@@ -93,8 +93,6 @@ var util = require('../common/util.js');
9393 * //-
9494 * // Once you have obtained a subscription object, you may begin to register
9595 * // listeners. This will automatically trigger pulling for messages.
96- * //
97- * // This is a new paragraph.
9896 * //-
9997 *
10098 * // Register an error handler.
@@ -117,7 +115,7 @@ function Subscription(pubsub, options) {
117115 this . closed = false ;
118116 this . interval = util . is ( options . interval , 'number' ) ? options . interval : 10 ;
119117
120- this . once ( 'newListener' , this . startPulling_ . bind ( this ) ) ;
118+ this . listenForEvents_ ( ) ;
121119}
122120
123121nodeutil . inherits ( Subscription , events . EventEmitter ) ;
@@ -136,6 +134,95 @@ Subscription.formatName_ = function(projectId, name) {
136134 return '/subscriptions/' + projectId + '/' + name ;
137135} ;
138136
137+ /**
138+ * Simplify a message from an API response to have two properties, `id` and
139+ * `data`. `data` is always converted to a string.
140+ *
141+ * @private
142+ */
143+ Subscription . formatMessage_ = function ( msg ) {
144+ var message = {
145+ id : msg . ackId
146+ } ;
147+ if ( msg . pubsubEvent && msg . pubsubEvent . message ) {
148+ message . data =
149+ new Buffer ( msg . pubsubEvent . message . data , 'base64' ) . toString ( 'utf-8' ) ;
150+ try {
151+ message . data = JSON . parse ( message . data ) ;
152+ } catch ( e ) { }
153+ }
154+ return message ;
155+ } ;
156+
157+ /**
158+ * Begin listening for events on the subscription. This method keeps track of
159+ * how many message listeners are assigned, and then removed, making sure
160+ * polling is handled automatically.
161+ *
162+ * As long as there is one active message listener, the connection is open. As
163+ * soon as there are no more message listeners, the connection is closed.
164+ *
165+ * @private
166+ *
167+ * @example
168+ * this.listenForEvents_();
169+ */
170+ Subscription . prototype . listenForEvents_ = function ( ) {
171+ var that = this ;
172+ var messageListeners = 0 ;
173+
174+ this . on ( 'newListener' , function ( event ) {
175+ if ( event === 'message' ) {
176+ messageListeners ++ ;
177+ if ( that . closed ) {
178+ that . closed = false ;
179+ }
180+ that . startPulling_ ( ) ;
181+ }
182+ } ) ;
183+
184+ this . on ( 'removeListener' , function ( event ) {
185+ if ( event === 'message' && -- messageListeners === 0 ) {
186+ that . closed = true ;
187+ }
188+ } ) ;
189+ } ;
190+
191+ /**
192+ * Poll the backend for new messages. This runs a loop to ping the API at the
193+ * provided interval from the subscription's instantiation. If one wasn't
194+ * provided, the default value is 10 milliseconds.
195+ *
196+ * If messages are received, they are emitted on the `message` event.
197+ *
198+ * Note: This method is automatically called once a message event handler is
199+ * assigned to the description.
200+ *
201+ * To stop pulling, see {@linkcode module:pubsub/subscription#close}.
202+ *
203+ * @private
204+ *
205+ * @example
206+ * subscription.startPulling_();
207+ */
208+ Subscription . prototype . startPulling_ = function ( ) {
209+ var that = this ;
210+ if ( this . closed ) {
211+ return ;
212+ }
213+ this . pull ( {
214+ returnImmediately : false
215+ } , function ( err , message ) {
216+ if ( err ) {
217+ that . emit ( 'error' , err ) ;
218+ }
219+ if ( message ) {
220+ that . emit ( 'message' , message ) ;
221+ }
222+ setTimeout ( that . startPulling_ . bind ( that ) , that . interval ) ;
223+ } ) ;
224+ } ;
225+
139226/**
140227 * Acknowledge to the backend that the message was retrieved. You must provide
141228 * either a single ID, or an array of IDs.
@@ -162,15 +249,35 @@ Subscription.prototype.ack = function(ids, callback) {
162249} ;
163250
164251/**
165- * Pull messages from the subscribed topic. If messages were found, they are
166- * passed along with a `message` event.
252+ * Delete the subscription. Pull requests from the current subscription will be
253+ * errored once unsubscription is complete.
254+ *
255+ * @param {function= } callback - The callback function.
256+ *
257+ * @example
258+ * subscription.delete(function(err) {});
259+ */
260+ Subscription . prototype . delete = function ( callback ) {
261+ callback = callback || util . noop ;
262+ this . request (
263+ 'DELETE' , 'subscriptions/' + this . name , null , true , function ( err ) {
264+ if ( err ) {
265+ callback ( err ) ;
266+ return ;
267+ }
268+ this . closed = true ;
269+ callback ( null ) ;
270+ } . bind ( this ) ) ;
271+ } ;
272+
273+ /**
274+ * Pull messages from the subscribed topic. If messages were found, your
275+ * callback is executed with the message object.
167276 *
168277 * Note that messages are pulled automatically once you register your first
169278 * event listener to the subscription, thus the call to `pull` is handled for
170- * you.
171- *
172- * Calling `pull` directly can be helpful after your subscription has been
173- * closed with {@linkcode module:pubsub/subscription#close}.
279+ * you. If you don't want to start pulling, simply don't register a
280+ * `subscription.on('message', function() {})` event handler.
174281 *
175282 * @param {object= } options - Configuration object.
176283 * @param {boolean= } options.returnImmediately - If set, the system will respond
@@ -179,7 +286,10 @@ Subscription.prototype.ack = function(ids, callback) {
179286 * @param {function } callback - The callback function.
180287 *
181288 * @example
182- * subscription.pull(function(err) {});
289+ * subscription.pull(function(err, message) {
290+ * // message.id = ID used to acknowledge its receival.
291+ * // message.data = Contents of the message.
292+ * });
183293 */
184294Subscription . prototype . pull = function ( options , callback ) {
185295 var that = this ;
@@ -199,104 +309,15 @@ Subscription.prototype.pull = function(options, callback) {
199309 callback ( err ) ;
200310 return ;
201311 }
202- if ( ! that . autoAck ) {
203- that . emitMessage_ ( message ) ;
204- callback ( ) ;
205- return ;
312+ message = Subscription . formatMessage_ ( message ) ;
313+ if ( that . autoAck ) {
314+ that . ack ( message . id , function ( err ) {
315+ callback ( err , message ) ;
316+ } ) ;
317+ } else {
318+ callback ( null , message ) ;
206319 }
207- that . ack ( message . ackId , function ( err ) {
208- if ( err ) {
209- callback ( err ) ;
210- return ;
211- }
212- that . emitMessage_ ( message ) ;
213- callback ( ) ;
214- } ) ;
215320 } ) ;
216321} ;
217322
218- /**
219- * Delete the subscription. Pull requests from the current subscription will be
220- * errored once unsubscription is complete.
221- *
222- * @param {function= } callback - The callback function.
223- *
224- * @example
225- * subscription.delete(function(err) {});
226- */
227- Subscription . prototype . delete = function ( callback ) {
228- callback = callback || util . noop ;
229- this . request (
230- 'DELETE' , 'subscriptions/' + this . name , null , true , function ( err ) {
231- if ( err ) {
232- callback ( err ) ;
233- return ;
234- }
235- this . closed = true ;
236- callback ( null ) ;
237- } . bind ( this ) ) ;
238- } ;
239-
240- /**
241- * Poll the backend for new messages. This runs a loop to ping the API at the
242- * provided interval from the subscription's instantiation. If you didn't
243- * provide one, the default value is 10 milliseconds.
244- *
245- * If messages are received, you can catch them by registering a listener for
246- * the `message` event.
247- *
248- * To stop pulling, see {@linkcode module:pubsub/subscription#stopPulling}.
249- *
250- * @private
251- *
252- * @example
253- * subscription.startPulling_();
254- */
255- Subscription . prototype . startPulling_ = function ( ) {
256- if ( this . closed ) {
257- return ;
258- }
259- this . pull ( {
260- returnImmediately : false
261- } , function ( err ) {
262- if ( err && err . code === 400 ) {
263- this . emit ( 'error' , err ) ;
264- }
265- setTimeout ( this . startPulling_ . bind ( this ) , this . interval ) ;
266- } . bind ( this ) ) ;
267- } ;
268-
269- /**
270- * Stop the subscription from automatically pulling. You will still be able to
271- * call {@linkcode module:pubsub/subscription#pull} directly.
272- *
273- * @example
274- * subscription.close();
275- */
276- Subscription . prototype . close = function ( ) {
277- this . closed = true ;
278- } ;
279-
280- /**
281- * Emits a 'message' event with the provided message.
282- *
283- * The message is simplified from the API response to have simply two
284- * properties, `id` and `data`. `data` is always converted to a string.
285- *
286- * @private
287- */
288- Subscription . prototype . emitMessage_ = function ( msg ) {
289- var message = {
290- id : msg . ackId
291- } ;
292- if ( msg . pubsubEvent && msg . pubsubEvent . message ) {
293- message . data =
294- new Buffer ( msg . pubsubEvent . message . data , 'base64' ) . toString ( 'utf-8' ) ;
295- try {
296- message . data = JSON . parse ( message . data ) ;
297- } catch ( e ) { }
298- }
299- this . emit ( 'message' , message ) ;
300- } ;
301-
302323module . exports = Subscription ;
0 commit comments