1   /* Copyright 2002-2022 CS GROUP
2    * Licensed to CS GROUP (CS) under one or more
3    * contributor license agreements.  See the NOTICE file distributed with
4    * this work for additional information regarding copyright ownership.
5    * CS licenses this file to You under the Apache License, Version 2.0
6    * (the "License"); you may not use this file except in compliance with
7    * the License.  You may obtain a copy of the License at
8    *
9    *   http://www.apache.org/licenses/LICENSE-2.0
10   *
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS,
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14   * See the License for the specific language governing permissions and
15   * limitations under the License.
16   */
17  package org.orekit.gnss.metric.ntrip;
18  
19  import java.io.BufferedReader;
20  import java.io.IOException;
21  import java.io.InputStream;
22  import java.io.InputStreamReader;
23  import java.net.Authenticator;
24  import java.net.HttpURLConnection;
25  import java.net.InetAddress;
26  import java.net.InetSocketAddress;
27  import java.net.Proxy;
28  import java.net.Proxy.Type;
29  import java.net.SocketAddress;
30  import java.net.URL;
31  import java.net.URLConnection;
32  import java.net.UnknownHostException;
33  import java.nio.charset.StandardCharsets;
34  import java.util.ArrayList;
35  import java.util.Formatter;
36  import java.util.HashMap;
37  import java.util.List;
38  import java.util.Locale;
39  import java.util.Map;
40  import java.util.concurrent.ExecutorService;
41  import java.util.concurrent.Executors;
42  import java.util.concurrent.TimeUnit;
43  import java.util.concurrent.atomic.AtomicReference;
44  
45  import org.hipparchus.util.FastMath;
46  import org.orekit.errors.OrekitException;
47  import org.orekit.errors.OrekitMessages;
48  import org.orekit.gnss.metric.messages.ParsedMessage;
49  
50  /** Source table for ntrip streams retrieval.
51   * <p>
52   * Note that all authentication is performed automatically by just
53   * calling the standard {@link Authenticator#setDefault(Authenticator)}
54   * method to set up an authenticator.
55   * </p>
56   * @author Luc Maisonobe
57   * @since 11.0
58   */
59  public class NtripClient {
60  
61      /** Default timeout for connections and reads (ms). */
62      public static final int DEFAULT_TIMEOUT = 10000;
63  
64      /** Default port for ntrip communication. */
65      public static final int DEFAULT_PORT = 2101;
66  
67      /** Default delay before we reconnect after connection close (s). */
68      public static final double DEFAULT_RECONNECT_DELAY = 1.0;
69  
70      /** Default factor by which reconnection delay is multiplied after each attempt. */
71      public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
72  
73      /** Default maximum number of reconnect a attempts without readin any data. */
74      public static final int DEFAULT_MAX_RECONNECT = 20;
75  
76      /** Host header. */
77      private static final String HOST_HEADER_KEY = "Host";
78  
79      /** User-agent header key. */
80      private static final String USER_AGENT_HEADER_KEY = "User-Agent";
81  
82      /** User-agent header value. */
83      private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
84  
85      /** Version header key. */
86      private static final String VERSION_HEADER_KEY = "Ntrip-Version";
87  
88      /** Version header value. */
89      private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
90  
91      /** Connection header key. */
92      private static final String CONNECTION_HEADER_KEY = "Connection";
93  
94      /** Connection header value. */
95      private static final String CONNECTION_HEADER_VALUE = "close";
96  
97      /** Flags header key. */
98      private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
99  
100     /** Content type for source table. */
101     private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
102 
103     /** Degrees to arc minutes conversion factor. */
104     private static final double DEG_TO_MINUTES = 60.0;
105 
106     /** Caster host. */
107     private final String host;
108 
109     /** Caster port. */
110     private final int port;
111 
112     /** Delay before we reconnect after connection close. */
113     private double reconnectDelay;
114 
115     /** Multiplication factor for reconnection delay. */
116     private double reconnectDelayFactor;
117 
118     /** Max number of reconnections. */
119     private int maxRetries;
120 
121     /** Timeout for connections and reads. */
122     private int timeout;
123 
124     /** Proxy to use. */
125     private Proxy proxy;
126 
127     /** NMEA GGA sentence (may be null). */
128     private AtomicReference<String> gga;
129 
130     /** Observers for encoded messages. */
131     private final List<ObserverHolder> observers;
132 
133     /** Monitors for data streams. */
134     private final Map<String, StreamMonitor> monitors;
135 
136     /** Source table. */
137     private SourceTable sourceTable;
138 
139     /** Executor for stream monitoring tasks. */
140     private ExecutorService executorService;
141 
142     /** Build a client for NTRIP.
143      * <p>
144      * The default configuration uses default timeout, default reconnection
145      * parameters, no GPS fix and no proxy.
146      * </p>
147      * @param host caster host providing the source table
148      * @param port port to use for connection
149      * see {@link #DEFAULT_PORT}
150      */
151     public NtripClient(final String host, final int port) {
152         this.host         = host;
153         this.port         = port;
154         this.observers    = new ArrayList<>();
155         this.monitors     = new HashMap<>();
156         setTimeout(DEFAULT_TIMEOUT);
157         setReconnectParameters(DEFAULT_RECONNECT_DELAY,
158                                DEFAULT_RECONNECT_DELAY_FACTOR,
159                                DEFAULT_MAX_RECONNECT);
160         setProxy(Type.DIRECT, null, -1);
161         this.gga             = new AtomicReference<String>(null);
162         this.sourceTable     = null;
163         this.executorService = null;
164     }
165 
166     /** Get the caster host.
167      * @return caster host
168      */
169     public String getHost() {
170         return host;
171     }
172 
173     /** Get the port to use for connection.
174      * @return port to use for connection
175      */
176     public int getPort() {
177         return port;
178     }
179 
180     /** Set timeout for connections and reads.
181      * @param timeout timeout for connections and reads (ms)
182      */
183     public void setTimeout(final int timeout) {
184         this.timeout = timeout;
185     }
186 
187     /** Set Reconnect parameters.
188      * @param delay delay before we reconnect after connection close
189      * @param delayFactor factor by which reconnection delay is multiplied after each attempt
190      * @param max max number of reconnect a attempts without reading any data
191      */
192     public void setReconnectParameters(final double delay,
193                                        final double delayFactor,
194                                        final int max) {
195         this.reconnectDelay       = delay;
196         this.reconnectDelayFactor = delayFactor;
197         this.maxRetries           = max;
198     }
199 
200     /** Set proxy parameters.
201      * @param type proxy type
202      * @param proxyHost host name of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
203      * @param proxyPort port number of the proxy (ignored if {@code type} is {@code Proxy.Type.DIRECT})
204      */
205     public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
206         try {
207             if (type == Proxy.Type.DIRECT) {
208                 // disable proxy
209                 proxy = Proxy.NO_PROXY;
210             } else {
211                 // enable proxy
212                 final InetAddress   hostAddress  = InetAddress.getByName(proxyHost);
213                 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
214                 proxy = new Proxy(type, proxyAddress);
215             }
216         } catch (UnknownHostException uhe) {
217             throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
218         }
219     }
220 
221     /** Get proxy.
222      * @return proxy to use
223      */
224     public Proxy getProxy() {
225         return proxy;
226     }
227 
228     /** Set GPS fix data to send as NMEA sentence to Ntrip caster if required.
229      * @param hour hour of the fix (UTC time)
230      * @param minute minute of the fix (UTC time)
231      * @param second second of the fix (UTC time)
232      * @param latitude latitude (radians)
233      * @param longitude longitude (radians)
234      * @param ellAltitude altitude above ellipsoid (m)
235      * @param undulation height of the geoid above ellipsoid (m)
236      */
237     public void setFix(final int hour, final int minute, final double second,
238                        final double latitude, final double longitude, final double ellAltitude,
239                        final double undulation) {
240 
241         // convert latitude
242         final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
243         final int    dLat   = (int) FastMath.floor(latDeg);
244         final double mLat   = DEG_TO_MINUTES * (latDeg - dLat);
245         final char   cLat   = latitude >= 0.0 ? 'N' : 'S';
246 
247         // convert longitude
248         final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
249         final int    dLon   = (int) FastMath.floor(lonDeg);
250         final double mLon   = DEG_TO_MINUTES * (lonDeg - dLon);
251         final char   cLon   = longitude >= 0.0 ? 'E' : 'W';
252 
253         // build NMEA GGA sentence
254         final StringBuilder builder = new StringBuilder(82);
255         try (Formatter formatter = new Formatter(builder, Locale.US)) {
256 
257             // dummy values
258             final int    fixQuality = 1;
259             final int    nbSat      = 4;
260             final double hdop       = 1.0;
261 
262             // sentence body
263             formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
264                              hour, minute, second,
265                              dLat, mLat, cLat, dLon, mLon, cLon,
266                              fixQuality, nbSat, hdop,
267                              ellAltitude, undulation);
268 
269             // checksum
270             byte sum = 0;
271             for (int i = 1; i < builder.length(); ++i) {
272                 sum ^= builder.charAt(i);
273             }
274             formatter.format("*%02X", sum);
275 
276         }
277         gga.set(builder.toString());
278 
279     }
280 
281     /** Get NMEA GGA sentence.
282      * @return NMEA GGA sentence (may be null)
283      */
284     String getGGA() {
285         return gga.get();
286     }
287 
288     /** Add an observer for an encoded messages.
289      * <p>
290      * If messages of the specified type have already been retrieved from
291      * a stream, the observer will be immediately notified with the last
292      * message from each mount point (in unspecified order) as a side effect
293      * of being added.
294      * </p>
295      * @param typeCode code for the message type (if set to 0, notification
296      * will be triggered regardless of message type)
297      * @param mountPoint mountPoint from which data must come (if null, notification
298      * will be triggered regardless of mount point)
299      * @param observer observer for this message type
300      */
301     public void addObserver(final int typeCode, final String mountPoint,
302                             final MessageObserver observer) {
303 
304         // store the observer for future monitored mount points
305         observers.add(new ObserverHolder(typeCode, mountPoint, observer));
306 
307         // check if we should also add it to already monitored mount points
308         for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
309             if (mountPoint == null || mountPoint.equals(entry.getKey())) {
310                 entry.getValue().addObserver(typeCode, observer);
311             }
312         }
313 
314     }
315 
316     /** Get a sourcetable.
317      * @return source table from the caster
318      */
319     public SourceTable getSourceTable() {
320         if (sourceTable == null) {
321             try {
322 
323                 // perform request
324                 final HttpURLConnection connection = connect("");
325 
326                 final int responseCode = connection.getResponseCode();
327                 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
328                     throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
329                 } else if (responseCode != HttpURLConnection.HTTP_OK) {
330                     throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
331                 }
332 
333                 // for this request, we MUST get a source table
334                 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
335                     throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
336                 }
337 
338                 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
339 
340                 // parse source table records
341                 try (InputStream is = connection.getInputStream();
342                      InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
343                      BufferedReader br = new BufferedReader(isr)) {
344                     int lineNumber = 0;
345                     for (String line = br.readLine(); line != null; line = br.readLine()) {
346 
347                         ++lineNumber;
348                         line = line.trim();
349                         if (line.length() == 0) {
350                             continue;
351                         }
352 
353                         if (line.startsWith(RecordType.CAS.toString())) {
354                             table.addCasterRecord(new CasterRecord(line));
355                         } else if (line.startsWith(RecordType.NET.toString())) {
356                             table.addNetworkRecord(new NetworkRecord(line));
357                         } else if (line.startsWith(RecordType.STR.toString())) {
358                             table.addDataStreamRecord(new DataStreamRecord(line));
359                         } else if (line.startsWith("ENDSOURCETABLE")) {
360                             // we have reached end of table
361                             break;
362                         } else {
363                             throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
364                                                       connection.getURL().getHost(), lineNumber, line);
365                         }
366 
367                     }
368                 }
369 
370                 sourceTable = table;
371                 return table;
372 
373             } catch (IOException ioe) {
374                 throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
375             }
376         }
377 
378         return sourceTable;
379 
380     }
381 
382     /** Connect to a mount point and start streaming data from it.
383      * <p>
384      * This method sets up an internal dedicated thread for continuously
385      * monitoring data incoming from a mount point. When new complete
386      * {@link ParsedMessage parsed messages} becomes available, the
387      * {@link MessageObserver observers} that have been registered
388      * using {@link #addObserver(int, String, MessageObserver) addObserver()}
389      * method will be notified about the message.
390      * </p>
391      * <p>
392      * This method must be called once for each stream to monitor.
393      * </p>
394      * @param mountPoint mount point providing the stream
395      * @param type messages type of the mount point
396      * @param requiresNMEA if true, the mount point requires a NMEA GGA sentence in the request
397      * @param ignoreUnknownMessageTypes if true, unknown messages types are silently ignored
398      */
399     public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
400                                final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
401 
402         if (executorService == null) {
403             // lazy creation of executor service, with one thread for each possible data stream
404             executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
405         }
406 
407         // safety check
408         if (monitors.containsKey(mountPoint)) {
409             throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
410         }
411 
412         // create the monitor
413         final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
414                                                         reconnectDelay, reconnectDelayFactor, maxRetries);
415         monitors.put(mountPoint, monitor);
416 
417         // set up the already known observers
418         for (final ObserverHolder observerHolder : observers) {
419             if (observerHolder.mountPoint == null ||
420                 observerHolder.mountPoint.equals(mountPoint)) {
421                 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
422             }
423         }
424 
425         // start streaming data
426         executorService.execute(monitor);
427 
428     }
429 
430     /** Check if any of the streaming thread has thrown an exception.
431      * <p>
432      * If a streaming thread has thrown an exception, it will be rethrown here
433      * </p>
434      */
435     public void checkException() {
436         // check if any of the stream got an exception
437         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
438             final OrekitException exception = entry.getValue().getException();
439             if (exception != null) {
440                 throw exception;
441             }
442         }
443     }
444 
445     /** Stop streaming data from all connected mount points.
446      * <p>
447      * If an exception was encountered during data streaming, it will be rethrown here
448      * </p>
449      * @param time timeout for waiting underlying threads termination (ms)
450      */
451     public void stopStreaming(final int time) {
452 
453         // ask all monitors to stop retrieving data
454         for (final  Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
455             entry.getValue().stopMonitoring();
456         }
457 
458         try {
459             // wait for proper ending
460             executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
461         } catch (InterruptedException ie) {
462             // Restore interrupted state...
463             Thread.currentThread().interrupt();
464         }
465 
466         checkException();
467 
468     }
469 
470     /** Connect to caster.
471      * @param mountPoint mount point (empty for getting sourcetable)
472      * @return performed connection
473      * @throws IOException if an I/O exception occurs during connection
474      */
475     HttpURLConnection connect(final String mountPoint)
476         throws IOException {
477 
478         // set up connection
479         final String protocol = "http";
480         final URL casterURL = new URL(protocol, host, port, "/" + mountPoint);
481         final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
482         connection.setConnectTimeout(timeout);
483         connection.setReadTimeout(timeout);
484 
485         // common headers
486         connection.setRequestProperty(HOST_HEADER_KEY,       host);
487         connection.setRequestProperty(VERSION_HEADER_KEY,    VERSION_HEADER_VALUE);
488         connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
489         connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
490 
491         return connection;
492 
493     }
494 
495     /** Get an header from a response.
496      * @param connection connection to analyze
497      * @param key header key
498      * @return header value
499      */
500     private String getHeaderValue(final URLConnection connection, final String key) {
501         final String value = connection.getHeaderField(key);
502         if (value == null) {
503             throw new OrekitException(OrekitMessages.MISSING_HEADER,
504                                       connection.getURL().getHost(), key);
505         }
506         return value;
507     }
508 
509     /** Local holder for observers. */
510     private static class ObserverHolder {
511 
512         /** Code for the message type. */
513         private final int typeCode;
514 
515         /** Mount point. */
516         private final String mountPoint;
517 
518         /** Observer to notify. */
519         private final MessageObserver observer;
520 
521         /** Simple constructor.
522          * @param typeCode code for the message type
523          * @param mountPoint mountPoint from which data must come (if null, notification
524          * will be triggered regardless of mount point)
525          * @param observer observer for this message type
526          */
527         ObserverHolder(final int typeCode, final String mountPoint,
528                             final MessageObserver observer) {
529             this.typeCode   = typeCode;
530             this.mountPoint = mountPoint;
531             this.observer   = observer;
532         }
533 
534     }
535 
536 }