Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Abortable and timeouts #478

Closed
72 changes: 60 additions & 12 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ class Context {
}
}

let getClientAbortable = null;
class Client extends EventEmitter {
#transport;
#abortable;
#streamId;

constructor(transport) {
constructor(transport, abortable) {
super();
this.#transport = transport;
this.#abortable = abortable;
this.#streamId = 0;
this.ip = transport.ip;
this.session = null;
Expand All @@ -63,6 +66,19 @@ class Client extends EventEmitter {
this.destroy();
transport.server.clients.delete(this);
});
transport.once('timeout', () => {
abortable.abort('Request Timeout');
});
abortable.once('aborted', (error) => {
if (transport instanceof HttpTransport) {
error.code = error.httpCode = 408;
this.error(error.code, { error });
}
});
}

static {
getClientAbortable = (client) => client.#abortable;
}

error(code, options) {
Expand Down Expand Up @@ -139,6 +155,14 @@ class Client extends EventEmitter {
return true;
}

get aborted() {
return this.#abortable.aborted;
}

throwIfAborted() {
this.#abortable.throwIfAborted();
}

close() {
this.#transport.close();
}
Expand Down Expand Up @@ -171,12 +195,13 @@ class Server {
init() {
const { application, balancer, options } = this;
const { protocol, nagle = true, key, cert, SNICallback } = options;
const { timeouts } = options;
const proto = protocol === 'http' || balancer ? http : https;
const opt = { key, cert, noDelay: !nagle, SNICallback };
this.httpServer = proto.createServer(opt);

this.httpServer.on('request', async (req, res) => {
const transport = new HttpTransport(this, req, res);
const transport = new HttpTransport(this, req, res, timeouts.request);
const api = req.url.startsWith('/api');
if (!api && !(balancer && req.url === '/')) {
if (application.static.constructor.name !== 'Static') return;
Expand All @@ -185,7 +210,8 @@ class Server {
if (balancer) this.balancing(transport);
if (res.writableEnded) return;

const client = new Client(transport);
const abortable = new metautil.Abortable();
const client = new Client(transport, abortable);
const data = await metautil.receiveBody(req).catch(() => null);

if (req.url === '/api') {
Expand All @@ -201,7 +227,8 @@ class Server {

this.wsServer.on('connection', (connection, req) => {
const transport = new WsTransport(this, req, connection);
const client = new Client(transport);
const abortable = new metautil.Abortable();
const client = new Client(transport, abortable);

connection.on('message', (data, isBinary) => {
if (isBinary) this.binary(client, new Uint8Array(data));
Expand Down Expand Up @@ -241,8 +268,10 @@ class Server {
}
const packet = metautil.jsonParse(data) || {};
const { id, type, method } = packet;
if (type === 'call' && id && method) return void this.rpc(client, packet);
else if (type === 'stream' && id) return void this.stream(client, packet);
if (type === 'call' && id && method) {
const abortable = getClientAbortable(client);
return void abortable.run(this.rpc.bind(this), client, packet);
} else if (type === 'stream' && id) return void this.stream(client, packet);
const error = new Error('Packet structure error');
client.error(500, { error, pass: true });
}
Expand All @@ -262,17 +291,25 @@ class Server {
} catch {
return void client.error(503, { id });
}
const { timeouts } = this.options;
const abortable = getClientAbortable(client);
if (typeof proc.onAborted === 'function')
abortable.once('aborted', proc.onAborted);
if (proc.timeout && proc.timeout < timeouts.request)
abortable.resetTimeout(proc.timeout);
let result = null;
try {
result = await proc.invoke(context, args);
} catch (error) {
if (error === abortable.reason) abortable.throwIfAborted();
let code = error.code === 'ETIMEOUT' ? 408 : 500;
if (typeof error.code === 'number') code = error.code;
error.httpCode = code;
return void client.error(code, { id, error });
} finally {
proc.leave();
}
abortable.throwIfAborted();
if (metautil.isError(result)) {
const { code, httpCode = 200 } = result;
return void client.error(code, { id, error: result, httpCode });
Expand Down Expand Up @@ -333,21 +370,32 @@ class Server {
const args = { ...parameters, ...body };
const packet = { id: 0, method: unit + '/' + method, args };
const hook = application.getHook(unit);
if (hook) this.hook(client, hook, packet, verb, headers);
else this.rpc(client, packet);
const abortable = getClientAbortable(client);
if (hook)
abortable.run(this.hook.bind(this), client, hook, packet, verb, headers);
else abortable.run(this.rpc.bind(this), client, packet);
}

async hook(client, proc, packet, verb, headers) {
const { id, method, args } = packet;
if (!proc) return void client.error(404, { id });
const context = client.createContext();
const par = { verb, method, args, headers };
const { timeouts } = this.options;
const abortable = getClientAbortable(client);
if (typeof proc.onAborted === 'function')
abortable.once('aborted', proc.onAborted);
if (proc.timeout && proc.timeout < timeouts.request)
abortable.resetTimeout(proc.timeout);
let result = null;
try {
const par = { verb, method, args, headers };
const result = await proc.invoke(context, par);
client.send(result);
result = await proc.invoke(context, par);
} catch (error) {
client.error(500, { id, error });
if (error === abortable.reason) abortable.throwIfAborted();
return void client.error(500, { id, error });
}
abortable.throwIfAborted();
client.send(result);
this.console.log(`${client.ip}\t${method}`);
}

Expand Down
3 changes: 2 additions & 1 deletion lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,14 @@ class Transport extends EventEmitter {
}

class HttpTransport extends Transport {
constructor(server, req, res) {
constructor(server, req, res, timeout = 0) {
super(server, req);
this.res = res;
if (req.method === 'OPTIONS') this.options();
req.on('close', () => {
this.emit('close');
});
res.setTimeout(timeout, () => this.emit('timeout'));
}

write(data, httpCode = 200, ext = 'json', options = {}) {
Expand Down
Loading