From 9b31359cefd89762b4ddd5d5d333a33a93829368 Mon Sep 17 00:00:00 2001 From: Daryl D'Souza Date: Sun, 18 Aug 2019 12:11:39 +1000 Subject: [PATCH 1/4] Handling shutdown action once again in KCL manager --- lib/kcl/kcl_manager.js | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js index 0bac8afa..5e997121 100644 --- a/lib/kcl/kcl_manager.js +++ b/lib/kcl/kcl_manager.js @@ -53,6 +53,7 @@ KCLManager.prototype._onAction = function(action) { switch (actionType) { case 'initialize': case 'processRecords': + case 'shutdown': case 'leaseLost': case 'shardEnded': this._onRecordProcessorAction(action); From e18e0b9481cb5e23ae3f0be3a50fa573128dc0e0 Mon Sep 17 00:00:00 2001 From: Daryl D'Souza Date: Sun, 18 Aug 2019 12:12:45 +1000 Subject: [PATCH 2/4] Minor clean up of doc + passing correct number of args to KCLManager --- lib/kcl/kcl_manager.js | 2 +- lib/kcl/kcl_process.js | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js index 5e997121..b74685b9 100644 --- a/lib/kcl/kcl_manager.js +++ b/lib/kcl/kcl_manager.js @@ -26,7 +26,7 @@ KCLManager.VERSION2 = Symbol("version2"); * @param {object} kclManagerInput - Object containing the recordprocessor and the version of the record processor. * @param {file} inputFile - A file to read action messages from. * @param {file} outputFile - A file to write action messages to. - * @param {file} errorfile - A file to write error messages to. + * @param {file} errorFile - A file to write error messages to. */ function KCLManager(kclManagerInput, inputFile, outputFile, errorFile) { this._version = kclManagerInput.version; diff --git a/lib/kcl/kcl_process.js b/lib/kcl/kcl_process.js index 1a7e239d..d6c2b696 100644 --- a/lib/kcl/kcl_process.js +++ b/lib/kcl/kcl_process.js @@ -60,7 +60,7 @@ var KCLManager = require('./kcl_manager'); * @param {object} recordProcessor - A record processor to use for processing a shard. * @param {file} inputFile - A file to read action messages from. Defaults to STDIN. * @param {file} outputFile - A file to write action messages to. Defaults to STDOUT. - * @param {file} errorfile - A file to write error messages to. Defaults to STDERR. + * @param {file} errorFile - A file to write error messages to. Defaults to STDERR. */ function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) { var allMethodsPresent = typeof recordProcessor.initialize === 'function' && @@ -84,7 +84,7 @@ function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) { version: version }; - var kclManager = new KCLManager(kclManagerInput, inputFile, outputFile, errorFile, version); + var kclManager = new KCLManager(kclManagerInput, inputFile, outputFile, errorFile); return { // For testing only. From 665f370b5f68d0ff1b0f456f5ae86e8cba7e0a15 Mon Sep 17 00:00:00 2001 From: Daryl D'Souza Date: Sun, 18 Aug 2019 12:40:58 +1000 Subject: [PATCH 3/4] Handling shutdown action in KCLManager._onRecordProcessorAction properly --- lib/kcl/kcl_manager.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js index b74685b9..2551ed6d 100644 --- a/lib/kcl/kcl_manager.js +++ b/lib/kcl/kcl_manager.js @@ -99,6 +99,10 @@ KCLManager.prototype._onRecordProcessorAction = function(action) { recordProcessorFuncInput.checkpointer = checkpointer; recordProcessorFunc = recordProcessor.processRecords; break; + case 'shutdown': + recordProcessorFunc = recordProcessor.shutdown; + recordProcessorFuncInput.checkpointer = checkpointer; + break; case 'leaseLost': if (this._version === KCLManager.VERSION1) { recordProcessorFuncInput.reason = 'ZOMBIE'; From 61d33bff977ede4146ee96994296b98f4ace79e7 Mon Sep 17 00:00:00 2001 From: Daryl D'Souza Date: Sun, 18 Aug 2019 12:42:07 +1000 Subject: [PATCH 4/4] Rearranged lines to match other case blocks --- lib/kcl/kcl_manager.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js index 2551ed6d..acf4a68f 100644 --- a/lib/kcl/kcl_manager.js +++ b/lib/kcl/kcl_manager.js @@ -100,8 +100,8 @@ KCLManager.prototype._onRecordProcessorAction = function(action) { recordProcessorFunc = recordProcessor.processRecords; break; case 'shutdown': - recordProcessorFunc = recordProcessor.shutdown; recordProcessorFuncInput.checkpointer = checkpointer; + recordProcessorFunc = recordProcessor.shutdown; break; case 'leaseLost': if (this._version === KCLManager.VERSION1) {