I have a complex application and I am using Apache Flink to stream events from Kafka to BigQuery; the application queries the dataset first in order to get the latest offset for each topic/partition. This is how the code running the query looks like:
var response = retryUntilSuccess(maxFetchOffsetAttempts, maxFetchOffsetDelay) { () =>
Thread.sleep(Random.nextInt(maxFetchOffsetDelay.toMillis.toInt))
bigQuery.query(
QueryRequest
.newBuilder(offsetsQuery)
.setUseLegacySql(false)
.setDefaultDataset(config.dataset)
.build()
)
}
while (!response.jobCompleted()) {
logger.warn(
s"Offsets for ${record.tableName} are not ready, will retry in 1 second, jobId: ${response.getJobId()}")
Thread.sleep(1000)
response = bigQuery.getQueryResults(response.getJobId())
}
val result = response.getResult()
This is written in Scala but it is mostly a translation of the example in the Javadocs; the retryUntilSuccess function retries the request multiple time and there is a random wait (30 seconds at this moment) before the request so to handle the fact that many queries will fire in parallel and may result in hitting API limits.
When I run this code I almost always get the following exception:
java.lang.NullPointerException
at com.google.cloud.bigquery.JobId.fromPb(JobId.java:111)
at com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:635)
at com.google.cloud.bigquery.BigQueryImpl.getQueryResults(BigQueryImpl.java:619)
at io.chumps.dataprocessing.sinks.BigQuerySink$$anon$1.startingOffsets$lzycompute(BigQuerySink.scala:89)
at io.chumps.dataprocessing.sinks.BigQuerySink$$anon$1.startingOffsets(BigQuerySink.scala:73)
at io.chumps.dataprocessing.sinks.BigQuerySink$$anon$1.open(BigQuerySink.scala:136)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:376)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:256)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:745)
The line in our code where this exception happens is where I call getQueryResults and I am sure that I am not sending a null in there as I can see the job id in the logs before the failure; I also checked that the response doesn't contain any error by calling getExecutionErrors. I say that "it almost always fails" because over some tens attempts it managed to run a couple of times without incurring in the exception.
Could this be a bug in the way the API limits are hadled by the client? Do you have any advice to work around this issue?
Thanks!
I have a complex application and I am using Apache Flink to stream events from Kafka to BigQuery; the application queries the dataset first in order to get the latest offset for each topic/partition. This is how the code running the query looks like:
This is written in Scala but it is mostly a translation of the example in the Javadocs; the
retryUntilSuccessfunction retries the request multiple time and there is a random wait (30 seconds at this moment) before the request so to handle the fact that many queries will fire in parallel and may result in hitting API limits.When I run this code I almost always get the following exception:
The line in our code where this exception happens is where I call
getQueryResultsand I am sure that I am not sending anullin there as I can see the job id in the logs before the failure; I also checked that the response doesn't contain any error by callinggetExecutionErrors. I say that "it almost always fails" because over some tens attempts it managed to run a couple of times without incurring in the exception.Could this be a bug in the way the API limits are hadled by the client? Do you have any advice to work around this issue?
Thanks!