Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Storage Module #96

Merged
merged 6 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 13 additions & 133 deletions lib/countly-bulk.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@
* server.add_request({begin_session:1, metrics:{_os:"Linux"}, device_id:"users_device_id", events:[{key:"Test", count:1}]});
*/

var fs = require("fs");
var path = require("path");
var http = require("http");
var https = require("https");
var cluster = require("cluster");
var cc = require("./countly-common");
var BulkUser = require("./countly-bulk-user");
var CountlyStorage = require("./countly-storage");

/**
* @lends module:lib/countly-bulk
Expand All @@ -40,7 +39,7 @@ var BulkUser = require("./countly-bulk-user");
* @param {number} [conf.max_events=100] - maximum amount of events to send in one batch
* @param {boolean} [conf.persist_queue=false] - persistently store queue until processed, default is false if you want to keep queue in memory and process all in one process run
* @param {boolean} [conf.force_post=false] - force using post method for all requests
* @param {string} [conf.storage_path="../bulk_data/"] - where SDK would store data, including id, queues, etc
* @param {string} [conf.storage_path] - where SDK would store data, including id, queues, etc
* @param {string} [conf.http_options=] - function to get http options by reference and overwrite them, before running each request
* @param {number} [conf.max_key_length=128] - maximum size of all string keys
* @param {number} [conf.max_value_size=256] - maximum size of all values in our key-value pairs (Except "picture" field, that has a limit of 4096 chars)
Expand Down Expand Up @@ -73,7 +72,6 @@ function CountlyBulk(conf) {
var maxBreadcrumbCount = 100;
var maxStackTraceLinesPerThread = 30;
var maxStackTraceLineLength = 200;
var __data = {};

cc.debugBulk = conf.debug || false;
if (!conf.app_key) {
Expand All @@ -96,7 +94,6 @@ function CountlyBulk(conf) {
conf.max_events = conf.max_events || 100;
conf.force_post = conf.force_post || false;
conf.persist_queue = conf.persist_queue || false;
conf.storage_path = conf.storage_path || "../bulk_data/";
conf.http_options = conf.http_options || null;
conf.maxKeyLength = conf.max_key_length || maxKeyLength;
conf.maxValueSize = conf.max_value_size || maxValueSize;
Expand All @@ -105,19 +102,7 @@ function CountlyBulk(conf) {
conf.maxStackTraceLinesPerThread = conf.max_stack_trace_lines_per_thread || maxStackTraceLinesPerThread;
conf.maxStackTraceLineLength = conf.max_stack_trace_line_length || maxStackTraceLineLength;

var mainDir = path.resolve(__dirname, conf.storage_path);
if (conf.persist_queue) {
try {
if (!fs.existsSync(mainDir)) {
fs.mkdirSync(mainDir);
}
}
catch (ex) {
// problem creating directory
// eslint-disable-next-line no-console
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk, Failed white creating the '/data' directory. Error: ", ex.stack);
}
}
CountlyStorage.setBulkDataPath(conf.storage_path, conf.persist_queue);

this.conf = conf;
/**
Expand Down Expand Up @@ -157,7 +142,7 @@ function CountlyBulk(conf) {

requestQueue.push(query);
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Adding request to the queue.");
storeSet("cly_req_queue", requestQueue);
CountlyStorage.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -205,7 +190,7 @@ function CountlyBulk(conf) {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, adding the request into queue.");
requestQueue.push(query);
}
storeSet("cly_req_queue", requestQueue);
CountlyStorage.storeSet("cly_req_queue", requestQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, "CountlyBulk add_bulk_request, Sending message to the parent process. Adding the raw request to the queue.");
Expand Down Expand Up @@ -260,7 +245,7 @@ function CountlyBulk(conf) {
eventQueue[device_id] = [];
}
eventQueue[device_id].push(e);
storeSet("cly_bulk_event", eventQueue);
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
}
else {
cc.log(cc.logLevelEnums.INFO, `CountlyBulk add_event, Sending message to the parent process. Adding event: [${event.key}].`);
Expand Down Expand Up @@ -358,7 +343,7 @@ function CountlyBulk(conf) {
*/
function toBulkRequestQueue(bulkRequest) {
bulkQueue.push(bulkRequest);
storeSet("cly_bulk_queue", bulkQueue);
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
}
var self = this;

Expand All @@ -384,7 +369,7 @@ function CountlyBulk(conf) {
}
if (eventChanges) {
isEmpty = false;
storeSet("cly_bulk_event", eventQueue);
CountlyStorage.storeSet("cly_bulk_event", eventQueue);
}

// process request queue into bulk requests
Expand All @@ -398,7 +383,7 @@ function CountlyBulk(conf) {
var requests = requestQueue.splice(0, conf.bulk_size);
toBulkRequestQueue({ app_key: conf.app_key, requests: JSON.stringify(requests) });
}
storeSet("cly_req_queue", requestQueue);
CountlyStorage.storeSet("cly_req_queue", requestQueue);
}

// process bulk request queue
Expand All @@ -413,7 +398,7 @@ function CountlyBulk(conf) {
bulkQueue.unshift(res);
failTimeout = cc.getTimestamp() + conf.fail_timeout;
}
storeSet("cly_bulk_queue", bulkQueue);
CountlyStorage.storeSet("cly_bulk_queue", bulkQueue);
readyToProcess = true;
}, "heartBeat", false);
}
Expand Down Expand Up @@ -594,111 +579,6 @@ function CountlyBulk(conf) {
}
}

/**
* Read value from file
* @param {String} key - key for file
* @returns {varies} value in file
*/
var readFile = function(key) {
var data;
if (conf.persist_queue) {
var dir = path.resolve(__dirname, `${conf.storage_path}__${key}.json`);

// try reading data file
try {
data = fs.readFileSync(dir);
}
catch (ex) {
// there was no file, probably new init
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk readFile, Nothing to read. Might be first init. Error: ", ex);
data = null;
}

try {
// trying to parse json string
data = JSON.parse(data);
}
catch (ex) {
// problem parsing, corrupted file?
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk readFile, Problem while parsing. Error:", ex.stack);
// backup corrupted file data
fs.writeFile(path.resolve(__dirname, `${conf.storage_path}__${key}.${cc.getTimestamp()}${Math.random()}.json`), data, () => {});
// start with new clean object
data = null;
}
}
return data;
};

var asyncWriteLock = false;
var asyncWriteQueue = [];

/**
* Write to file and process queue while in asyncWriteLock
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
var writeFile = function(key, value, callback) {
var ob = {};
ob[key] = value;
var dir = path.resolve(__dirname, `${conf.storage_path}__${key}.json`);
fs.writeFile(dir, JSON.stringify(ob), (err) => {
if (err) {
// eslint-disable-next-line no-console
cc.log(cc.logLevelEnums.ERROR, "CountlyBulk writeFile, Problem while writing. Error:", err);
}
if (typeof callback === "function") {
callback(err);
}
if (asyncWriteQueue.length) {
setTimeout(() => {
var arr = asyncWriteQueue.shift();
writeFile(arr[0], arr[1], arr[2]);
}, 0);
}
else {
asyncWriteLock = false;
}
});
};

/**
* Save value in storage
* @param {String} key - key for value to store
* @param {varies} value - value to store
* @param {Function} callback - callback to call when done storing
*/
var storeSet = function(key, value, callback) {
__data[key] = value;
if (!asyncWriteLock) {
asyncWriteLock = true;
writeFile(key, value, callback);
}
else {
asyncWriteQueue.push([key, value, callback]);
}
};

/**
* Get value from storage
* @param {String} key - key of value to get
* @param {varies} def - default value to use if not set
* @returns {varies} value for the key
*/
var storeGet = function(key, def) {
if (typeof __data[key] === "undefined") {
var ob = readFile(key);
if (!ob) {
__data[key] = def;
}
else {
__data[key] = ob[key];
}
}
return __data[key];
};

// listen to current workers
if (cluster.workers) {
for (var id in cluster.workers) {
Expand All @@ -711,9 +591,9 @@ function CountlyBulk(conf) {
worker.on("message", handleWorkerMessage);
});

var requestQueue = storeGet("cly_req_queue", []);
var eventQueue = storeGet("cly_bulk_event", {});
var bulkQueue = storeGet("cly_bulk_queue", []);
var requestQueue = CountlyStorage.storeGet("cly_req_queue", []);
var eventQueue = CountlyStorage.storeGet("cly_bulk_event", {});
var bulkQueue = CountlyStorage.storeGet("cly_bulk_queue", []);
}

module.exports = CountlyBulk;
Loading
Loading