Skip to content

Commit

Permalink
Merge pull request #20 from onicagroup/issue17
Browse files Browse the repository at this point in the history
Refactoring of CargoplaneClient to emit connection events
  • Loading branch information
kernwig authored Feb 26, 2020
2 parents e0789d0 + e281163 commit 602de02
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 132 deletions.
234 changes: 175 additions & 59 deletions client/lib/cargoplane-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export interface CargoplaneCredential {
sessionToken: string;
/** ISO-8601 date-time of when the credentials expire - normally one hour after issuing */
expiration: string;

}

/** Message queued for publishing */
Expand All @@ -21,6 +20,84 @@ interface QueuedMessage {
message?: any;
}

const MillisecondsInMinute = 60000;

/**
* Specialized version of setTimer():
* - Emits events
* - Checks actual wall time, not just clock ticks
* - Detects discrepancy which indicates the CPU had been suspended.
*/
class ClockTimer {
private timer?: any;
private event$?: Subject<Event>;
private expiringTime: number;
private lastMinuteTime: number;

/**
* Set timer
* @param expiringTime emit 'expiring' event around this time: milliseconds since epoch
* @param outputEvents$ stream to emit to
*/
set(expiringTime: number, outputEvents$: Subject<Event>) {
this.event$ = outputEvents$;
this.expiringTime = expiringTime;
this.lastMinuteTime = Date.now();
this.timer = setInterval(() => this.checkClock(), MillisecondsInMinute);
}

/** Clear/stop timer */
clear() {
this.timer && clearTimeout(this.timer);
this.timer = undefined;
this.event$ = undefined;
}

private checkClock() {
const now = Date.now();
if ((now - this.lastMinuteTime) > (2*MillisecondsInMinute)) {
this.event$ && this.event$.next(new Event('clock-resume'));
}
if (now >= this.expiringTime) {
this.event$ && this.event$.next(new Event('expiring'));
}
this.lastMinuteTime = now;
}
}

/**
* Helper to decode JSON messages from IoT Core.
* Sometimes the message is a string, sometimes its a Uint8Array.
* Sometimes the runtime environment has TextDecoder, sometimes it doesn't.
*/
class MessageParser {
private decoder = !!TextDecoder ? new TextDecoder() : null;

/**
* Parse message into an object.
* @throws SyntaxError if message is not valid JSON
*/
parse(message: string | Uint8Array | undefined): any {
if (typeof message === 'string') {
return JSON.parse(message);
}
else if (message instanceof Uint8Array) {
const str = this.decoder ? this.decoder.decode(message) : this.shimDecoder(message);
return JSON.parse(str);
}
else {
return '';
}
}

/** Simple shim for old browsers that don't support TextDecoder */
shimDecoder(array: Uint8Array): string {
let str = '';
array.forEach(byte => str += String.fromCharCode(byte));
return str;
}
}

/**
* Web browser-side API for Cargoplane.
* Must use as a singleton.
Expand All @@ -40,90 +117,59 @@ export class CargoplaneClient {
private client?: any;
private clientOnline = false;
private publishQueue: QueuedMessage[] = [];
private connectionEvent$?: Subject<Event>;
private clock = new ClockTimer();
private messageParser = new MessageParser();

/**
* Is the service currently connected to the cloud?
*/
isOnline(): boolean {
return this.clientOnline;
return this.client && this.clientOnline;
}

/**
* Connect to cloud Cargoplane with given credentials.
*
* @param credential as received by calling a CargoplaneCloud#createCredentials()
* @param emitEventMsBeforeExpiration how many milliseconds before credential#expiration
* shall the return observable emit an Event of type "expiring"?
* @return observable of events about the connection. See documentation for details.
*/
connect(credential: CargoplaneCredential): void {
console.debug("Cargoplane connecting");
if (this.client) {
this.client.end();
}
connect(credential: CargoplaneCredential,
emitEventMsBeforeExpiration = MillisecondsInMinute,
): Observable<Event> {
this.cleanupConnection();

if (!credential.iotEndpoint) {
console.error("No IoT endpoint defined. Cargoplane disabled");
return;
const error = new Subject<Event>();
error.next(new Event('error'));
error.complete();
return error;
}

const params = {
region: credential.region,
protocol: 'wss',
accessKeyId: credential.accessKey,
secretKey: credential.secretKey,
sessionToken: credential.sessionToken,
port: 443,
host: credential.iotEndpoint,
autoResubscribe: false
};
console.debug("Cargoplane IoT Device ", params);
this.client = awsIot.device(params);

this.client.on('connect', () => {
console.debug("Cargoplane connected");
this.clientOnline = true;
this.resubscribeToAllTopics();
this.publishQueuedMessages();
});

this.client.on('message', (topic: string, message: any) => {
console.debug("Cargoplane received topic", topic, "with content:", message);
const subject = this.typeSubjects.get(topic);
if (subject) {
subject.next(message && message.length ? JSON.parse(message) : '');
}
});

this.client.on('error', (err) => {
console.error("Cargoplane error: ", err);
});
this.setupClient(credential);
this.connectionEvent$ = new Subject<Event>();

this.client.on('reconnect', () => {
console.info("Cargoplane attempting reconnect");
});
// Set expiration clock
const credentialExpirationTime = Date.parse(credential.expiration) || MillisecondsInMinute * 59;
console.debug('Cargoplane credential expires in',
Math.round((credentialExpirationTime - Date.now()) / MillisecondsInMinute),
'minutes');
this.clock.set(credentialExpirationTime - emitEventMsBeforeExpiration, this.connectionEvent$);

this.client.on('offline', () => {
console.debug("Cargoplane offline");
this.clientOnline = false;
});

this.client.on('close', () => {
console.debug("Cargoplane closed");
this.clientOnline = false;
});
return this.connectionEvent$.asObservable();
}

/**
* Disconnect and clear all subscriptions. (Ex: upon user logout)
*/
disconnect(): void {
this.typeSubjects.forEach(subject => {
subject.complete();
});
this.typeSubjects.forEach(subject => subject.complete());
this.typeSubjects.clear();

if (this.client) {
this.client.end();
this.client = undefined;
}
this.publishQueue = [];
this.cleanupConnection();
}

/**
Expand Down Expand Up @@ -184,4 +230,74 @@ export class CargoplaneClient {

this.publishQueue.length = 0;
}

/** Helper for connecting as an IoT Device */
private setupClient(credential: CargoplaneCredential): void {
const params = {
region: credential.region,
protocol: 'wss',
accessKeyId: credential.accessKey,
secretKey: credential.secretKey,
sessionToken: credential.sessionToken,
port: 443,
host: credential.iotEndpoint,
autoResubscribe: false
};
console.debug("Cargoplane connecting as IoT Device ", params);
this.client = awsIot.device(params);

this.client.on('connect', () => {
console.debug("Cargoplane connected");
this.clientOnline = true;
this.connectionEvent$ && this.connectionEvent$.next(new Event('connected'));
this.resubscribeToAllTopics();
this.publishQueuedMessages();
});

this.client.on('message', (topic: string, message: any) => {
try {
const payload = this.messageParser.parse(message);
console.debug("Cargoplane received topic", topic, "with content:", payload);
const subject = this.typeSubjects.get(topic);
if (subject) {
subject.next(payload);
}
}
catch (ex) {
console.warn("Cargoplane received topic", topic, "with unparsable content:", message);
}
});

this.client.on('error', (event: Event) => {
console.error("Cargoplane error: ", event);
this.connectionEvent$ && event && this.connectionEvent$.next(event);
});

this.client.on('reconnect', () => {
console.info("Cargoplane attempting reconnect");
});

this.client.on('offline', () => {
console.debug("Cargoplane offline");
this.clientOnline = false;
this.connectionEvent$ && this.connectionEvent$.next(new Event('offline'));
});

this.client.on('close', () => {
console.debug("Cargoplane closed");
this.clientOnline = false;
this.connectionEvent$ && this.connectionEvent$.next(new Event('disconnected'));
});
}

/** Helper for cleaning up connection-based content */
private cleanupConnection(): void {
this.clock.clear();
this.connectionEvent$ && this.connectionEvent$.complete();
this.connectionEvent$ = undefined;

this.client && this.client.end();
this.client = undefined;
this.clientOnline = false;
}
}
2 changes: 1 addition & 1 deletion client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cargoplane/client",
"version": "1.0.1",
"version": "1.1.0",
"description": "Web client library for publishing messages to and subscribing to to messages via AWS.",
"keywords": [
"aws",
Expand Down
17 changes: 7 additions & 10 deletions demo/client/ng-cargoplane-demo/src/app/chat/chat.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,13 @@ export class ChatService {
const credential = await this.http.get(credentialsPath).toPromise() as CargoplaneCredential;

// Connect to Cargoplane
await this.cargoplane.connect(credential);

// Schedule next reconnect 60s before timeout
const now = Date.now();
const expire = Date.parse(credential.expiration);
const waitMs = expire - now - 60000;
if (waitMs > 0) {
console.log('Credential renewal in', waitMs / 1000, 'seconds');
setTimeout(() => this.reconnect(), waitMs);
}
this.cargoplane.connect(credential)
.subscribe(event => {
console.log('Event from cargoplane:', event.type);
if (event.type === 'expiring') {
this.reconnect();
}
});
}

/**
Expand Down
17 changes: 7 additions & 10 deletions demo/client/react-cargoplane-demo/src/chat/chat.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ export class ChatService {
const credential = (await Axios.get(credentialsPath)).data;

// Connect to Cargoplane
await this.cargoplane.connect(credential);

// Schedule next reconnect 60s before timeout
const now = Date.now();
const expire = Date.parse(credential.expiration);
const waitMs = expire - now - 60000;
if (waitMs > 0) {
console.log('Credential renewal in', waitMs / 1000, 'seconds');
setTimeout(() => this._reconnect(), waitMs);
}
this.cargoplane.connect(credential)
.subscribe(event => {
console.log('Event from cargoplane:', event.type);
if (event.type === 'expiring') {
this._reconnect();
}
});
}

/**
Expand Down
2 changes: 2 additions & 0 deletions docs/Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ name = "pypi"
[packages]
Sphinx = "===1.8.1"
sphinx_rtd_theme = "===0.4.2"
sphinxcontrib-websupport = "==1.1.2"
typing = {version = "*", markers="python_version < '3.0.0'"}

# just for http.server py2/py3 syntax consistency in manual doc testing
future = "~= 0.16"
Loading

0 comments on commit 602de02

Please sign in to comment.