Skip to content

Commit 3f8f60f

Browse files
Jon Wayne Parrottryanseys
authored andcommitted
Adding autoCreate option to Topic constructor and reuseExisting option to subscribe.
1 parent 7077411 commit 3f8f60f

3 files changed

Lines changed: 105 additions & 5 deletions

File tree

lib/pubsub/index.js

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,18 +196,25 @@ PubSub.prototype.createTopic = function(name, callback) {
196196
* @throws {Error} If a name is not provided.
197197
*
198198
* @param {string} name - The name of the topic.
199+
* @param {object=} options - Configuration object.
200+
* @param {boolean=} options.autoCreate - Automatically create topic if it
201+
* doesn't exist. Note that messages published to a topic with no
202+
* subscribers will not be delivered.
199203
* @return {module:pubsub/topic}
200204
*
201205
* @example
202206
* var topic = pubsub.topic('my-existing-topic');
207+
* var topic = pubsub.topic('topic-that-maybe-exists', { autoCreate: true });
203208
* topic.publish('New message!');
204209
*/
205-
PubSub.prototype.topic = function(name) {
210+
PubSub.prototype.topic = function(name, options) {
206211
if (!name) {
207212
throw new Error('A name must be specified for a new topic.');
208213
}
214+
options = options || {};
209215
return new Topic(this, {
210-
name: name
216+
name: name,
217+
autoCreate: options.autoCreate
211218
});
212219
};
213220

lib/pubsub/topic.js

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,12 @@ function Topic(pubsub, options) {
6060
this.name = Topic.formatName_(pubsub.projectId, options.name);
6161
this.projectId = pubsub.projectId;
6262
this.pubsub = pubsub;
63+
64+
if (options.autoCreate) {
65+
this.unformattedName = options.name;
66+
this.origMakeReq_ = this.makeReq_;
67+
this.makeReq_ = this.autoCreateWrapper_;
68+
}
6369
}
6470

6571
/**
@@ -95,6 +101,34 @@ Topic.formatName_ = function(projectId, name) {
95101
return 'projects/' + projectId + '/topics/' + name;
96102
};
97103

104+
/**
105+
* Wrapper for makeReq_ that automatically attempts to create a topic if it
106+
* does not yet exist.
107+
*
108+
* @private
109+
*/
110+
Topic.prototype.autoCreateWrapper_ = function(method, path, q, body, callback) {
111+
var self = this;
112+
113+
function createAndRetry() {
114+
self.pubsub.createTopic(self.unformattedName, function(err) {
115+
if (err) {
116+
callback(err);
117+
return;
118+
}
119+
self.origMakeReq_(method, path, q, body, callback);
120+
});
121+
}
122+
123+
this.origMakeReq_(method, path, q, body, function(err, res) {
124+
if (err && err.code === 404 && method !== 'DELETE') {
125+
createAndRetry();
126+
} else {
127+
callback(err, res);
128+
}
129+
});
130+
};
131+
98132
/**
99133
* Publish the provided message or array of messages. A message can be of any
100134
* type. On success, an array of messageIds is returned in the response.
@@ -243,6 +277,10 @@ Topic.prototype.getSubscriptions = function(query, callback) {
243277
* once it's pulled. (default: false)
244278
* @param {number=} options.interval - Interval in milliseconds to check for new
245279
* messages. (default: 10)
280+
* @param {boolean=} options.reuseExisting - If the subscription already exists,
281+
* reuse it. The options of the existing subscription are not changed. If
282+
* false, attempting to create a subscription that already exists will fail.
283+
* (default: false)
246284
* @param {function} callback - The callback function.
247285
*
248286
* @example
@@ -276,11 +314,13 @@ Topic.prototype.subscribe = function(name, options, callback) {
276314

277315
var path = Subscription.formatName_(this.projectId, name);
278316
this.makeReq_('PUT', path, null, body, function(err) {
279-
if (err) {
317+
if (options.reuseExisting && err && err.code === 409) {
318+
callback(null, self.subscription(name, options));
319+
} else if (err) {
280320
callback(err);
281-
return;
321+
} else {
322+
callback(null, self.subscription(name, options));
282323
}
283-
callback(null, self.subscription(name, options));
284324
});
285325
};
286326

test/pubsub/topic.js

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,43 @@ describe('Topic', function() {
161161
});
162162
});
163163

164+
describe('publish to non-existing topic', function() {
165+
var messageObject = { data: 'howdy' };
166+
167+
it('should generate 404 error without autoCreate', function(done) {
168+
topic.makeReq_ = function(method, path, query, body, callback) {
169+
callback({ code: 404 });
170+
};
171+
172+
topic.publish(messageObject, function(err) {
173+
assert.equal(err.code, 404);
174+
done();
175+
});
176+
});
177+
178+
it('should publish successfully with autoCreate', function(done) {
179+
var acTopic = new Topic(pubsubMock, {
180+
name: TOPIC_NAME, autoCreate: true
181+
});
182+
var created = false;
183+
184+
acTopic.origMakeReq_ = function(method, path, query, body, callback) {
185+
if (!created) {
186+
callback({ code: 404 });
187+
} else {
188+
callback(null);
189+
}
190+
};
191+
192+
pubsubMock.createTopic = function(name, callback) {
193+
created = true;
194+
callback();
195+
};
196+
197+
acTopic.publish(messageObject, done);
198+
});
199+
});
200+
164201
describe('delete', function() {
165202
it('should delete a topic', function(done) {
166203
topic.makeReq_ = function(method, path) {
@@ -276,6 +313,22 @@ describe('Topic', function() {
276313
};
277314
topic.subscribe(SUB_NAME, CONFIG, assert.ifError);
278315
});
316+
317+
it('should re-use existing subscription if specified', function(done) {
318+
topic.subscription = function() {
319+
done();
320+
};
321+
322+
topic.makeReq_ = function(method, path, qs, body, callback) {
323+
callback({ code: 409 });
324+
};
325+
326+
topic.subscribe(SUB_NAME, function(err) {
327+
assert.equal(err.code, 409);
328+
});
329+
330+
topic.subscribe(SUB_NAME, { reuseExisting: true }, assert.ifError);
331+
});
279332
});
280333

281334
describe('subscription', function() {

0 commit comments

Comments
 (0)