-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathkoa-insert.js
143 lines (120 loc) · 3.81 KB
/
koa-insert.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
const path = require('path');
const dotenv = require('dotenv-safe');
const Koa = require('koa');
const KoaRouter = require('koa-router');
const bodyParser = require('koa-bodyparser');
const { MongoClient } = require('mongodb');
const _ = require('lodash');
const Bulker = require('./bulk');
dotenv.config({
path: path.join(__dirname, '.env'),
sample: path.join(__dirname, '.env.example'),
});
const MONGO_URI = process.env.MONGO_URI;
const MONGO_DB_NAME = process.env.MONGO_DB_NAME;
const PORT = parseInt(process.env.PORT, 10) || 3000;
const INSERT_BULK_TIMEOUT =
parseInt(process.env.INSERT_BULK_TIMEOUT, 10) || 1000;
const INSERT_BULK_SIZE = parseInt(process.env.INSERT_BULK_SIZE, 10) || 1000;
const INSERT_GROUP_TIMEOUT =
parseInt(process.env.INSERT_GROUP_TIMEOUT, 10) || 60000;
const INSERT_GROUP_SIZE = parseInt(process.env.INSERT_GROUP_SIZE, 10) || 100000;
const INSERT_GROUP_CHUNK_SIZE =
parseInt(process.env.INSERT_GROUP_CHUNK_SIZE, 10) || 100;
const mongoClient = new MongoClient(MONGO_URI, {
useUnifiedTopology: true,
maxPoolSize: 100, // Set equal to concurrent request to avoid waiting for connection
maxIdleTimeMS: 10000,
// loggerLevel: 'debug',
});
const db = mongoClient.db(MONGO_DB_NAME);
const app = new Koa();
const router = new KoaRouter();
app.use(bodyParser());
// Init bulk operations
const insertBulker = new Bulker(
INSERT_BULK_SIZE,
INSERT_BULK_TIMEOUT,
async (items) => {
await db.collection('logs').insertMany(items);
}
);
const groupBulker = new Bulker(
INSERT_GROUP_SIZE,
INSERT_GROUP_TIMEOUT,
async (items) => {
let chunk = [];
const start = new Date();
for (let i = 0; i < items.length; i++) {
await (async function (item) {
chunk.push(db.collection('logs').insertOne(item));
if (i % INSERT_GROUP_CHUNK_SIZE === 0) {
await Promise.all(chunk);
chunk = [];
}
})(items[i]);
}
console.log(`Group taked ${new Date() - start} ms`);
}
);
// Insert a log
router.post('/insert_sync', async (ctx, next) => {
const body = ctx.request.body;
await db.collection('logs').insertOne(body);
ctx.response.body = 'Ok';
next();
});
router.post('/insert_group', async (ctx, next) => {
const body = ctx.request.body;
groupBulker.push(body);
ctx.response.body = 'Ok';
next();
});
router.post('/insert_async', async (ctx, next) => {
const body = ctx.request.body;
db.collection('logs').insertOne(body);
ctx.response.body = 'Ok';
next();
});
router.post('/insert_bulk', async (ctx, next) => {
const body = ctx.request.body;
insertBulker.push(body);
ctx.response.body = 'Ok';
next();
});
// Empty GET request to test overhead of framework
router.get('/', async (ctx, next) => {
ctx.response.body = 'Ok';
next();
});
// Empty POST request to test base network latency
router.post('/', async (ctx, next) => {
ctx.response.body = 'Ok';
next();
});
app.use(router.routes());
mongoClient.connect((err) => {
if (err) {
console.error(err);
process.exit(1);
}
const server = app.listen(PORT, () => {
console.log(`Application is listening at http://localhost:${PORT}`);
});
const terminate = function () {
server.close(async () => {
console.log('HTTP server closed, flushing batch...');
await groupBulker.flush();
await mongoClient.close();
console.log('Cleaned everything, bye bye.');
});
}
process.on('SIGTERM', () => {
console.log('SIGTERM signal received: closing HTTP server');
terminate();
});
process.on('SIGINT', () => {
console.log('SIGINT signal received: closing HTTP server');
terminate();
});
});