diff --git a/README.md b/README.md index 9c39904..9ea20f2 100644 --- a/README.md +++ b/README.md @@ -16,42 +16,43 @@ Javascript client (for browser and node.js) ## Table of Contents -- [Wampy.js](#wampyjs) - - [About](#about) - - [Table of Contents](#table-of-contents) - - [Description](#description) - - [Usage example](#usage-example) - - [Installation](#installation) - - [Exported components](#exported-components) - - [CLI tool](#cli-tool) - - [Migrating or Updating versions](#migrating-or-updating-versions) - - [API](#api) - - [Constructor(\[url\[, options\]\])](#constructorurl-options) - - [options(\[opts\])](#optionsopts) - - [getOptions()](#getoptions) - - [setOptions(\[newOptions\])](#setoptionsnewoptions) - - [getOpStatus()](#getopstatus) - - [getSessionId()](#getsessionid) - - [connect(\[url\])](#connecturl) - - [disconnect()](#disconnect) - - [abort()](#abort) - - [Ticket-based Authentication](#ticket-based-authentication) - - [Challenge Response Authentication](#challenge-response-authentication) - - [Cryptosign-based Authentication](#cryptosign-based-authentication) - - [Automatically chosen Authentication](#automatically-chosen-authentication) - - [subscribe(topicURI, onEvent\[, advancedOptions\])](#subscribetopicuri-onevent-advancedoptions) - - [unsubscribe(subscriptionIdKey\[, onEvent\])](#unsubscribesubscriptionidkey-onevent) - - [publish(topicURI\[, payload\[, advancedOptions\]\])](#publishtopicuri-payload-advancedoptions) - - [call(topicURI\[, payload\[, advancedOptions\]\])](#calltopicuri-payload-advancedoptions) - - [cancel(reqId\[, advancedOptions\])](#cancelreqid-advancedoptions) - - [register(topicURI, rpc\[, advancedOptions\])](#registertopicuri-rpc-advancedoptions) - - [unregister(topicURI)](#unregistertopicuri) - - [Error handling](#error-handling) - - [Using custom serializer](#using-custom-serializer) - - [Connecting through TLS in node environment](#connecting-through-tls-in-node-environment) - - [Tests and code coverage](#tests-and-code-coverage) - - [Copyright and License](#copyright-and-license) - - [See Also](#see-also) +* [Wampy.js](#wampyjs) + * [About](#about) + * [Table of Contents](#table-of-contents) + * [Description](#description) + * [Usage example](#usage-example) + * [Installation](#installation) + * [Exported components](#exported-components) + * [CLI tool](#cli-tool) + * [Migrating or Updating versions](#migrating-or-updating-versions) + * [API](#api) + * [Constructor([url[, options]])](#constructorurl-options) + * [options([opts])](#optionsopts) + * [getOptions()](#getoptions) + * [setOptions([newOptions])](#setoptionsnewoptions) + * [getOpStatus()](#getopstatus) + * [getSessionId()](#getsessionid) + * [connect([url])](#connecturl) + * [disconnect()](#disconnect) + * [abort()](#abort) + * [Ticket-based Authentication](#ticket-based-authentication) + * [Challenge Response Authentication](#challenge-response-authentication) + * [Cryptosign-based Authentication](#cryptosign-based-authentication) + * [Automatically chosen Authentication](#automatically-chosen-authentication) + * [subscribe(topicURI, onEvent[, advancedOptions])](#subscribetopicuri-onevent-advancedoptions) + * [unsubscribe(subscriptionIdKey[, onEvent])](#unsubscribesubscriptionidkey-onevent) + * [publish(topicURI[, payload[, advancedOptions]])](#publishtopicuri-payload-advancedoptions) + * [call(topicURI[, payload[, advancedOptions]])](#calltopicuri-payload-advancedoptions) + * [progressiveCall(topicURI[, payload[, advancedOptions]])](#progressivecalltopicuri-payload-advancedoptions) + * [cancel(reqId[, advancedOptions])](#cancelreqid-advancedoptions) + * [register(topicURI, rpc[, advancedOptions])](#registertopicuri-rpc-advancedoptions) + * [unregister(topicURI)](#unregistertopicuri) + * [Error handling](#error-handling) + * [Using custom serializer](#using-custom-serializer) + * [Connecting through TLS in node environment](#connecting-through-tls-in-node-environment) + * [Tests and code coverage](#tests-and-code-coverage) + * [Copyright and License](#copyright-and-license) + * [See Also](#see-also) ## Description @@ -82,12 +83,15 @@ Wampy.js supports the following WAMP roles and features: - event retention - caller: - caller identification + - progressive call invocations - progressive call results - call canceling - call timeout - payload passthru mode - callee: - caller identification + - progressive call invocations + - progressive call results - call trust levels - pattern-based registration - shared registration @@ -98,7 +102,7 @@ Wampy supports the following serializers: - JSON (default, native) - MsgPack (See [MessagePack][] Site for more info) - CBOR (See [CBOR][] Site for more info) -- Any new serializer can be added easily +- Any new serializer can be easily added In node.js environment Wampy is compatible with the following websocket clients: @@ -325,7 +329,8 @@ wampy = new Wampy({ ### options([opts]) -.options() method is now deprecated, so this is here only for documentation purposes. Please use `getOptions()/setOptions()` instead. +.options() method is now deprecated, so this is here only for documentation purposes. Please +use `getOptions()/setOptions()` instead. .options() can be called in two forms: -- without parameters it will behave the same as new method [getOptions()](#getoptions) @@ -920,6 +925,60 @@ try { [Back to Table of Contents](#table-of-contents) +### progressiveCall(topicURI[, payload[, advancedOptions]]) + +Make an RPC progressive invocation call to topicURI. + +Input parameters to this function are the same as described in the [call()](#calltopicuri-payload-advancedoptions) +above. The difference is in the result. + +Returns an object containing the result promise and the `sendData` function +which is supposed to be used to send additional data chunks in bounds of the +initiated remote procedure call: + +- **result**. A promise that resolves to the result of the RPC call. +- **sendData**. A function to send additional data to the ongoing RPC call. + This function has signature: `([payload], [advancedOptions])`: + - **payload**. RPC data. Optional. Same as in [call()](#calltopicuri-payload-advancedoptions). + - **advancedOptions**. Optional parameters hash table with properties: + - **progress**. Bool. Flag, indicating the ongoing (true) or final (false) call invocation. + If this parameter is omitted - it is treated as TRUE, meaning the + intermediate ongoing call invocation. For the final call invocation + this flag must be passed and set to FALSE. In other case the call + invocation wil never end. + - function returns nothing, but exception maybe thrown in case of error. + +```javascript +let pc; +try { + pc = ws.progressiveCall('sum.numbers', 1); +} catch (e) { + console.log('RPC call failed!', e.error); +} + +// Somewhere where you have additional data to be send to the procedure +try { + pc.sendData(2); + pc.sendData(3); + pc.sendData(4); + // This is final data chunk, so the Callee can understand that + // no more input data will be send and it can finish its + // calculations and send the result. + // Note that nothing stops the Callee to send the + // intermediate results if that makes sense. + pc.sendData(5, { progress: false }); +} catch (e) { + console.log('RPC call send data failed!', e.error); +} + + +// And here we are waiting for the final call result +const res = await pc.result +console.log("Res:", res); // Res: 15 +``` + +[Back to Table of Contents](#table-of-contents) + ### cancel(reqId[, advancedOptions]) RPC invocation cancelling. diff --git a/src/wampy.js b/src/wampy.js index 21fc240..da7efb4 100644 --- a/src/wampy.js +++ b/src/wampy.js @@ -14,11 +14,12 @@ * */ -import { WAMP_MSG_SPEC, WAMP_ERROR_MSG, E2EE_SERIALIZERS, SUCCESS } from './constants.js'; +import { E2EE_SERIALIZERS, SUCCESS, WAMP_ERROR_MSG, WAMP_MSG_SPEC } from './constants.js'; import * as Errors from './errors.js'; -import { getWebSocket, getNewPromise } from './utils.js'; -import { JsonSerializer } from './serializers/json-serializer.js'; import { WebsocketError } from './errors.js'; +import { getNewPromise, getWebSocket } from './utils.js'; +import { JsonSerializer } from './serializers/json-serializer.js'; + const jsonSerializer = new JsonSerializer(); /** @@ -1947,29 +1948,44 @@ class Wampy { } /** - * Remote Procedure Call - * @param {string} topic - a topic URI to be called - * @param {string|number|Array|object} [payload] - can be either a value of any type or null. Also, it - * is possible to pass array and object-like data simultaneously. - * In this case pass a hash-table with next attributes: - * { - * argsList: array payload (may be omitted) - * argsDict: object payload (may be omitted) - * } - * @param {object} [advancedOptions] - optional parameter. Must include any or all of the options: - * { disclose_me: bool flag of disclosure of Caller identity (WAMP session ID) - * to endpoints of a routed call - * progress_callback: function for handling progressive call results - * timeout: integer timeout (in ms) for the call to finish - * ppt_scheme: string Identifies the Payload Schema - * ppt_serializer: string Specifies what serializer was used to encode the payload - * ppt_cipher: string Specifies the cryptographic algorithm that was used to encrypt - * the payload - * ppt_keyid: string Contains the encryption key id that was used to encrypt the payload - * } - * @returns {Promise} + * Process CALL advanced options and transform them for the WAMP CALL message Options + * + * @param {object} advancedOptions + * @private + * @returns {object} */ - async call (topic, payload, advancedOptions) { + _getCallMessageOptionsFromAdvancedOptions(advancedOptions) { + const { + timeout, + progress, + progress_callback, + disclose_me, + ppt_scheme, + ppt_serializer, + ppt_cipher, + ppt_keyid + } = advancedOptions || {}; + + return { + ...(progress_callback ? { receive_progress: true } : {}), + ...(progress ? { progress: true } : {}), + ...(disclose_me ? { disclose_me: true } : {}), + ...(timeout ? { timeout } : {}), + ...(ppt_scheme ? { ppt_scheme } : {}), + ...(ppt_serializer ? { ppt_serializer } : {}), + ...(ppt_cipher ? { ppt_cipher } : {}), + ...(ppt_keyid ? { ppt_keyid } : {}), + }; + } + + /** + * Remote Procedure Call Internal Implementation + * @param {string} topic - same as in call method + * @param {string|number|Array|object} [payload] - same as in call method + * @param {object} [advancedOptions] - same as in call method + * @returns {number} Request ID + */ + _callInternal(topic, payload, advancedOptions) { if (!this._preReqChecks({ topic, patternBased: false, allowWAMP: true }, 'dealer')) { throw this._cache.opStatus.error; } @@ -1980,7 +1996,7 @@ class Wampy { throw invalidParamError; } - const { timeout, progress_callback, disclose_me } = advancedOptions || {}; + const { timeout, progress_callback, ppt_scheme } = advancedOptions || {}; const isTimeoutInvalid = (timeout && typeof timeout !== 'number'); const isProgressCallbackInvalid = (progress_callback && typeof progress_callback !== 'function'); @@ -1991,8 +2007,6 @@ class Wampy { throw invalidParamError; } - const { ppt_scheme, ppt_serializer, ppt_cipher, ppt_keyid } = advancedOptions || {}; - // Check and handle Payload PassThru Mode // @see https://wamp-proto.org/wamp_latest_ietf.html#name-payload-passthru-mode if (ppt_scheme && !this._checkPPTOptions('dealer', advancedOptions)) { @@ -2004,15 +2018,7 @@ class Wampy { reqId = this._getReqId(); } while (reqId in this._calls); - const messageOptions = { - ...(progress_callback ? { receive_progress: true } : {}), - ...(disclose_me ? { disclose_me: true } : {}), - ...(timeout ? { timeout } : {}), - ...(ppt_scheme ? { ppt_scheme } : {}), - ...(ppt_serializer ? { ppt_serializer } : {}), - ...(ppt_cipher ? { ppt_cipher } : {}), - ...(ppt_keyid ? { ppt_keyid } : {}), - }; + const messageOptions = this._getCallMessageOptionsFromAdvancedOptions(advancedOptions); const { err, payloadItems } = payload ? this._packPPTPayload(payload, messageOptions) : {}; @@ -2029,9 +2035,136 @@ class Wampy { this._calls[reqId].onProgress = progress_callback; } + return reqId; + } + + /** + * Remote Procedure Call + * @param {string} topic - a topic URI to be called + * @param {string|number|Array|object} [payload] - can be either a value of any type or null. Also, it + * is possible to pass array and object-like data simultaneously. + * In this case pass a hash-table with next attributes: + * { + * argsList: array payload (may be omitted) + * argsDict: object payload (may be omitted) + * } + * @param {object} [advancedOptions] - optional parameter. Must include any or all of the options: + * { disclose_me: bool flag of disclosure of Caller identity (WAMP session ID) + * to endpoints of a routed call + * progress_callback: function for handling progressive call results + * timeout: integer timeout (in ms) for the call to finish + * ppt_scheme: string Identifies the Payload Schema + * ppt_serializer: string Specifies what serializer was used to encode the payload + * ppt_cipher: string Specifies the cryptographic algorithm that was used to encrypt + * the payload + * ppt_keyid: string Contains the encryption key id that was used to encrypt the payload + * } + * @returns {Promise} + */ + async call (topic, payload, advancedOptions) { + const reqId = this._callInternal(topic, payload, advancedOptions); return this._calls[reqId].promise; } + /** + * @typedef {function} ProgressiveCallSendData + * @param {string|number|Array|object} [payload] - can be either a value of any type or null. Also, it + * is possible to pass array and object-like data simultaneously. + * In this case pass a hash-table with next attributes: + * { + * argsList: array payload (maybe omitted) + * argsDict: object payload (maybe omitted) + * } + * @param {object} [advancedOptions] - optional parameter - Must include next options: + * { + * progress: bool flag, indicating the ongoing (true) or final (false) call invocation. + * If this parameter is omitted - it is treated as TRUE, meaning the + * intermediate ongoing call invocation. For the final call invocation + * this flag must be passed and set to FALSE. In other case the call + * invocation wil never end. + * } + */ + /** + * @typedef {Object} ProgressiveCallReturn + * @property {Promise} result - A promise that resolves to the result of the RPC call. + * @property {ProgressiveCallSendData} sendData - A function to send additional data to the ongoing RPC call. + */ + /** + * Remote Procedure Progressive Call + * + * You can send additional input data which won't be treated as a new independent but instead + * will be transferred as another input data chunk to the same remote procedure call. Of course + * Callee and Dealer should support the "progressive_call_invocations" feature as well. + * + * @param {string} topic - a topic URI to be called + * @param {string|number|Array|object} [payload] - can be either a value of any type or null. Also, it + * is possible to pass array and object-like data simultaneously. + * In this case pass a hash-table with next attributes: + * { + * argsList: array payload (maybe omitted) + * argsDict: object payload (maybe omitted) + * } + * @param {object} [advancedOptions] - optional parameter. Must include any or all of the options: + * { disclose_me: bool flag of disclosure of Caller identity (WAMP session ID) + * to endpoints of a routed call + * progress_callback: function for handling progressive call results + * timeout: integer timeout (in ms) for the call to finish + * ppt_scheme: string Identifies the Payload Schema + * ppt_serializer: string Specifies what serializer was used to encode the payload + * ppt_cipher: string Specifies the cryptographic algorithm that was used to encrypt + * the payload + * ppt_keyid: string Contains the encryption key id that was used to encrypt the payload + * } + * @returns {ProgressiveCallReturn} - An object containing the result promise and the sendData function. + */ + progressiveCall (topic, payload, advancedOptions) { + if (!this._checkRouterFeature('dealer', 'progressive_call_invocations')) { + throw this._cache.opStatus.error; + } + + advancedOptions = advancedOptions || {}; + advancedOptions.progress = true; // Implicitly set the progress flag before making the first call + const reqId = this._callInternal(topic, payload, advancedOptions); + + const messageOptions = this._getCallMessageOptionsFromAdvancedOptions(advancedOptions); + + // Now we need to construct the function tha client may call to pass another input data chunk + const cb = (payload, advancedOptions) => { + if (advancedOptions && !this._isPlainObject(advancedOptions)) { + const invalidParamError = new Errors.InvalidParamError('advancedOptions'); + this._fillOpStatusByError(invalidParamError); + throw invalidParamError; + } + + const msgOpt = messageOptions; + const { progress } = advancedOptions || {}; + if (progress !== undefined) { + if (typeof progress === 'boolean') { + msgOpt.progress = progress; + } else { + const invalidParamError = new Errors.InvalidParamError('progress'); + this._fillOpStatusByError(invalidParamError); + throw invalidParamError; + } + } + + const { err, payloadItems } = payload ? this._packPPTPayload(payload, messageOptions) : {}; + + if (err) { + throw this._cache.opStatus.error; + } + + // WAMP SPEC: [CALL, Request|id, Options|dict, Procedure|uri, (Arguments|list, ArgumentsKw|dict)] + this._send([WAMP_MSG_SPEC.CALL, reqId, messageOptions, topic, ...(payloadItems || [])]); + this._cache.opStatus = { ...SUCCESS, reqId }; + } + + return { + result: this._calls[reqId].promise, + sendData: cb + }; + } + /** * RPC invocation cancelling * diff --git a/test/send-data.js b/test/send-data.js index 7d1935d..6a5d3ce 100644 --- a/test/send-data.js +++ b/test/send-data.js @@ -1678,7 +1678,7 @@ const WAMP_MSG_SPEC = { WAMP_MSG_SPEC.RESULT, 'RequestId', { progress: true }, - [50] + [555] ], from: [1], to: [1] @@ -1703,6 +1703,126 @@ const WAMP_MSG_SPEC = { from: [1], to: [1] }, + // disallows progressive call invocations if dealer does not support it + { + data: [ + WAMP_MSG_SPEC.GOODBYE, + {}, + 'wamp.error.goodbye_and_out' + ] + }, + { + data: [ + WAMP_MSG_SPEC.WELCOME, + 7, + { + agent: 'Wampy.js test suite', + roles: { + dealer: { + features: { + caller_identification : true, + progressive_call_results: true, + call_timeout : true, + payload_passthru_mode : true + } + } + } + } + ] + }, + // allows progressive call invocations + { + data: [ + WAMP_MSG_SPEC.GOODBYE, + {}, + 'wamp.error.goodbye_and_out' + ] + }, + { + data: [ + WAMP_MSG_SPEC.WELCOME, + 7, + { + agent: 'Wampy.js test suite', + roles: { + dealer: { + features: { + caller_identification : true, + progressive_call_results : true, + progressive_call_invocations: true, + call_canceling : true, + call_timeout : true, + payload_passthru_mode : true + } + } + } + } + ] + }, + { + data: null, + silent: true + }, + { + data: null, + silent: true + }, + { + data: [ + WAMP_MSG_SPEC.RESULT, + 'RequestId', + {}, + [100] + ], + from: [1], + to: [1] + }, + // allows progressive call invocations with progressive result receiving + { + data: [ + WAMP_MSG_SPEC.RESULT, + 'RequestId', + { progress: true }, + [1] + ], + from: [1], + to: [1], + }, + { + data: [ + WAMP_MSG_SPEC.RESULT, + 'RequestId', + { progress: true }, + [2] + ], + from: [1], + to: [1], + }, + { + data: [ + WAMP_MSG_SPEC.RESULT, + 'RequestId', + {}, + [3] + ], + from: [1], + to: [1] + }, + // checks for advanced options during progressive call invocations + { + data: null, + silent: true + }, + { + data: [ + WAMP_MSG_SPEC.RESULT, + 'RequestId', + {}, + [100] + ], + from: [1], + to: [1] + }, // allows to invoke asynchronous RPC without value { data: [ diff --git a/test/wampy-cli-test.js b/test/wampy-cli-test.js index 0104ea1..b48bf48 100644 --- a/test/wampy-cli-test.js +++ b/test/wampy-cli-test.js @@ -27,7 +27,7 @@ describe('Wampy CLI test suite', function () { after(function () { // Dirty hack 'cause there is no legal way of mocking external dependency let data = readFileSync(replaceFileName, { encoding: 'utf8' }); - data = data.replace('import { FakeWampyMock as Wampy } from \'../test/fakeWampyMock.js\';', 'import { Wampy } from \'../src/wampy.js\';'); + data = data.replace('import { FakeWampyMock as Wampy } from \'../test/fake-wampy-mock.js\';', 'import { Wampy } from \'../src/wampy.js\';'); writeFileSync(replaceFileName, data, { encoding: 'utf8' }); }); diff --git a/test/wampy-common-test.js b/test/wampy-common-test.js index 88d01ab..720cba8 100644 --- a/test/wampy-common-test.js +++ b/test/wampy-common-test.js @@ -1999,6 +1999,7 @@ for (const item of serializers) { expect(progress).to.be.true; }); + it('checks options during canceling RPC invocation', function (done) { let reqId; // eslint-disable-line prefer-const @@ -2051,6 +2052,100 @@ for (const item of serializers) { return wampy.unregister('register.rpc1'); }); + it('disallows progressive call invocations if dealer does not support it', function(done) { + wampy.setOptions({ + onClose: function () { + setTimeout(async function () { + await wampy.connect(); + + try { + wampy.progressiveCall('call.rpc9', 'payload'); + } catch (e) { + expect(e).to.be.instanceOf(Errors.FeatureNotSupportedError); + done(); + } + }, 1); + } + }).disconnect(); + }); + + it('allows progressive call invocations', function(done) { + wampy.setOptions({ + onClose: function () { + wampy.setOptions({ onClose: null }); + setTimeout(async function () { + await wampy.connect(); + + const { sendData, result } = wampy.progressiveCall('call.rpc.progressive', 1); + + try { + sendData(50); + sendData(100, { progress: false }); + } catch (e) { + done(e); + } + + const res = await result; + expect(res).to.be.an('object'); + expect(res.argsList).to.be.an('array'); + expect(res.argsDict).to.be.undefined; + expect(res.argsList[0]).to.be.equal(100); + done(); + + }, 1); + } + }).disconnect(); + }); + + it('allows progressive call invocations with progressive result receiving', async function () { + let resArg = 1; + + const { sendData, result } = + wampy.progressiveCall('call.rpc.progressive', 1, { + progress_callback: function (e) { + expect(e).to.be.an('object'); + expect(e.argsList).to.be.an('array'); + expect(e.argsDict).to.be.undefined; + expect(e.argsList[0]).to.be.equal(resArg++); + } + }); + + sendData(50); + sendData(100, { progress: false }); + + const res = await result; + expect(res).to.be.an('object'); + expect(res.argsList).to.be.an('array'); + expect(res.argsDict).to.be.undefined; + expect(res.argsList[0]).to.be.equal(3); + }); + + it('checks for advanced options during progressive call invocations', async function () { + let resArg = 1; + + const { sendData, result } = wampy.progressiveCall('call.rpc.progressive', 1); + + try { + sendData(2, 'invalid_options'); + } catch (e) { + expect(e).to.be.instanceOf(Errors.InvalidParamError); + } + + try { + sendData(2, { progress: 'string instead of boolean'}); + } catch (e) { + expect(e).to.be.instanceOf(Errors.InvalidParamError); + } + + sendData(100, { progress: false }); + + const res = await result; + expect(res).to.be.an('object'); + expect(res.argsList).to.be.an('array'); + expect(res.argsDict).to.be.undefined; + expect(res.argsList[0]).to.be.equal(100); + }); + it('allows to invoke asynchronous RPC without value', async function () { await wampy.register('register.rpc3', function (e) { return new Promise(function (resolve, reject) {