Skip to content
This repository was archived by the owner on Mar 17, 2026. It is now read-only.

Commit 98cf540

Browse files
fix: detect subscription properties and warn for exactly-once (#1561)
* feat: detect subscription properties and warn for exactly-once * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: check for changes to subscription options on every message * chore: fix copyright in new file * fix: update copyright header again ("once more with feeling" edition) * fix: still yet more copyright header * fix: timing in CI is tricky, replace setTimeout Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 179b617 commit 98cf540

3 files changed

Lines changed: 117 additions & 2 deletions

File tree

src/subscriber.ts

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ import {Subscription} from './subscription';
3030
import {defaultOptions} from './default-options';
3131
import {SubscriberClient} from './v1';
3232
import {createSpan} from './opentelemetry-tracing';
33+
import {Throttler} from './util';
3334

34-
export type PullResponse = google.pubsub.v1.IPullResponse;
35+
export type PullResponse = google.pubsub.v1.IStreamingPullResponse;
36+
export type SubscriptionProperties =
37+
google.pubsub.v1.StreamingPullResponse.ISubscriptionProperties;
3538

3639
/**
3740
* Date object with nanosecond precision. Supports all standard Date arguments
@@ -253,6 +256,10 @@ export class Subscriber extends EventEmitter {
253256
private _options!: SubscriberOptions;
254257
private _stream!: MessageStream;
255258
private _subscription: Subscription;
259+
private _errorLog: Throttler;
260+
261+
subscriptionProperties?: SubscriptionProperties;
262+
256263
constructor(subscription: Subscription, options = {}) {
257264
super();
258265

@@ -266,8 +273,32 @@ export class Subscriber extends EventEmitter {
266273
this._histogram = new Histogram({min: 10, max: 600});
267274
this._latencies = new Histogram();
268275
this._subscription = subscription;
276+
this._errorLog = new Throttler(60 * 1000);
277+
269278
this.setOptions(options);
270279
}
280+
281+
/**
282+
* Sets our subscription properties from the first incoming message.
283+
*
284+
* @param {SubscriptionProperties} subscriptionProperties The new properties.
285+
* @private
286+
*/
287+
setSubscriptionProperties(subscriptionProperties: SubscriptionProperties) {
288+
this.subscriptionProperties = subscriptionProperties;
289+
290+
// If this is an exactly-once subscription, warn the user that they may have difficulty.
291+
if (this.subscriptionProperties.exactlyOnceDeliveryEnabled) {
292+
this._errorLog.doMaybe(() =>
293+
console.error(
294+
'WARNING: Exactly-once subscriptions are not yet supported ' +
295+
'by the Node client library. This feature will be added ' +
296+
'in a future release.'
297+
)
298+
);
299+
}
300+
}
301+
271302
/**
272303
* The 99th percentile of request latencies.
273304
*
@@ -517,7 +548,13 @@ export class Subscriber extends EventEmitter {
517548
*
518549
* @private
519550
*/
520-
private _onData({receivedMessages}: PullResponse): void {
551+
private _onData(response: PullResponse): void {
552+
// Grab the subscription properties for exactly once and ordering flags.
553+
if (response.subscriptionProperties) {
554+
this.setSubscriptionProperties(response.subscriptionProperties);
555+
}
556+
557+
const {receivedMessages} = response;
521558
for (const data of receivedMessages!) {
522559
const message = new Message(this, data);
523560

src/util.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,35 @@ export function promisifySome<T>(
4040
}
4141

4242
export function noop() {}
43+
44+
/**
45+
* Provides a very simple throttling capability for tasks like error logs.
46+
* This ensures that no task is actually completed unless N millis have passed
47+
* since the last one.
48+
*
49+
* @private
50+
*/
51+
export class Throttler {
52+
minMillis: number;
53+
lastTime?: number;
54+
55+
constructor(minMillis: number) {
56+
this.minMillis = minMillis;
57+
}
58+
59+
/**
60+
* Performs the task requested, if enough time has passed since the
61+
* last successful call.
62+
*/
63+
doMaybe(task: Function) {
64+
const now = Date.now();
65+
const doTask =
66+
!this.lastTime ||
67+
(this.lastTime && now - this.lastTime >= this.minMillis);
68+
69+
if (doTask) {
70+
task();
71+
this.lastTime = now;
72+
}
73+
}
74+
}

test/util.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import {describe, it} from 'mocha';
16+
import {Throttler} from '../src/util';
17+
import * as assert from 'assert';
18+
19+
describe('utils', () => {
20+
describe('Throttler', () => {
21+
it('does not allow too many calls through at once', () => {
22+
const throttler = new Throttler(300);
23+
let totalCalls = '';
24+
25+
// This one should succeed.
26+
throttler.doMaybe(() => {
27+
totalCalls += 'FIRST';
28+
});
29+
30+
// This one should fail.
31+
throttler.doMaybe(() => {
32+
totalCalls += 'SECOND';
33+
});
34+
35+
// Simulate time passing.
36+
throttler.lastTime! -= 1000;
37+
38+
// This one should succeed.
39+
throttler.doMaybe(() => {
40+
totalCalls += 'THIRD';
41+
});
42+
43+
assert.strictEqual(totalCalls, 'FIRSTTHIRD');
44+
});
45+
});
46+
});

0 commit comments

Comments
 (0)