|
19 | 19 | import com.google.api.core.ApiFunction; |
20 | 20 | import com.google.api.core.ApiFuture; |
21 | 21 | import com.google.api.core.ApiFutures; |
22 | | -import com.google.bigtable.admin.v2.CheckConsistencyResponse; |
23 | 22 | import com.google.bigtable.admin.v2.DeleteTableRequest; |
24 | 23 | import com.google.bigtable.admin.v2.DropRowRangeRequest; |
25 | | -import com.google.bigtable.admin.v2.GenerateConsistencyTokenRequest; |
26 | | -import com.google.bigtable.admin.v2.GenerateConsistencyTokenResponse; |
27 | 24 | import com.google.bigtable.admin.v2.GetTableRequest; |
28 | 25 | import com.google.bigtable.admin.v2.InstanceName; |
29 | 26 | import com.google.bigtable.admin.v2.ListTablesRequest; |
30 | 27 | import com.google.bigtable.admin.v2.TableName; |
31 | 28 | import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPage; |
32 | 29 | import com.google.cloud.bigtable.admin.v2.BaseBigtableTableAdminClient.ListTablesPagedResponse; |
33 | | -import com.google.cloud.bigtable.admin.v2.models.ConsistencyToken; |
34 | 30 | import com.google.cloud.bigtable.admin.v2.models.CreateTableRequest; |
35 | 31 | import com.google.cloud.bigtable.admin.v2.models.ModifyColumnFamiliesRequest; |
36 | 32 | import com.google.cloud.bigtable.admin.v2.models.Table; |
37 | 33 | import com.google.cloud.bigtable.admin.v2.stub.EnhancedBigtableTableAdminStub; |
38 | | -import com.google.common.annotations.VisibleForTesting; |
39 | 34 | import com.google.common.base.Preconditions; |
40 | 35 | import com.google.common.collect.Lists; |
41 | 36 | import com.google.common.util.concurrent.Futures; |
@@ -649,91 +644,57 @@ public ApiFuture<Void> dropAllRowsAsync(String tableId) { |
649 | 644 | } |
650 | 645 |
|
651 | 646 | /** |
652 | | - * Generates a token to verify the replication status of table mutations invoked before this call. |
653 | | - * Token expires in 90 days |
| 647 | + * Blocks until replication has caught up to the point this method was called. This allows callers |
| 648 | + * to make sure that their mutations have been replicated across all of their clusters. |
654 | 649 | * |
655 | | - * <p>Sample code: |
| 650 | + * <p>Sample code |
656 | 651 | * |
657 | 652 | * <pre>{@code |
658 | | - * ConsistencyToken consistencyToken = client.generateConsistencyToken("my-table"); |
| 653 | + * client.awaitReplication("my-table"); |
659 | 654 | * }</pre> |
660 | | - */ |
661 | | - @SuppressWarnings("WeakerAccess") |
662 | | - public ConsistencyToken generateConsistencyToken(String tableId) { |
663 | | - return awaitFuture(generateConsistencyTokenAsync(tableId)); |
664 | | - } |
665 | | - |
666 | | - /** |
667 | | - * Asynchornously generates a token to verify the replication status of table mutations invoked |
668 | | - * before this call. Token expires in 90 days |
669 | | - * |
670 | | - * <p>Sample code: |
671 | 655 | * |
672 | | - * <pre>{@code |
673 | | - * ApiFuture<ConsistencyToken> consistencyTokenFuture = client.generateConsistencyToken("my-table"); |
674 | | - * }</pre> |
| 656 | + * @throws com.google.api.gax.retrying.PollException when polling exceeds the total timeout |
675 | 657 | */ |
676 | | - // TODO(igorbernstein2): add sample code for waiting for the fetch consistency token |
677 | 658 | @SuppressWarnings("WeakerAccess") |
678 | | - public ApiFuture<ConsistencyToken> generateConsistencyTokenAsync(final String tableId) { |
679 | | - GenerateConsistencyTokenRequest request = GenerateConsistencyTokenRequest.newBuilder() |
680 | | - .setName(getTableName(tableId)) |
681 | | - .build(); |
682 | | - |
683 | | - return ApiFutures.transform( |
684 | | - stub.generateConsistencyTokenCallable().futureCall(request), |
685 | | - new ApiFunction<GenerateConsistencyTokenResponse, ConsistencyToken>() { |
686 | | - @Override |
687 | | - public ConsistencyToken apply(GenerateConsistencyTokenResponse proto) { |
688 | | - TableName tableName = TableName |
689 | | - .of(instanceName.getProject(), instanceName.getInstance(), tableId); |
690 | | - return ConsistencyToken.of(tableName, proto.getConsistencyToken()); |
691 | | - } |
692 | | - }, |
693 | | - MoreExecutors.directExecutor()); |
| 659 | + public void awaitReplication(String tableId) { |
| 660 | + TableName tableName = TableName |
| 661 | + .of(instanceName.getProject(), instanceName.getInstance(), tableId); |
| 662 | + awaitFuture(stub.awaitReplicationCallable().futureCall(tableName)); |
694 | 663 | } |
695 | 664 |
|
696 | 665 | /** |
697 | | - * Checks replication consistency for the specified token consistency token |
| 666 | + * Returns a future that is resolved when replication has caught up to the point this method was |
| 667 | + * called. This allows callers to make sure that their mutations have been replicated across all |
| 668 | + * of their clusters. |
698 | 669 | * |
699 | 670 | * <p>Sample code: |
700 | 671 | * |
701 | 672 | * <pre>{@code |
702 | | - * try(BigtableTableAdminClient client = BigtableTableAdminClient.create(InstanceName.of("[PROJECT]", "[INSTANCE]"))) { |
703 | | - * // Perform some mutations. |
| 673 | + * ApiFuture<Void> replicationFuture = client.awaitReplicationAsync("my-table"); |
704 | 674 | * |
705 | | - * ConsistencyToken token = client.generateConsistencyToken("table-id"); |
706 | | - * while(!client.isConsistent(token)) { |
707 | | - * Thread.sleep(100); |
708 | | - * } |
| 675 | + * ApiFutures.addCallback( |
| 676 | + * replicationFuture, |
| 677 | + * new ApiFutureCallback<Void>() { |
| 678 | + * public void onSuccess(Table table) { |
| 679 | + * System.out.println("All clusters are now consistent"); |
| 680 | + * } |
| 681 | + * |
| 682 | + * public void onFailure(Throwable t) { |
| 683 | + * t.printStackTrace(); |
| 684 | + * } |
| 685 | + * }, |
| 686 | + * MoreExecutors.directExecutor() |
| 687 | + * ); |
709 | 688 | * |
710 | | - * // Now all clusters are consistent |
711 | | - * } |
712 | 689 | * }</pre> |
713 | 690 | */ |
714 | 691 | @SuppressWarnings("WeakerAccess") |
715 | | - public boolean isConsistent(ConsistencyToken token) { |
716 | | - return awaitFuture(isConsistentAsync(token)); |
| 692 | + public ApiFuture<Void> awaitReplicationAsync(final String tableId) { |
| 693 | + TableName tableName = TableName |
| 694 | + .of(instanceName.getProject(), instanceName.getInstance(), tableId); |
| 695 | + return stub.awaitReplicationCallable().futureCall(tableName); |
717 | 696 | } |
718 | 697 |
|
719 | | - @VisibleForTesting |
720 | | - ApiFuture<Boolean> isConsistentAsync(ConsistencyToken token) { |
721 | | - ApiFuture<CheckConsistencyResponse> checkConsResp = stub.checkConsistencyCallable() |
722 | | - .futureCall(token.toProto(instanceName)); |
723 | | - |
724 | | - return ApiFutures.transform( |
725 | | - checkConsResp, |
726 | | - new ApiFunction<CheckConsistencyResponse, Boolean>() { |
727 | | - @Override |
728 | | - public Boolean apply(CheckConsistencyResponse input) { |
729 | | - return input.getConsistent(); |
730 | | - } |
731 | | - }, |
732 | | - MoreExecutors.directExecutor()); |
733 | | - } |
734 | | - |
735 | | - // TODO(igorbernstein2): add awaitConsist() & awaitConsistAsync() that generate & poll a token |
736 | | - |
737 | 698 | /** |
738 | 699 | * Helper method to construct the table name in format: projects/{project}/instances/{instance}/tables/{tableId} |
739 | 700 | */ |
|
0 commit comments