Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #140 and #139 by adding an Inward/Outward mechanism for remote call #141

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions lib/http.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ exports.listen = function (options, transportUtil) {
var listener
var listenAttempts = 0
var listen_details = _.clone(msg)
var pins = transportUtil.resolve_pins(msg)

server.on('request', function (req, res) {
internals.timeout(listenOptions, req, res)
Expand All @@ -34,7 +35,7 @@ exports.listen = function (options, transportUtil) {
return res.end()
}

internals.trackHeaders(listenOptions, seneca, transportUtil, req, res)
internals.trackHeaders(listenOptions, seneca, transportUtil, req, res, pins)
})
})

Expand Down Expand Up @@ -200,7 +201,7 @@ internals.setBody = function (seneca, transportUtil, req, res, next) {
})
}

internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, res) {
internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, res, pins) {
if (req.url.indexOf(listenOptions.path) !== 0) {
return
}
Expand Down Expand Up @@ -232,6 +233,17 @@ internals.trackHeaders = function (listenOptions, seneca, transportUtil, req, re
}
}

// Issue: #97
if (pins) {
// any topic is not pinned, verifying the calling one is pinned
var argspatrun = transportUtil.make_argspatrun(pins)
if (!argspatrun.find(req.body)) {
data.error = transportUtil.error('not_pinned', { act: req.body })
data.error.statusCode = 404
return internals.sendResponse(seneca, transportUtil, res, data, {})
}
}

transportUtil.handle_request(seneca, data, listenOptions, function (out) {
internals.sendResponse(seneca, transportUtil, res, out, data)
})
Expand All @@ -241,14 +253,14 @@ internals.sendResponse = function (seneca, transportUtil, res, out, data) {
var outJson = 'null'
var httpcode = 200

if (out && out.res) {
httpcode = out.res.statusCode || httpcode
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.res)
}
else if (out && out.error) {
if (out && out.error) {
httpcode = out.error.statusCode || 500
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.error)
}
else if (out && out.res) {
httpcode = out.res.statusCode || httpcode
outJson = transportUtil.stringifyJSON(seneca, 'listen-web', out.res)
}

var headers = {
'Content-Type': 'application/json',
Expand Down
17 changes: 16 additions & 1 deletion lib/tcp.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ exports.listen = function (options, transportUtil) {
var connections = []
var listenAttempts = 0

var pins = transportUtil.resolve_pins(args)

var listener = Net.createServer(function (connection) {
seneca.log.debug('listen', 'connection', listenOptions,
'remote', connection.remoteAddress, connection.remotePort)
Expand All @@ -40,7 +42,20 @@ exports.listen = function (options, transportUtil) {
return
}

transportUtil.handle_request(seneca, data, options, function (out) {
// Issue: #97
if (pins) {
// any topic is not pinned, verifying the calling one is pinned
var argspatrun = transportUtil.make_argspatrun(pins)
if (!argspatrun.find(data.act)) {
out = transportUtil.prepareResponse(seneca, data)
out.error = transportUtil.error('not_pinned', data)

stringifier.write(out)
return
}
}

transportUtil.handle_request(seneca, data, listenOptions, function (out) {
if (out === null || !out.sync) {
return
}
Expand Down
43 changes: 36 additions & 7 deletions lib/transport-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ var internals = {
'message_loop': 'Inbound message rejected as looping back to this server.',
'data_error': 'Inbound message included an error description.',
'invalid_json': 'Invalid JSON: <%=input%>.',
'unexcepted_async_error': 'Unexcepted error response to asynchronous message.'
'unexcepted_async_error': 'Unexcepted error response to asynchronous message.',
'not_pinned': 'Inbound message <%=act%> rejected as not pinned'
},
override: true
})
Expand Down Expand Up @@ -98,7 +99,6 @@ internals.Utils.prototype.handle_response = function (seneca, data, client_optio
return false
}


var actinfo = {
id: data.id,
accept: data.accept,
Expand Down Expand Up @@ -233,14 +233,34 @@ internals.Utils.prototype.handle_request = function (seneca, data, listen_option

input.id$ = data.id

this.requestAct(seneca, input, output, respond)
this.requestAct(seneca, input, output, respond, listen_options.inward, listen_options.outward)
}

internals.Utils.prototype.requestAct = function (seneca, input, output, respond) {
internals.Utils.prototype.requestAct = function (seneca, input, output, respond, inward, outward) {
var self = this

try {
if (inward) {
inward({ seneca }, { msg: input })
}
seneca.act(input, function (err, out) {
if (outward) {
try {
var outward_data = {
err: err,
msg: input,
res: out
}
outward({ seneca }, outward_data)
err = outward_data.err
out = outward_data.res
}
catch (e) {
// outward failed, keep a trace of the original err if there was one
e.act_error = err
err = e
}
}
self.update_output(input, output, err, out)
respond(output)
})
Expand Down Expand Up @@ -483,16 +503,25 @@ internals.Utils.prototype.handle_entity = function (seneca, raw) {

raw = _.isObject(raw) ? raw : {}

// FIX #135 - entity is now optional in seneca, entity sent remotely may arrive
// in a seneca context that does not support entity.
// Don't try to "make" them as it is not supported and lead to:
// TypeError: seneca.make$ is not a function
function make (entity) {
if (seneca.make$) return seneca.make$(entity)
else seneca.log.warn('[', entity, '] cannot be used as entity. seneca-entity plugin is missing.')
return entity
}

if (raw.entity$) {
return seneca.make$(raw)
return make(raw)
}

_.each(raw, function (value, key) {
if (_.isObject(value) && value.entity$) {
raw[key] = seneca.make$(value)
raw[key] = make(value)
}
})

return raw
}

Expand Down
32 changes: 32 additions & 0 deletions test/entity.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,36 @@ describe('Transporting Entities', function () {
})
})
})

it('supports client that are not using entity (#135)', function (done) {
var server = CreateInstance()

if (server.version >= '2.0.0') {
server.use(Entity)
}

server.ready(function () {
server.add({cmd: 'test'}, function (args, cb) {
let entity = this.make$('test').data$(args.entity)
entity.save$(function (err, entitySaveResponse) {
if (err) return cb(err)
cb(null, entitySaveResponse)
})
})
.add({role: 'entity', cmd: 'save'}, function (args, cb) {
cb(null, { entity: args.ent, tx: args.tx$ })
})
.listen({ type: 'tcp', port: 20105 })

var client = CreateInstance()
client.client({ type: 'tcp', port: 20105 })
client.ready(function () {
this.act({cmd: 'test', entity: { name: 'bar' }}, function (err, res) {
Assert(!err)
Assert(res.entity.name === 'bar')
done()
})
})
})
})
})
101 changes: 100 additions & 1 deletion test/http.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ describe('Specific http', function () {
})
})


it('http-query', function (fin) {
CreateInstance({errhandler: fin})
.add('a:1', function (args, done) {
Expand Down Expand Up @@ -193,6 +192,106 @@ describe('Specific http', function () {
})
})
})

it('apply inward/outward listen options on HTTP remote act call', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(null, { BAR: args.bar })
})
.listen({type: 'http', port: '18997',
inward: (context, data) => {
data.msg.bar += 1
data.msg.inward = 'INPUT UPGRADED'
},
outward: (context, data) => {
data.res.BAR += 10
data.res.inward = data.msg.inward
data.res.outward = 'OUTPUT UPGRADED'
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18997'})

siClient.act('foo:1,bar:2', function (err, out) {
if (err) return fin(err)
Assert.equal(out.BAR, 13)
Assert.equal(out.inward, 'INPUT UPGRADED')
Assert.equal(out.outward, 'OUTPUT UPGRADED')
fin()
})
})
})

it('reject HTTP remote act call in inward listen option', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(null, { BAR: args.bar })
})
.listen({type: 'http', port: '18996',
inward: (context, data) => {
var e = new Error('HTTP inward rejected!')
e.error_code = 'inward_rejected'
throw e
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18996'})

siClient.act('foo:1,bar:2', function (err, out) {
Assert.equal(err.error_code, 'inward_rejected')
if (err) return fin()
fin(new Error('Inward does not reject remote call'))
})
})
})

it('reject HTTP remote act call in outward listen option', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(null, { BAR: args.bar })
})
.listen({type: 'http', port: '18995',
outward: (context, data) => {
var e = new Error('HTTP outward rejected!')
e.error_code = 'outward_rejected'
throw e
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18995'})

siClient.act('foo:1,bar:2', function (err, out) {
Assert.equal(err.error_code, 'outward_rejected')
if (err) return fin()
fin(new Error('Outward does not reject remote call'))
})
})
})

it('catch HTTP remote act call error in outward listen option', function (fin) {
CreateInstance()
.add('foo:1', function (args, done) {
done(new Error('Catchable failure'), {BAR: args.bar})
})
.listen({type: 'http', port: '18994',
outward: (context, data) => {
delete data.err
}
})
.ready(function () {
var siClient = CreateInstance()
.client({type: 'http', port: '18994'})

siClient.act('foo:1,bar:2', function (err, out) {
if (err) return fin(err)
Assert.equal(out.BAR, 2)
fin()
})
})
})
})

describe('Specific https', function () {
Expand Down
Loading