diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js index 0bac8afa..acf4a68f 100644 --- a/lib/kcl/kcl_manager.js +++ b/lib/kcl/kcl_manager.js @@ -26,7 +26,7 @@ KCLManager.VERSION2 = Symbol("version2"); * @param {object} kclManagerInput - Object containing the recordprocessor and the version of the record processor. * @param {file} inputFile - A file to read action messages from. * @param {file} outputFile - A file to write action messages to. - * @param {file} errorfile - A file to write error messages to. + * @param {file} errorFile - A file to write error messages to. */ function KCLManager(kclManagerInput, inputFile, outputFile, errorFile) { this._version = kclManagerInput.version; @@ -53,6 +53,7 @@ KCLManager.prototype._onAction = function(action) { switch (actionType) { case 'initialize': case 'processRecords': + case 'shutdown': case 'leaseLost': case 'shardEnded': this._onRecordProcessorAction(action); @@ -98,6 +99,10 @@ KCLManager.prototype._onRecordProcessorAction = function(action) { recordProcessorFuncInput.checkpointer = checkpointer; recordProcessorFunc = recordProcessor.processRecords; break; + case 'shutdown': + recordProcessorFuncInput.checkpointer = checkpointer; + recordProcessorFunc = recordProcessor.shutdown; + break; case 'leaseLost': if (this._version === KCLManager.VERSION1) { recordProcessorFuncInput.reason = 'ZOMBIE'; diff --git a/lib/kcl/kcl_process.js b/lib/kcl/kcl_process.js index 1a7e239d..d6c2b696 100644 --- a/lib/kcl/kcl_process.js +++ b/lib/kcl/kcl_process.js @@ -60,7 +60,7 @@ var KCLManager = require('./kcl_manager'); * @param {object} recordProcessor - A record processor to use for processing a shard. * @param {file} inputFile - A file to read action messages from. Defaults to STDIN. * @param {file} outputFile - A file to write action messages to. Defaults to STDOUT. - * @param {file} errorfile - A file to write error messages to. Defaults to STDERR. + * @param {file} errorFile - A file to write error messages to. Defaults to STDERR. */ function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) { var allMethodsPresent = typeof recordProcessor.initialize === 'function' && @@ -84,7 +84,7 @@ function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) { version: version }; - var kclManager = new KCLManager(kclManagerInput, inputFile, outputFile, errorFile, version); + var kclManager = new KCLManager(kclManagerInput, inputFile, outputFile, errorFile); return { // For testing only.