StreamMonitor.java

  1. /* Copyright 2002-2024 CS GROUP
  2.  * Licensed to CS GROUP (CS) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * CS licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *   http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package org.orekit.gnss.metric.ntrip;

  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.net.HttpURLConnection;
  21. import java.net.SocketTimeoutException;
  22. import java.net.URISyntaxException;
  23. import java.util.ArrayList;
  24. import java.util.Arrays;
  25. import java.util.HashMap;
  26. import java.util.List;
  27. import java.util.Map;
  28. import java.util.concurrent.atomic.AtomicBoolean;
  29. import java.util.concurrent.atomic.AtomicReference;

  30. import org.hipparchus.util.FastMath;
  31. import org.orekit.errors.OrekitException;
  32. import org.orekit.errors.OrekitInternalError;
  33. import org.orekit.errors.OrekitMessages;
  34. import org.orekit.gnss.metric.messages.ParsedMessage;
  35. import org.orekit.gnss.metric.parser.AbstractEncodedMessage;
  36. import org.orekit.gnss.metric.parser.MessagesParser;

  37. /** Monitor for retrieving streamed data from one mount point.
  38.  * @author Luc Maisonobe
  39.  * @since 11.0
  40.  */
  41. public class StreamMonitor extends AbstractEncodedMessage implements Runnable {

  42.     /** GGA header key. */
  43.     private static final String GGA_HEADER_KEY = "Ntrip-GGA";

  44.     /** Content type for GNSS data. */
  45.     private static final String GNSS_DATA_CONTENT_TYPE = "gnss/data";

  46.     /** Size of buffer for retrieving data. */
  47.     private static final int BUFFER_SIZE = 0x4000;

  48.     /** Frame preamble. */
  49.     private static final int PREAMBLE = 0xD3;

  50.     /** Frame preamble size. */
  51.     private static final int PREAMBLE_SIZE = 3;

  52.     /** Frame CRC size. */
  53.     private static final int CRC_SIZE = 3;

  54.     /** Generator polynomial for CRC. */
  55.     private static final int GENERATOR = 0x1864CFB;

  56.     /** High bit of the generator polynomial. */
  57.     private static final int HIGH = 0x1000000;

  58.     /** CRC 24Q lookup table. */
  59.     private static final int[] CRC_LOOKUP = new int[256];

  60.     static {

  61.         // set up lookup table
  62.         CRC_LOOKUP[0] = 0;
  63.         CRC_LOOKUP[1] = GENERATOR;

  64.         int h = GENERATOR;
  65.         for (int i = 2; i < 256; i <<= 1) {
  66.             h <<= 1;
  67.             if ((h & HIGH) != 0) {
  68.                 h ^= GENERATOR;
  69.             }
  70.             for (int j = 0; j < i; ++j) {
  71.                 CRC_LOOKUP[i + j] = CRC_LOOKUP[j] ^ h;
  72.             }
  73.         }

  74.     }

  75.     /** Associated NTRIP client. */
  76.     private final NtripClient client;

  77.     /** Mount point providing the stream. */
  78.     private final String mountPoint;

  79.     /** Messages type of the mount point. */
  80.     private final Type type;

  81.     /** Indicator for required NMEA. */
  82.     private final boolean nmeaRequired;

  83.     /** Indicator for ignoring unknown messages. */
  84.     private final boolean ignoreUnknownMessageTypes;

  85.     /** Delay before we reconnect after connection close. */
  86.     private final double reconnectDelay;

  87.     /** Multiplication factor for reconnection delay. */
  88.     private final double reconnectDelayFactor;

  89.     /** Max number of reconnections. */
  90.     private final int maxRetries;

  91.     /** Stop flag. */
  92.     private AtomicBoolean stop;

  93.     /** Circular buffer. */
  94.     private byte[] buffer;

  95.     /** Read index. */
  96.     private int readIndex;

  97.     /** Message end index. */
  98.     private int messageEndIndex;

  99.     /** Write index. */
  100.     private int writeIndex;

  101.     /** Observers for encoded messages. */
  102.     private final Map<Integer, List<MessageObserver>> observers;

  103.     /** Last available message for each type. */
  104.     private final Map<Integer, ParsedMessage> lastMessages;

  105.     /** Exception caught during monitoring. */
  106.     private final AtomicReference<OrekitException> exception;

  107.     /** Build a monitor for streaming data from a mount point.
  108.      * @param client associated NTRIP client
  109.      * @param mountPoint mount point providing the stream
  110.      * @param type messages type of the mount point
  111.      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
  112.      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
  113.      * @param reconnectDelay delay before we reconnect after connection close
  114.      * @param reconnectDelayFactor factor by which reconnection delay is multiplied after each attempt
  115.      * @param maxRetries max number of reconnect a attempts without reading any data
  116.      */
  117.     public StreamMonitor(final NtripClient client,
  118.                          final String mountPoint, final Type type,
  119.                          final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes,
  120.                          final double reconnectDelay, final double reconnectDelayFactor,
  121.                          final int maxRetries) {
  122.         this.client                    = client;
  123.         this.mountPoint                = mountPoint;
  124.         this.type                      = type;
  125.         this.nmeaRequired              = requiresNMEA;
  126.         this.ignoreUnknownMessageTypes = ignoreUnknownMessageTypes;
  127.         this.reconnectDelay            = reconnectDelay;
  128.         this.reconnectDelayFactor      = reconnectDelayFactor;
  129.         this.maxRetries                = maxRetries;
  130.         this.stop                      = new AtomicBoolean(false);
  131.         this.observers                 = new HashMap<>();
  132.         this.lastMessages              = new HashMap<>();
  133.         this.exception                 = new AtomicReference<OrekitException>(null);
  134.     }

  135.     /** Add an observer for encoded messages.
  136.      * <p>
  137.      * If messages of the specified type have already been retrieved from
  138.      * a stream, the observer will be immediately notified with the last
  139.      * message as a side effect of being added.
  140.      * </p>
  141.      * @param typeCode code for the message type (if set to 0, notification
  142.      * will be triggered regardless of message type)
  143.      * @param observer observer for this message type
  144.      */
  145.     public void addObserver(final int typeCode, final MessageObserver observer) {
  146.         synchronized (observers) {

  147.             // register the observer
  148.             List<MessageObserver> list = observers.get(typeCode);
  149.             if (list == null) {
  150.                 // create a new list the first time we register an observer for a message
  151.                 list =  new ArrayList<>();
  152.                 observers.put(typeCode, list);
  153.             }
  154.             list.add(observer);

  155.             // if we already have a message of the proper type
  156.             // immediately notify the new observer about it
  157.             final ParsedMessage last = lastMessages.get(typeCode);
  158.             if (last != null) {
  159.                 observer.messageAvailable(mountPoint, last);
  160.             }

  161.         }
  162.     }

  163.     /** Stop monitoring. */
  164.     public void stopMonitoring() {
  165.         stop.set(true);
  166.     }

  167.     /** Retrieve exception caught during monitoring.
  168.      * @return exception caught
  169.      */
  170.     public OrekitException getException() {
  171.         return exception.get();
  172.     }

  173.     /** {@inheritDoc} */
  174.     @Override
  175.     public void run() {

  176.         try {

  177.             final MessagesParser parser = type.getParser(extractUsedMessages());
  178.             int nbAttempts = 0;
  179.             double delay = reconnectDelay;
  180.             while (nbAttempts < maxRetries) {

  181.                 try {
  182.                     // prepare request
  183.                     final HttpURLConnection connection = client.connect(mountPoint);
  184.                     if (nmeaRequired) {
  185.                         if (client.getGGA() == null) {
  186.                             throw new OrekitException(OrekitMessages.STREAM_REQUIRES_NMEA_FIX, mountPoint);
  187.                         } else {
  188.                             // update NMEA GGA sentence in the extra headers for this mount point
  189.                             connection.setRequestProperty(GGA_HEADER_KEY, client.getGGA());
  190.                         }
  191.                     }

  192.                     // perform request
  193.                     final int responseCode = connection.getResponseCode();
  194.                     if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
  195.                         throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, mountPoint);
  196.                     } else if (responseCode != HttpURLConnection.HTTP_OK) {
  197.                         throw new OrekitException(OrekitMessages.CONNECTION_ERROR,
  198.                                                   connection.getURL().getHost(),
  199.                                                   connection.getResponseMessage());
  200.                     }

  201.                     // for this request, we MUST get GNSS data
  202.                     if (!GNSS_DATA_CONTENT_TYPE.equals(connection.getContentType())) {
  203.                         throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
  204.                     }

  205.                     // data extraction loop
  206.                     resetCircularBuffer();
  207.                     try (InputStream is = connection.getInputStream()) {

  208.                         for (int r = fillUp(is); r >= 0; r = fillUp(is)) {

  209.                             // we have read something, reset reconnection attempts counters
  210.                             nbAttempts = 0;
  211.                             delay      = reconnectDelay;

  212.                             if (stop.get()) {
  213.                                 // stop monitoring immediately
  214.                                 // (returning closes the input stream automatically)
  215.                                 return;
  216.                             }

  217.                             while (bufferSize() >= 3) {
  218.                                 if (peekByte(0) != PREAMBLE) {
  219.                                     // we are out of synch with respect to frame structure
  220.                                     // drop the unknown byte
  221.                                     moveRead(1);
  222.                                 } else {
  223.                                     final int size = (peekByte(1) & 0x03) << 8 | peekByte(2);
  224.                                     if (bufferSize() >= PREAMBLE_SIZE + size + CRC_SIZE) {
  225.                                         // check CRC
  226.                                         final int crc = (peekByte(PREAMBLE_SIZE + size)     << 16) |
  227.                                                         (peekByte(PREAMBLE_SIZE + size + 1) <<  8) |
  228.                                                          peekByte(PREAMBLE_SIZE + size + 2);
  229.                                         if (crc == computeCRC(PREAMBLE_SIZE + size)) {
  230.                                             // we have a complete and consistent frame
  231.                                             // we can extract the message it contains
  232.                                             messageEndIndex = (readIndex + PREAMBLE_SIZE + size) % BUFFER_SIZE;
  233.                                             moveRead(PREAMBLE_SIZE);
  234.                                             start();
  235.                                             final ParsedMessage message = parser.parse(this, ignoreUnknownMessageTypes);
  236.                                             if (message != null) {
  237.                                                 storeAndNotify(message);
  238.                                             }
  239.                                             // jump to expected message end, in case the message was corrupted
  240.                                             // and parsing did not reach message end
  241.                                             readIndex = (messageEndIndex + CRC_SIZE) % BUFFER_SIZE;
  242.                                         } else {
  243.                                             // CRC is not consistent, we are probably not really synched
  244.                                             // and the preamble byte was just a random byte
  245.                                             // we drop this single byte and continue looking for sync
  246.                                             moveRead(1);
  247.                                         }
  248.                                     } else {
  249.                                         // the frame is not complete, we need more data
  250.                                         break;
  251.                                     }
  252.                                 }
  253.                             }

  254.                         }

  255.                     }
  256.                 } catch (SocketTimeoutException ste) {
  257.                     // ignore exception, it will be handled by reconnection attempt below
  258.                 } catch (IOException | URISyntaxException e) {
  259.                     throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_GNSS_DATA, client.getHost());
  260.                 }

  261.                 // manage reconnection
  262.                 try {
  263.                     Thread.sleep((int) FastMath.rint(delay * 1000));
  264.                 } catch (InterruptedException ie) {
  265.                     // Restore interrupted state...
  266.                     Thread.currentThread().interrupt();
  267.                 }
  268.                 ++nbAttempts;
  269.                 delay *= reconnectDelayFactor;

  270.             }

  271.         } catch (OrekitException oe) {
  272.             // store the exception so it can be retrieved by Ntrip client
  273.             exception.set(oe);
  274.         }

  275.     }

  276.     /** Store a parsed encoded message and notify observers.
  277.      * @param message parsed message
  278.      */
  279.     private void storeAndNotify(final ParsedMessage message) {
  280.         synchronized (observers) {

  281.             for (int typeCode : Arrays.asList(0, message.getTypeCode())) {

  282.                 // store message
  283.                 lastMessages.put(typeCode, message);

  284.                 // notify observers
  285.                 final List<MessageObserver> list = observers.get(typeCode);
  286.                 if (list != null) {
  287.                     for (final MessageObserver observer : list) {
  288.                         // notify observer
  289.                         observer.messageAvailable(mountPoint, message);
  290.                     }
  291.                 }

  292.             }

  293.         }
  294.     }

  295.     /** Reset the circular buffer.
  296.      */
  297.     private void resetCircularBuffer() {
  298.         buffer     = new byte[BUFFER_SIZE];
  299.         readIndex  = 0;
  300.         writeIndex = 0;
  301.     }

  302.     /** Extract data from input stream.
  303.      * @param is input stream to extract data from
  304.      * @return number of byes read or -1
  305.      * @throws IOException if data cannot be extracted properly
  306.      */
  307.     private int fillUp(final InputStream is) throws IOException {
  308.         final int max = bufferMaxWrite();
  309.         if (max == 0) {
  310.             // this should never happen
  311.             // the buffer is large enough for almost 16 encoded messages, including wrapping frame
  312.             throw new OrekitInternalError(null);
  313.         }
  314.         final int r = is.read(buffer, writeIndex, max);
  315.         if (r >= 0) {
  316.             writeIndex = (writeIndex + r) % BUFFER_SIZE;
  317.         }
  318.         return r;
  319.     }

  320.     /** {@inheritDoc} */
  321.     @Override
  322.     protected int fetchByte() {
  323.         if (readIndex == messageEndIndex || readIndex == writeIndex) {
  324.             return -1;
  325.         }

  326.         final int ret = buffer[readIndex] & 0xFF;
  327.         moveRead(1);
  328.         return ret;
  329.     }

  330.     /** Get the number of bytes currently in the buffer.
  331.      * @return number of bytes currently in the buffer
  332.      */
  333.     private int bufferSize() {
  334.         final int n = writeIndex - readIndex;
  335.         return n >= 0 ? n : BUFFER_SIZE + n;
  336.     }

  337.     /** Peek a buffer byte without moving read pointer.
  338.      * @param offset offset counted from read pointer
  339.      * @return value of the byte at given offset
  340.      */
  341.     private int peekByte(final int offset) {
  342.         return buffer[(readIndex + offset) % BUFFER_SIZE] & 0xFF;
  343.     }

  344.     /** Move read pointer.
  345.      * @param n number of bytes to move read pointer
  346.      */
  347.     private void moveRead(final int n) {
  348.         readIndex = (readIndex + n) % BUFFER_SIZE;
  349.     }

  350.     /** Get the number of bytes that can be added to the buffer without wrapping around.
  351.      * @return number of bytes that can be added
  352.      */
  353.     private int bufferMaxWrite() {
  354.         if (writeIndex >= readIndex) {
  355.             return (readIndex == 0 ? BUFFER_SIZE - 1 : BUFFER_SIZE) - writeIndex;
  356.         } else {
  357.             return readIndex - writeIndex - 1;
  358.         }
  359.     }

  360.     /** Compute QualCom CRC.
  361.      * @param length length of the byte stream
  362.      * @return QualCom CRC
  363.      */
  364.     private int computeCRC(final int length) {
  365.         int crc = 0;
  366.         for (int i = 0; i < length; ++i) {
  367.             crc = ((crc << 8) ^ CRC_LOOKUP[peekByte(i) ^ (crc >>> 16)]) & (HIGH - 1);
  368.         }
  369.         return crc;
  370.     }

  371.     /** Extract the used messages.
  372.      * @return the extracted messages
  373.      */
  374.     private List<Integer> extractUsedMessages() {
  375.         synchronized (observers) {

  376.             // List of needed messages
  377.             final List<Integer> messages = new ArrayList<>();

  378.             // Loop on observers entries
  379.             for (Map.Entry<Integer, List<MessageObserver>> entry : observers.entrySet()) {
  380.                 // Extract message type code
  381.                 final int typeCode = entry.getKey();
  382.                 // Add to the list
  383.                 messages.add(typeCode);
  384.             }

  385.             return messages;
  386.         }
  387.     }

  388. }