1 /* Copyright 2002-2022 CS GROUP
2 * Licensed to CS GROUP (CS) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * CS licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17 package org.orekit.gnss.metric.ntrip;
18
19 import java.io.IOException;
20 import java.io.InputStream;
21 import java.net.HttpURLConnection;
22 import java.net.SocketTimeoutException;
23 import java.util.ArrayList;
24 import java.util.Arrays;
25 import java.util.HashMap;
26 import java.util.List;
27 import java.util.Map;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 import java.util.concurrent.atomic.AtomicReference;
30
31 import org.hipparchus.util.FastMath;
32 import org.orekit.errors.OrekitException;
33 import org.orekit.errors.OrekitInternalError;
34 import org.orekit.errors.OrekitMessages;
35 import org.orekit.gnss.metric.messages.ParsedMessage;
36 import org.orekit.gnss.metric.parser.AbstractEncodedMessages;
37 import org.orekit.gnss.metric.parser.MessagesParser;
38
39 /** Monitor for retrieving streamed data from one mount point.
40 * @author Luc Maisonobe
41 * @since 11.0
42 */
43 public class StreamMonitor extends AbstractEncodedMessages implements Runnable {
44
45 /** GGA header key. */
46 private static final String GGA_HEADER_KEY = "Ntrip-GGA";
47
48 /** Content type for GNSS data. */
49 private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";
50
51 /** Size of buffer for retrieving data. */
52 private static final int BUFFER_SIZE = 0x4000;
53
54 /** Frame preamble. */
55 private static final int PREAMBLE = 0xD3;
56
57 /** Frame preamble size. */
58 private static final int PREAMBLE_SIZE = 3;
59
60 /** Frame CRC size. */
61 private static final int CRC_SIZE = 3;
62
63 /** Generator polynomial for CRC. */
64 private static final int GENERATOR = 0x1864CFB;
65
66 /** High bit of the generator polynomial. */
67 private static final int HIGH = 0x1000000;
68
69 /** CRC 24Q lookup table. */
70 private static final int[] CRC_LOOKUP = new int[256];
71
72 static {
73
74 // set up lookup table
75 CRC_LOOKUP[0] = 0;
76 CRC_LOOKUP[1] = GENERATOR;
77
78 int h = GENERATOR;
79 for (int i = 2; i < 256; i <<= 1) {
80 h <<= 1;
81 if ((h & HIGH) != 0) {
82 h ^= GENERATOR;
83 }
84 for (int j = 0; j < i; ++j) {
85 CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
86 }
87 }
88
89 }
90
91 /** Associated NTRIP client. */
92 private final NtripClient client;
93
94 /** Mount point providing the stream. */
95 private final String mountPoint;
96
97 /** Messages type of the mount point. */
98 private final Type type;
99
100 /** Indicator for required NMEA. */
101 private final boolean nmeaRequired;
102
103 /** Indicator for ignoring unknown messages. */
104 private final boolean ignoreUnknownMessageTypes;
105
106 /** Delay before we reconnect after connection close. */
107 private final double reconnectDelay;
108
109 /** Multiplication factor for reconnection delay. */
110 private final double reconnectDelayFactor;
111
112 /** Max number of reconnections. */
113 private final int maxRetries;
114
115 /** Stop flag. */
116 private AtomicBoolean stop;
117
118 /** Circular buffer. */
119 private byte[] buffer;
120
121 /** Read index. */
122 private int readIndex;
123
124 /** Message end index. */
125 private int messageEndIndex;
126
127 /** Write index. */
128 private int writeIndex;
129
130 /** Observers for encoded messages. */
131 private final Map<Integer, List<MessageObserver>> observers;
132
133 /** Last available message for each type. */
134 private final Map<Integer, ParsedMessage> lastMessages;
135
136 /** Exception caught during monitoring. */
137 private final AtomicReference<OrekitException> exception;
138
139 /** Build a monitor for streaming data from a mount point.
140 * @param client associated NTRIP client
141 * @param mountPoint mount point providing the stream
142 * @param type messages type of the mount point
143 * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
144 * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
145 * @param reconnectDelay delay before we reconnect after connection close
146 * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
147 * @param maxRetries max number of reconnect a attempts without reading any data
148 */
149 public StreamMonitor(final NtripClient client,
150 final String mountPoint, final Type type,
151 final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
152 final double reconnectDelay, final double reconnectDelayFactor,
153 final int maxRetries) {
154 this.client = client;
155 this.mountPoint = mountPoint;
156 this.type = type;
157 this.nmeaRequired = requiresNMEA;
158 this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
159 this.reconnectDelay = reconnectDelay;
160 this.reconnectDelayFactor = reconnectDelayFactor;
161 this.maxRetries = maxRetries;
162 this.stop = new AtomicBoolean(false);
163 this.observers = new HashMap<>();
164 this.lastMessages = new HashMap<>();
165 this.exception = new AtomicReference<OrekitException>(null);
166 }
167
168 /** Add an observer for encoded messages.
169 * <p>
170 * If messages of the specified type have already been retrieved from
171 * a stream, the observer will be immediately notified with the last
172 * message as a side effect of being added.
173 * </p>
174 * @param typeCode code for the message type (if set to 0, notification
175 * will be triggered regardless of message type)
176 * @param observer observer for this message type
177 */
178 public void addObserver(final int typeCode, final MessageObserver observer) {
179 synchronized (observers) {
180
181 // register the observer
182 List<MessageObserver> list = observers.get(typeCode);
183 if (list == null) {
184 // create a new list the first time we register an observer for a message
185 list = new ArrayList<>();
186 observers.put(typeCode, list);
187 }
188 list.add(observer);
189
190 // if we already have a message of the proper type
191 // immediately notify the new observer about it
192 final ParsedMessage last = lastMessages.get(typeCode);
193 if (last != null) {
194 observer.messageAvailable(mountPoint, last);
195 }
196
197 }
198 }
199
200 /** Stop monitoring. */
201 public void stopMonitoring() {
202 stop.set(true);
203 }
204
205 /** Retrieve exception caught during monitoring.
206 * @return exception caught
207 */
208 public OrekitException getException() {
209 return exception.get();
210 }
211
212 /** {@inheritDoc} */
213 @Override
214 public void run() {
215
216 try {
217
218 final MessagesParser parser = type.getParser(extractUsedMessages());
219 int nbAttempts = 0;
220 double delay = reconnectDelay;
221 while (nbAttempts < maxRetries) {
222
223 try {
224 // prepare request
225 final HttpURLConnection connection = client.connect(mountPoint);
226 if (nmeaRequired) {
227 if (client.getGGA() == null) {
228 throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
229 } else {
230 // update NMEA GGA sentence in the extra headers for this mount point
231 connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
232 }
233 }
234
235 // perform request
236 final int responseCode = connection.getResponseCode();
237 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
238 throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
239 } else if (responseCode != HttpURLConnection.HTTP_OK) {
240 throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
241 connection.getURL().getHost(),
242 connection.getResponseMessage());
243 }
244
245 // for this request, we MUST get GNSS data
246 if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
247 throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
248 }
249
250 // data extraction loop
251 resetCircularBuffer();
252 try (InputStream is = connection.getInputStream()) {
253
254 for (int r = fillUp(is); r >= 0; r = fillUp(is)) {
255
256 // we have read something, reset reconnection attempts counters
257 nbAttempts = 0;
258 delay = reconnectDelay;
259
260 if (stop.get()) {
261 // stop monitoring immediately
262 // (returning closes the input stream automatically)
263 return;
264 }
265
266 while (bufferSize() >= 3) {
267 if (peekByte(0) != PREAMBLE) {
268 // we are out of synch with respect to frame structure
269 // drop the unknown byte
270 moveRead(1);
271 } else {
272 final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
273 if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
274 // check CRC
275 final int crc = (peekByte(PREAMBLE_SIZE + size) << 16) |
276 (peekByte(PREAMBLE_SIZE + size + 1) << 8) |
277 peekByte(PREAMBLE_SIZE + size + 2);
278 if (crc == computeCRC(PREAMBLE_SIZE + size)) {
279 // we have a complete and consistent frame
280 // we can extract the message it contains
281 messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
282 moveRead(PREAMBLE_SIZE);
283 start();
284 final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
285 if (message != null) {
286 storeAndNotify(message);
287 }
288 // jump to expected message end, in case the message was corrupted
289 // and parsing did not reach message end
290 readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
291 } else {
292 // CRC is not consistent, we are probably not really synched
293 // and the preamble byte was just a random byte
294 // we drop this single byte and continue looking for sync
295 moveRead(1);
296 }
297 } else {
298 // the frame is not complete, we need more data
299 break;
300 }
301 }
302 }
303
304 }
305
306 }
307 } catch (SocketTimeoutException ste) {
308 // ignore exception, it will be handled by reconnection attempt below
309 } catch (IOException ioe) {
310 throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
311 }
312
313 // manage reconnection
314 try {
315 Thread.sleep((int) FastMath.rint(delay * 1000));
316 } catch (InterruptedException ie) {
317 // Restore interrupted state...
318 Thread.currentThread().interrupt();
319 }
320 ++nbAttempts;
321 delay *= reconnectDelayFactor;
322
323 }
324
325 } catch (OrekitException oe) {
326 // store the exception so it can be retrieved by Ntrip client
327 exception.set(oe);
328 }
329
330 }
331
332 /** Store a parsed encoded message and notify observers.
333 * @param message parsed message
334 */
335 private void storeAndNotify(final ParsedMessage message) {
336 synchronized (observers) {
337
338 for (int typeCode : Arrays.asList(0, message.getTypeCode())) {
339
340 // store message
341 lastMessages.put(typeCode, message);
342
343 // notify observers
344 final List<MessageObserver> list = observers.get(typeCode);
345 if (list != null) {
346 for (final MessageObserver observer : list) {
347 // notify observer
348 observer.messageAvailable(mountPoint, message);
349 }
350 }
351
352 }
353
354 }
355 }
356
357 /** Reset the circular buffer.
358 */
359 private void resetCircularBuffer() {
360 buffer = new byte[BUFFER_SIZE];
361 readIndex = 0;
362 writeIndex = 0;
363 }
364
365 /** Extract data from input stream.
366 * @param is input stream to extract data from
367 * @return number of byes read or -1
368 * @throws IOException if data cannot be extracted properly
369 */
370 private int fillUp(final InputStream is) throws IOException {
371 final int max = bufferMaxWrite();
372 if (max == 0) {
373 // this should never happen
374 // the buffer is large enough for almost 16 encoded messages, including wrapping frame
375 throw new OrekitInternalError(null);
376 }
377 final int r = is.read(buffer, writeIndex, max);
378 if (r >= 0) {
379 writeIndex = (writeIndex + r) % BUFFER_SIZE;
380 }
381 return r;
382 }
383
384 /** {@inheritDoc} */
385 @Override
386 protected int fetchByte() {
387 if (readIndex == messageEndIndex || readIndex == writeIndex) {
388 return -1;
389 }
390
391 final int ret = buffer[readIndex] & 0xFF;
392 moveRead(1);
393 return ret;
394 }
395
396 /** Get the number of bytes currently in the buffer.
397 * @return number of bytes currently in the buffer
398 */
399 private int bufferSize() {
400 final int n = writeIndex - readIndex;
401 return n >= 0 ? n : BUFFER_SIZE + n;
402 }
403
404 /** Peek a buffer byte without moving read pointer.
405 * @param offset offset counted from read pointer
406 * @return value of the byte at given offset
407 */
408 private int peekByte(final int offset) {
409 return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
410 }
411
412 /** Move read pointer.
413 * @param n number of bytes to move read pointer
414 */
415 private void moveRead(final int n) {
416 readIndex = (readIndex + n) % BUFFER_SIZE;
417 }
418
419 /** Get the number of bytes that can be added to the buffer without wrapping around.
420 * @return number of bytes that can be added
421 */
422 private int bufferMaxWrite() {
423 if (writeIndex >= readIndex) {
424 return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
425 } else {
426 return readIndex - writeIndex - 1;
427 }
428 }
429
430 /** Compute QualCom CRC.
431 * @param length length of the byte stream
432 * @return QualCom CRC
433 */
434 private int computeCRC(final int length) {
435 int crc = 0;
436 for (int i = 0; i < length; ++i) {
437 crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
438 }
439 return crc;
440 }
441
442 private List<Integer> extractUsedMessages() {
443 synchronized (observers) {
444
445 // List of needed messages
446 final List<Integer> messages = new ArrayList<>();
447
448 // Loop on observers entries
449 for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
450 // Extract message type code
451 final int typeCode = entry.getKey();
452 // Add to the list
453 messages.add(typeCode);
454 }
455
456 return messages;
457 }
458 }
459
460 }