From e5e6cbb81357355bb039e758127b438abbb38a5f Mon Sep 17 00:00:00 2001 From: Adam Fanello Date: Fri, 21 Feb 2020 13:01:15 -0800 Subject: [PATCH 1/5] Refactoring of CargoplaneClient to emit connection events - CargoplaneClient now returns an RxJs stream - Emit underlying events from IoT connection - Detect expiring credentials and emit event - Detect resumption of suspended CPU and emit event - Split-up long connect function with some helpers - Bug fix: Clear pending publishQueue upon disconnect - Bug fix: Handle message processing failures - Bug fix: Display message content as JSON, not array of integers. - Bump client version to 1.1.0 - Updated client demos - Updated client documentation --- client/lib/cargoplane-client.ts | 200 ++++++++++++------ client/package-lock.json | 2 +- client/package.json | 2 +- .../src/app/chat/chat.service.ts | 17 +- .../src/chat/chat.service.js | 17 +- docs/Pipfile | 1 + docs/Pipfile.lock | 107 ++++++---- docs/source/client.rst | 24 ++- docs/source/conf.py | 4 +- 9 files changed, 239 insertions(+), 135 deletions(-) diff --git a/client/lib/cargoplane-client.ts b/client/lib/cargoplane-client.ts index f502c5e..b0bbf69 100644 --- a/client/lib/cargoplane-client.ts +++ b/client/lib/cargoplane-client.ts @@ -12,7 +12,6 @@ export interface CargoplaneCredential { sessionToken: string; /** ISO-8601 date-time of when the credentials expire - normally one hour after issuing */ expiration: string; - } /** Message queued for publishing */ @@ -21,6 +20,51 @@ interface QueuedMessage { message?: any; } +const MillisecondsInMinute = 60000; + +/** + * Specialized version of setTimer(): + * - Emits events + * - Checks actual wall time, not just clock ticks + * - Detects discrepancy which indicates the CPU had been suspended. + */ +class ClockTimer { + private timer?: NodeJS.Timer; + private event$?: Subject; + private expiringTime: number; + private lastMinuteTime: number; + + /** + * Set timer + * @param expiringTime emit 'expiring' event around this time: milliseconds since epoch + * @param outputEvents$ stream to emit to + */ + set(expiringTime: number, outputEvents$: Subject) { + this.event$ = outputEvents$; + this.expiringTime = expiringTime; + this.lastMinuteTime = Date.now(); + this.timer = setInterval(() => this.checkClock(), MillisecondsInMinute); + } + + /** Clear/stop timer */ + clear() { + this.timer && clearTimeout(this.timer); + this.timer = undefined; + this.event$ = undefined; + } + + private checkClock() { + const now = Date.now(); + if ((now - this.lastMinuteTime) > (2*MillisecondsInMinute)) { + this.event$ && this.event$.next(new Event('clock-resume')); + } + if (now >= this.expiringTime) { + this.event$ && this.event$.next(new Event('expiring')); + } + this.lastMinuteTime = now; + } +} + /** * Web browser-side API for Cargoplane. * Must use as a singleton. @@ -40,90 +84,58 @@ export class CargoplaneClient { private client?: any; private clientOnline = false; private publishQueue: QueuedMessage[] = []; + private connectionEvent$?: Subject; + private clock = new ClockTimer(); /** * Is the service currently connected to the cloud? */ isOnline(): boolean { - return this.clientOnline; + return this.client && this.clientOnline; } /** * Connect to cloud Cargoplane with given credentials. * * @param credential as received by calling a CargoplaneCloud#createCredentials() + * @param emitEventMsBeforeExpiration how many milliseconds before credential#expiration + * shall the return observable emit an Event of type "expiring"? + * @return observable of events about the connection. See documentation for details. */ - connect(credential: CargoplaneCredential): void { - console.debug("Cargoplane connecting"); - if (this.client) { - this.client.end(); - } + connect(credential: CargoplaneCredential, + emitEventMsBeforeExpiration = MillisecondsInMinute, + ): Observable { + this.cleanupConnection(); if (!credential.iotEndpoint) { console.error("No IoT endpoint defined. Cargoplane disabled"); - return; + const error = new Subject(); + error.next(new Event('error')); + error.complete(); + return error; } - const params = { - region: credential.region, - protocol: 'wss', - accessKeyId: credential.accessKey, - secretKey: credential.secretKey, - sessionToken: credential.sessionToken, - port: 443, - host: credential.iotEndpoint, - autoResubscribe: false - }; - console.debug("Cargoplane IoT Device ", params); - this.client = awsIot.device(params); - - this.client.on('connect', () => { - console.debug("Cargoplane connected"); - this.clientOnline = true; - this.resubscribeToAllTopics(); - this.publishQueuedMessages(); - }); + this.setupClient(credential); + this.connectionEvent$ = new Subject(); - this.client.on('message', (topic: string, message: any) => { - console.debug("Cargoplane received topic", topic, "with content:", message); - const subject = this.typeSubjects.get(topic); - if (subject) { - subject.next(message && message.length ? JSON.parse(message) : ''); - } - }); + // Set expiration clock + const credentialExpirationTime = Date.parse(credential.expiration) || MillisecondsInMinute * 59; + console.debug('Cargoplane credential expires in', + Math.round((credentialExpirationTime - Date.now()) / MillisecondsInMinute), + 'minutes'); + this.clock.set(credentialExpirationTime - emitEventMsBeforeExpiration, this.connectionEvent$); - this.client.on('error', (err) => { - console.error("Cargoplane error: ", err); - }); - - this.client.on('reconnect', () => { - console.info("Cargoplane attempting reconnect"); - }); - - this.client.on('offline', () => { - console.debug("Cargoplane offline"); - this.clientOnline = false; - }); - - this.client.on('close', () => { - console.debug("Cargoplane closed"); - this.clientOnline = false; - }); + return this.connectionEvent$.asObservable(); } /** * Disconnect and clear all subscriptions. (Ex: upon user logout) */ disconnect(): void { - this.typeSubjects.forEach(subject => { - subject.complete(); - }); + this.typeSubjects.forEach(subject => subject.complete()); this.typeSubjects.clear(); - - if (this.client) { - this.client.end(); - this.client = undefined; - } + this.publishQueue = []; + this.cleanupConnection(); } /** @@ -184,4 +196,74 @@ export class CargoplaneClient { this.publishQueue.length = 0; } + + /** Helper for connecting as an IoT Device */ + private setupClient(credential: CargoplaneCredential): void { + const params = { + region: credential.region, + protocol: 'wss', + accessKeyId: credential.accessKey, + secretKey: credential.secretKey, + sessionToken: credential.sessionToken, + port: 443, + host: credential.iotEndpoint, + autoResubscribe: false + }; + console.debug("Cargoplane connecting as IoT Device ", params); + this.client = awsIot.device(params); + + this.client.on('connect', () => { + console.debug("Cargoplane connected"); + this.clientOnline = true; + this.connectionEvent$ && this.connectionEvent$.next(new Event('connected')); + this.resubscribeToAllTopics(); + this.publishQueuedMessages(); + }); + + this.client.on('message', (topic: string, message: any) => { + try { + const payload = message && message.length ? JSON.parse(message) : ''; + console.debug("Cargoplane received topic", topic, "with content:", payload); + const subject = this.typeSubjects.get(topic); + if (subject) { + subject.next(payload); + } + } + catch (ex) { + console.warn("Cargoplane received topic", topic, "with unparsable content:", message); + } + }); + + this.client.on('error', (event: Event) => { + console.error("Cargoplane error: ", event); + this.connectionEvent$ && event && this.connectionEvent$.next(event); + }); + + this.client.on('reconnect', () => { + console.info("Cargoplane attempting reconnect"); + }); + + this.client.on('offline', () => { + console.debug("Cargoplane offline"); + this.clientOnline = false; + this.connectionEvent$ && this.connectionEvent$.next(new Event('offline')); + }); + + this.client.on('close', () => { + console.debug("Cargoplane closed"); + this.clientOnline = false; + this.connectionEvent$ && this.connectionEvent$.next(new Event('disconnected')); + }); + } + + /** Helper for cleaning up connection-based content */ + private cleanupConnection(): void { + this.clock.clear(); + this.connectionEvent$ && this.connectionEvent$.complete(); + this.connectionEvent$ = undefined; + + this.client && this.client.end(); + this.client = undefined; + this.clientOnline = false; + } } diff --git a/client/package-lock.json b/client/package-lock.json index 9bc603e..56eff87 100644 --- a/client/package-lock.json +++ b/client/package-lock.json @@ -1,6 +1,6 @@ { "name": "@cargoplane/client", - "version": "1.0.1", + "version": "1.1.0", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/client/package.json b/client/package.json index 7459649..9eef350 100644 --- a/client/package.json +++ b/client/package.json @@ -1,6 +1,6 @@ { "name": "@cargoplane/client", - "version": "1.0.1", + "version": "1.1.0", "description": "Web client library for publishing messages to and subscribing to to messages via AWS.", "keywords": [ "aws", diff --git a/demo/client/ng-cargoplane-demo/src/app/chat/chat.service.ts b/demo/client/ng-cargoplane-demo/src/app/chat/chat.service.ts index 8fd4710..8e04a88 100644 --- a/demo/client/ng-cargoplane-demo/src/app/chat/chat.service.ts +++ b/demo/client/ng-cargoplane-demo/src/app/chat/chat.service.ts @@ -32,16 +32,13 @@ export class ChatService { const credential = await this.http.get(credentialsPath).toPromise() as CargoplaneCredential; // Connect to Cargoplane - await this.cargoplane.connect(credential); - - // Schedule next reconnect 60s before timeout - const now = Date.now(); - const expire = Date.parse(credential.expiration); - const waitMs = expire - now - 60000; - if (waitMs > 0) { - console.log('Credential renewal in', waitMs / 1000, 'seconds'); - setTimeout(() => this.reconnect(), waitMs); - } + this.cargoplane.connect(credential) + .subscribe(event => { + console.log('Event from cargoplane:', event.type); + if (event.type === 'expiring') { + this.reconnect(); + } + }); } /** diff --git a/demo/client/react-cargoplane-demo/src/chat/chat.service.js b/demo/client/react-cargoplane-demo/src/chat/chat.service.js index af0c632..b13d27b 100644 --- a/demo/client/react-cargoplane-demo/src/chat/chat.service.js +++ b/demo/client/react-cargoplane-demo/src/chat/chat.service.js @@ -22,16 +22,13 @@ export class ChatService { const credential = (await Axios.get(credentialsPath)).data; // Connect to Cargoplane - await this.cargoplane.connect(credential); - - // Schedule next reconnect 60s before timeout - const now = Date.now(); - const expire = Date.parse(credential.expiration); - const waitMs = expire - now - 60000; - if (waitMs > 0) { - console.log('Credential renewal in', waitMs / 1000, 'seconds'); - setTimeout(() => this._reconnect(), waitMs); - } + this.cargoplane.connect(credential) + .subscribe(event => { + console.log('Event from cargoplane:', event.type); + if (event.type === 'expiring') { + this._reconnect(); + } + }); } /** diff --git a/docs/Pipfile b/docs/Pipfile index 8f42afd..76b9eb6 100644 --- a/docs/Pipfile +++ b/docs/Pipfile @@ -6,6 +6,7 @@ name = "pypi" [packages] Sphinx = "===1.8.1" sphinx_rtd_theme = "===0.4.2" +typing = {version = "*", markers="python_version < '3.0.0'"} # just for http.server py2/py3 syntax consistency in manual doc testing future = "~= 0.16" diff --git a/docs/Pipfile.lock b/docs/Pipfile.lock index 6f11286..68bed4c 100644 --- a/docs/Pipfile.lock +++ b/docs/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "0890e60a56c617a6799874b969d8e4c4350d5a5bae476349ba75fa8a6d4e4bf0" + "sha256": "ff0ada53e61249cf22c02a3fc27d1117ca3c021630cd1963d3817157e05bc5a3" }, "pipfile-spec": 6, "requires": {}, @@ -23,17 +23,17 @@ }, "babel": { "hashes": [ - "sha256:af92e6106cb7c55286b25b38ad7695f8b4efb36a90ba483d7f7a6628c46158ab", - "sha256:e86135ae101e31e2c8ec20a4e0c5220f4eed12487d5cf3f78be7e98d3a57fc28" + "sha256:1aac2ae2d0d8ea368fa90906567f5c08463d98ade155c0c4bfedd6a0f7160e38", + "sha256:d670ea0b10f8b723672d3a6abeb87b565b244da220d76b4dba1b66269ec152d4" ], - "version": "==2.7.0" + "version": "==2.8.0" }, "certifi": { "hashes": [ - "sha256:e4f3620cfea4f83eedc95b24abd9cd56f3c4b146dd0177e83a21b4eb49e21e50", - "sha256:fd7c7c74727ddcf00e9acd26bba8da604ffec95bf1c2144e67aff7a8b50e6cef" + "sha256:017c25db2a153ce562900032d5bc68e9f191e44e9a0f762f373977de9df1fbb3", + "sha256:25b64c7da4cd7479594d035c08c2d809eb4aab3a26e5a990ea98cc450c320f1f" ], - "version": "==2019.9.11" + "version": "==2019.11.28" }, "chardet": { "hashes": [ @@ -44,39 +44,38 @@ }, "docutils": { "hashes": [ - "sha256:6c4f696463b79f1fb8ba0c594b63840ebd41f059e92b31957c46b74a4599b6d0", - "sha256:9e4d7ecfc600058e07ba661411a2b7de2fd0fafa17d1a7f7361cd47b1175c827", - "sha256:a2aeea129088da402665e92e0b25b04b073c04b2dce4ab65caaa38b7ce2e1a99" + "sha256:0c5b78adfbf7762415433f5515cd5c9e762339e23369dbe8000d84a4bf4ab3af", + "sha256:c2de3a60e9e7d07be26b7f2b00ca0309c207e06c100f9cc2a94931fc75a478fc" ], - "version": "==0.15.2" + "version": "==0.16" }, "future": { "hashes": [ - "sha256:67045236dcfd6816dc439556d009594abf643e5eb48992e36beac09c2ca659b8" + "sha256:b1bead90b70cf6ec3f0710ae53a525360fa360d306a86583adc6bf83a4db537d" ], "index": "pypi", - "version": "==0.17.1" + "version": "==0.18.2" }, "idna": { "hashes": [ - "sha256:c357b3f628cf53ae2c4c05627ecc484553142ca23264e593d327bcde5e9c3407", - "sha256:ea8b7f6188e6fa117537c3df7da9fc686d485087abf6ac197f9c46432f7e4a3c" + "sha256:7588d1c14ae4c77d74036e8c22ff447b26d0fde8f007354fd48a7814db15b7cb", + "sha256:a068a21ceac8a4d63dbfd964670474107f541babbd2250d61922f029858365fa" ], - "version": "==2.8" + "version": "==2.9" }, "imagesize": { "hashes": [ - "sha256:3f349de3eb99145973fefb7dbe38554414e5c30abd0c8e4b970a7c9d09f3a1d8", - "sha256:f3832918bc3c66617f92e35f5d70729187676313caa60c187eb0f28b8fe5e3b5" + "sha256:6965f19a6a2039c7d48bca7dba2473069ff854c36ae6f19d2cde309d998228a1", + "sha256:b1f6b5a4eab1f73479a50fb79fcf729514a900c341d8503d62a62dbc4127a2b1" ], - "version": "==1.1.0" + "version": "==1.2.0" }, "jinja2": { "hashes": [ - "sha256:74320bb91f31270f9551d46522e33af46a80c3d619f4a4bf42b3164d30b5911f", - "sha256:9fe95f19286cfefaa917656583d020be14e7859c6b0252588391e47db34527de" + "sha256:93187ffbc7808079673ef52771baa950426fd664d3aad1d0fa3e95644360e250", + "sha256:b0eaf100007721b5c16c1fc1eecb87409464edc10469ddc9a22a27a99123be49" ], - "version": "==2.10.3" + "version": "==2.11.1" }, "markupsafe": { "hashes": [ @@ -84,13 +83,16 @@ "sha256:09027a7803a62ca78792ad89403b1b7a73a01c8cb65909cd876f7fcebd79b161", "sha256:09c4b7f37d6c648cb13f9230d847adf22f8171b1ccc4d5682398e77f40309235", "sha256:1027c282dad077d0bae18be6794e6b6b8c91d58ed8a8d89a89d59693b9131db5", + "sha256:13d3144e1e340870b25e7b10b98d779608c02016d5184cfb9927a9f10c689f42", "sha256:24982cc2533820871eba85ba648cd53d8623687ff11cbb805be4ff7b4c971aff", "sha256:29872e92839765e546828bb7754a68c418d927cd064fd4708fab9fe9c8bb116b", "sha256:43a55c2930bbc139570ac2452adf3d70cdbb3cfe5912c71cdce1c2c6bbd9c5d1", "sha256:46c99d2de99945ec5cb54f23c8cd5689f6d7177305ebff350a58ce5f8de1669e", "sha256:500d4957e52ddc3351cabf489e79c91c17f6e0899158447047588650b5e69183", "sha256:535f6fc4d397c1563d08b88e485c3496cf5784e927af890fb3c3aac7f933ec66", + "sha256:596510de112c685489095da617b5bcbbac7dd6384aeebeda4df6025d0256a81b", "sha256:62fe6c95e3ec8a7fad637b7f3d372c15ec1caa01ab47926cfdf7a75b40e0eac1", + "sha256:6788b695d50a51edb699cb55e35487e430fa21f1ed838122d722e0ff0ac5ba15", "sha256:6dd73240d2af64df90aa7c4e7481e23825ea70af4b4922f8ede5b9e35f78a3b1", "sha256:717ba8fe3ae9cc0006d7c451f0bb265ee07739daf76355d06366154ee68d221e", "sha256:79855e1c5b8da654cf486b830bd42c06e8780cea587384cf6545b7d9ac013a0b", @@ -107,30 +109,32 @@ "sha256:ba59edeaa2fc6114428f1637ffff42da1e311e29382d81b339c1817d37ec93c6", "sha256:c8716a48d94b06bb3b2524c2b77e055fb313aeb4ea620c8dd03a105574ba704f", "sha256:cd5df75523866410809ca100dc9681e301e3c27567cf498077e8551b6d20e42f", - "sha256:e249096428b3ae81b08327a63a485ad0878de3fb939049038579ac0ef61e17e7" + "sha256:cdb132fc825c38e1aeec2c8aa9338310d29d337bebbd7baa06889d09a60a1fa2", + "sha256:e249096428b3ae81b08327a63a485ad0878de3fb939049038579ac0ef61e17e7", + "sha256:e8313f01ba26fbbe36c7be1966a7b7424942f670f38e666995b88d012765b9be" ], "version": "==1.1.1" }, "packaging": { "hashes": [ - "sha256:28b924174df7a2fa32c1953825ff29c61e2f5e082343165438812f00d3a7fc47", - "sha256:d9551545c6d761f3def1677baf08ab2a3ca17c56879e70fecba2fc4dde4ed108" + "sha256:170748228214b70b672c581a3dd610ee51f733018650740e98c7df862a583f73", + "sha256:e665345f9eef0c621aa0bf2f8d78cf6d21904eef16a93f020240b704a57f1334" ], - "version": "==19.2" + "version": "==20.1" }, "pygments": { "hashes": [ - "sha256:71e430bc85c88a430f000ac1d9b331d2407f681d6f6aec95e8bcfbc3df5b0127", - "sha256:881c4c157e45f30af185c1ffe8d549d48ac9127433f2c380c24b84572ad66297" + "sha256:2a3fe295e54a20164a9df49c75fa58526d3be48e14aceba6d6b1e8ac0bfd6f1b", + "sha256:98c8aa5a9f778fcd1026a17361ddaf7330d1b7c62ae97c3bb0ae73e0b9b6b0fe" ], - "version": "==2.4.2" + "version": "==2.5.2" }, "pyparsing": { "hashes": [ - "sha256:6f98a7b9397e206d78cc01df10131398f1c8b8510a2f4d97d9abd82e1aacdd80", - "sha256:d9338df12903bbf5d65a0e4e87c2161968b10d2e489652bb47001d82a9b028b4" + "sha256:4c830582a84fb022400b85429791bc551f1f4871c33f23e44f353119e92f969f", + "sha256:c342dccb5250c08d45fd6f8b4a559613ca603b57498511740e65cd11a2e7dcec" ], - "version": "==2.4.2" + "version": "==2.4.6" }, "pytz": { "hashes": [ @@ -141,17 +145,17 @@ }, "requests": { "hashes": [ - "sha256:11e007a8a2aa0323f5a921e9e6a2d7e4e67d9877e85773fba9ba6419025cbeb4", - "sha256:9cf5292fcd0f598c671cfc1e0d7d1a7f13bb8085e9a590f48c010551dc6c4b31" + "sha256:43999036bfa82904b6af1d99e4882b560e5e2c68e5c4b0aa03b655f3d7d73fee", + "sha256:b3f43d496c6daba4493e7c431722aeb7dbc6288f52a6e04e7b6023b0247817e6" ], - "version": "==2.22.0" + "version": "==2.23.0" }, "six": { "hashes": [ - "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", - "sha256:d16a0141ec1a18405cd4ce8b4613101da75da0e9a7aec5bdd4fa804d0e0eba73" + "sha256:236bdbdce46e6e6a3d61a337c0f8b763ca1e8717c03b369e87a7ec7ce1319c0a", + "sha256:8f3cd2e254d8f793e7f3d6d9df77b92252b52637291d0f0da013c76ea2724b6c" ], - "version": "==1.12.0" + "version": "==1.14.0" }, "snowballstemmer": { "hashes": [ @@ -166,7 +170,7 @@ "sha256:d222626d8356de702431e813a05c68a35967e3d66c6cd1c2c89539bb179a7464" ], "index": "pypi", - "version": "==1.8.1" + "version": "===1.8.1" }, "sphinx-rtd-theme": { "hashes": [ @@ -174,22 +178,31 @@ "sha256:d0f6bc70f98961145c5b0e26a992829363a197321ba571b31b24ea91879e0c96" ], "index": "pypi", - "version": "==0.4.2" + "version": "===0.4.2" }, "sphinxcontrib-websupport": { "hashes": [ - "sha256:1501befb0fdf1d1c29a800fdbf4ef5dc5369377300ddbdd16d2cd40e54c6eefc", - "sha256:e02f717baf02d0b6c3dd62cf81232ffca4c9d5c331e03766982e3ff9f1d2bc3f" + "sha256:50fb98fcb8ff2a8869af2afa6b8ee51b3baeb0b17dacd72505105bf15d506ead", + "sha256:bad3fbd312bc36a31841e06e7617471587ef642bdacdbdddaa8cc30cf251b5ea" ], - "version": "==1.1.2" + "version": "==1.2.0" }, - "urllib3": { + "typing": { "hashes": [ - "sha256:4c291ca23bbb55c76518905869ef34bdd5f0e46af7afe6861e8375643ffee1a0", - "sha256:9a247273df709c4fedb38c711e44292304f73f39ab01beda9f6b9fc375669ac3" + "sha256:91dfe6f3f706ee8cc32d38edbbf304e9b7583fb37108fef38229617f8b3eba23", + "sha256:c8cabb5ab8945cd2f54917be357d134db9cc1eb039e59d1606dc1e60cb1d9d36", + "sha256:f38d83c5a7a7086543a0f649564d661859c5146a85775ab90c0d2f93ffaa9714" ], "index": "pypi", - "version": "==1.24.2" + "markers": "python_version < '3.0.0'", + "version": "==3.7.4.1" + }, + "urllib3": { + "hashes": [ + "sha256:2f3db8b19923a873b3e5256dc9c2dedfa883e33d87c690d9c7913e1f40673cdc", + "sha256:87716c2d2a7121198ebcb7ce7cccf6ce5e9ba539041cfbaeecfb641dc0bf6acc" + ], + "version": "==1.25.8" } }, "develop": {} diff --git a/docs/source/client.rst b/docs/source/client.rst index c2ac198..c334cb5 100644 --- a/docs/source/client.rst +++ b/docs/source/client.rst @@ -44,15 +44,29 @@ connect Connects/reconnects to Cargoplane Cloud with the given credentials. -``connect(credential: CargoplaneCredential): void`` +``connect(credential: CargoplaneCredential, emitEventMsBeforeExpiration?: number): Observable`` The ``credential`` must be retrieved from the companion Lambda in the cloud package calling ``CargoplaneCloud#createCredentials``. -The ``expiration`` field in ``CargoplaneCredential`` provides the expiration ISO date-time of the credentials. -You must obtain new credentials and reconnect prior to expiration in order to remain connected. Subscriptions -are automatically re-applied upon reconnect. The expiration period is normally one hour, but it is best to use -this value rather than assume one hour. +``connect`` will return a stream of `Events `_ +about the connection. Check the Event ``type`` field for what happened. +Unless otherwise stated, Cargoplane will log these events and manage them automatically. + +* ``type === 'connected'``: Connection has completed. +* ``type === 'disconnected'``: Connection has been dropped. +* ``type === 'offline'``: Network is offline. +* ``type === 'expiring'``: The current credentials are expiring (or has already). + Use this to trigger your application to obtain new credentials to call ``connect`` with again. + Subscriptions are automatically re-applied upon reconnect. + You can control how early the expiration warning comes by optionally passing in a value + for ``emitEventMsBeforeExpiration``. The default is one minute. +* ``type === 'clock-resume'``: If the computer sleeps or the browser tab is suspended, this will be + emitted when processing resumes. *Messages may have been lost while suspended* - you may need to + take action to account for this. If the credentials expired during the suspension, a separate + ``expiring`` event will follow this. +* ``type === 'error'``: There was an error with the connection. (Cargoplane will try to reconnect if needed.) + disconnect ^^^^^^^^^^ diff --git a/docs/source/conf.py b/docs/source/conf.py index 21bc3a8..6e02a9d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -25,13 +25,13 @@ # -- Project information ----------------------------------------------------- project = u'cargoplane' -copyright = u'2019, Onica Group' +copyright = u'2019-2020, Onica Group' author = u'Onica Group' # The short X.Y version version = u'' # The full version, including alpha/beta/rc tags -release = u'1.0.0' +release = u'1.1.0' # -- General configuration --------------------------------------------------- From f48486f5b07ec5149afb257d7cfb6af1a8c3e6c2 Mon Sep 17 00:00:00 2001 From: Adam Fanello Date: Fri, 21 Feb 2020 14:08:05 -0800 Subject: [PATCH 2/5] Issue 16 - Angular unit test documentation --- docs/source/client.rst | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/docs/source/client.rst b/docs/source/client.rst index c334cb5..c5c42f9 100644 --- a/docs/source/client.rst +++ b/docs/source/client.rst @@ -28,6 +28,17 @@ Add the file ``./webpack-custom.config.ts`` to the project's root folder with th } }; +Similarly, if you use the default Karma unit test running, add the following in ``karma.conf.js`` as another property +passed to ``config.set``:: + + webpack: { + node: { + fs: 'empty', + tls: 'empty', + path: 'empty' + } + } + See also `AWS Javascript SDK with Angular `_ for other possible changes that may be needed. From 16b6c8e755d8828978da4b6953e2e6c99c92a964 Mon Sep 17 00:00:00 2001 From: Adam Fanello Date: Mon, 24 Feb 2020 14:00:09 -0800 Subject: [PATCH 3/5] Removed Node.js data type from browser client code. - No functional difference here, just removing a point of confusion based on feedback from a reviewer. --- client/lib/cargoplane-client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/lib/cargoplane-client.ts b/client/lib/cargoplane-client.ts index b0bbf69..c3e1e93 100644 --- a/client/lib/cargoplane-client.ts +++ b/client/lib/cargoplane-client.ts @@ -29,7 +29,7 @@ const MillisecondsInMinute = 60000; * - Detects discrepancy which indicates the CPU had been suspended. */ class ClockTimer { - private timer?: NodeJS.Timer; + private timer?: any; private event$?: Subject; private expiringTime: number; private lastMinuteTime: number; From 20a8741d87c48d9291cfbb6843798c1f8cd62cc4 Mon Sep 17 00:00:00 2001 From: Adam Fanello Date: Mon, 24 Feb 2020 16:05:52 -0800 Subject: [PATCH 4/5] Fixed documentation generation --- docs/Pipfile | 1 + docs/Pipfile.lock | 9 +++++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/Pipfile b/docs/Pipfile index 76b9eb6..ee47a91 100644 --- a/docs/Pipfile +++ b/docs/Pipfile @@ -6,6 +6,7 @@ name = "pypi" [packages] Sphinx = "===1.8.1" sphinx_rtd_theme = "===0.4.2" +sphinxcontrib-websupport = "==1.1.2" typing = {version = "*", markers="python_version < '3.0.0'"} # just for http.server py2/py3 syntax consistency in manual doc testing diff --git a/docs/Pipfile.lock b/docs/Pipfile.lock index 68bed4c..c9bae77 100644 --- a/docs/Pipfile.lock +++ b/docs/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "ff0ada53e61249cf22c02a3fc27d1117ca3c021630cd1963d3817157e05bc5a3" + "sha256": "6f56be0e3258ab92c511d197af164b1b8ec523c1584cc52a564161c2d075e60f" }, "pipfile-spec": 6, "requires": {}, @@ -182,10 +182,11 @@ }, "sphinxcontrib-websupport": { "hashes": [ - "sha256:50fb98fcb8ff2a8869af2afa6b8ee51b3baeb0b17dacd72505105bf15d506ead", - "sha256:bad3fbd312bc36a31841e06e7617471587ef642bdacdbdddaa8cc30cf251b5ea" + "sha256:1501befb0fdf1d1c29a800fdbf4ef5dc5369377300ddbdd16d2cd40e54c6eefc", + "sha256:e02f717baf02d0b6c3dd62cf81232ffca4c9d5c331e03766982e3ff9f1d2bc3f" ], - "version": "==1.2.0" + "index": "pypi", + "version": "==1.1.2" }, "typing": { "hashes": [ From e28116331e9a8300fd3730ce234e0b0f2b2fc735 Mon Sep 17 00:00:00 2001 From: Adam Fanello Date: Tue, 25 Feb 2020 13:03:16 -0800 Subject: [PATCH 5/5] Issue #17 - Message parsing failure handling - Per comment on issue #17, the logic to parse the message payload needs to handle multiple cases. - Created MessageParser class to handle these complexities. --- client/lib/cargoplane-client.ts | 36 ++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/client/lib/cargoplane-client.ts b/client/lib/cargoplane-client.ts index c3e1e93..7ec84f2 100644 --- a/client/lib/cargoplane-client.ts +++ b/client/lib/cargoplane-client.ts @@ -65,6 +65,39 @@ class ClockTimer { } } +/** + * Helper to decode JSON messages from IoT Core. + * Sometimes the message is a string, sometimes its a Uint8Array. + * Sometimes the runtime environment has TextDecoder, sometimes it doesn't. + */ +class MessageParser { + private decoder = !!TextDecoder ? new TextDecoder() : null; + + /** + * Parse message into an object. + * @throws SyntaxError if message is not valid JSON + */ + parse(message: string | Uint8Array | undefined): any { + if (typeof message === 'string') { + return JSON.parse(message); + } + else if (message instanceof Uint8Array) { + const str = this.decoder ? this.decoder.decode(message) : this.shimDecoder(message); + return JSON.parse(str); + } + else { + return ''; + } + } + + /** Simple shim for old browsers that don't support TextDecoder */ + shimDecoder(array: Uint8Array): string { + let str = ''; + array.forEach(byte => str += String.fromCharCode(byte)); + return str; + } +} + /** * Web browser-side API for Cargoplane. * Must use as a singleton. @@ -86,6 +119,7 @@ export class CargoplaneClient { private publishQueue: QueuedMessage[] = []; private connectionEvent$?: Subject; private clock = new ClockTimer(); + private messageParser = new MessageParser(); /** * Is the service currently connected to the cloud? @@ -222,7 +256,7 @@ export class CargoplaneClient { this.client.on('message', (topic: string, message: any) => { try { - const payload = message && message.length ? JSON.parse(message) : ''; + const payload = this.messageParser.parse(message); console.debug("Cargoplane received topic", topic, "with content:", payload); const subject = this.typeSubjects.get(topic); if (subject) {