4848import org .apache .beam .sdk .testing .PAssert ;
4949import org .apache .beam .sdk .testing .TestPipeline ;
5050import org .apache .beam .sdk .transforms .Create ;
51+ import org .apache .beam .sdk .transforms .MapElements ;
5152import org .apache .beam .sdk .values .PCollection ;
5253import org .apache .beam .sdk .values .PCollectionRowTuple ;
5354import org .apache .beam .sdk .values .Row ;
55+ import org .apache .beam .sdk .values .TypeDescriptors ;
5456import org .junit .Before ;
5557import org .junit .Rule ;
5658import org .junit .Test ;
@@ -211,7 +213,13 @@ public void testInputElementCount() throws Exception {
211213 public void testFailedRows () throws Exception {
212214 String tableSpec = "project:dataset.write_with_fail" ;
213215 BigQueryStorageWriteApiSchemaTransformConfiguration config =
214- BigQueryStorageWriteApiSchemaTransformConfiguration .builder ().setTable (tableSpec ).build ();
216+ BigQueryStorageWriteApiSchemaTransformConfiguration .builder ()
217+ .setTable (tableSpec )
218+ .setErrorHandling (
219+ BigQueryStorageWriteApiSchemaTransformConfiguration .ErrorHandling .builder ()
220+ .setOutput ("FailedRows" )
221+ .build ())
222+ .build ();
215223
216224 String failValue = "fail_me" ;
217225
@@ -234,7 +242,15 @@ public void testFailedRows() throws Exception {
234242 fakeDatasetService .setShouldFailRow (shouldFailRow );
235243
236244 PCollectionRowTuple result = runWithConfig (config , totalRows );
237- PCollection <Row > failedRows = result .get ("FailedRows" );
245+ PCollection <Row > failedRows =
246+ result
247+ .get ("FailedRows" )
248+ .apply (
249+ "ExtractFailedRows" ,
250+ MapElements .into (TypeDescriptors .rows ())
251+ .via ((rowAndError ) -> rowAndError .<Row >getValue ("failed_row" )))
252+ .setRowSchema (SCHEMA );
253+ ;
238254
239255 PAssert .that (failedRows ).containsInAnyOrder (expectedFailedRows );
240256 p .run ().waitUntilFinish ();
@@ -250,7 +266,13 @@ public void testFailedRows() throws Exception {
250266 public void testErrorCount () throws Exception {
251267 String tableSpec = "project:dataset.error_count" ;
252268 BigQueryStorageWriteApiSchemaTransformConfiguration config =
253- BigQueryStorageWriteApiSchemaTransformConfiguration .builder ().setTable (tableSpec ).build ();
269+ BigQueryStorageWriteApiSchemaTransformConfiguration .builder ()
270+ .setTable (tableSpec )
271+ .setErrorHandling (
272+ BigQueryStorageWriteApiSchemaTransformConfiguration .ErrorHandling .builder ()
273+ .setOutput ("FailedRows" )
274+ .build ())
275+ .build ();
254276
255277 Function <TableRow , Boolean > shouldFailRow =
256278 (Function <TableRow , Boolean > & Serializable ) tr -> tr .get ("name" ).equals ("a" );
0 commit comments