1 /* Copyright 2002-2025 CS GROUP
2 * Licensed to CS GROUP (CS) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * CS licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.orekit.gnss.metric.ntrip;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.net.HttpURLConnection;
22 import java.net.SocketTimeoutException;
23 import java.net.URISyntaxException;
24 import java.util.ArrayList;
25 import java.util.Arrays;
26 import java.util.HashMap;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.atomic.AtomicReference;
31
32 import org.hipparchus.util.FastMath;
33 import org.orekit.errors.OrekitException;
34 import org.orekit.errors.OrekitInternalError;
35 import org.orekit.errors.OrekitMessages;
36 import org.orekit.gnss.metric.messages.ParsedMessage;
37 import org.orekit.gnss.metric.parser.AbstractEncodedMessage;
38 import org.orekit.gnss.metric.parser.MessagesParser;
39
40 /** Monitor for retrieving streamed data from one mount point.
41 * @author Luc Maisonobe
42 * @since 11.0
43 */
44 public class StreamMonitor extends AbstractEncodedMessage implements Runnable {
45
46 /** GGA header key. */
47 private static final String GGA_HEADER_KEY = "Ntrip-GGA";
48
49 /** Content type for GNSS data. */
50 private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";
51
52 /** Size of buffer for retrieving data. */
53 private static final int BUFFER_SIZE = 0x4000;
54
55 /** Frame preamble. */
56 private static final int PREAMBLE = 0xD3;
57
58 /** Frame preamble size. */
59 private static final int PREAMBLE_SIZE = 3;
60
61 /** Frame CRC size. */
62 private static final int CRC_SIZE = 3;
63
64 /** Generator polynomial for CRC. */
65 private static final int GENERATOR = 0x1864CFB;
66
67 /** High bit of the generator polynomial. */
68 private static final int HIGH = 0x1000000;
69
70 /** CRC 24Q lookup table. */
71 private static final int[] CRC_LOOKUP = new int[256];
72
73 static {
74
75 // set up lookup table
76 CRC_LOOKUP[0] = 0;
77 CRC_LOOKUP[1] = GENERATOR;
78
79 int h = GENERATOR;
80 for (int i = 2; i < 256; i <<= 1) {
81 h <<= 1;
82 if ((h & HIGH) != 0) {
83 h ^= GENERATOR;
84 }
85 for (int j = 0; j < i; ++j) {
86 CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
87 }
88 }
89
90 }
91
92 /** Associated NTRIP client. */
93 private final NtripClient client;
94
95 /** Mount point providing the stream. */
96 private final String mountPoint;
97
98 /** Messages type of the mount point. */
99 private final Type type;
100
101 /** Indicator for required NMEA. */
102 private final boolean nmeaRequired;
103
104 /** Indicator for ignoring unknown messages. */
105 private final boolean ignoreUnknownMessageTypes;
106
107 /** Delay before we reconnect after connection close. */
108 private final double reconnectDelay;
109
110 /** Multiplication factor for reconnection delay. */
111 private final double reconnectDelayFactor;
112
113 /** Max number of reconnections. */
114 private final int maxRetries;
115
116 /** Stop flag. */
117 private AtomicBoolean stop;
118
119 /** Circular buffer. */
120 private byte[] buffer;
121
122 /** Read index. */
123 private int readIndex;
124
125 /** Message end index. */
126 private int messageEndIndex;
127
128 /** Write index. */
129 private int writeIndex;
130
131 /** Observers for encoded messages. */
132 private final Map<Integer, List<MessageObserver>> observers;
133
134 /** Last available message for each type. */
135 private final Map<Integer, ParsedMessage> lastMessages;
136
137 /** Exception caught during monitoring. */
138 private final AtomicReference<OrekitException> exception;
139
140 /** Build a monitor for streaming data from a mount point.
141 * @param client associated NTRIP client
142 * @param mountPoint mount point providing the stream
143 * @param type messages type of the mount point
144 * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
145 * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
146 * @param reconnectDelay delay before we reconnect after connection close
147 * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
148 * @param maxRetries max number of reconnect attempts without reading any data
149 */
150 public StreamMonitor(final NtripClient client,
151 final String mountPoint, final Type type,
152 final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
153 final double reconnectDelay, final double reconnectDelayFactor,
154 final int maxRetries) {
155 this.client = client;
156 this.mountPoint = mountPoint;
157 this.type = type;
158 this.nmeaRequired = requiresNMEA;
159 this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
160 this.reconnectDelay = reconnectDelay;
161 this.reconnectDelayFactor = reconnectDelayFactor;
162 this.maxRetries = maxRetries;
163 this.stop = new AtomicBoolean(false);
164 this.observers = new HashMap<>();
165 this.lastMessages = new HashMap<>();
166 this.exception = new AtomicReference<>(null);
167 }
168
169 /** Add an observer for encoded messages.
170 * <p>
171 * If messages of the specified type have already been retrieved from
172 * a stream, the observer will be immediately notified with the last
173 * message as a side effect of being added.
174 * </p>
175 * @param typeCode code for the message type (if set to 0, notification
176 * will be triggered regardless of message type)
177 * @param observer observer for this message type
178 */
179 public void addObserver(final int typeCode, final MessageObserver observer) {
180 synchronized (observers) {
181
182 // register the observer
183 observers.computeIfAbsent(typeCode, tc -> new ArrayList<>()).add(observer);
184
185 // if we already have a message of the proper type
186 // immediately notify the new observer about it
187 final ParsedMessage last = lastMessages.get(typeCode);
188 if (last != null) {
189 observer.messageAvailable(mountPoint, last);
190 }
191
192 }
193 }
194
195 /** Stop monitoring. */
196 public void stopMonitoring() {
197 stop.set(true);
198 }
199
200 /** Retrieve exception caught during monitoring.
201 * @return exception caught
202 */
203 public OrekitException getException() {
204 return exception.get();
205 }
206
207 /** {@inheritDoc} */
208 @Override
209 public void run() {
210
211 try {
212
213 final MessagesParser parser = type.getParser(extractUsedMessages(), client.getTimeScales());
214 int nbAttempts = 0;
215 double delay = reconnectDelay;
216 while (nbAttempts < maxRetries) {
217
218 try {
219 // prepare request
220 final HttpURLConnection connection = client.connect(mountPoint);
221 if (nmeaRequired) {
222 if (client.getGGA() == null) {
223 throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
224 } else {
225 // update NMEA GGA sentence in the extra headers for this mount point
226 connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
227 }
228 }
229
230 // perform request
231 final int responseCode = connection.getResponseCode();
232 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
233 throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
234 } else if (responseCode != HttpURLConnection.HTTP_OK) {
235 throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
236 connection.getURL().getHost(),
237 connection.getResponseMessage());
238 }
239
240 // for this request, we MUST get GNSS data
241 if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
242 throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
243 }
244
245 // data extraction loop
246 resetCircularBuffer();
247 try (InputStream is = connection.getInputStream()) {
248
249 for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
250
251 // we have read something, reset reconnection attempts counters
252 nbAttempts = 0;
253 delay = reconnectDelay;
254
255 if (stop.get()) {
256 // stop monitoring immediately
257 // (returning closes the input stream automatically)
258 return;
259 }
260
261 while (bufferSize() >= 3) {
262 if (peekByte(0) != PREAMBLE) {
263 // we are out of synch with respect to frame structure
264 // drop the unknown byte
265 moveRead(1);
266 } else {
267 final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
268 if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
269 // check CRC
270 final int crc = (peekByte(PREAMBLE_SIZE + size) << 16) |
271 (peekByte(PREAMBLE_SIZE + size + 1) << 8) |
272 peekByte(PREAMBLE_SIZE + size + 2);
273 if (crc == computeCRC(PREAMBLE_SIZE + size)) {
274 // we have a complete and consistent frame
275 // we can extract the message it contains
276 messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
277 moveRead(PREAMBLE_SIZE);
278 start();
279 final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
280 if (message != null) {
281 storeAndNotify(message);
282 }
283 // jump to expected message end, in case the message was corrupted
284 // and parsing did not reach message end
285 readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
286 } else {
287 // CRC is not consistent, we are probably not really synched
288 // and the preamble byte was just a random byte
289 // we drop this single byte and continue looking for sync
290 moveRead(1);
291 }
292 } else {
293 // the frame is not complete, we need more data
294 break;
295 }
296 }
297 }
298
299 }
300
301 }
302 } catch (SocketTimeoutException ste) {
303 // ignore exception, it will be handled by reconnection attempt below
304 } catch (IOException | URISyntaxException e) {
305 throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
306 }
307
308 // manage reconnection
309 try {
310 Thread.sleep((int) FastMath.rint(delay * 1000));
311 } catch (InterruptedException ie) {
312 // Restore interrupted state...
313 Thread.currentThread().interrupt();
314 }
315 ++nbAttempts;
316 delay *= reconnectDelayFactor;
317
318 }
319
320 } catch (OrekitException oe) {
321 // store the exception so it can be retrieved by Ntrip client
322 exception.set(oe);
323 }
324
325 }
326
327 /** Store a parsed encoded message and notify observers.
328 * @param message parsed message
329 */
330 private void storeAndNotify(final ParsedMessage message) {
331 synchronized (observers) {
332
333 for (int typeCode : Arrays.asList(0, message.getTypeCode())) {
334
335 // store message
336 lastMessages.put(typeCode, message);
337
338 // notify observers
339 final List<MessageObserver> list = observers.get(typeCode);
340 if (list != null) {
341 for (final MessageObserver observer : list) {
342 // notify observer
343 observer.messageAvailable(mountPoint, message);
344 }
345 }
346
347 }
348
349 }
350 }
351
352 /** Reset the circular buffer.
353 */
354 private void resetCircularBuffer() {
355 buffer = new byte[BUFFER_SIZE];
356 readIndex = 0;
357 writeIndex = 0;
358 }
359
360 /** Extract data from input stream.
361 * @param is input stream to extract data from
362 * @return number of byes read or -1
363 * @throws IOException if data cannot be extracted properly
364 */
365 private int fillUp(final InputStream is) throws IOException {
366 final int max = bufferMaxWrite();
367 if (max == 0) {
368 // this should never happen
369 // the buffer is large enough for almost 16 encoded messages, including wrapping frame
370 throw new OrekitInternalError(null);
371 }
372 final int r = is.read(buffer, writeIndex, max);
373 if (r >= 0) {
374 writeIndex = (writeIndex + r) % BUFFER_SIZE;
375 }
376 return r;
377 }
378
379 /** {@inheritDoc} */
380 @Override
381 protected int fetchByte() {
382 if (readIndex == messageEndIndex || readIndex == writeIndex) {
383 return -1;
384 }
385
386 final int ret = buffer[readIndex] & 0xFF;
387 moveRead(1);
388 return ret;
389 }
390
391 /** Get the number of bytes currently in the buffer.
392 * @return number of bytes currently in the buffer
393 */
394 private int bufferSize() {
395 final int n = writeIndex - readIndex;
396 return n >= 0 ? n : BUFFER_SIZE + n;
397 }
398
399 /** Peek a buffer byte without moving read pointer.
400 * @param offset offset counted from read pointer
401 * @return value of the byte at given offset
402 */
403 private int peekByte(final int offset) {
404 return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
405 }
406
407 /** Move read pointer.
408 * @param n number of bytes to move read pointer
409 */
410 private void moveRead(final int n) {
411 readIndex = (readIndex + n) % BUFFER_SIZE;
412 }
413
414 /** Get the number of bytes that can be added to the buffer without wrapping around.
415 * @return number of bytes that can be added
416 */
417 private int bufferMaxWrite() {
418 if (writeIndex >= readIndex) {
419 return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
420 } else {
421 return readIndex - writeIndex - 1;
422 }
423 }
424
425 /** Compute QualCom CRC.
426 * @param length length of the byte stream
427 * @return QualCom CRC
428 */
429 private int computeCRC(final int length) {
430 int crc = 0;
431 for (int i = 0; i < length; ++i) {
432 crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
433 }
434 return crc;
435 }
436
437 /** Extract the used messages.
438 * @return the extracted messages
439 */
440 private List<Integer> extractUsedMessages() {
441 synchronized (observers) {
442
443 // List of needed messages
444 final List<Integer> messages = new ArrayList<>();
445
446 // Loop on observers entries
447 for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
448 // Extract message type code
449 final int typeCode = entry.getKey();
450 // Add to the list
451 messages.add(typeCode);
452 }
453
454 return messages;
455 }
456 }
457
458 }