2020
2121'use strict' ;
2222
23+ var events = require ( 'events' ) ;
2324var is = require ( 'is' ) ;
24- var nodeutil = require ( 'util ' ) ;
25+ var modelo = require ( 'modelo ' ) ;
2526
2627/**
2728 * @type {module:common/service-object }
@@ -72,6 +73,30 @@ var util = require('../common/util.js');
7273 * var bigquery = gcloud.bigquery();
7374 *
7475 * var job = bigquery.job('job-id');
76+ *
77+ * //-
78+ * // All jobs are event emitters. The status of each job is polled
79+ * // continuously, starting only after you register a "complete" listener.
80+ * //-
81+ * job.on('complete', function(metadata) {
82+ * // The job is complete.
83+ * });
84+ *
85+ * //-
86+ * // Be sure to register an error handler as well to catch any issues which
87+ * // impeded the job.
88+ * //-
89+ * job.on('error', function(err) {
90+ * // An error occurred during the job.
91+ * });
92+ *
93+ * //-
94+ * // To force the Job object to stop polling for updates, simply remove any
95+ * // "complete" listeners you've registered.
96+ * //
97+ * // The easiest way to do this is with `removeAllListeners()`.
98+ * //-
99+ * job.removeAllListeners();
75100 */
76101function Job ( bigQuery , id ) {
77102 var methods = {
@@ -126,6 +151,8 @@ function Job(bigQuery, id) {
126151 methods : methods
127152 } ) ;
128153
154+ events . EventEmitter . call ( this ) ;
155+
129156 this . bigQuery = bigQuery ;
130157
131158 // The API endpoint for cancel is: .../bigquery/v2/project/projectId/...
@@ -139,9 +166,14 @@ function Job(bigQuery, id) {
139166 return reqOpts ;
140167 }
141168 } ) ;
169+
170+ this . completeListeners = 0 ;
171+ this . hasActiveListeners = false ;
172+
173+ this . listenForEvents_ ( ) ;
142174}
143175
144- nodeutil . inherits ( Job , ServiceObject ) ;
176+ modelo . inherits ( Job , ServiceObject , events . EventEmitter ) ;
145177
146178/**
147179 * Cancel a job. Use {module:bigquery/job#getMetadata} to see if the cancel
@@ -156,35 +188,9 @@ nodeutil.inherits(Job, ServiceObject);
156188 * @example
157189 * job.cancel(function(err, apiResponse) {
158190 * // Check to see if the job completes successfully.
159- * onJobComplete(function(err) {
160- * if (!err) {
161- * // Job cancelled successfully.
162- * }
163- * });
191+ * job.on('error', function(err) {});
192+ * job.on('complete', function(metadata) {});
164193 * });
165- *
166- * function onJobComplete(callback) {
167- * // Start a loop to check the status of the operation.
168- * checkJobStatus();
169- *
170- * function checkJobStatus() {
171- * job.getMetadata(function(err, apiResponse) {
172- * if (err) {
173- * callback(err);
174- * return;
175- * }
176- *
177- * if (apiResponse.status.state !== 'DONE') {
178- * // Job has not completed yet. Check again in 3 seconds.
179- * setTimeout(checkJobStatus, 3000);
180- * return;
181- * }
182- *
183- * // Job completed sucessfully.
184- * callback();
185- * });
186- * }
187- * }
188194 */
189195Job . prototype . cancel = function ( callback ) {
190196 callback = callback || util . noop ;
@@ -279,4 +285,70 @@ Job.prototype.getQueryResults = function(options, callback) {
279285 return this . bigQuery . query ( options , callback ) ;
280286} ;
281287
288+ /**
289+ * Begin listening for events on the job. This method keeps track of how many
290+ * "complete" listeners are registered and removed, making sure polling is
291+ * handled automatically.
292+ *
293+ * As long as there is one active "complete" listener, the connection is open.
294+ * When there are no more listeners, the polling stops.
295+ *
296+ * @private
297+ */
298+ Job . prototype . listenForEvents_ = function ( ) {
299+ var self = this ;
300+
301+ this . on ( 'newListener' , function ( event ) {
302+ if ( event === 'complete' ) {
303+ self . completeListeners ++ ;
304+
305+ if ( ! self . hasActiveListeners ) {
306+ self . hasActiveListeners = true ;
307+ self . startPolling_ ( ) ;
308+ }
309+ }
310+ } ) ;
311+
312+ this . on ( 'removeListener' , function ( event ) {
313+ if ( event === 'complete' && -- self . completeListeners === 0 ) {
314+ self . hasActiveListeners = false ;
315+ }
316+ } ) ;
317+ } ;
318+
319+ /**
320+ * Poll `getMetadata` to check the operation's status. This runs a loop to ping
321+ * the API on an interval.
322+ *
323+ * Note: This method is automatically called once a "complete" event handler is
324+ * registered on the operation.
325+ *
326+ * @private
327+ */
328+ Job . prototype . startPolling_ = function ( ) {
329+ var self = this ;
330+
331+ if ( ! this . hasActiveListeners ) {
332+ return ;
333+ }
334+
335+ this . getMetadata ( function ( err , metadata , apiResponse ) {
336+ if ( apiResponse . status && apiResponse . status . errors ) {
337+ err = util . ApiError ( apiResponse . status ) ;
338+ }
339+
340+ if ( err ) {
341+ self . emit ( 'error' , err ) ;
342+ return ;
343+ }
344+
345+ if ( metadata . status . state !== 'DONE' ) {
346+ setTimeout ( self . startPolling_ . bind ( self ) , 500 ) ;
347+ return ;
348+ }
349+
350+ self . emit ( 'complete' , metadata ) ;
351+ } ) ;
352+ } ;
353+
282354module . exports = Job ;
0 commit comments