1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.orekit.gnss.metric.ntrip;
18
19 import java.io.BufferedReader;
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.InputStreamReader;
23 import java.net.Authenticator;
24 import java.net.HttpURLConnection;
25 import java.net.InetAddress;
26 import java.net.InetSocketAddress;
27 import java.net.Proxy;
28 import java.net.Proxy.Type;
29 import java.net.SocketAddress;
30 import java.net.URL;
31 import java.net.URLConnection;
32 import java.net.UnknownHostException;
33 import java.nio.charset.StandardCharsets;
34 import java.util.ArrayList;
35 import java.util.Formatter;
36 import java.util.HashMap;
37 import java.util.List;
38 import java.util.Locale;
39 import java.util.Map;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.TimeUnit;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import org.hipparchus.util.FastMath;
46 import org.orekit.errors.OrekitException;
47 import org.orekit.errors.OrekitMessages;
48 import org.orekit.gnss.metric.messages.ParsedMessage;
49
50
51
52
53
54
55
56
57
58
59 public class NtripClient {
60
61
62 public static final int DEFAULT_TIMEOUT = 10000;
63
64
65 public static final int DEFAULT_PORT = 2101;
66
67
68 public static final double DEFAULT_RECONNECT_DELAY = 1.0;
69
70
71 public static final double DEFAULT_RECONNECT_DELAY_FACTOR = 1.5;
72
73
74 public static final int DEFAULT_MAX_RECONNECT = 20;
75
76
77 private static final String HOST_HEADER_KEY = "Host";
78
79
80 private static final String USER_AGENT_HEADER_KEY = "User-Agent";
81
82
83 private static final String USER_AGENT_HEADER_VALUE = "NTRIP orekit/11.0";
84
85
86 private static final String VERSION_HEADER_KEY = "Ntrip-Version";
87
88
89 private static final String VERSION_HEADER_VALUE = "Ntrip/2.0";
90
91
92 private static final String CONNECTION_HEADER_KEY = "Connection";
93
94
95 private static final String CONNECTION_HEADER_VALUE = "close";
96
97
98 private static final String FLAGS_HEADER_KEY = "Ntrip-Flags";
99
100
101 private static final String SOURCETABLE_CONTENT_TYPE = "gnss/sourcetable";
102
103
104 private static final double DEG_TO_MINUTES = 60.0;
105
106
107 private final String host;
108
109
110 private final int port;
111
112
113 private double reconnectDelay;
114
115
116 private double reconnectDelayFactor;
117
118
119 private int maxRetries;
120
121
122 private int timeout;
123
124
125 private Proxy proxy;
126
127
128 private AtomicReference<String> gga;
129
130
131 private final List<ObserverHolder> observers;
132
133
134 private final Map<String, StreamMonitor> monitors;
135
136
137 private SourceTable sourceTable;
138
139
140 private ExecutorService executorService;
141
142
143
144
145
146
147
148
149
150
151 public NtripClient(final String host, final int port) {
152 this.host = host;
153 this.port = port;
154 this.observers = new ArrayList<>();
155 this.monitors = new HashMap<>();
156 setTimeout(DEFAULT_TIMEOUT);
157 setReconnectParameters(DEFAULT_RECONNECT_DELAY,
158 DEFAULT_RECONNECT_DELAY_FACTOR,
159 DEFAULT_MAX_RECONNECT);
160 setProxy(Type.DIRECT, null, -1);
161 this.gga = new AtomicReference<String>(null);
162 this.sourceTable = null;
163 this.executorService = null;
164 }
165
166
167
168
169 public String getHost() {
170 return host;
171 }
172
173
174
175
176 public int getPort() {
177 return port;
178 }
179
180
181
182
183 public void setTimeout(final int timeout) {
184 this.timeout = timeout;
185 }
186
187
188
189
190
191
192 public void setReconnectParameters(final double delay,
193 final double delayFactor,
194 final int max) {
195 this.reconnectDelay = delay;
196 this.reconnectDelayFactor = delayFactor;
197 this.maxRetries = max;
198 }
199
200
201
202
203
204
205 public void setProxy(final Proxy.Type type, final String proxyHost, final int proxyPort) {
206 try {
207 if (type == Proxy.Type.DIRECT) {
208
209 proxy = Proxy.NO_PROXY;
210 } else {
211
212 final InetAddress hostAddress = InetAddress.getByName(proxyHost);
213 final SocketAddress proxyAddress = new InetSocketAddress(hostAddress, proxyPort);
214 proxy = new Proxy(type, proxyAddress);
215 }
216 } catch (UnknownHostException uhe) {
217 throw new OrekitException(uhe, OrekitMessages.UNKNOWN_HOST, proxyHost);
218 }
219 }
220
221
222
223
224 public Proxy getProxy() {
225 return proxy;
226 }
227
228
229
230
231
232
233
234
235
236
237 public void setFix(final int hour, final int minute, final double second,
238 final double latitude, final double longitude, final double ellAltitude,
239 final double undulation) {
240
241
242 final double latDeg = FastMath.abs(FastMath.toDegrees(latitude));
243 final int dLat = (int) FastMath.floor(latDeg);
244 final double mLat = DEG_TO_MINUTES * (latDeg - dLat);
245 final char cLat = latitude >= 0.0 ? 'N' : 'S';
246
247
248 final double lonDeg = FastMath.abs(FastMath.toDegrees(longitude));
249 final int dLon = (int) FastMath.floor(lonDeg);
250 final double mLon = DEG_TO_MINUTES * (lonDeg - dLon);
251 final char cLon = longitude >= 0.0 ? 'E' : 'W';
252
253
254 final StringBuilder builder = new StringBuilder(82);
255 try (Formatter formatter = new Formatter(builder, Locale.US)) {
256
257
258 final int fixQuality = 1;
259 final int nbSat = 4;
260 final double hdop = 1.0;
261
262
263 formatter.format("$GPGGA,%02d%02d%06.3f,%02d%07.4f,%c,%02d%07.4f,%c,%1d,%02d,%3.1f,%.1f,M,%.1f,M,,",
264 hour, minute, second,
265 dLat, mLat, cLat, dLon, mLon, cLon,
266 fixQuality, nbSat, hdop,
267 ellAltitude, undulation);
268
269
270 byte sum = 0;
271 for (int i = 1; i < builder.length(); ++i) {
272 sum ^= builder.charAt(i);
273 }
274 formatter.format("*%02X", sum);
275
276 }
277 gga.set(builder.toString());
278
279 }
280
281
282
283
284 String getGGA() {
285 return gga.get();
286 }
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301 public void addObserver(final int typeCode, final String mountPoint,
302 final MessageObserver observer) {
303
304
305 observers.add(new ObserverHolder(typeCode, mountPoint, observer));
306
307
308 for (Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
309 if (mountPoint == null || mountPoint.equals(entry.getKey())) {
310 entry.getValue().addObserver(typeCode, observer);
311 }
312 }
313
314 }
315
316
317
318
319 public SourceTable getSourceTable() {
320 if (sourceTable == null) {
321 try {
322
323
324 final HttpURLConnection connection = connect("");
325
326 final int responseCode = connection.getResponseCode();
327 if (responseCode == HttpURLConnection.HTTP_UNAUTHORIZED) {
328 throw new OrekitException(OrekitMessages.FAILED_AUTHENTICATION, "caster");
329 } else if (responseCode != HttpURLConnection.HTTP_OK) {
330 throw new OrekitException(OrekitMessages.CONNECTION_ERROR, host, connection.getResponseMessage());
331 }
332
333
334 if (!SOURCETABLE_CONTENT_TYPE.equals(connection.getContentType())) {
335 throw new OrekitException(OrekitMessages.UNEXPECTED_CONTENT_TYPE, connection.getContentType());
336 }
337
338 final SourceTable table = new SourceTable(getHeaderValue(connection, FLAGS_HEADER_KEY));
339
340
341 try (InputStream is = connection.getInputStream();
342 InputStreamReader isr = new InputStreamReader(is, StandardCharsets.UTF_8);
343 BufferedReader br = new BufferedReader(isr)) {
344 int lineNumber = 0;
345 for (String line = br.readLine(); line != null; line = br.readLine()) {
346
347 ++lineNumber;
348 line = line.trim();
349 if (line.length() == 0) {
350 continue;
351 }
352
353 if (line.startsWith(RecordType.CAS.toString())) {
354 table.addCasterRecord(new CasterRecord(line));
355 } else if (line.startsWith(RecordType.NET.toString())) {
356 table.addNetworkRecord(new NetworkRecord(line));
357 } else if (line.startsWith(RecordType.STR.toString())) {
358 table.addDataStreamRecord(new DataStreamRecord(line));
359 } else if (line.startsWith("ENDSOURCETABLE")) {
360
361 break;
362 } else {
363 throw new OrekitException(OrekitMessages.SOURCETABLE_PARSE_ERROR,
364 connection.getURL().getHost(), lineNumber, line);
365 }
366
367 }
368 }
369
370 sourceTable = table;
371 return table;
372
373 } catch (IOException ioe) {
374 throw new OrekitException(ioe, OrekitMessages.CANNOT_PARSE_SOURCETABLE, host);
375 }
376 }
377
378 return sourceTable;
379
380 }
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399 public void startStreaming(final String mountPoint, final org.orekit.gnss.metric.ntrip.Type type,
400 final boolean requiresNMEA, final boolean ignoreUnknownMessageTypes) {
401
402 if (executorService == null) {
403
404 executorService = Executors.newFixedThreadPool(getSourceTable().getDataStreams().size());
405 }
406
407
408 if (monitors.containsKey(mountPoint)) {
409 throw new OrekitException(OrekitMessages.MOUNPOINT_ALREADY_CONNECTED, mountPoint);
410 }
411
412
413 final StreamMonitor monitor = new StreamMonitor(this, mountPoint, type, requiresNMEA, ignoreUnknownMessageTypes,
414 reconnectDelay, reconnectDelayFactor, maxRetries);
415 monitors.put(mountPoint, monitor);
416
417
418 for (final ObserverHolder observerHolder : observers) {
419 if (observerHolder.mountPoint == null ||
420 observerHolder.mountPoint.equals(mountPoint)) {
421 monitor.addObserver(observerHolder.typeCode, observerHolder.observer);
422 }
423 }
424
425
426 executorService.execute(monitor);
427
428 }
429
430
431
432
433
434
435 public void checkException() {
436
437 for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
438 final OrekitException exception = entry.getValue().getException();
439 if (exception != null) {
440 throw exception;
441 }
442 }
443 }
444
445
446
447
448
449
450
451 public void stopStreaming(final int time) {
452
453
454 for (final Map.Entry<String, StreamMonitor> entry : monitors.entrySet()) {
455 entry.getValue().stopMonitoring();
456 }
457
458 try {
459
460 executorService.awaitTermination(time, TimeUnit.MILLISECONDS);
461 } catch (InterruptedException ie) {
462
463 Thread.currentThread().interrupt();
464 }
465
466 checkException();
467
468 }
469
470
471
472
473
474
475 HttpURLConnection connect(final String mountPoint)
476 throws IOException {
477
478
479 final String protocol = "http";
480 final URL casterURL = new URL(protocol, host, port, "/" + mountPoint);
481 final HttpURLConnection connection = (HttpURLConnection) casterURL.openConnection(proxy);
482 connection.setConnectTimeout(timeout);
483 connection.setReadTimeout(timeout);
484
485
486 connection.setRequestProperty(HOST_HEADER_KEY, host);
487 connection.setRequestProperty(VERSION_HEADER_KEY, VERSION_HEADER_VALUE);
488 connection.setRequestProperty(USER_AGENT_HEADER_KEY, USER_AGENT_HEADER_VALUE);
489 connection.setRequestProperty(CONNECTION_HEADER_KEY, CONNECTION_HEADER_VALUE);
490
491 return connection;
492
493 }
494
495
496
497
498
499
500 private String getHeaderValue(final URLConnection connection, final String key) {
501 final String value = connection.getHeaderField(key);
502 if (value == null) {
503 throw new OrekitException(OrekitMessages.MISSING_HEADER,
504 connection.getURL().getHost(), key);
505 }
506 return value;
507 }
508
509
510 private static class ObserverHolder {
511
512
513 private final int typeCode;
514
515
516 private final String mountPoint;
517
518
519 private final MessageObserver observer;
520
521
522
523
524
525
526
527 ObserverHolder(final int typeCode, final String mountPoint,
528 final MessageObserver observer) {
529 this.typeCode = typeCode;
530 this.mountPoint = mountPoint;
531 this.observer = observer;
532 }
533
534 }
535
536 }