Skip to content

Commit 4292811

Browse files
callmehiphopstephenplusplus
authored andcommitted
pubsub: tokenize connection pool project id (#2579)
* pubsub: interpolate projectId in connection pool * add delay between making sub & snap
1 parent 51df102 commit 4292811

3 files changed

Lines changed: 45 additions & 2 deletions

File tree

packages/pubsub/src/connection-pool.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ var RETRY_CODES = [
5656
*/
5757
function ConnectionPool(subscription) {
5858
this.subscription = subscription;
59+
this.projectId = subscription.projectId;
60+
5961
this.connections = new Map();
6062

6163
this.isPaused = false;
@@ -230,7 +232,8 @@ ConnectionPool.prototype.createConnection = function() {
230232
}
231233

232234
connection.write({
233-
subscription: self.subscription.name,
235+
subscription: common.util.replaceProjectIdToken(
236+
self.subscription.name, self.projectId),
234237
streamAckDeadlineSeconds: self.settings.ackDeadline / 1000
235238
});
236239

@@ -302,6 +305,10 @@ ConnectionPool.prototype.getClient = function(callback) {
302305
grpc.credentials.createFromGoogleCredential(authClient)
303306
);
304307

308+
if (!self.projectId || self.projectId === '{{projectId}}') {
309+
self.projectId = pubsub.auth.projectId;
310+
}
311+
305312
var Subscriber = v1(pubsub.options).Subscriber;
306313

307314
self.client = new Subscriber(v1.SERVICE_ADDRESS, credentials, {

packages/pubsub/system-test/pubsub.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,7 @@ describe('pubsub', function() {
535535
return deleteAllSnapshots()
536536
.then(wait(2500))
537537
.then(subscription.create.bind(subscription))
538+
.then(wait(2500))
538539
.then(snapshot.create.bind(snapshot))
539540
.then(wait(2500));
540541
});

packages/pubsub/test/connection-pool.js

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,20 @@ describe('ConnectionPool', function() {
6666
var pool;
6767

6868
var FAKE_PUBSUB_OPTIONS = {};
69+
var PROJECT_ID = 'grapce-spacheship-123';
6970

7071
var PUBSUB = {
72+
projectId: PROJECT_ID,
7173
auth: {
74+
projectId: PROJECT_ID,
7275
getAuthClient: fakeUtil.noop
7376
},
7477
options: FAKE_PUBSUB_OPTIONS
7578
};
7679

7780
var SUB_NAME = 'test-subscription';
7881
var SUBSCRIPTION = {
82+
projectId: PROJECT_ID,
7983
name: SUB_NAME,
8084
pubsub: PUBSUB,
8185
request: fakeUtil.noop
@@ -344,6 +348,7 @@ describe('ConnectionPool', function() {
344348
});
345349

346350
describe('connection', function() {
351+
var TOKENIZED_SUB_NAME = 'project/p/subscriptions/' + SUB_NAME;
347352
var fakeId;
348353

349354
beforeEach(function() {
@@ -352,12 +357,20 @@ describe('ConnectionPool', function() {
352357
fakeUuid.v4 = function() {
353358
return fakeId;
354359
};
360+
361+
fakeUtil.replaceProjectIdToken = common.util.replaceProjectIdToken;
355362
});
356363

357364
it('should create a connection', function(done) {
365+
fakeUtil.replaceProjectIdToken = function(subName, projectId) {
366+
assert.strictEqual(subName, SUB_NAME);
367+
assert.strictEqual(projectId, PROJECT_ID);
368+
return TOKENIZED_SUB_NAME;
369+
};
370+
358371
fakeConnection.write = function(reqOpts) {
359372
assert.deepEqual(reqOpts, {
360-
subscription: SUB_NAME,
373+
subscription: TOKENIZED_SUB_NAME,
361374
streamAckDeadlineSeconds: pool.settings.ackDeadline / 1000
362375
});
363376
};
@@ -662,6 +675,7 @@ describe('ConnectionPool', function() {
662675
});
663676

664677
describe('getClient', function() {
678+
var AUTH_PROJECT_ID = 'auth-project-id-123';
665679
var fakeAuthClient = {};
666680

667681
function FakeSubscriber(address, creds, options) {
@@ -671,6 +685,7 @@ describe('ConnectionPool', function() {
671685
}
672686

673687
beforeEach(function() {
688+
PUBSUB.auth.projectId = AUTH_PROJECT_ID;
674689
PUBSUB.auth.getAuthClient = function(callback) {
675690
callback(null, fakeAuthClient);
676691
};
@@ -734,6 +749,26 @@ describe('ConnectionPool', function() {
734749
});
735750
});
736751

752+
it('should capture the projectId when falsey', function(done) {
753+
delete pool.projectId;
754+
755+
pool.getClient(function(err) {
756+
assert.ifError(err);
757+
assert.strictEqual(pool.projectId, AUTH_PROJECT_ID);
758+
done();
759+
});
760+
});
761+
762+
it('should capture the projectId if it needs tokenization', function(done) {
763+
pool.projectId = '{{projectId}}';
764+
765+
pool.getClient(function(err) {
766+
assert.ifError(err);
767+
assert.strictEqual(pool.projectId, AUTH_PROJECT_ID);
768+
done();
769+
});
770+
});
771+
737772
it('should pass the pubsub options into the gax fn', function(done) {
738773
v1Override = function(options) {
739774
assert.strictEqual(options, FAKE_PUBSUB_OPTIONS);

0 commit comments

Comments
 (0)