Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pool): added optional setting for killing processes and not retaining them #1173

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions REFERENCE.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ interface AdvancedSettings {
retryProcessDelay: number = 5000; // delay before processing next job in case of internal error.
backoffStrategies: {}; // A set of custom backoff strategies keyed by name.
drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs).
reuseProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish
}
```

Expand Down
15 changes: 13 additions & 2 deletions lib/process/child-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const ChildPool = function ChildPool() {
return new ChildPool();
}

// Flag for keeping or killing processes after finished processing, default keep always
this.reuseProcesses = true;

this.retained = {};
this.free = {};
};
Expand All @@ -39,6 +42,10 @@ const convertExecArgv = function(execArgv) {
});
};

ChildPool.prototype.setReuseProcesses = function(reuseProcesses) {
this.reuseProcesses = reuseProcesses;
}

ChildPool.prototype.retain = function(processFile) {
const _this = this;
let child = _this.getFree(processFile).pop();
Expand All @@ -65,8 +72,12 @@ ChildPool.prototype.retain = function(processFile) {
};

ChildPool.prototype.release = function(child) {
delete this.retained[child.pid];
this.getFree(child.processFile).push(child);
if (this.reuseProcesses === true) {
delete this.retained[child.pid];
this.getFree(child.processFile).push(child);
} else {
this.kill(child);
}
};

ChildPool.prototype.remove = function(child) {
Expand Down
4 changes: 3 additions & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ const Queue = function Queue(name, url, opts) {
guardInterval: 5000,
retryProcessDelay: 5000,
drainDelay: 5,
backoffStrategies: {}
backoffStrategies: {},
reuseProcesses: true
});

this.settings.lockRenewTime =
Expand Down Expand Up @@ -650,6 +651,7 @@ Queue.prototype.setHandler = function(name, handler) {
}

this.childPool = this.childPool || require('./process/child-pool')();
this.childPool.setReuseProcesses(this.settings.reuseProcesses);

const sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler, this.childPool).bind(this);
Expand Down
14 changes: 14 additions & 0 deletions test/test_child-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,18 @@ describe('Child pool', () => {
expect(children).to.include(child);
});
});

it('should kill child after processing is finished and not retain it', function() {
const processor = __dirname + '/fixtures/fixture_processor_bar.js';

pool.setReuseProcesses(false);

return pool.retain(processor).then(_child => {
expect(_child).to.be.ok;
pool.release(_child);

expect(pool.retained).to.be.empty;
expect(pool.free[processor]).to.be.empty;
});
});
});