1515// Because MasterKeepAliveConnection is default scope, we have to use this package. :-/
1616package org .apache .hadoop .hbase .client ;
1717
18- import com .google .api .client .internal .protorest .ProtoRestBlockingClient ;
19- import com .google .bigtable .anviltop .AnviltopServices .AnviltopService ;
20- import com .google .bigtable .anviltop .AnviltopServices .AnviltopService .BlockingInterface ;
2118import com .google .cloud .anviltop .hbase .AnviltopOptions ;
2219import com .google .cloud .anviltop .hbase .AnvilTopOptionsFactory ;
2320import com .google .cloud .anviltop .hbase .AnvilTopTable ;
24- import com .google .cloud .hadoop .hbase .AnviltopBlockingClient ;
21+ import com .google .cloud .hadoop .hbase .AnviltopAdminBlockingGrpcClient ;
22+ import com .google .cloud .hadoop .hbase .AnviltopAdminClient ;
23+ import com .google .cloud .hadoop .hbase .AnviltopBlockingGrpcClient ;
2524import com .google .cloud .hadoop .hbase .AnviltopClient ;
26- import com .google .net .rpc3 .client .RpcStubParameters ;
27- import com .google .net .rpc3 .impl .proto .HTTPOverRPC ;
2825
2926import org .apache .commons .logging .Log ;
3027import org .apache .commons .logging .LogFactory ;
3128import org .apache .hadoop .conf .Configuration ;
3229import org .apache .hadoop .hbase .HRegionLocation ;
3330import org .apache .hadoop .hbase .HTableDescriptor ;
3431import org .apache .hadoop .hbase .MasterNotRunningException ;
32+ import org .apache .hadoop .hbase .RegionLocations ;
3533import org .apache .hadoop .hbase .ServerName ;
3634import org .apache .hadoop .hbase .TableName ;
3735import org .apache .hadoop .hbase .ZooKeeperConnectionException ;
5250import java .util .concurrent .ThreadPoolExecutor ;
5351import java .util .concurrent .TimeUnit ;
5452
55- public class AnvilTopConnection implements HConnection , Closeable {
53+ public class AnvilTopConnection implements ClusterConnection , Closeable {
5654 private static final Log LOG = LogFactory .getLog (AnvilTopConnection .class );
5755
5856 private final Configuration conf ;
5957 private volatile boolean closed ;
6058 private volatile boolean aborted ;
6159 private volatile ExecutorService batchPool = null ;
6260 private AnviltopClient client ;
61+ private AnviltopAdminClient anviltopAdminClient ;
6362 private User user = null ;
6463 private volatile boolean cleanupPool = false ;
6564 private final AnviltopOptions options ;
@@ -80,32 +79,27 @@ public AnvilTopConnection(Configuration conf) throws IOException {
8079
8180 this .options = AnvilTopOptionsFactory .fromConfiguration (conf );
8281 this .client = getAnviltopClient (this .options );
82+ this .anviltopAdminClient = getAdminClient (this .options );
8383 }
8484
85- protected AnviltopClient getAnviltopClient (AnviltopOptions options )
85+ private AnviltopAdminClient getAdminClient (AnviltopOptions options )
8686 throws MalformedURLException {
8787 URL endpointUrl = new URL (options .getApiEndpoint ());
8888
89- // Needed(?) until we run on Apiary @ staging.googleapis.com
90- HTTPOverRPC .ClientInterface rpc3stub = getRpc3Stub (
91- endpointUrl .getHost (), endpointUrl .getPort ());
92-
93- return new AnviltopBlockingClient (
94- rpc3stub ,
95- getAnvilTopInterface (options .getApiEndpoint ()));
89+ return AnviltopAdminBlockingGrpcClient .createClient (
90+ "https" .equals (endpointUrl .getProtocol ()),
91+ endpointUrl .getHost (),
92+ endpointUrl .getPort ());
9693 }
9794
98- HTTPOverRPC .ClientInterface getRpc3Stub (String host , int port ) {
99- RpcStubParameters stubParam = new RpcStubParameters (String .format ("%s:%s" , host , port ));
100- return HTTPOverRPC .newStub (stubParam );
101- }
95+ protected AnviltopClient getAnviltopClient (AnviltopOptions options )
96+ throws MalformedURLException {
97+ URL endpointUrl = new URL (options .getApiEndpoint ());
10298
103- protected BlockingInterface getAnvilTopInterface (String apiEndpointUrl ) {
104- return AnviltopService .newBlockingStub (
105- ProtoRestBlockingClient .newBuilder ()
106- .setBaseUrl (apiEndpointUrl )
107- .build ()
108- );
99+ return AnviltopBlockingGrpcClient .createClient (
100+ "https" .equals (endpointUrl .getProtocol ()),
101+ endpointUrl .getHost (),
102+ endpointUrl .getPort ());
109103 }
110104
111105 @ Override
@@ -145,7 +139,7 @@ public HTableInterface getTable(TableName tableName, ExecutorService pool) throw
145139
146140 @ Override
147141 public Admin getAdmin () throws IOException {
148- throw new UnsupportedOperationException (); // TODO
142+ return new AnviltopAdmin ( options , this , anviltopAdminClient );
149143 }
150144
151145 @ Override
@@ -253,6 +247,12 @@ public HRegionLocation relocateRegion(TableName tableName, byte[] row) throws IO
253247 throw new UnsupportedOperationException (); // TODO
254248 }
255249
250+ @ Override
251+ public HRegionLocation relocateRegion (TableName tableName , byte [] row , int replicaId )
252+ throws IOException {
253+ throw new UnsupportedOperationException (); // TODO
254+ }
255+
256256 @ Override
257257 public HRegionLocation relocateRegion (byte [] tableName , byte [] row ) throws IOException {
258258 throw new UnsupportedOperationException (); // TODO
@@ -297,6 +297,18 @@ public List<HRegionLocation> locateRegions(TableName tableName, boolean useCache
297297 throw new UnsupportedOperationException (); // TODO
298298 }
299299
300+ @ Override
301+ public RegionLocations locateRegion (TableName tableName , byte [] row , boolean useCache ,
302+ boolean retry ) throws IOException {
303+ throw new UnsupportedOperationException (); // TODO
304+ }
305+
306+ @ Override
307+ public RegionLocations locateRegion (TableName tableName , byte [] row , boolean useCache ,
308+ boolean retry , int replicaId ) throws IOException {
309+ throw new UnsupportedOperationException (); // TODO
310+ }
311+
300312 @ Override
301313 public List <HRegionLocation > locateRegions (byte [] tableName , boolean useCache , boolean offlined )
302314 throws IOException {
@@ -420,6 +432,11 @@ public NonceGenerator getNonceGenerator() {
420432 throw new UnsupportedOperationException (); // TODO
421433 }
422434
435+ @ Override
436+ public AsyncProcess getAsyncProcess () {
437+ throw new UnsupportedOperationException (); // TODO
438+ }
439+
423440 @ Override
424441 public void abort (final String msg , Throwable t ) {
425442 if (t != null ) {
0 commit comments