Skip to content

Commit

Permalink
fix: set MaxBytes for AckQueue (#1963)
Browse files Browse the repository at this point in the history
* fix: #1864

* feat: update maxBytes changes to use a fixed max size

* chore: remove "discount polyfill" for allSettled

* tests: add unit tests for maxBytes handling

* fix: do the maxMessages and maxBytes checks _before_ adding

* fix: roll back nise package to avoid semver breakage

* chore: move the nise version fix to devDependencies (oops)

* chore: pin rimraf in samples package to avoid typescript breakage in lru-cache

* chore: also pin lru-cache, remove unneeded rimraf types

---------

Co-authored-by: Cheskel Twersky <[email protected]>
  • Loading branch information
feywind and dermasmid authored Sep 13, 2024
1 parent c2b75bb commit 5945563
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 43 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,9 @@
"mocha": "^9.2.2",
"mv": "^2.1.1",
"ncp": "^2.0.0",
"nise": "6.0.0",
"null-loader": "^4.0.0",
"path-to-regexp": "6.2.2",
"protobufjs": "^7.0.0",
"proxyquire": "^2.0.0",
"sinon": "^18.0.0",
Expand Down
4 changes: 2 additions & 2 deletions samples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
"devDependencies": {
"@google-cloud/bigquery": "^7.0.0",
"@types/chai": "^4.2.16",
"@types/rimraf": "^4.0.0",
"chai": "^4.2.0",
"gts": "^5.0.0",
"lru-cache": "9.1.2",
"mocha": "^9.2.2",
"rimraf": "^5.0.0",
"rimraf": "5.0.9",
"typescript": "^5.1.6",
"uuid": "^9.0.0"
}
Expand Down
30 changes: 26 additions & 4 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ export interface BatchOptions {
maxMilliseconds?: number;
}

// This is the maximum number of bytes we will send for a batch of
// ack/modack messages. The server itself has a maximum of 512KiB, so
// we just pull back a little from that in case of unknown fenceposts.
export const MAX_BATCH_BYTES = 510 * 1024 * 1024;

/**
* Error class used to signal a batch failure.
*
Expand Down Expand Up @@ -113,6 +118,7 @@ export abstract class MessageQueue {
numPendingRequests: number;
numInFlightRequests: number;
numInRetryRequests: number;
bytes: number;
protected _onFlush?: defer.DeferredPromise<void>;
protected _onDrain?: defer.DeferredPromise<void>;
protected _options!: BatchOptions;
Expand All @@ -127,6 +133,7 @@ export abstract class MessageQueue {
this.numPendingRequests = 0;
this.numInFlightRequests = 0;
this.numInRetryRequests = 0;
this.bytes = 0;
this._requests = [];
this._subscriber = sub;
this._retrier = new ExponentialRetry<QueuedMessage>(
Expand Down Expand Up @@ -195,7 +202,18 @@ export abstract class MessageQueue {
}

const {maxMessages, maxMilliseconds} = this._options;
const size = Buffer.byteLength(message.ackId, 'utf8');

// If we will go over maxMessages or MAX_BATCH_BYTES by adding this
// message, flush first. (maxMilliseconds is handled by timers.)
if (
this._requests.length + 1 >= maxMessages! ||
this.bytes + size >= MAX_BATCH_BYTES
) {
this.flush();
}

// Add the message to the current batch.
const responsePromise = defer<void>();
this._requests.push({
message: {
Expand All @@ -208,10 +226,10 @@ export abstract class MessageQueue {
});
this.numPendingRequests++;
this.numInFlightRequests++;
this.bytes += size;

if (this._requests.length >= maxMessages!) {
this.flush();
} else if (!this._timer) {
// Ensure that we are counting toward maxMilliseconds by timer.
if (!this._timer) {
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
}

Expand Down Expand Up @@ -273,6 +291,7 @@ export abstract class MessageQueue {
const deferred = this._onFlush;

this._requests = [];
this.bytes = 0;
this.numPendingRequests -= batchSize;
delete this._onFlush;

Expand Down Expand Up @@ -339,7 +358,10 @@ export abstract class MessageQueue {
* @private
*/
setOptions(options: BatchOptions): void {
const defaults: BatchOptions = {maxMessages: 3000, maxMilliseconds: 100};
const defaults: BatchOptions = {
maxMessages: 3000,
maxMilliseconds: 100,
};

this._options = Object.assign(defaults, options);
}
Expand Down
60 changes: 23 additions & 37 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import defer = require('p-defer');

import * as messageTypes from '../src/message-queues';
import {BatchError} from '../src/message-queues';
import {AckError, Message, Subscriber} from '../src/subscriber';
import {Message, Subscriber} from '../src/subscriber';
import {DebugMessage} from '../src/debug';

class FakeClient {
Expand Down Expand Up @@ -99,36 +99,6 @@ class ModAckQueue extends messageTypes.ModAckQueue {
}
}

// This discount polyfill for Promise.allSettled can be removed after we drop Node 12.
type AllSettledResult<T, U> = {
status: 'fulfilled' | 'rejected';
value?: T;
reason?: U;
};
function allSettled<T, U>(
proms: Promise<T>[]
): Promise<AllSettledResult<T, U>[]> {
const checkedProms = proms.map((r: Promise<T>) =>
r
.then(
(value: T) =>
({
status: 'fulfilled',
value,
}) as AllSettledResult<T, U>
)
.catch(
(error: U) =>
({
status: 'rejected',
reason: error,
}) as AllSettledResult<T, U>
)
);

return Promise.all(checkedProms);
}

describe('MessageQueues', () => {
const sandbox = sinon.createSandbox();

Expand Down Expand Up @@ -190,6 +160,15 @@ describe('MessageQueues', () => {
assert.strictEqual(stub.callCount, 1);
});

it('should flush the queue if at byte capacity', () => {
const stub = sandbox.stub(messageQueue, 'flush');

messageQueue.bytes = messageTypes.MAX_BATCH_BYTES - 10;
messageQueue.add(new FakeMessage() as Message);

assert.strictEqual(stub.callCount, 1);
});

it('should schedule a flush if needed', () => {
const clock = sandbox.useFakeTimers();
const stub = sandbox.stub(messageQueue, 'flush');
Expand Down Expand Up @@ -244,6 +223,13 @@ describe('MessageQueues', () => {
assert.strictEqual(messageQueue.numPendingRequests, 0);
});

it('should remove the bytes of messages from the queue', () => {
messageQueue.add(new FakeMessage() as Message);
messageQueue.flush();

assert.strictEqual(messageQueue.bytes, 0);
});

it('should send the batch', () => {
const message = new FakeMessage();
const deadline = 10;
Expand Down Expand Up @@ -498,7 +484,7 @@ describe('MessageQueues', () => {
(r: messageTypes.QueuedMessage) => r.responsePromise!.promise
);
await ackQueue.flush();
const results = await allSettled(proms);
const results = await Promise.allSettled(proms);
const oneSuccess = {status: 'fulfilled', value: undefined};
assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]);
});
Expand All @@ -522,7 +508,7 @@ describe('MessageQueues', () => {
proms.shift();
await ackQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'OTHER');
assert.strictEqual(results[1].status, 'rejected');
Expand Down Expand Up @@ -552,7 +538,7 @@ describe('MessageQueues', () => {
];
await ackQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'INVALID');

Expand Down Expand Up @@ -789,7 +775,7 @@ describe('MessageQueues', () => {
(r: messageTypes.QueuedMessage) => r.responsePromise!.promise
);
await modAckQueue.flush();
const results = await allSettled(proms);
const results = await Promise.allSettled(proms);
const oneSuccess = {status: 'fulfilled', value: undefined};
assert.deepStrictEqual(results, [oneSuccess, oneSuccess, oneSuccess]);
});
Expand All @@ -815,7 +801,7 @@ describe('MessageQueues', () => {
proms.shift();
await modAckQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'OTHER');
assert.strictEqual(results[1].status, 'rejected');
Expand Down Expand Up @@ -847,7 +833,7 @@ describe('MessageQueues', () => {
];
await modAckQueue.flush();

const results = await allSettled<void, AckError>(proms);
const results = await Promise.allSettled<void>(proms);
assert.strictEqual(results[0].status, 'rejected');
assert.strictEqual(results[0].reason?.errorCode, 'INVALID');

Expand Down

0 comments on commit 5945563

Please sign in to comment.