1   /* Copyright 2002-2022 CS GROUP
2    * Licensed to CS GROUP (CS) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * CS licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.orekit.gnss.metric.ntrip;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.net.HttpURLConnection;
22  import java.net.SocketTimeoutException;
23  import java.util.ArrayList;
24  import java.util.Arrays;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.concurrent.atomic.AtomicBoolean;
29  import java.util.concurrent.atomic.AtomicReference;
30  
31  import org.hipparchus.util.FastMath;
32  import org.orekit.errors.OrekitException;
33  import org.orekit.errors.OrekitInternalError;
34  import org.orekit.errors.OrekitMessages;
35  import org.orekit.gnss.metric.messages.ParsedMessage;
36  import org.orekit.gnss.metric.parser.AbstractEncodedMessages;
37  import org.orekit.gnss.metric.parser.MessagesParser;
38  
39  /** Monitor for retrieving streamed data from one mount point.
40   * @author Luc Maisonobe
41   * @since 11.0
42   */
43  public class StreamMonitor extends AbstractEncodedMessages implements Runnable {
44  
45      /** GGA header key. */
46      private static final String GGA_HEADER_KEY = "Ntrip-GGA";
47  
48      /** Content type for GNSS data. */
49      private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";
50  
51      /** Size of buffer for retrieving data. */
52      private static final int BUFFER_SIZE = 0x4000;
53  
54      /** Frame preamble. */
55      private static final int PREAMBLE = 0xD3;
56  
57      /** Frame preamble size. */
58      private static final int PREAMBLE_SIZE = 3;
59  
60      /** Frame CRC size. */
61      private static final int CRC_SIZE = 3;
62  
63      /** Generator polynomial for CRC. */
64      private static final int GENERATOR = 0x1864CFB;
65  
66      /** High bit of the generator polynomial. */
67      private static final int HIGH = 0x1000000;
68  
69      /** CRC 24Q lookup table. */
70      private static final int[] CRC_LOOKUP = new int[256];
71  
72      static {
73  
74          // set up lookup table
75          CRC_LOOKUP[0] = 0;
76          CRC_LOOKUP[1] = GENERATOR;
77  
78          int h = GENERATOR;
79          for (int i = 2; i < 256; i <<= 1) {
80              h <<= 1;
81              if ((h & HIGH) != 0) {
82                  h ^= GENERATOR;
83              }
84              for (int j = 0; j < i; ++j) {
85                  CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
86              }
87          }
88  
89      }
90  
91      /** Associated NTRIP client. */
92      private final NtripClient client;
93  
94      /** Mount point providing the stream. */
95      private final String mountPoint;
96  
97      /** Messages type of the mount point. */
98      private final Type type;
99  
100     /** Indicator for required NMEA. */
101     private final boolean nmeaRequired;
102 
103     /** Indicator for ignoring unknown messages. */
104     private final boolean ignoreUnknownMessageTypes;
105 
106     /** Delay before we reconnect after connection close. */
107     private final double reconnectDelay;
108 
109     /** Multiplication factor for reconnection delay. */
110     private final double reconnectDelayFactor;
111 
112     /** Max number of reconnections. */
113     private final int maxRetries;
114 
115     /** Stop flag. */
116     private AtomicBoolean stop;
117 
118     /** Circular buffer. */
119     private byte[] buffer;
120 
121     /** Read index. */
122     private int readIndex;
123 
124     /** Message end index. */
125     private int messageEndIndex;
126 
127     /** Write index. */
128     private int writeIndex;
129 
130     /** Observers for encoded messages. */
131     private final Map<Integer, List<MessageObserver>> observers;
132 
133     /** Last available message for each type. */
134     private final Map<Integer, ParsedMessage> lastMessages;
135 
136     /** Exception caught during monitoring. */
137     private final AtomicReference<OrekitException> exception;
138 
139     /** Build a monitor for streaming data from a mount point.
140      * @param client associated NTRIP client
141      * @param mountPoint mount point providing the stream
142      * @param type messages type of the mount point
143      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
144      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
145      * @param reconnectDelay delay before we reconnect after connection close
146      * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
147      * @param maxRetries max number of reconnect a attempts without reading any data
148      */
149     public StreamMonitor(final NtripClient client,
150                          final String mountPoint, final Type type,
151                          final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
152                          final double reconnectDelay, final double reconnectDelayFactor,
153                          final int maxRetries) {
154         this.client                    = client;
155         this.mountPoint                = mountPoint;
156         this.type                      = type;
157         this.nmeaRequired              = requiresNMEA;
158         this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
159         this.reconnectDelay            = reconnectDelay;
160         this.reconnectDelayFactor      = reconnectDelayFactor;
161         this.maxRetries                = maxRetries;
162         this.stop                      = new AtomicBoolean(false);
163         this.observers                 = new HashMap<>();
164         this.lastMessages              = new HashMap<>();
165         this.exception                 = new AtomicReference<OrekitException>(null);
166     }
167 
168     /** Add an observer for encoded messages.
169      * <p>
170      * If messages of the specified type have already been retrieved from
171      * a stream, the observer will be immediately notified with the last
172      * message as a side effect of being added.
173      * </p>
174      * @param typeCode code for the message type (if set to 0, notification
175      * will be triggered regardless of message type)
176      * @param observer observer for this message type
177      */
178     public void addObserver(final int typeCode, final MessageObserver observer) {
179         synchronized (observers) {
180 
181             // register the observer
182             List<MessageObserver> list = observers.get(typeCode);
183             if (list == null) {
184                 // create a new list the first time we register an observer for a message
185                 list =  new ArrayList<>();
186                 observers.put(typeCode, list);
187             }
188             list.add(observer);
189 
190             // if we already have a message of the proper type
191             // immediately notify the new observer about it
192             final ParsedMessage last = lastMessages.get(typeCode);
193             if (last != null) {
194                 observer.messageAvailable(mountPoint, last);
195             }
196 
197         }
198     }
199 
200     /** Stop monitoring. */
201     public void stopMonitoring() {
202         stop.set(true);
203     }
204 
205     /** Retrieve exception caught during monitoring.
206      * @return exception caught
207      */
208     public OrekitException getException() {
209         return exception.get();
210     }
211 
212     /** {@inheritDoc} */
213     @Override
214     public void run() {
215 
216         try {
217 
218             final MessagesParser parser = type.getParser(extractUsedMessages());
219             int nbAttempts = 0;
220             double delay = reconnectDelay;
221             while (nbAttempts < maxRetries) {
222 
223                 try {
224                     // prepare request
225                     final HttpURLConnection connection = client.connect(mountPoint);
226                     if (nmeaRequired) {
227                         if (client.getGGA() == null) {
228                             throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
229                         } else {
230                             // update NMEA GGA sentence in the extra headers for this mount point
231                             connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
232                         }
233                     }
234 
235                     // perform request
236                     final int responseCode = connection.getResponseCode();
237                     if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
238                         throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
239                     } else if (responseCode != HttpURLConnection.HTTP_OK) {
240                         throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
241                                                   connection.getURL().getHost(),
242                                                   connection.getResponseMessage());
243                     }
244 
245                     // for this request, we MUST get GNSS data
246                     if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
247                         throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
248                     }
249 
250                     // data extraction loop
251                     resetCircularBuffer();
252                     try (InputStream is = connection.getInputStream()) {
253 
254                         for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
255 
256                             // we have read something, reset reconnection attempts counters
257                             nbAttempts = 0;
258                             delay      = reconnectDelay;
259 
260                             if (stop.get()) {
261                                 // stop monitoring immediately
262                                 // (returning closes the input stream automatically)
263                                 return;
264                             }
265 
266                             while (bufferSize() >= 3) {
267                                 if (peekByte(0) != PREAMBLE) {
268                                     // we are out of synch with respect to frame structure
269                                     // drop the unknown byte
270                                     moveRead(1);
271                                 } else {
272                                     final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
273                                     if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
274                                         // check CRC
275                                         final int crc = (peekByte(PREAMBLE_SIZE + size)     << 16) |
276                                                         (peekByte(PREAMBLE_SIZE + size + 1) <<  8) |
277                                                          peekByte(PREAMBLE_SIZE + size + 2);
278                                         if (crc == computeCRC(PREAMBLE_SIZE + size)) {
279                                             // we have a complete and consistent frame
280                                             // we can extract the message it contains
281                                             messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
282                                             moveRead(PREAMBLE_SIZE);
283                                             start();
284                                             final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
285                                             if (message != null) {
286                                                 storeAndNotify(message);
287                                             }
288                                             // jump to expected message end, in case the message was corrupted
289                                             // and parsing did not reach message end
290                                             readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
291                                         } else {
292                                             // CRC is not consistent, we are probably not really synched
293                                             // and the preamble byte was just a random byte
294                                             // we drop this single byte and continue looking for sync
295                                             moveRead(1);
296                                         }
297                                     } else {
298                                         // the frame is not complete, we need more data
299                                         break;
300                                     }
301                                 }
302                             }
303 
304                         }
305 
306                     }
307                 } catch (SocketTimeoutException ste) {
308                     // ignore exception, it will be handled by reconnection attempt below
309                 } catch (IOException ioe) {
310                     throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
311                 }
312 
313                 // manage reconnection
314                 try {
315                     Thread.sleep((int) FastMath.rint(delay * 1000));
316                 } catch (InterruptedException ie) {
317                     // Restore interrupted state...
318                     Thread.currentThread().interrupt();
319                 }
320                 ++nbAttempts;
321                 delay *= reconnectDelayFactor;
322 
323             }
324 
325         } catch (OrekitException oe) {
326             // store the exception so it can be retrieved by Ntrip client
327             exception.set(oe);
328         }
329 
330     }
331 
332     /** Store a parsed encoded message and notify observers.
333      * @param message parsed message
334      */
335     private void storeAndNotify(final ParsedMessage message) {
336         synchronized (observers) {
337 
338             for (int typeCode : Arrays.asList(0, message.getTypeCode())) {
339 
340                 // store message
341                 lastMessages.put(typeCode, message);
342 
343                 // notify observers
344                 final List<MessageObserver> list = observers.get(typeCode);
345                 if (list != null) {
346                     for (final MessageObserver observer : list) {
347                         // notify observer
348                         observer.messageAvailable(mountPoint, message);
349                     }
350                 }
351 
352             }
353 
354         }
355     }
356 
357     /** Reset the circular buffer.
358      */
359     private void resetCircularBuffer() {
360         buffer     = new byte[BUFFER_SIZE];
361         readIndex  = 0;
362         writeIndex = 0;
363     }
364 
365     /** Extract data from input stream.
366      * @param is input stream to extract data from
367      * @return number of byes read or -1
368      * @throws IOException if data cannot be extracted properly
369      */
370     private int fillUp(final InputStream is) throws IOException {
371         final int max = bufferMaxWrite();
372         if (max == 0) {
373             // this should never happen
374             // the buffer is large enough for almost 16 encoded messages, including wrapping frame
375             throw new OrekitInternalError(null);
376         }
377         final int r = is.read(buffer, writeIndex, max);
378         if (r >= 0) {
379             writeIndex = (writeIndex + r) % BUFFER_SIZE;
380         }
381         return r;
382     }
383 
384     /** {@inheritDoc} */
385     @Override
386     protected int fetchByte() {
387         if (readIndex == messageEndIndex || readIndex == writeIndex) {
388             return -1;
389         }
390 
391         final int ret = buffer[readIndex] & 0xFF;
392         moveRead(1);
393         return ret;
394     }
395 
396     /** Get the number of bytes currently in the buffer.
397      * @return number of bytes currently in the buffer
398      */
399     private int bufferSize() {
400         final int n = writeIndex - readIndex;
401         return n >= 0 ? n : BUFFER_SIZE + n;
402     }
403 
404     /** Peek a buffer byte without moving read pointer.
405      * @param offset offset counted from read pointer
406      * @return value of the byte at given offset
407      */
408     private int peekByte(final int offset) {
409         return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
410     }
411 
412     /** Move read pointer.
413      * @param n number of bytes to move read pointer
414      */
415     private void moveRead(final int n) {
416         readIndex = (readIndex + n) % BUFFER_SIZE;
417     }
418 
419     /** Get the number of bytes that can be added to the buffer without wrapping around.
420      * @return number of bytes that can be added
421      */
422     private int bufferMaxWrite() {
423         if (writeIndex >= readIndex) {
424             return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
425         } else {
426             return readIndex - writeIndex - 1;
427         }
428     }
429 
430     /** Compute QualCom CRC.
431      * @param length length of the byte stream
432      * @return QualCom CRC
433      */
434     private int computeCRC(final int length) {
435         int crc = 0;
436         for (int i = 0; i < length; ++i) {
437             crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
438         }
439         return crc;
440     }
441 
442     private List<Integer> extractUsedMessages() {
443         synchronized (observers) {
444 
445             // List of needed messages
446             final List<Integer> messages = new ArrayList<>();
447 
448             // Loop on observers entries
449             for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
450                 // Extract message type code
451                 final int typeCode = entry.getKey();
452                 // Add to the list
453                 messages.add(typeCode);
454             }
455 
456             return messages;
457         }
458     }
459 
460 }