|
17 | 17 | package com.google.cloud.bigquery.storage.v1beta1.it; |
18 | 18 |
|
19 | 19 | import static com.google.common.truth.Truth.assertWithMessage; |
| 20 | +import static org.junit.Assert.assertArrayEquals; |
20 | 21 | import static org.junit.Assert.assertEquals; |
21 | 22 | import static org.junit.Assert.assertNotNull; |
22 | 23 | import static org.junit.Assert.assertNull; |
|
55 | 56 | import com.google.protobuf.TextFormat; |
56 | 57 | import com.google.protobuf.Timestamp; |
57 | 58 | import java.io.IOException; |
| 59 | +import java.math.BigDecimal; |
| 60 | +import java.nio.ByteBuffer; |
58 | 61 | import java.util.ArrayList; |
59 | 62 | import java.util.Arrays; |
60 | 63 | import java.util.Collections; |
61 | 64 | import java.util.Iterator; |
62 | 65 | import java.util.List; |
63 | 66 | import java.util.logging.Logger; |
| 67 | +import org.apache.avro.Conversions; |
| 68 | +import org.apache.avro.LogicalTypes; |
64 | 69 | import org.apache.avro.Schema; |
65 | 70 | import org.apache.avro.generic.GenericData; |
66 | 71 | import org.apache.avro.generic.GenericRecordBuilder; |
|
69 | 74 | import org.junit.BeforeClass; |
70 | 75 | import org.junit.Test; |
71 | 76 | import org.threeten.bp.Duration; |
| 77 | +import org.threeten.bp.Instant; |
| 78 | +import org.threeten.bp.LocalDate; |
| 79 | +import org.threeten.bp.LocalTime; |
| 80 | +import org.threeten.bp.ZoneOffset; |
| 81 | +import org.threeten.bp.ZonedDateTime; |
| 82 | +import org.threeten.bp.format.DateTimeFormatter; |
72 | 83 |
|
73 | 84 | /** Integration tests for BigQuery Storage API. */ |
74 | 85 | public class ITBigQueryStorageTest { |
@@ -502,6 +513,343 @@ public void testIngestionTimePartitionedTable() throws InterruptedException, IOE |
502 | 513 | assertEquals(2L, partitionFilteredRows.get(0).get("num_field")); |
503 | 514 | } |
504 | 515 |
|
| 516 | + @Test |
| 517 | + public void testBasicSqlTypes() throws InterruptedException, IOException { |
| 518 | + String table_name = "test_basic_sql_types"; |
| 519 | + String createTableStatement = |
| 520 | + String.format( |
| 521 | + " CREATE TABLE %s.%s " |
| 522 | + + " (int_field INT64 NOT NULL," |
| 523 | + + " num_field NUMERIC NOT NULL," |
| 524 | + + " float_field FLOAT64 NOT NULL," |
| 525 | + + " bool_field BOOL NOT NULL," |
| 526 | + + " str_field STRING NOT NULL," |
| 527 | + + " bytes_field BYTES NOT NULL) " |
| 528 | + + " OPTIONS( " |
| 529 | + + " description=\"a table with basic column types\" " |
| 530 | + + " ) " |
| 531 | + + "AS " |
| 532 | + + " SELECT " |
| 533 | + + " 17," |
| 534 | + + " CAST(1234.56 AS NUMERIC)," |
| 535 | + + " 6.547678," |
| 536 | + + " TRUE," |
| 537 | + + " \"String field value\"," |
| 538 | + + " b\"абвгд\"", |
| 539 | + DATASET, table_name); |
| 540 | + |
| 541 | + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); |
| 542 | + |
| 543 | + TableReference tableReference = |
| 544 | + TableReference.newBuilder() |
| 545 | + .setTableId(table_name) |
| 546 | + .setDatasetId(DATASET) |
| 547 | + .setProjectId(ServiceOptions.getDefaultProjectId()) |
| 548 | + .build(); |
| 549 | + |
| 550 | + List<GenericData.Record> rows = |
| 551 | + ReadAllRows(/* tableReference = */ tableReference, /* filter = */ null); |
| 552 | + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); |
| 553 | + |
| 554 | + GenericData.Record record = rows.get(0); |
| 555 | + Schema avroSchema = record.getSchema(); |
| 556 | + |
| 557 | + String actualSchemaMessage = |
| 558 | + String.format( |
| 559 | + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); |
| 560 | + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); |
| 561 | + |
| 562 | + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); |
| 563 | + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); |
| 564 | + assertEquals(actualSchemaMessage, 6, avroSchema.getFields().size()); |
| 565 | + |
| 566 | + assertEquals( |
| 567 | + actualSchemaMessage, Schema.Type.LONG, avroSchema.getField("int_field").schema().getType()); |
| 568 | + assertEquals(rowAssertMessage, 17L, (long) record.get("int_field")); |
| 569 | + |
| 570 | + assertEquals( |
| 571 | + actualSchemaMessage, |
| 572 | + Schema.Type.BYTES, |
| 573 | + avroSchema.getField("num_field").schema().getType()); |
| 574 | + assertEquals( |
| 575 | + actualSchemaMessage, |
| 576 | + LogicalTypes.decimal(/* precision = */ 38, /* scale = */ 9), |
| 577 | + avroSchema.getField("num_field").schema().getLogicalType()); |
| 578 | + BigDecimal actual_num_field = |
| 579 | + new Conversions.DecimalConversion() |
| 580 | + .fromBytes( |
| 581 | + (ByteBuffer) record.get("num_field"), |
| 582 | + avroSchema, |
| 583 | + avroSchema.getField("num_field").schema().getLogicalType()); |
| 584 | + assertEquals( |
| 585 | + rowAssertMessage, |
| 586 | + BigDecimal.valueOf(/* unscaledVal = */ 1_234_560_000_000L, /* scale = */ 9), |
| 587 | + actual_num_field); |
| 588 | + |
| 589 | + assertEquals( |
| 590 | + actualSchemaMessage, |
| 591 | + Schema.Type.DOUBLE, |
| 592 | + avroSchema.getField("float_field").schema().getType()); |
| 593 | + assertEquals( |
| 594 | + rowAssertMessage, |
| 595 | + /* expected = */ 6.547678d, |
| 596 | + /* actual = */ (double) record.get("float_field"), |
| 597 | + /* delta = */ 0.0001); |
| 598 | + |
| 599 | + assertEquals( |
| 600 | + actualSchemaMessage, |
| 601 | + Schema.Type.BOOLEAN, |
| 602 | + avroSchema.getField("bool_field").schema().getType()); |
| 603 | + assertEquals(rowAssertMessage, true, record.get("bool_field")); |
| 604 | + |
| 605 | + assertEquals( |
| 606 | + actualSchemaMessage, |
| 607 | + Schema.Type.STRING, |
| 608 | + avroSchema.getField("str_field").schema().getType()); |
| 609 | + assertEquals(rowAssertMessage, new Utf8("String field value"), record.get("str_field")); |
| 610 | + |
| 611 | + assertEquals( |
| 612 | + actualSchemaMessage, |
| 613 | + Schema.Type.BYTES, |
| 614 | + avroSchema.getField("bytes_field").schema().getType()); |
| 615 | + assertArrayEquals( |
| 616 | + rowAssertMessage, |
| 617 | + Utf8.getBytesFor("абвгд"), |
| 618 | + ((ByteBuffer) (record.get("bytes_field"))).array()); |
| 619 | + } |
| 620 | + |
| 621 | + @Test |
| 622 | + public void testDateAndTimeSqlTypes() throws InterruptedException, IOException { |
| 623 | + String table_name = "test_date_and_time_sql_types"; |
| 624 | + String createTableStatement = |
| 625 | + String.format( |
| 626 | + " CREATE TABLE %s.%s " |
| 627 | + + " (date_field DATE NOT NULL," |
| 628 | + + " datetime_field DATETIME NOT NULL," |
| 629 | + + " time_field TIME NOT NULL," |
| 630 | + + " timestamp_field TIMESTAMP NOT NULL)" |
| 631 | + + " OPTIONS( " |
| 632 | + + " description=\"a table with date and time column types\" " |
| 633 | + + " ) " |
| 634 | + + "AS " |
| 635 | + + " SELECT " |
| 636 | + + " CAST(\"2019-05-31\" AS DATE)," |
| 637 | + + " CAST(\"2019-04-30 21:47:59.999999\" AS DATETIME)," |
| 638 | + + " CAST(\"21:47:59.999999\" AS TIME)," |
| 639 | + + " CAST(\"2019-04-30 19:24:19.123456 UTC\" AS TIMESTAMP)", |
| 640 | + DATASET, table_name); |
| 641 | + |
| 642 | + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); |
| 643 | + |
| 644 | + TableReference tableReference = |
| 645 | + TableReference.newBuilder() |
| 646 | + .setTableId(table_name) |
| 647 | + .setDatasetId(DATASET) |
| 648 | + .setProjectId(ServiceOptions.getDefaultProjectId()) |
| 649 | + .build(); |
| 650 | + |
| 651 | + List<GenericData.Record> rows = |
| 652 | + ReadAllRows(/* tableReference = */ tableReference, /* filter = */ null); |
| 653 | + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); |
| 654 | + |
| 655 | + GenericData.Record record = rows.get(0); |
| 656 | + Schema avroSchema = record.getSchema(); |
| 657 | + |
| 658 | + String actualSchemaMessage = |
| 659 | + String.format( |
| 660 | + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); |
| 661 | + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); |
| 662 | + |
| 663 | + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); |
| 664 | + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); |
| 665 | + assertEquals(actualSchemaMessage, 4, avroSchema.getFields().size()); |
| 666 | + |
| 667 | + assertEquals( |
| 668 | + actualSchemaMessage, Schema.Type.INT, avroSchema.getField("date_field").schema().getType()); |
| 669 | + assertEquals( |
| 670 | + actualSchemaMessage, |
| 671 | + LogicalTypes.date(), |
| 672 | + avroSchema.getField("date_field").schema().getLogicalType()); |
| 673 | + assertEquals( |
| 674 | + rowAssertMessage, |
| 675 | + LocalDate.of(/* year = */ 2019, /* month = */ 5, /* dayOfMonth = */ 31), |
| 676 | + LocalDate.ofEpochDay((int) record.get("date_field"))); |
| 677 | + |
| 678 | + assertEquals( |
| 679 | + actualSchemaMessage, |
| 680 | + Schema.Type.STRING, |
| 681 | + avroSchema.getField("datetime_field").schema().getType()); |
| 682 | + assertEquals( |
| 683 | + actualSchemaMessage, |
| 684 | + "datetime", |
| 685 | + avroSchema.getField("datetime_field").schema().getObjectProp("logicalType")); |
| 686 | + assertEquals( |
| 687 | + rowAssertMessage, |
| 688 | + new Utf8("2019-04-30T21:47:59.999999"), |
| 689 | + (Utf8) record.get("datetime_field")); |
| 690 | + |
| 691 | + assertEquals( |
| 692 | + actualSchemaMessage, |
| 693 | + Schema.Type.LONG, |
| 694 | + avroSchema.getField("time_field").schema().getType()); |
| 695 | + assertEquals( |
| 696 | + actualSchemaMessage, |
| 697 | + LogicalTypes.timeMicros(), |
| 698 | + avroSchema.getField("time_field").schema().getLogicalType()); |
| 699 | + assertEquals( |
| 700 | + rowAssertMessage, |
| 701 | + LocalTime.of( |
| 702 | + /* hour = */ 21, |
| 703 | + /* minute = */ 47, |
| 704 | + /* second = */ 59, |
| 705 | + /* nanoOfSecond = */ 999_999_000), |
| 706 | + LocalTime.ofNanoOfDay(1_000L * (long) record.get("time_field"))); |
| 707 | + |
| 708 | + assertEquals( |
| 709 | + actualSchemaMessage, |
| 710 | + Schema.Type.LONG, |
| 711 | + avroSchema.getField("timestamp_field").schema().getType()); |
| 712 | + assertEquals( |
| 713 | + actualSchemaMessage, |
| 714 | + LogicalTypes.timestampMicros(), |
| 715 | + avroSchema.getField("timestamp_field").schema().getLogicalType()); |
| 716 | + ZonedDateTime expected_timestamp = |
| 717 | + ZonedDateTime.parse( |
| 718 | + "2019-04-30T19:24:19Z", DateTimeFormatter.ISO_INSTANT.withZone(ZoneOffset.UTC)) |
| 719 | + .withNano(123_456_000); |
| 720 | + long actual_timestamp_micros = (long) record.get("timestamp_field"); |
| 721 | + ZonedDateTime actual_timestamp = |
| 722 | + ZonedDateTime.ofInstant( |
| 723 | + Instant.ofEpochSecond( |
| 724 | + /* epochSecond = */ actual_timestamp_micros / 1_000_000, |
| 725 | + (actual_timestamp_micros % 1_000_000) * 1_000), |
| 726 | + ZoneOffset.UTC); |
| 727 | + assertEquals(rowAssertMessage, expected_timestamp, actual_timestamp); |
| 728 | + } |
| 729 | + |
| 730 | + @Test |
| 731 | + public void testGeographySqlType() throws InterruptedException, IOException { |
| 732 | + String table_name = "test_geography_sql_type"; |
| 733 | + String createTableStatement = |
| 734 | + String.format( |
| 735 | + " CREATE TABLE %s.%s " |
| 736 | + + " (geo_field GEOGRAPHY NOT NULL)" |
| 737 | + + " OPTIONS( " |
| 738 | + + " description=\"a table with a geography column type\" " |
| 739 | + + " ) " |
| 740 | + + "AS " |
| 741 | + + " SELECT ST_GEOGPOINT(1.1, 2.2)", |
| 742 | + DATASET, table_name); |
| 743 | + |
| 744 | + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); |
| 745 | + |
| 746 | + TableReference tableReference = |
| 747 | + TableReference.newBuilder() |
| 748 | + .setTableId(table_name) |
| 749 | + .setDatasetId(DATASET) |
| 750 | + .setProjectId(ServiceOptions.getDefaultProjectId()) |
| 751 | + .build(); |
| 752 | + |
| 753 | + List<GenericData.Record> rows = |
| 754 | + ReadAllRows(/* tableReference = */ tableReference, /* filter = */ null); |
| 755 | + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); |
| 756 | + |
| 757 | + GenericData.Record record = rows.get(0); |
| 758 | + Schema avroSchema = record.getSchema(); |
| 759 | + |
| 760 | + String actualSchemaMessage = |
| 761 | + String.format( |
| 762 | + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); |
| 763 | + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); |
| 764 | + |
| 765 | + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); |
| 766 | + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); |
| 767 | + assertEquals(actualSchemaMessage, 1, avroSchema.getFields().size()); |
| 768 | + |
| 769 | + assertEquals( |
| 770 | + actualSchemaMessage, |
| 771 | + Schema.Type.STRING, |
| 772 | + avroSchema.getField("geo_field").schema().getType()); |
| 773 | + assertEquals( |
| 774 | + actualSchemaMessage, |
| 775 | + "GEOGRAPHY", |
| 776 | + avroSchema.getField("geo_field").schema().getObjectProp("sqlType")); |
| 777 | + assertEquals(rowAssertMessage, new Utf8("POINT(1.1 2.2)"), (Utf8) record.get("geo_field")); |
| 778 | + } |
| 779 | + |
| 780 | + @Test |
| 781 | + public void testStructAndArraySqlTypes() throws InterruptedException, IOException { |
| 782 | + String table_name = "test_struct_and_array_sql_types"; |
| 783 | + String createTableStatement = |
| 784 | + String.format( |
| 785 | + " CREATE TABLE %s.%s " |
| 786 | + + " (array_field ARRAY<INT64>," |
| 787 | + + " struct_field STRUCT<int_field INT64 NOT NULL, str_field STRING NOT NULL> NOT NULL)" |
| 788 | + + " OPTIONS( " |
| 789 | + + " description=\"a table with array and time column types\" " |
| 790 | + + " ) " |
| 791 | + + "AS " |
| 792 | + + " SELECT " |
| 793 | + + " [1, 2, 3]," |
| 794 | + + " (10, 'abc')", |
| 795 | + DATASET, table_name); |
| 796 | + |
| 797 | + RunQueryJobAndExpectSuccess(QueryJobConfiguration.newBuilder(createTableStatement).build()); |
| 798 | + |
| 799 | + TableReference tableReference = |
| 800 | + TableReference.newBuilder() |
| 801 | + .setTableId(table_name) |
| 802 | + .setDatasetId(DATASET) |
| 803 | + .setProjectId(ServiceOptions.getDefaultProjectId()) |
| 804 | + .build(); |
| 805 | + |
| 806 | + List<GenericData.Record> rows = |
| 807 | + ReadAllRows(/* tableReference = */ tableReference, /* filter = */ null); |
| 808 | + assertEquals("Actual rows read: " + rows.toString(), 1, rows.size()); |
| 809 | + |
| 810 | + GenericData.Record record = rows.get(0); |
| 811 | + Schema avroSchema = record.getSchema(); |
| 812 | + |
| 813 | + String actualSchemaMessage = |
| 814 | + String.format( |
| 815 | + "Unexpected schema. Actual schema:%n%s", avroSchema.toString(/* pretty = */ true)); |
| 816 | + String rowAssertMessage = String.format("Row not matching expectations: %s", record.toString()); |
| 817 | + |
| 818 | + assertEquals(actualSchemaMessage, Schema.Type.RECORD, avroSchema.getType()); |
| 819 | + assertEquals(actualSchemaMessage, "__root__", avroSchema.getName()); |
| 820 | + assertEquals(actualSchemaMessage, 2, avroSchema.getFields().size()); |
| 821 | + |
| 822 | + assertEquals( |
| 823 | + actualSchemaMessage, |
| 824 | + Schema.Type.ARRAY, |
| 825 | + avroSchema.getField("array_field").schema().getType()); |
| 826 | + assertEquals( |
| 827 | + actualSchemaMessage, |
| 828 | + Schema.Type.LONG, |
| 829 | + avroSchema.getField("array_field").schema().getElementType().getType()); |
| 830 | + assertArrayEquals( |
| 831 | + rowAssertMessage, |
| 832 | + new Long[] {1L, 2L, 3L}, |
| 833 | + ((GenericData.Array<Long>) record.get("array_field")).toArray(new Long[0])); |
| 834 | + |
| 835 | + // Validate the STRUCT field and its members. |
| 836 | + Schema structSchema = avroSchema.getField("struct_field").schema(); |
| 837 | + assertEquals(actualSchemaMessage, Schema.Type.RECORD, structSchema.getType()); |
| 838 | + GenericData.Record structRecord = (GenericData.Record) record.get("struct_field"); |
| 839 | + |
| 840 | + assertEquals( |
| 841 | + actualSchemaMessage, |
| 842 | + Schema.Type.LONG, |
| 843 | + structSchema.getField("int_field").schema().getType()); |
| 844 | + assertEquals(rowAssertMessage, 10L, (long) structRecord.get("int_field")); |
| 845 | + |
| 846 | + assertEquals( |
| 847 | + actualSchemaMessage, |
| 848 | + Schema.Type.STRING, |
| 849 | + structSchema.getField("str_field").schema().getType()); |
| 850 | + assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field")); |
| 851 | + } |
| 852 | + |
505 | 853 | /** |
506 | 854 | * Reads to the specified row offset within the stream. If the stream does not have the desired |
507 | 855 | * rows to read, it will read all of them. |
|
0 commit comments