1   /* Copyright 2002-2025 CS GROUP
2    * Licensed to CS GROUP (CS) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * CS licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.orekit.gnss.metric.ntrip;
18  
19  import java.io.BufferedReader;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.InputStreamReader;
23  import java.net.Authenticator;
24  import java.net.HttpURLConnection;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.Proxy;
28  import java.net.Proxy.Type;
29  import java.net.SocketAddress;
30  import java.net.URI;
31  import java.net.URISyntaxException;
32  import java.net.URL;
33  import java.net.URLConnection;
34  import java.net.UnknownHostException;
35  import java.nio.charset.StandardCharsets;
36  import java.util.ArrayList;
37  import java.util.Formatter;
38  import java.util.HashMap;
39  import java.util.List;
40  import java.util.Locale;
41  import java.util.Map;
42  import java.util.concurrent.ExecutorService;
43  import java.util.concurrent.Executors;
44  import java.util.concurrent.TimeUnit;
45  import java.util.concurrent.atomic.AtomicReference;
46  
47  import org.hipparchus.util.FastMath;
48  import org.orekit.errors.OrekitException;
49  import org.orekit.errors.OrekitMessages;
50  import org.orekit.gnss.metric.messages.ParsedMessage;
51  import org.orekit.time.TimeScales;
52  
53  /** Source table for ntrip streams retrieval.
54   * <p>
55   * Note that all authentication is performed automatically by just
56   * calling the standard {@link Authenticator#setDefault(Authenticator)}
57   * method to set up an authenticator.
58   * </p>
59   * @author Luc Maisonobe
60   * @since 11.0
61   */
62  public class NtripClient {
63  
64      /** Default timeout for connections and reads (ms). */
65      public static final int DEFAULT_TIMEOUT = 10000;
66  
67      /** Default port for ntrip communication. */
68      public static final int DEFAULT_PORT = 2101;
69  
70      /** Default delay before we reconnect after connection close (s). */
71      public static final double DEFAULT_RECONNECT_DELAY = 1.0;
72  
73      /** Default factor by which reconnection delay is multiplied after each attempt. */
74      public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
75  
76      /** Default maximum number of reconnect a attempts without readin any data. */
77      public static final int DEFAULT_MAX_RECONNECT = 20;
78  
79      /** Host header. */
80      private static final String HOST_HEADER_KEY = "Host";
81  
82      /** User-agent header key. */
83      private static final String USER_AGENT_HEADER_KEY = "User-Agent";
84  
85      /** User-agent header value. */
86      private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
87  
88      /** Version header key. */
89      private static final String VERSION_HEADER_KEY = "Ntrip-Version";
90  
91      /** Version header value. */
92      private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
93  
94      /** Connection header key. */
95      private static final String CONNECTION_HEADER_KEY = "Connection";
96  
97      /** Connection header value. */
98      private static final String CONNECTION_HEADER_VALUE = "close";
99  
100     /** Flags header key. */
101     private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
102 
103     /** Content type for source table. */
104     private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
105 
106     /** Degrees to arc minutes conversion factor. */
107     private static final double DEG_TO_MINUTES = 60.0;
108 
109     /** Caster host. */
110     private final String host;
111 
112     /** Caster port. */
113     private final int port;
114 
115     /** Delay before we reconnect after connection close. */
116     private double reconnectDelay;
117 
118     /** Multiplication factor for reconnection delay. */
119     private double reconnectDelayFactor;
120 
121     /** Max number of reconnections. */
122     private int maxRetries;
123 
124     /** Timeout for connections and reads. */
125     private int timeout;
126 
127     /** Proxy to use. */
128     private Proxy proxy;
129 
130     /** NMEA GGA sentence (may be null). */
131     private AtomicReference<String> gga;
132 
133     /** Observers for encoded messages. */
134     private final List<ObserverHolder> observers;
135 
136     /** Monitors for data streams. */
137     private final Map<String, StreamMonitor> monitors;
138 
139     /** Source table. */
140     private SourceTable sourceTable;
141 
142     /** Executor for stream monitoring tasks. */
143     private ExecutorService executorService;
144 
145     /** Known time scales.
146      * @since 13.0
147      */
148     private final TimeScales timeScales;
149 
150     /** Build a client for NTRIP.
151      * <p>
152      * The default configuration uses default timeout, default reconnection
153      * parameters, no GPS fix and no proxy.
154      * </p>
155      * @param host caster host providing the source table
156      * @param port port to use for connection
157      * @param timeScales known time scales
158      * @since 13.0
159      * see {@link #DEFAULT_PORT}
160      */
161     public NtripClient(final String host, final int port, final TimeScales timeScales) {
162         this.host         = host;
163         this.port         = port;
164         this.observers    = new ArrayList<>();
165         this.monitors     = new HashMap<>();
166         setTimeout(DEFAULT_TIMEOUT);
167         setReconnectParameters(DEFAULT_RECONNECT_DELAY,
168                                DEFAULT_RECONNECT_DELAY_FACTOR,
169                                DEFAULT_MAX_RECONNECT);
170         setProxy(Type.DIRECT, null, -1);
171         this.gga             = new AtomicReference<>(null);
172         this.sourceTable     = null;
173         this.executorService = null;
174         this.timeScales      = timeScales;
175     }
176 
177     /** Get the caster host.
178      * @return caster host
179      */
180     public String getHost() {
181         return host;
182     }
183 
184     /** Get the port to use for connection.
185      * @return port to use for connection
186      */
187     public int getPort() {
188         return port;
189     }
190 
191     /** Get the known time scales.
192      * @return known time scales
193      * @since 13.0
194      */
195     public TimeScales getTimeScales() {
196         return timeScales;
197     }
198 
199     /** Set timeout for connections and reads.
200      * @param timeout timeout for connections and reads (ms)
201      */
202     public void setTimeout(final int timeout) {
203         this.timeout = timeout;
204     }
205 
206     /** Set Reconnect parameters.
207      * @param delay delay before we reconnect after connection close
208      * @param delayFactor factor by which reconnection delay is multiplied after each attempt
209      * @param max max number of reconnect a attempts without reading any data
210      */
211     public void setReconnectParameters(final double delay,
212                                        final double delayFactor,
213                                        final int max) {
214         this.reconnectDelay       = delay;
215         this.reconnectDelayFactor = delayFactor;
216         this.maxRetries           = max;
217     }
218 
219     /** Set proxy parameters.
220      * @param type proxy type
221      * @param proxyHost host name of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
222      * @param proxyPort port number of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
223      */
224     public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
225         try {
226             if (type == Proxy.Type.DIRECT) {
227                 // disable proxy
228                 proxy = Proxy.NO_PROXY;
229             } else {
230                 // enable proxy
231                 final InetAddress   hostAddress  = InetAddress.getByName(proxyHost);
232                 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
233                 proxy = new Proxy(type, proxyAddress);
234             }
235         } catch (UnknownHostException uhe) {
236             throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
237         }
238     }
239 
240     /** Get proxy.
241      * @return proxy to use
242      */
243     public Proxy getProxy() {
244         return proxy;
245     }
246 
247     /** Set GPS fix data to send as NMEA sentence to Ntrip caster if required.
248      * @param hour hour of the fix (UTC time)
249      * @param minute minute of the fix (UTC time)
250      * @param second second of the fix (UTC time)
251      * @param latitude latitude (radians)
252      * @param longitude longitude (radians)
253      * @param ellAltitude altitude above ellipsoid (m)
254      * @param undulation height of the geoid above ellipsoid (m)
255      */
256     public void setFix(final int hour, final int minute, final double second,
257                        final double latitude, final double longitude, final double ellAltitude,
258                        final double undulation) {
259 
260         // convert latitude
261         final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
262         final int    dLat   = (int) FastMath.floor(latDeg);
263         final double mLat   = DEG_TO_MINUTES * (latDeg - dLat);
264         final char   cLat   = latitude >= 0.0 ? 'N' : 'S';
265 
266         // convert longitude
267         final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
268         final int    dLon   = (int) FastMath.floor(lonDeg);
269         final double mLon   = DEG_TO_MINUTES * (lonDeg - dLon);
270         final char   cLon   = longitude >= 0.0 ? 'E' : 'W';
271 
272         // build NMEA GGA sentence
273         final StringBuilder builder = new StringBuilder(82);
274         try (Formatter formatter = new Formatter(builder, Locale.US)) {
275 
276             // dummy values
277             final int    fixQuality = 1;
278             final int    nbSat      = 4;
279             final double hdop       = 1.0;
280 
281             // sentence body
282             formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
283                              hour, minute, second,
284                              dLat, mLat, cLat, dLon, mLon, cLon,
285                              fixQuality, nbSat, hdop,
286                              ellAltitude, undulation);
287 
288             // checksum
289             byte sum = 0;
290             for (int i = 1; i < builder.length(); ++i) {
291                 sum ^= builder.charAt(i);
292             }
293             formatter.format("*%02X", sum);
294 
295         }
296         gga.set(builder.toString());
297 
298     }
299 
300     /** Get NMEA GGA sentence.
301      * @return NMEA GGA sentence (may be null)
302      */
303     String getGGA() {
304         return gga.get();
305     }
306 
307     /** Add an observer for an encoded messages.
308      * <p>
309      * If messages of the specified type have already been retrieved from
310      * a stream, the observer will be immediately notified with the last
311      * message from each mount point (in unspecified order) as a side effect
312      * of being added.
313      * </p>
314      * @param typeCode code for the message type (if set to 0, notification
315      * will be triggered regardless of message type)
316      * @param mountPoint mountPoint from which data must come (if null, notification
317      * will be triggered regardless of mount point)
318      * @param observer observer for this message type
319      */
320     public void addObserver(final int typeCode, final String mountPoint,
321                             final MessageObserver observer) {
322 
323         // store the observer for future monitored mount points
324         observers.add(new ObserverHolder(typeCode, mountPoint, observer));
325 
326         // check if we should also add it to already monitored mount points
327         for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
328             if (mountPoint == null || mountPoint.equals(entry.getKey())) {
329                 entry.getValue().addObserver(typeCode, observer);
330             }
331         }
332 
333     }
334 
335     /** Get a sourcetable.
336      * @return source table from the caster
337      */
338     public SourceTable getSourceTable() {
339         if (sourceTable == null) {
340             try {
341 
342                 // perform request
343                 final HttpURLConnection connection = connect("");
344 
345                 final int responseCode = connection.getResponseCode();
346                 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
347                     throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
348                 } else if (responseCode != HttpURLConnection.HTTP_OK) {
349                     throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
350                 }
351 
352                 // for this request, we MUST get a source table
353                 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
354                     throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
355                 }
356 
357                 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
358 
359                 // parse source table records
360                 try (InputStream is = connection.getInputStream();
361                      InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
362                      BufferedReader br = new BufferedReader(isr)) {
363                     int lineNumber = 0;
364                     for (String line = br.readLine(); line != null; line = br.readLine()) {
365 
366                         ++lineNumber;
367                         line = line.trim();
368                         if (line.length() == 0) {
369                             continue;
370                         }
371 
372                         if (line.startsWith(RecordType.CAS.toString())) {
373                             table.addCasterRecord(new CasterRecord(line));
374                         } else if (line.startsWith(RecordType.NET.toString())) {
375                             table.addNetworkRecord(new NetworkRecord(line));
376                         } else if (line.startsWith(RecordType.STR.toString())) {
377                             table.addDataStreamRecord(new DataStreamRecord(line));
378                         } else if (line.startsWith("ENDSOURCETABLE")) {
379                             // we have reached end of table
380                             break;
381                         } else {
382                             throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
383                                                       connection.getURL().getHost(), lineNumber, line);
384                         }
385 
386                     }
387                 }
388 
389                 sourceTable = table;
390                 return table;
391 
392             } catch (IOException | URISyntaxException e) {
393                 throw new OrekitException(e, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
394             }
395         }
396 
397         return sourceTable;
398 
399     }
400 
401     /** Connect to a mount point and start streaming data from it.
402      * <p>
403      * This method sets up an internal dedicated thread for continuously
404      * monitoring data incoming from a mount point. When new complete
405      * {@link ParsedMessage parsed messages} becomes available, the
406      * {@link MessageObserver observers} that have been registered
407      * using {@link #addObserver(int, String, MessageObserver) addObserver()}
408      * method will be notified about the message.
409      * </p>
410      * <p>
411      * This method must be called once for each stream to monitor.
412      * </p>
413      * @param mountPoint mount point providing the stream
414      * @param type messages type of the mount point
415      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
416      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
417      */
418     public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
419                                final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
420 
421         if (executorService == null) {
422             // lazy creation of executor service, with one thread for each possible data stream
423             executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
424         }
425 
426         // safety check
427         if (monitors.containsKey(mountPoint)) {
428             throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
429         }
430 
431         // create the monitor
432         final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
433                                                         reconnectDelay, reconnectDelayFactor, maxRetries);
434         monitors.put(mountPoint, monitor);
435 
436         // set up the already known observers
437         for (final ObserverHolder observerHolder : observers) {
438             if (observerHolder.mountPoint == null ||
439                 observerHolder.mountPoint.equals(mountPoint)) {
440                 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
441             }
442         }
443 
444         // start streaming data
445         executorService.execute(monitor);
446 
447     }
448 
449     /** Check if any of the streaming thread has thrown an exception.
450      * <p>
451      * If a streaming thread has thrown an exception, it will be rethrown here
452      * </p>
453      */
454     public void checkException() {
455         // check if any of the stream got an exception
456         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
457             final OrekitException exception = entry.getValue().getException();
458             if (exception != null) {
459                 throw exception;
460             }
461         }
462     }
463 
464     /** Stop streaming data from all connected mount points.
465      * <p>
466      * If an exception was encountered during data streaming, it will be rethrown here
467      * </p>
468      * @param time timeout for waiting underlying threads termination (ms)
469      */
470     public void stopStreaming(final int time) {
471 
472         // ask all monitors to stop retrieving data
473         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
474             entry.getValue().stopMonitoring();
475         }
476 
477         try {
478             // wait for proper ending
479             executorService.shutdown();
480             executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
481         } catch (InterruptedException ie) {
482             // Restore interrupted state...
483             Thread.currentThread().interrupt();
484         }
485 
486         checkException();
487 
488     }
489 
490     /** Connect to caster.
491      * @param mountPoint mount point (empty for getting sourcetable)
492      * @return performed connection
493      * @throws IOException if an I/O exception occurs during connection
494      * @throws URISyntaxException if the built URI is invalid
495      */
496     HttpURLConnection connect(final String mountPoint)
497         throws IOException, URISyntaxException {
498 
499         // set up connection
500         final String scheme = "http";
501         final URL casterURL = new URI(scheme, null, host, port, "/" + mountPoint, null, null).toURL();
502         final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
503         connection.setConnectTimeout(timeout);
504         connection.setReadTimeout(timeout);
505 
506         // common headers
507         connection.setRequestProperty(HOST_HEADER_KEY,       host);
508         connection.setRequestProperty(VERSION_HEADER_KEY,    VERSION_HEADER_VALUE);
509         connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
510         connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
511 
512         return connection;
513 
514     }
515 
516     /** Get an header from a response.
517      * @param connection connection to analyze
518      * @param key header key
519      * @return header value
520      */
521     private String getHeaderValue(final URLConnection connection, final String key) {
522         final String value = connection.getHeaderField(key);
523         if (value == null) {
524             throw new OrekitException(OrekitMessages.MISSING_HEADER,
525                                       connection.getURL().getHost(), key);
526         }
527         return value;
528     }
529 
530     /** Local holder for observers. */
531     private static class ObserverHolder {
532 
533         /** Code for the message type. */
534         private final int typeCode;
535 
536         /** Mount point. */
537         private final String mountPoint;
538 
539         /** Observer to notify. */
540         private final MessageObserver observer;
541 
542         /** Simple constructor.
543          * @param typeCode code for the message type
544          * @param mountPoint mountPoint from which data must come (if null, notification
545          * will be triggered regardless of mount point)
546          * @param observer observer for this message type
547          */
548         ObserverHolder(final int typeCode, final String mountPoint,
549                             final MessageObserver observer) {
550             this.typeCode   = typeCode;
551             this.mountPoint = mountPoint;
552             this.observer   = observer;
553         }
554 
555     }
556 
557 }