Skip to content

Commit bd09731

Browse files
bigquery: implement Job as an event emitter
1 parent f4e431f commit bd09731

4 files changed

Lines changed: 143 additions & 69 deletions

File tree

lib/bigquery/index.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,7 @@ BigQuery.prototype.job = function(id) {
468468
* });
469469
*/
470470
BigQuery.prototype.query = function(options, callback) {
471-
var that = this;
471+
var self = this;
472472

473473
if (is.string(options)) {
474474
options = {
@@ -485,13 +485,13 @@ BigQuery.prototype.query = function(options, callback) {
485485

486486
if (job) {
487487
// Get results of the query.
488-
that.request({
488+
self.request({
489489
uri: '/queries/' + job.id,
490490
qs: requestQuery
491491
}, responseHandler);
492492
} else {
493493
// Create a job.
494-
that.request({
494+
self.request({
495495
method: 'POST',
496496
uri: '/queries',
497497
json: options
@@ -521,7 +521,7 @@ BigQuery.prototype.query = function(options, callback) {
521521
}
522522
if (nextQuery && !nextQuery.job && resp.jobReference.jobId) {
523523
// Create a prepared Job to continue the query.
524-
nextQuery.job = that.job(resp.jobReference.jobId);
524+
nextQuery.job = self.job(resp.jobReference.jobId);
525525
}
526526

527527
callback(null, rows, nextQuery, resp);

lib/bigquery/job.js

Lines changed: 102 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020

2121
'use strict';
2222

23+
var events = require('events');
2324
var is = require('is');
24-
var nodeutil = require('util');
25+
var modelo = require('modelo');
2526

2627
/**
2728
* @type {module:common/service-object}
@@ -72,6 +73,30 @@ var util = require('../common/util.js');
7273
* var bigquery = gcloud.bigquery();
7374
*
7475
* var job = bigquery.job('job-id');
76+
*
77+
* //-
78+
* // All jobs are event emitters. The status of each job is polled
79+
* // continuously, starting only after you register a "complete" listener.
80+
* //-
81+
* job.on('complete', function(metadata) {
82+
* // The job is complete.
83+
* });
84+
*
85+
* //-
86+
* // Be sure to register an error handler as well to catch any issues which
87+
* // impeded the job.
88+
* //-
89+
* job.on('error', function(err) {
90+
* // An error occurred during the job.
91+
* });
92+
*
93+
* //-
94+
* // To force the Job object to stop polling for updates, simply remove any
95+
* // "complete" listeners you've registered.
96+
* //
97+
* // The easiest way to do this is with `removeAllListeners()`.
98+
* //-
99+
* job.removeAllListeners();
75100
*/
76101
function Job(bigQuery, id) {
77102
var methods = {
@@ -126,6 +151,8 @@ function Job(bigQuery, id) {
126151
methods: methods
127152
});
128153

154+
events.EventEmitter.call(this);
155+
129156
this.bigQuery = bigQuery;
130157

131158
// The API endpoint for cancel is: .../bigquery/v2/project/projectId/...
@@ -139,9 +166,14 @@ function Job(bigQuery, id) {
139166
return reqOpts;
140167
}
141168
});
169+
170+
this.completeListeners = 0;
171+
this.hasActiveListeners = false;
172+
173+
this.listenForEvents_();
142174
}
143175

144-
nodeutil.inherits(Job, ServiceObject);
176+
modelo.inherits(Job, ServiceObject, events.EventEmitter);
145177

146178
/**
147179
* Cancel a job. Use {module:bigquery/job#getMetadata} to see if the cancel
@@ -156,35 +188,9 @@ nodeutil.inherits(Job, ServiceObject);
156188
* @example
157189
* job.cancel(function(err, apiResponse) {
158190
* // Check to see if the job completes successfully.
159-
* onJobComplete(function(err) {
160-
* if (!err) {
161-
* // Job cancelled successfully.
162-
* }
163-
* });
191+
* job.on('error', function(err) {});
192+
* job.on('complete', function(metadata) {});
164193
* });
165-
*
166-
* function onJobComplete(callback) {
167-
* // Start a loop to check the status of the operation.
168-
* checkJobStatus();
169-
*
170-
* function checkJobStatus() {
171-
* job.getMetadata(function(err, apiResponse) {
172-
* if (err) {
173-
* callback(err);
174-
* return;
175-
* }
176-
*
177-
* if (apiResponse.status.state !== 'DONE') {
178-
* // Job has not completed yet. Check again in 3 seconds.
179-
* setTimeout(checkJobStatus, 3000);
180-
* return;
181-
* }
182-
*
183-
* // Job completed sucessfully.
184-
* callback();
185-
* });
186-
* }
187-
* }
188194
*/
189195
Job.prototype.cancel = function(callback) {
190196
callback = callback || util.noop;
@@ -279,4 +285,70 @@ Job.prototype.getQueryResults = function(options, callback) {
279285
return this.bigQuery.query(options, callback);
280286
};
281287

288+
/**
289+
* Begin listening for events on the job. This method keeps track of how many
290+
* "complete" listeners are registered and removed, making sure polling is
291+
* handled automatically.
292+
*
293+
* As long as there is one active "complete" listener, the connection is open.
294+
* When there are no more listeners, the polling stops.
295+
*
296+
* @private
297+
*/
298+
Job.prototype.listenForEvents_ = function() {
299+
var self = this;
300+
301+
this.on('newListener', function(event) {
302+
if (event === 'complete') {
303+
self.completeListeners++;
304+
305+
if (!self.hasActiveListeners) {
306+
self.hasActiveListeners = true;
307+
self.startPolling_();
308+
}
309+
}
310+
});
311+
312+
this.on('removeListener', function(event) {
313+
if (event === 'complete' && --self.completeListeners === 0) {
314+
self.hasActiveListeners = false;
315+
}
316+
});
317+
};
318+
319+
/**
320+
* Poll `getMetadata` to check the operation's status. This runs a loop to ping
321+
* the API on an interval.
322+
*
323+
* Note: This method is automatically called once a "complete" event handler is
324+
* registered on the operation.
325+
*
326+
* @private
327+
*/
328+
Job.prototype.startPolling_ = function() {
329+
var self = this;
330+
331+
if (!this.hasActiveListeners) {
332+
return;
333+
}
334+
335+
this.getMetadata(function(err, metadata, apiResponse) {
336+
if (apiResponse.status && apiResponse.status.errors) {
337+
err = util.ApiError(apiResponse.status);
338+
}
339+
340+
if (err) {
341+
self.emit('error', err);
342+
return;
343+
}
344+
345+
if (metadata.status.state !== 'DONE') {
346+
setTimeout(self.startPolling_.bind(self), 500);
347+
return;
348+
}
349+
350+
self.emit('complete', metadata);
351+
});
352+
};
353+
282354
module.exports = Job;

lib/bigquery/table.js

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,10 @@ Table.mergeSchemaWithRows_ = function(schema, rows) {
276276
*
277277
* @example
278278
* var yourTable = dataset.table('your-table');
279-
* table.copy(yourTable, function(err, job, apiResponse) {});
279+
* table.copy(yourTable, function(err, job, apiResponse) {
280+
* // `job` is a Job object that can be used to check the status of the
281+
* // request.
282+
* });
280283
*
281284
* //-
282285
* // See the [`configuration.copy`](http://goo.gl/dKWIyS) object for all
@@ -387,11 +390,8 @@ Table.prototype.createReadStream = function() {
387390
* request.get(csvUrl)
388391
* .pipe(table.createWriteStream(metadata))
389392
* .on('complete', function(job) {
390-
* // job is a Job object, which you can use to check the status of the load
391-
* // operation.
392-
* job.getMetadata(function(err, metadata) {
393-
* // metadata.status
394-
* });
393+
* // `job` is a Job object that can be used to check the status of the
394+
* // request.
395395
* });
396396
*
397397
* //-
@@ -499,7 +499,10 @@ Table.prototype.createWriteStream = function(metadata) {
499499
* // If you wish to override this, or provide an array of destination files,
500500
* // you must provide an `options` object.
501501
* //-
502-
* table.export(exportedFile, function(err, job, apiResponse) {});
502+
* table.export(exportedFile, function(err, job, apiResponse) {
503+
* // `job` is a Job object that can be used to check the status of the
504+
* // request.
505+
* });
503506
*
504507
* //-
505508
* // If you need more customization, pass an `options` object.
@@ -737,7 +740,10 @@ Table.prototype.getRows = function(options, callback) {
737740
* //-
738741
* // Load data from a local file.
739742
* //-
740-
* table.import('./institutions.csv', function(err, job, apiResponse) {});
743+
* table.import('./institutions.csv', function(err, job, apiResponse) {
744+
* // `job` is a Job object that can be used to check the status of the
745+
* // request.
746+
* });
741747
*
742748
* //-
743749
* // You may also pass in metadata in the format of a Jobs resource. See

system-test/bigquery.js

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -203,38 +203,20 @@ describe('BigQuery', function() {
203203
});
204204

205205
it('should cancel a job', function(done) {
206-
var query = 'SELECT * FROM [publicdata:samples.github_nested]';
206+
var query = 'SELECT url FROM [publicdata:samples.github_nested] LIMIT 10';
207207

208208
bigquery.startQuery(query, function(err, job) {
209209
assert.ifError(err);
210210

211211
job.cancel(function(err) {
212212
assert.ifError(err);
213-
onJobComplete(done);
214-
});
215-
216-
function onJobComplete(callback) {
217-
// Start a loop to check the status of the operation.
218-
checkJobStatus();
219-
220-
function checkJobStatus() {
221-
job.getMetadata(function(err, apiResponse) {
222-
if (err) {
223-
callback(err);
224-
return;
225-
}
226-
227-
if (apiResponse.status.state !== 'DONE') {
228-
// Job has not completed yet. Check again in 3 seconds.
229-
setTimeout(checkJobStatus, 3000);
230-
return;
231-
}
232213

233-
// Job completed sucessfully.
234-
callback();
214+
job
215+
.on('error', done)
216+
.on('complete', function() {
217+
done();
235218
});
236-
}
237-
}
219+
});
238220
});
239221
});
240222

@@ -346,8 +328,12 @@ describe('BigQuery', function() {
346328
it('should import data from a file in your bucket', function(done) {
347329
table.import(file, function(err, job) {
348330
assert.ifError(err);
349-
assert(job instanceof Job);
350-
done();
331+
332+
job
333+
.on('error', done)
334+
.on('complete', function() {
335+
done();
336+
});
351337
});
352338
});
353339

@@ -395,7 +381,17 @@ describe('BigQuery', function() {
395381
});
396382

397383
it('should export data to a file in your bucket', function(done) {
398-
table.export(bucket.file('kitten-test-data-backup.json'), done);
384+
var file = bucket.file('kitten-test-data-backup.json');
385+
386+
table.export(file, function(err, job) {
387+
assert.ifError(err);
388+
389+
job
390+
.on('error', done)
391+
.on('complete', function() {
392+
done();
393+
});
394+
});
399395
});
400396
});
401397
});

0 commit comments

Comments
 (0)