diff --git a/src/app/services/websocket/api2.service.ts b/src/app/services/websocket/api2.service.ts new file mode 100644 index 00000000000..202bae030a4 --- /dev/null +++ b/src/app/services/websocket/api2.service.ts @@ -0,0 +1,190 @@ +import { Injectable } from '@angular/core'; +import { Router } from '@angular/router'; +import { TranslateService } from '@ngx-translate/core'; +import { UUID } from 'angular2-uuid'; +import { + filter, map, merge, Observable, of, share, startWith, Subject, Subscriber, switchMap, take, takeUntil, throwError, +} from 'rxjs'; +import { ApiErrorName } from 'app/enums/api-error-name.enum'; +import { IncomingApiMessageType } from 'app/enums/api-message-type.enum'; +import { ResponseErrorType } from 'app/enums/response-error-type.enum'; +import { applyApiEvent } from 'app/helpers/operators/apply-api-event.operator'; +import { observeJob } from 'app/helpers/operators/observe-job.operator'; +import { ApiCallAndSubscribeMethod, ApiCallAndSubscribeResponse } from 'app/interfaces/api/api-call-and-subscribe-directory.interface'; +import { ApiCallMethod, ApiCallParams, ApiCallResponse } from 'app/interfaces/api/api-call-directory.interface'; +import { ApiJobMethod, ApiJobParams, ApiJobResponse } from 'app/interfaces/api/api-job-directory.interface'; +import { ApiError } from 'app/interfaces/api-error.interface'; +import { + ApiEvent, ApiEventMethod, ApiEventTyped, IncomingApiMessage, ResultMessage, +} from 'app/interfaces/api-message.interface'; +import { Job } from 'app/interfaces/job.interface'; +import { WebSocketHandlerService } from 'app/services/websocket/websocket-handler.service'; + +@Injectable({ + providedIn: 'root', +}) +export class Api2Service { + private readonly eventSubscribers = new Map>(); + readonly clearSubscriptions$ = new Subject(); + + constructor( + protected router: Router, + protected wsHandler: WebSocketHandlerService, + protected translate: TranslateService, + ) { + this.wsHandler.isConnected$?.subscribe((isConnected) => { + if (!isConnected) { + this.clearSubscriptions(); + } + }); + } + + call(method: M, params?: ApiCallParams): Observable> { + return this.callMethod(method, params); + } + + /** + * For jobs better to use the `selectJob` store selector. + */ + callAndSubscribe( + method: M, + params?: ApiCallParams, + ): Observable[]> { + return this.callMethod(method, params) + .pipe( + switchMap((items) => this.subscribe(method).pipe( + startWith(null), + map((event) => ([items, event])), + )), + applyApiEvent(), + takeUntil(this.clearSubscriptions$), + ); + } + + /** + * Use `job` when you care about job progress or result. + */ + startJob(method: M, params?: ApiJobParams): Observable { + return this.callMethod(method, params); + } + + /** + * In your subscription, next will be next job update, complete will be when the job is complete. + */ + job( + method: M, + params?: ApiJobParams, + ): Observable>> { + return this.callMethod(method, params).pipe( + switchMap((jobId: number) => { + return merge( + this.subscribeToJobUpdates(jobId), + // Get job status here for jobs that complete too fast. + this.call('core.get_jobs', [[['id', '=', jobId]]]).pipe(map((jobs) => jobs[0])), + ) + .pipe(observeJob()); + }), + takeUntil(this.clearSubscriptions$), + ) as Observable>>; + } + + subscribe(method: K | `${K}:${string}`): Observable> { + if (this.eventSubscribers.has(method as K)) { + return this.eventSubscribers.get(method as K); + } + const observable$ = new Observable((trigger: Subscriber>) => { + const subscription = this.wsHandler.buildSubscriber>(method as K).subscribe(trigger); + return () => { + subscription.unsubscribe(); + this.eventSubscribers.delete(method as K); + }; + }).pipe( + switchMap((apiEvent) => { + const erroredEvent = apiEvent as unknown as ResultMessage; + if (erroredEvent?.error) { + console.error('Error: ', erroredEvent.error); + return throwError(() => erroredEvent.error); + } + return of(apiEvent); + }), + share(), + takeUntil(this.clearSubscriptions$), + ); + + this.eventSubscribers.set(method as K, observable$); + return observable$; + } + + subscribeToLogs(name: string): Observable> { + return this.subscribe(name as ApiEventMethod) as unknown as Observable>; + } + + clearSubscriptions(): void { + this.clearSubscriptions$.next(); + this.eventSubscribers.clear(); + } + + private callMethod(method: M, params?: ApiCallParams): Observable>; + private callMethod(method: M, params?: ApiJobParams): Observable; + private callMethod(method: M, params?: unknown): Observable { + const uuid = UUID.UUID(); + return of(uuid).pipe( + switchMap(() => { + performance.mark(`${method} - ${uuid} - start`); + this.wsHandler.scheduleCall({ + id: uuid, msg: IncomingApiMessageType.Method, method, params, + }); + return this.wsHandler.responses$.pipe( + filter((data: IncomingApiMessage) => data.msg === IncomingApiMessageType.Result && data.id === uuid), + ); + }), + switchMap((data: IncomingApiMessage) => { + if ('error' in data && data.error) { + this.printError(data.error, { method, params }); + const error = this.enhanceError(data.error, { method }); + return throwError(() => error); + } + + performance.mark(`${method} - ${uuid} - end`); + performance.measure(method, `${method} - ${uuid} - start`, `${method} - ${uuid} - end`); + return of(data); + }), + + map((data: ResultMessage) => data.result), + take(1), + ); + } + + private subscribeToJobUpdates(jobId: number): Observable { + return this.subscribe('core.get_jobs').pipe( + filter((apiEvent) => apiEvent.id === jobId), + map((apiEvent) => apiEvent.fields), + takeUntil(this.clearSubscriptions$), + ); + } + + private printError(error: ApiError, context: { method: string; params: unknown }): void { + if (error.errname === ApiErrorName.NoAccess) { + console.error(`Access denied to ${context.method} with ${context.params ? JSON.stringify(context.params) : 'no params'}`); + return; + } + + // Do not log validation errors. + if (error.type === ResponseErrorType.Validation) { + return; + } + + console.error('Error: ', error); + } + + // TODO: Probably doesn't belong here. Consider building something similar to interceptors. + private enhanceError(error: ApiError, context: { method: string }): ApiError { + if (error.errname === ApiErrorName.NoAccess) { + return { + ...error, + reason: this.translate.instant('Access denied to {method}', { method: context.method }), + }; + } + return error; + } +} diff --git a/src/app/services/websocket/websocket-connection.class.ts b/src/app/services/websocket/websocket-connection.class.ts new file mode 100644 index 00000000000..6651ff0bb4d --- /dev/null +++ b/src/app/services/websocket/websocket-connection.class.ts @@ -0,0 +1,45 @@ +import { Observable } from 'rxjs'; +import { webSocket as rxjsWebSocket, WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket'; + +export class WebSocketConnection { + private ws$: WebSocketSubject; + private wsAsObservable$: Observable; + get stream$(): Observable { + return this.wsAsObservable$; + } + + constructor( + private webSocket: typeof rxjsWebSocket, + ) { } + + connect(config: WebSocketSubjectConfig): void { + if (this.ws$) { + this.ws$.complete(); + } + + performance.mark(`WS Init. URL: ${config.url}"`); + this.ws$ = this.webSocket(config); + + this.wsAsObservable$ = this.ws$.asObservable(); + } + + send(payload: unknown): void { + this.ws$.next(payload); + } + + close(): void { + this.ws$?.complete(); + } + + event( + subMsg: () => unknown, + unsubMsg: () => unknown, + messageFilter: (value: unknown) => boolean, + ): Observable { + return this.ws$.multiplex( + subMsg, + unsubMsg, + messageFilter, + ) as Observable; + } +} diff --git a/src/app/services/websocket/websocket-handler.service.ts b/src/app/services/websocket/websocket-handler.service.ts new file mode 100644 index 00000000000..694772fb058 --- /dev/null +++ b/src/app/services/websocket/websocket-handler.service.ts @@ -0,0 +1,213 @@ +import { Inject, Injectable } from '@angular/core'; +import { UUID } from 'angular2-uuid'; +import { environment } from 'environments/environment'; +import { + BehaviorSubject, filter, interval, map, mergeMap, NEVER, Observable, of, Subject, switchMap, take, tap, timer, +} from 'rxjs'; +import { webSocket as rxjsWebSocket } from 'rxjs/webSocket'; +import { IncomingApiMessageType, OutgoingApiMessageType } from 'app/enums/api-message-type.enum'; +import { WEBSOCKET } from 'app/helpers/websocket.helper'; +import { WINDOW } from 'app/helpers/window.helper'; +import { ApiEventMethod, ApiEventTyped, IncomingApiMessage } from 'app/interfaces/api-message.interface'; +import { WebSocketConnection } from 'app/services/websocket/websocket-connection.class'; + +@Injectable({ + providedIn: 'root', +}) +export class WebSocketHandlerService { + private readonly wsConnection: WebSocketConnection = new WebSocketConnection(this.webSocket); + private connectionUrl = (this.window.location.protocol === 'https:' ? 'wss://' : 'ws://') + environment.remote + '/websocket'; + + private readonly connectMsgReceived$ = new BehaviorSubject(false); + readonly isConnected$ = this.connectMsgReceived$.asObservable(); + + private readonly pingTimeoutMillis = 20 * 1000; + private readonly reconnectTimeoutMillis = 5 * 1000; + private readonly maxConcurrentCalls = 20; + + private isReconnectScheduled = false; + private shutDownInProgress = false; + + private readonly hasRestrictedError$ = new BehaviorSubject(false); + set isAccessRestricted$(value: boolean) { + this.hasRestrictedError$.next(value); + } + + get isAccessRestricted$(): Observable { + return this.hasRestrictedError$.asObservable(); + } + + private readonly isConnectionLive$ = new BehaviorSubject(false); + get isClosed$(): Observable { + return this.isConnectionLive$.pipe(map((isLive) => !isLive)); + } + + get responses$(): Observable { + return this.wsConnection.stream$ as Observable; + } + + private readonly triggerNextCall$ = new Subject(); + private activeCalls = 0; + private readonly queuedCalls: { id: string; [key: string]: unknown }[] = []; + private readonly pendingCalls = new Map(); + + constructor( + @Inject(WINDOW) protected window: Window, + @Inject(WEBSOCKET) private webSocket: typeof rxjsWebSocket, + ) { + this.setupWebSocket(); + } + + private setupWebSocket(): void { + this.connectWebSocket(); + this.setupSubscriptionUpdates(); + this.setupScheduledCalls(); + this.setupPing(); + } + + private setupScheduledCalls(): void { + this.triggerNextCall$.pipe( + tap(() => { + if (this.activeCalls + 1 < this.maxConcurrentCalls) { + return; + } + this.raiseConcurrentCallsError(); + }), + mergeMap(() => { + return this.queuedCalls.length > 0 ? this.processCall(this.queuedCalls.shift()) : of(null); + }, this.maxConcurrentCalls), + ).subscribe(); + } + + private processCall(call: { id: string; [key: string]: unknown }): Observable { + this.activeCalls++; + this.pendingCalls.set(call.id, call); + this.wsConnection.send(call); + + return this.responses$.pipe( + filter((data: IncomingApiMessage) => data.msg === IncomingApiMessageType.Result && data.id === call.id), + take(1), + tap(() => { + this.activeCalls--; + this.pendingCalls.delete(call.id); + this.triggerNextCall$.next(); + }), + ); + } + + private raiseConcurrentCallsError(): void { + console.error( + 'Max concurrent calls', + JSON.stringify( + [ + ...this.queuedCalls, + ...(this.pendingCalls.values()), + ].map((call: { id: string; method: string }) => call.method), + ), + ); + if (!environment.production) { + throw new Error( + `Max concurrent calls limit reached. + There are more than 20 calls queued. + See queued calls in the browser's console logs`, + ); + } + } + + private connectWebSocket(): void { + this.wsConnection.connect({ + url: this.connectionUrl, + openObserver: { + next: this.onOpen.bind(this), + }, + closeObserver: { + next: this.onClose.bind(this), + }, + }); + } + + private setupSubscriptionUpdates(): void { + this.wsConnection.stream$.pipe( + tap((response: IncomingApiMessage) => { + if (response.msg === IncomingApiMessageType.Connected) { + performance.mark('WS Connected'); + performance.measure('Establishing WS connection', 'WS Init', 'WS Connected'); + this.connectMsgReceived$.next(true); + } + }), + ).subscribe(); + } + + private setupPing(): void { + this.isConnected$.pipe( + switchMap((isConnected) => { + if (!isConnected) { + return NEVER; + } + + return interval(this.pingTimeoutMillis); + }), + ).subscribe(() => { + this.wsConnection.send({ msg: OutgoingApiMessageType.Ping, id: UUID.UUID() }); + }); + } + + private sendConnectMessage(): void { + this.wsConnection.send({ + msg: OutgoingApiMessageType.Connect, + version: '1', + support: ['1'], + }); + } + + private onClose(event: CloseEvent): void { + if (this.isReconnectScheduled) { + return; + } + this.isReconnectScheduled = true; + this.connectMsgReceived$.next(false); + this.isConnectionLive$.next(false); + if (event.code === 1008) { + this.isAccessRestricted$ = true; + } else { + this.reconnect(); + } + } + + private reconnect(): void { + this.isReconnectScheduled = true; + timer(this.reconnectTimeoutMillis).subscribe({ + next: () => { + this.isReconnectScheduled = false; + this.setupWebSocket(); + }, + }); + } + + private onOpen(): void { + if (this.isReconnectScheduled) { + this.wsConnection.close(); + return; + } + this.shutDownInProgress = false; + this.sendConnectMessage(); + } + + scheduleCall(payload: { id: string; [key: string]: unknown }): void { + this.queuedCalls.push(payload); + this.triggerNextCall$.next(); + } + + buildSubscriber>(name: K): Observable { + const id = UUID.UUID(); + return this.wsConnection.event( + () => ({ id, name, msg: OutgoingApiMessageType.Sub }), + () => ({ id, msg: OutgoingApiMessageType.UnSub }), + (message: R) => (message.collection === name && message.msg !== IncomingApiMessageType.NoSub), + ); + } + + prepareShutdown(): void { + this.shutDownInProgress = true; + } +}