Skip to content

Commit 63c16bb

Browse files
committed
Fix BigQuery Schema Provider test.
1 parent 2217cf4 commit 63c16bb

File tree

1 file changed

+25
-3
lines changed

1 file changed

+25
-3
lines changed

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,11 @@
4848
import org.apache.beam.sdk.testing.PAssert;
4949
import org.apache.beam.sdk.testing.TestPipeline;
5050
import org.apache.beam.sdk.transforms.Create;
51+
import org.apache.beam.sdk.transforms.MapElements;
5152
import org.apache.beam.sdk.values.PCollection;
5253
import org.apache.beam.sdk.values.PCollectionRowTuple;
5354
import org.apache.beam.sdk.values.Row;
55+
import org.apache.beam.sdk.values.TypeDescriptors;
5456
import org.junit.Before;
5557
import org.junit.Rule;
5658
import org.junit.Test;
@@ -211,7 +213,13 @@ public void testInputElementCount() throws Exception {
211213
public void testFailedRows() throws Exception {
212214
String tableSpec = "project:dataset.write_with_fail";
213215
BigQueryStorageWriteApiSchemaTransformConfiguration config =
214-
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
216+
BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
217+
.setTable(tableSpec)
218+
.setErrorHandling(
219+
BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
220+
.setOutput("FailedRows")
221+
.build())
222+
.build();
215223

216224
String failValue = "fail_me";
217225

@@ -234,7 +242,15 @@ public void testFailedRows() throws Exception {
234242
fakeDatasetService.setShouldFailRow(shouldFailRow);
235243

236244
PCollectionRowTuple result = runWithConfig(config, totalRows);
237-
PCollection<Row> failedRows = result.get("FailedRows");
245+
PCollection<Row> failedRows =
246+
result
247+
.get("FailedRows")
248+
.apply(
249+
"ExtractFailedRows",
250+
MapElements.into(TypeDescriptors.rows())
251+
.via((rowAndError) -> rowAndError.<Row>getValue("failed_row")))
252+
.setRowSchema(SCHEMA);
253+
;
238254

239255
PAssert.that(failedRows).containsInAnyOrder(expectedFailedRows);
240256
p.run().waitUntilFinish();
@@ -250,7 +266,13 @@ public void testFailedRows() throws Exception {
250266
public void testErrorCount() throws Exception {
251267
String tableSpec = "project:dataset.error_count";
252268
BigQueryStorageWriteApiSchemaTransformConfiguration config =
253-
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
269+
BigQueryStorageWriteApiSchemaTransformConfiguration.builder()
270+
.setTable(tableSpec)
271+
.setErrorHandling(
272+
BigQueryStorageWriteApiSchemaTransformConfiguration.ErrorHandling.builder()
273+
.setOutput("FailedRows")
274+
.build())
275+
.build();
254276

255277
Function<TableRow, Boolean> shouldFailRow =
256278
(Function<TableRow, Boolean> & Serializable) tr -> tr.get("name").equals("a");

0 commit comments

Comments
 (0)