1   /* Copyright 2002-2025 CS GROUP
2    * Licensed to CS GROUP (CS) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * CS licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.orekit.gnss.metric.ntrip;
18  
19  import java.io.IOException;
20  import java.io.InputStream;
21  import java.net.HttpURLConnection;
22  import java.net.SocketTimeoutException;
23  import java.net.URISyntaxException;
24  import java.util.ArrayList;
25  import java.util.Arrays;
26  import java.util.HashMap;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.concurrent.atomic.AtomicBoolean;
30  import java.util.concurrent.atomic.AtomicReference;
31  
32  import org.hipparchus.util.FastMath;
33  import org.orekit.errors.OrekitException;
34  import org.orekit.errors.OrekitInternalError;
35  import org.orekit.errors.OrekitMessages;
36  import org.orekit.gnss.metric.messages.ParsedMessage;
37  import org.orekit.gnss.metric.parser.AbstractEncodedMessage;
38  import org.orekit.gnss.metric.parser.MessagesParser;
39  
40  /** Monitor for retrieving streamed data from one mount point.
41   * @author Luc Maisonobe
42   * @since 11.0
43   */
44  public class StreamMonitor extends AbstractEncodedMessage implements Runnable {
45  
46      /** GGA header key. */
47      private static final String GGA_HEADER_KEY = "Ntrip-GGA";
48  
49      /** Content type for GNSS data. */
50      private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";
51  
52      /** Size of buffer for retrieving data. */
53      private static final int BUFFER_SIZE = 0x4000;
54  
55      /** Frame preamble. */
56      private static final int PREAMBLE = 0xD3;
57  
58      /** Frame preamble size. */
59      private static final int PREAMBLE_SIZE = 3;
60  
61      /** Frame CRC size. */
62      private static final int CRC_SIZE = 3;
63  
64      /** Generator polynomial for CRC. */
65      private static final int GENERATOR = 0x1864CFB;
66  
67      /** High bit of the generator polynomial. */
68      private static final int HIGH = 0x1000000;
69  
70      /** CRC 24Q lookup table. */
71      private static final int[] CRC_LOOKUP = new int[256];
72  
73      static {
74  
75          // set up lookup table
76          CRC_LOOKUP[0] = 0;
77          CRC_LOOKUP[1] = GENERATOR;
78  
79          int h = GENERATOR;
80          for (int i = 2; i < 256; i <<= 1) {
81              h <<= 1;
82              if ((h & HIGH) != 0) {
83                  h ^= GENERATOR;
84              }
85              for (int j = 0; j < i; ++j) {
86                  CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
87              }
88          }
89  
90      }
91  
92      /** Associated NTRIP client. */
93      private final NtripClient client;
94  
95      /** Mount point providing the stream. */
96      private final String mountPoint;
97  
98      /** Messages type of the mount point. */
99      private final Type type;
100 
101     /** Indicator for required NMEA. */
102     private final boolean nmeaRequired;
103 
104     /** Indicator for ignoring unknown messages. */
105     private final boolean ignoreUnknownMessageTypes;
106 
107     /** Delay before we reconnect after connection close. */
108     private final double reconnectDelay;
109 
110     /** Multiplication factor for reconnection delay. */
111     private final double reconnectDelayFactor;
112 
113     /** Max number of reconnections. */
114     private final int maxRetries;
115 
116     /** Stop flag. */
117     private AtomicBoolean stop;
118 
119     /** Circular buffer. */
120     private byte[] buffer;
121 
122     /** Read index. */
123     private int readIndex;
124 
125     /** Message end index. */
126     private int messageEndIndex;
127 
128     /** Write index. */
129     private int writeIndex;
130 
131     /** Observers for encoded messages. */
132     private final Map<Integer, List<MessageObserver>> observers;
133 
134     /** Last available message for each type. */
135     private final Map<Integer, ParsedMessage> lastMessages;
136 
137     /** Exception caught during monitoring. */
138     private final AtomicReference<OrekitException> exception;
139 
140     /** Build a monitor for streaming data from a mount point.
141      * @param client associated NTRIP client
142      * @param mountPoint mount point providing the stream
143      * @param type messages type of the mount point
144      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
145      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
146      * @param reconnectDelay delay before we reconnect after connection close
147      * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
148      * @param maxRetries max number of reconnect attempts without reading any data
149      */
150     public StreamMonitor(final NtripClient client,
151                          final String mountPoint, final Type type,
152                          final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
153                          final double reconnectDelay, final double reconnectDelayFactor,
154                          final int maxRetries) {
155         this.client                    = client;
156         this.mountPoint                = mountPoint;
157         this.type                      = type;
158         this.nmeaRequired              = requiresNMEA;
159         this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
160         this.reconnectDelay            = reconnectDelay;
161         this.reconnectDelayFactor      = reconnectDelayFactor;
162         this.maxRetries                = maxRetries;
163         this.stop                      = new AtomicBoolean(false);
164         this.observers                 = new HashMap<>();
165         this.lastMessages              = new HashMap<>();
166         this.exception                 = new AtomicReference<>(null);
167     }
168 
169     /** Add an observer for encoded messages.
170      * <p>
171      * If messages of the specified type have already been retrieved from
172      * a stream, the observer will be immediately notified with the last
173      * message as a side effect of being added.
174      * </p>
175      * @param typeCode code for the message type (if set to 0, notification
176      * will be triggered regardless of message type)
177      * @param observer observer for this message type
178      */
179     public void addObserver(final int typeCode, final MessageObserver observer) {
180         synchronized (observers) {
181 
182             // register the observer
183             observers.computeIfAbsent(typeCode, tc -> new ArrayList<>()).add(observer);
184 
185             // if we already have a message of the proper type
186             // immediately notify the new observer about it
187             final ParsedMessage last = lastMessages.get(typeCode);
188             if (last != null) {
189                 observer.messageAvailable(mountPoint, last);
190             }
191 
192         }
193     }
194 
195     /** Stop monitoring. */
196     public void stopMonitoring() {
197         stop.set(true);
198     }
199 
200     /** Retrieve exception caught during monitoring.
201      * @return exception caught
202      */
203     public OrekitException getException() {
204         return exception.get();
205     }
206 
207     /** {@inheritDoc} */
208     @Override
209     public void run() {
210 
211         try {
212 
213             final MessagesParser parser = type.getParser(extractUsedMessages(), client.getTimeScales());
214             int nbAttempts = 0;
215             double delay = reconnectDelay;
216             while (nbAttempts < maxRetries) {
217 
218                 try {
219                     // prepare request
220                     final HttpURLConnection connection = client.connect(mountPoint);
221                     if (nmeaRequired) {
222                         if (client.getGGA() == null) {
223                             throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
224                         } else {
225                             // update NMEA GGA sentence in the extra headers for this mount point
226                             connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
227                         }
228                     }
229 
230                     // perform request
231                     final int responseCode = connection.getResponseCode();
232                     if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
233                         throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
234                     } else if (responseCode != HttpURLConnection.HTTP_OK) {
235                         throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
236                                                   connection.getURL().getHost(),
237                                                   connection.getResponseMessage());
238                     }
239 
240                     // for this request, we MUST get GNSS data
241                     if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
242                         throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
243                     }
244 
245                     // data extraction loop
246                     resetCircularBuffer();
247                     try (InputStream is = connection.getInputStream()) {
248 
249                         for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
250 
251                             // we have read something, reset reconnection attempts counters
252                             nbAttempts = 0;
253                             delay      = reconnectDelay;
254 
255                             if (stop.get()) {
256                                 // stop monitoring immediately
257                                 // (returning closes the input stream automatically)
258                                 return;
259                             }
260 
261                             while (bufferSize() >= 3) {
262                                 if (peekByte(0) != PREAMBLE) {
263                                     // we are out of synch with respect to frame structure
264                                     // drop the unknown byte
265                                     moveRead(1);
266                                 } else {
267                                     final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
268                                     if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
269                                         // check CRC
270                                         final int crc = (peekByte(PREAMBLE_SIZE + size)     << 16) |
271                                                         (peekByte(PREAMBLE_SIZE + size + 1) <<  8) |
272                                                          peekByte(PREAMBLE_SIZE + size + 2);
273                                         if (crc == computeCRC(PREAMBLE_SIZE + size)) {
274                                             // we have a complete and consistent frame
275                                             // we can extract the message it contains
276                                             messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
277                                             moveRead(PREAMBLE_SIZE);
278                                             start();
279                                             final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
280                                             if (message != null) {
281                                                 storeAndNotify(message);
282                                             }
283                                             // jump to expected message end, in case the message was corrupted
284                                             // and parsing did not reach message end
285                                             readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
286                                         } else {
287                                             // CRC is not consistent, we are probably not really synched
288                                             // and the preamble byte was just a random byte
289                                             // we drop this single byte and continue looking for sync
290                                             moveRead(1);
291                                         }
292                                     } else {
293                                         // the frame is not complete, we need more data
294                                         break;
295                                     }
296                                 }
297                             }
298 
299                         }
300 
301                     }
302                 } catch (SocketTimeoutException ste) {
303                     // ignore exception, it will be handled by reconnection attempt below
304                 } catch (IOException | URISyntaxException e) {
305                     throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
306                 }
307 
308                 // manage reconnection
309                 try {
310                     Thread.sleep((int) FastMath.rint(delay * 1000));
311                 } catch (InterruptedException ie) {
312                     // Restore interrupted state...
313                     Thread.currentThread().interrupt();
314                 }
315                 ++nbAttempts;
316                 delay *= reconnectDelayFactor;
317 
318             }
319 
320         } catch (OrekitException oe) {
321             // store the exception so it can be retrieved by Ntrip client
322             exception.set(oe);
323         }
324 
325     }
326 
327     /** Store a parsed encoded message and notify observers.
328      * @param message parsed message
329      */
330     private void storeAndNotify(final ParsedMessage message) {
331         synchronized (observers) {
332 
333             for (int typeCode : Arrays.asList(0, message.getTypeCode())) {
334 
335                 // store message
336                 lastMessages.put(typeCode, message);
337 
338                 // notify observers
339                 final List<MessageObserver> list = observers.get(typeCode);
340                 if (list != null) {
341                     for (final MessageObserver observer : list) {
342                         // notify observer
343                         observer.messageAvailable(mountPoint, message);
344                     }
345                 }
346 
347             }
348 
349         }
350     }
351 
352     /** Reset the circular buffer.
353      */
354     private void resetCircularBuffer() {
355         buffer     = new byte[BUFFER_SIZE];
356         readIndex  = 0;
357         writeIndex = 0;
358     }
359 
360     /** Extract data from input stream.
361      * @param is input stream to extract data from
362      * @return number of byes read or -1
363      * @throws IOException if data cannot be extracted properly
364      */
365     private int fillUp(final InputStream is) throws IOException {
366         final int max = bufferMaxWrite();
367         if (max == 0) {
368             // this should never happen
369             // the buffer is large enough for almost 16 encoded messages, including wrapping frame
370             throw new OrekitInternalError(null);
371         }
372         final int r = is.read(buffer, writeIndex, max);
373         if (r >= 0) {
374             writeIndex = (writeIndex + r) % BUFFER_SIZE;
375         }
376         return r;
377     }
378 
379     /** {@inheritDoc} */
380     @Override
381     protected int fetchByte() {
382         if (readIndex == messageEndIndex || readIndex == writeIndex) {
383             return -1;
384         }
385 
386         final int ret = buffer[readIndex] & 0xFF;
387         moveRead(1);
388         return ret;
389     }
390 
391     /** Get the number of bytes currently in the buffer.
392      * @return number of bytes currently in the buffer
393      */
394     private int bufferSize() {
395         final int n = writeIndex - readIndex;
396         return n >= 0 ? n : BUFFER_SIZE + n;
397     }
398 
399     /** Peek a buffer byte without moving read pointer.
400      * @param offset offset counted from read pointer
401      * @return value of the byte at given offset
402      */
403     private int peekByte(final int offset) {
404         return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
405     }
406 
407     /** Move read pointer.
408      * @param n number of bytes to move read pointer
409      */
410     private void moveRead(final int n) {
411         readIndex = (readIndex + n) % BUFFER_SIZE;
412     }
413 
414     /** Get the number of bytes that can be added to the buffer without wrapping around.
415      * @return number of bytes that can be added
416      */
417     private int bufferMaxWrite() {
418         if (writeIndex >= readIndex) {
419             return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
420         } else {
421             return readIndex - writeIndex - 1;
422         }
423     }
424 
425     /** Compute QualCom CRC.
426      * @param length length of the byte stream
427      * @return QualCom CRC
428      */
429     private int computeCRC(final int length) {
430         int crc = 0;
431         for (int i = 0; i < length; ++i) {
432             crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
433         }
434         return crc;
435     }
436 
437     /** Extract the used messages.
438      * @return the extracted messages
439      */
440     private List<Integer> extractUsedMessages() {
441         synchronized (observers) {
442 
443             // List of needed messages
444             final List<Integer> messages = new ArrayList<>();
445 
446             // Loop on observers entries
447             for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
448                 // Extract message type code
449                 final int typeCode = entry.getKey();
450                 // Add to the list
451                 messages.add(typeCode);
452             }
453 
454             return messages;
455         }
456     }
457 
458 }