From d6eec10abd6a0cff979b7232b244d2804cb41ba9 Mon Sep 17 00:00:00 2001 From: James Simpson Date: Mon, 3 Dec 2018 14:05:05 -0600 Subject: [PATCH] Add support for automatic chunking with maxPacketSize option --- README.md | 1 + lib/democracy.js | 67 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 64 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 59cd767..a599c46 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ dem.publish('my-channel', {hello: 'world'}); new Democracy({ interval: 1000, // The interval (ms) at which `hello` heartbeats are sent to the other peers. timeout: 3000, // How long a peer must go without sending a `hello` to be considered down. + maxPacketSize: 508, // Maximum size per packet. If the data exceeds this, it will be chunked. source: '0.0.0.0:12345', // The IP and port to listen to (usually the local IP). peers: [], // The other servers/ports you want to communicate with (can be on the same or different server). weight: Math.random() * Date.now(), // The highest weight is used to determine the new leader. Must be unique for each node. diff --git a/lib/democracy.js b/lib/democracy.js index 1d5354d..327dda8 100644 --- a/lib/democracy.js +++ b/lib/democracy.js @@ -24,11 +24,13 @@ class Democracy extends EventEmitter { super(); this._nodes = {}; + this._chunks = {}; // Merge the passed options with the defaults. this.options = { interval: options.interval || 1000, timeout: options.timeout || 3000, + maxPacketSize: options.maxPacketSize || 508, source: options.source || '0.0.0.0:12345', peers: options.peers || [], weight: options.weight || Math.random() * Date.now(), @@ -137,13 +139,38 @@ class Democracy extends EventEmitter { data.source = `${this.options.source[0]}:${this.options.source[1]}`; + // Adjust the max size by the max size of the chunk wrapper data. + const maxSize = this.options.maxPacketSize; + const chunkSize = maxSize - 52; + + // Check if the packet needs to be chunked. + const str = JSON.stringify(data); + let chunks = []; + if (str.length > maxSize) { + const count = Math.ceil(str.length / chunkSize); + const packetId = shortid.generate(); + + for (let i = 0; i < count; i += 1) { + chunks.push(JSON.stringify({ + chunk: str.substr(i * chunkSize, chunkSize), + id: packetId, + c: count, + i, + })); + } + } else { + chunks.push(str); + } + // Data must be sent as a Buffer over the UDP socket. - const msg = Buffer.from(JSON.stringify(data)); + chunks = chunks.map(chunk => Buffer.from(chunk)); // Loop through each connect node and send the packet over. - for (let i = 0; i < this.options.peers.length; i += 1) { - if (!id || this._nodes[id].source === `${this.options.peers[i][0]}:${this.options.peers[i][1]}`) { - this.socket.send(msg, 0, msg.length, this.options.peers[i][1], this.options.peers[i][0]); + for (let x = 0; x < chunks.length; x += 1) { + for (let i = 0; i < this.options.peers.length; i += 1) { + if (!id || this._nodes[id].source === `${this.options.peers[i][0]}:${this.options.peers[i][1]}`) { + this.socket.send(chunks[x], 0, chunks[x].length, this.options.peers[i][1], this.options.peers[i][0]); + } } } @@ -243,6 +270,38 @@ class Democracy extends EventEmitter { processEvent(msg) { const data = this.decodeMsg(msg); + // Check if this is a chunk and put in the store. + if (data && data.chunk && data.id) { + // Add the chunk to the buffer. + this._chunks[data.id] = this._chunks[data.id] || []; + this._chunks[data.id].push(data); + + // If the buffer is full, combine and process. + if (this._chunks[data.id].length === data.c) { + // Sort the chunks by index. + this._chunks[data.id].sort((a, b) => { + if (a.i < b.i) { + return -1; + } + if (a.i > b.i) { + return 1; + } + + return 0; + }); + + // Merge the data into a single string. + const newData = this._chunks[data.id].reduce((acc, val) => acc + val.chunk, ''); + delete this._chunks[data.id]; + + // Process the data as a buffer. + this.processEvent(Buffer.from(newData)); + } + + return this; + } + + // Validate the data. if (!data || data.id === this._id) { return this; }