Skip to content

Commit

Permalink
NAS-131557: New classes with changed approach to websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
RehanY147 committed Nov 13, 2024
1 parent 33618ef commit 6ce4718
Show file tree
Hide file tree
Showing 3 changed files with 448 additions and 0 deletions.
190 changes: 190 additions & 0 deletions src/app/services/websocket/api2.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import { Injectable } from '@angular/core';
import { Router } from '@angular/router';
import { TranslateService } from '@ngx-translate/core';
import { UUID } from 'angular2-uuid';
import {
filter, map, merge, Observable, of, share, startWith, Subject, Subscriber, switchMap, take, takeUntil, throwError,
} from 'rxjs';
import { ApiErrorName } from 'app/enums/api-error-name.enum';
import { IncomingApiMessageType } from 'app/enums/api-message-type.enum';
import { ResponseErrorType } from 'app/enums/response-error-type.enum';
import { applyApiEvent } from 'app/helpers/operators/apply-api-event.operator';
import { observeJob } from 'app/helpers/operators/observe-job.operator';
import { ApiCallAndSubscribeMethod, ApiCallAndSubscribeResponse } from 'app/interfaces/api/api-call-and-subscribe-directory.interface';
import { ApiCallMethod, ApiCallParams, ApiCallResponse } from 'app/interfaces/api/api-call-directory.interface';
import { ApiJobMethod, ApiJobParams, ApiJobResponse } from 'app/interfaces/api/api-job-directory.interface';
import { ApiError } from 'app/interfaces/api-error.interface';
import {
ApiEvent, ApiEventMethod, ApiEventTyped, IncomingApiMessage, ResultMessage,
} from 'app/interfaces/api-message.interface';
import { Job } from 'app/interfaces/job.interface';
import { WebSocketHandlerService } from 'app/services/websocket/websocket-handler.service';

@Injectable({
providedIn: 'root',
})
export class Api2Service {
private readonly eventSubscribers = new Map<ApiEventMethod, Observable<ApiEventTyped>>();
readonly clearSubscriptions$ = new Subject<void>();

constructor(
protected router: Router,
protected wsHandler: WebSocketHandlerService,
protected translate: TranslateService,
) {
this.wsHandler.isConnected$?.subscribe((isConnected) => {
if (!isConnected) {
this.clearSubscriptions();
}
});
}

call<M extends ApiCallMethod>(method: M, params?: ApiCallParams<M>): Observable<ApiCallResponse<M>> {
return this.callMethod(method, params);
}

/**
* For jobs better to use the `selectJob` store selector.
*/
callAndSubscribe<M extends ApiCallAndSubscribeMethod>(
method: M,
params?: ApiCallParams<M>,
): Observable<ApiCallAndSubscribeResponse<M>[]> {
return this.callMethod<M>(method, params)
.pipe(
switchMap((items) => this.subscribe(method).pipe(
startWith(null),
map((event) => ([items, event])),
)),
applyApiEvent(),
takeUntil(this.clearSubscriptions$),
);
}

/**
* Use `job` when you care about job progress or result.
*/
startJob<M extends ApiJobMethod>(method: M, params?: ApiJobParams<M>): Observable<number> {
return this.callMethod(method, params);
}

/**
* In your subscription, next will be next job update, complete will be when the job is complete.
*/
job<M extends ApiJobMethod>(
method: M,
params?: ApiJobParams<M>,
): Observable<Job<ApiJobResponse<M>>> {
return this.callMethod(method, params).pipe(
switchMap((jobId: number) => {
return merge(
this.subscribeToJobUpdates(jobId),
// Get job status here for jobs that complete too fast.
this.call('core.get_jobs', [[['id', '=', jobId]]]).pipe(map((jobs) => jobs[0])),
)
.pipe(observeJob());
}),
takeUntil(this.clearSubscriptions$),
) as Observable<Job<ApiJobResponse<M>>>;
}

subscribe<K extends ApiEventMethod = ApiEventMethod>(method: K | `${K}:${string}`): Observable<ApiEventTyped<K>> {
if (this.eventSubscribers.has(method as K)) {
return this.eventSubscribers.get(method as K);
}
const observable$ = new Observable((trigger: Subscriber<ApiEventTyped<K>>) => {
const subscription = this.wsHandler.buildSubscriber<K, ApiEventTyped<K>>(method as K).subscribe(trigger);
return () => {
subscription.unsubscribe();
this.eventSubscribers.delete(method as K);
};
}).pipe(
switchMap((apiEvent) => {
const erroredEvent = apiEvent as unknown as ResultMessage;
if (erroredEvent?.error) {
console.error('Error: ', erroredEvent.error);
return throwError(() => erroredEvent.error);
}
return of(apiEvent);
}),
share(),
takeUntil(this.clearSubscriptions$),
);

this.eventSubscribers.set(method as K, observable$);
return observable$;
}

subscribeToLogs(name: string): Observable<ApiEvent<{ data: string }>> {
return this.subscribe(name as ApiEventMethod) as unknown as Observable<ApiEvent<{ data: string }>>;
}

clearSubscriptions(): void {
this.clearSubscriptions$.next();
this.eventSubscribers.clear();
}

private callMethod<M extends ApiCallMethod>(method: M, params?: ApiCallParams<M>): Observable<ApiCallResponse<M>>;
private callMethod<M extends ApiJobMethod>(method: M, params?: ApiJobParams<M>): Observable<number>;
private callMethod<M extends ApiCallMethod | ApiJobMethod>(method: M, params?: unknown): Observable<unknown> {
const uuid = UUID.UUID();
return of(uuid).pipe(
switchMap(() => {
performance.mark(`${method} - ${uuid} - start`);
this.wsHandler.scheduleCall({
id: uuid, msg: IncomingApiMessageType.Method, method, params,
});
return this.wsHandler.responses$.pipe(
filter((data: IncomingApiMessage) => data.msg === IncomingApiMessageType.Result && data.id === uuid),
);
}),
switchMap((data: IncomingApiMessage) => {
if ('error' in data && data.error) {
this.printError(data.error, { method, params });
const error = this.enhanceError(data.error, { method });
return throwError(() => error);
}

performance.mark(`${method} - ${uuid} - end`);
performance.measure(method, `${method} - ${uuid} - start`, `${method} - ${uuid} - end`);
return of(data);
}),

map((data: ResultMessage) => data.result),
take(1),
);
}

private subscribeToJobUpdates(jobId: number): Observable<Job> {
return this.subscribe('core.get_jobs').pipe(
filter((apiEvent) => apiEvent.id === jobId),
map((apiEvent) => apiEvent.fields),
takeUntil(this.clearSubscriptions$),
);
}

private printError(error: ApiError, context: { method: string; params: unknown }): void {
if (error.errname === ApiErrorName.NoAccess) {
console.error(`Access denied to ${context.method} with ${context.params ? JSON.stringify(context.params) : 'no params'}`);
return;
}

// Do not log validation errors.
if (error.type === ResponseErrorType.Validation) {
return;
}

console.error('Error: ', error);
}

// TODO: Probably doesn't belong here. Consider building something similar to interceptors.
private enhanceError(error: ApiError, context: { method: string }): ApiError {
if (error.errname === ApiErrorName.NoAccess) {
return {
...error,
reason: this.translate.instant('Access denied to {method}', { method: context.method }),
};
}
return error;
}
}
45 changes: 45 additions & 0 deletions src/app/services/websocket/websocket-connection.class.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { Observable } from 'rxjs';
import { webSocket as rxjsWebSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

export class WebSocketConnection {
private ws$: WebSocketSubject<unknown>;
private wsAsObservable$: Observable<unknown>;
get stream$(): Observable<unknown> {
return this.wsAsObservable$;
}

constructor(
private webSocket: typeof rxjsWebSocket,
) { }

connect(config: WebSocketSubjectConfig<unknown>): void {
if (this.ws$) {
this.ws$.complete();
}

performance.mark(`WS Init. URL: ${config.url}"`);
this.ws$ = this.webSocket(config);

this.wsAsObservable$ = this.ws$.asObservable();
}

send(payload: unknown): void {
this.ws$.next(payload);
}

close(): void {
this.ws$?.complete();
}

event<R>(
subMsg: () => unknown,
unsubMsg: () => unknown,
messageFilter: (value: unknown) => boolean,
): Observable<R> {
return this.ws$.multiplex(
subMsg,
unsubMsg,
messageFilter,
) as Observable<R>;
}
}
Loading

0 comments on commit 6ce4718

Please sign in to comment.