Skip to content

Commit 5ea1f5f

Browse files
authored
feat(bigtable): Use direct access when supported. Guarded by enabling… (#13873)
Current limitations: 1. Does not use PeerInfo for checking transport 2. Does not have retry logic into Prime (only one attempt for success) 3. Guarded by enabling CBT_BIGTABLE_CONN_POOL
1 parent f166fc9 commit 5ea1f5f

3 files changed

Lines changed: 256 additions & 3 deletions

File tree

bigtable/client.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
155155

156156
// Create the feature flags metadata once
157157
ffMD := createFeatureFlagsMD(metricsTracerFactory.enabled, disableRetryInfo, enableDirectAccess)
158+
// Set direct Access to be true.
159+
directAccessMD := createFeatureFlagsMD(metricsTracerFactory.enabled, disableRetryInfo, true)
158160

159161
var connPool gtransport.ConnPool
160162
var connPoolErr error
@@ -165,6 +167,15 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
165167
if enableBigtableConnPool {
166168
fullInstanceName := fmt.Sprintf("projects/%s/instances/%s", project, instance)
167169

170+
directAccessDialer := func() (*btransport.BigtableConn, error) {
171+
directAccessOptions := append(o, internaloption.EnableDirectPath(true), internaloption.EnableDirectPathXds())
172+
grpcConn, err := gtransport.Dial(ctx, directAccessOptions...)
173+
if err != nil {
174+
return nil, err
175+
}
176+
return btransport.NewBigtableConn(grpcConn), nil
177+
}
178+
168179
btPool, err := btransport.NewBigtableChannelPool(ctx,
169180
defaultBigtableConnPoolSize,
170181
btopt.BigtableLoadBalancingStrategy(),
@@ -182,6 +193,8 @@ func NewClientWithConfig(ctx context.Context, project, instance string, config C
182193
btransport.WithFeatureFlagsMetadata(ffMD),
183194
btransport.WithMetricsReporterConfig(btopt.DefaultMetricsReporterConfig()),
184195
btransport.WithMeterProvider(metricsTracerFactory.otelMeterProvider),
196+
btransport.WithDirectAccessFeatureFlagsMetadata(directAccessMD),
197+
btransport.WithDirectAccessDialer(directAccessDialer),
185198
)
186199

187200
if err != nil {

bigtable/internal/transport/connpool.go

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,20 @@ func WithMeterProvider(mp metric.MeterProvider) BigtableChannelPoolOption {
105105
}
106106
}
107107

108+
// WithDirectAccessDialer provides the dialer for direct access
109+
func WithDirectAccessDialer(directAccessDialer func() (*BigtableConn, error)) BigtableChannelPoolOption {
110+
return func(p *BigtableChannelPool) {
111+
p.directAccessDialer = directAccessDialer
112+
}
113+
}
114+
115+
// WithDirectAccessFeatureFlagsMetadata provides the feature flags required for DirectAccess
116+
func WithDirectAccessFeatureFlagsMetadata(directAccessFeatureFlagsMD metadata.MD) BigtableChannelPoolOption {
117+
return func(p *BigtableChannelPool) {
118+
p.directAccessFeatureFlagsMD = directAccessFeatureFlagsMD
119+
}
120+
}
121+
108122
// WithLogger provides the logger for logging events
109123
func WithLogger(logger *log.Logger) BigtableChannelPoolOption {
110124
return func(p *BigtableChannelPool) {
@@ -328,6 +342,12 @@ type BigtableChannelPool struct {
328342
// configs
329343
metricsConfig btopt.MetricsReporterConfig
330344

345+
directAccessFeatureFlagsMD metadata.MD
346+
directAccessDialer func() (*BigtableConn, error)
347+
348+
// Add the cached gauge instrument
349+
daEligibleGauge metric.Int64Gauge
350+
331351
// background monitors
332352
monitors []Monitor
333353
}
@@ -371,12 +391,43 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
371391
opt(pool)
372392
}
373393

394+
// make the da guage lifetime, optional
395+
if pool.meterProvider != nil {
396+
meter := pool.meterProvider.Meter(clientMeterName)
397+
var err error
398+
pool.daEligibleGauge, err = meter.Int64Gauge(
399+
"direct_access/compatible",
400+
metric.WithDescription("Reports 1 if the environment is eligible for DirectPath, 0 otherwise. Based on a connection attempt at startup."),
401+
metric.WithUnit("1"),
402+
)
403+
if err != nil {
404+
btopt.Debugf(pool.logger, "bigtable_connpool: failed to create direct_access/compatible metric: %v", err)
405+
}
406+
}
407+
408+
// All standard dialers and feature flags
409+
factoryDial := dial
410+
factoryFeatureFlagsMD := pool.featureFlagsMD
411+
412+
var firstConn *BigtableConn
413+
414+
if pool.directAccessDialer != nil {
415+
var isDirectAccess bool
416+
if firstConn, isDirectAccess = pool.checkIfDirectAccessIsAvailable(); isDirectAccess {
417+
btopt.Debugf(pool.logger, "bigtable_connpool: Direct Access is available. Using Direct Access now.")
418+
factoryDial = pool.directAccessDialer
419+
factoryFeatureFlagsMD = pool.directAccessFeatureFlagsMD
420+
} else {
421+
btopt.Debugf(pool.logger, "bigtable_connpool: Direct Access is not available. Falling back to cloud path.")
422+
}
423+
}
424+
374425
// Initialize the connectionFactory
375426
pool.factory = &connectionFactory{
376-
dial: dial,
427+
dial: factoryDial,
377428
instanceName: pool.instanceName,
378429
appProfile: pool.appProfile,
379-
featureFlagsMD: pool.featureFlagsMD,
430+
featureFlagsMD: factoryFeatureFlagsMD,
380431
logger: pool.logger,
381432
}
382433

@@ -405,7 +456,15 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
405456
break
406457
}
407458

408-
entry, err := pool.factory.newEntry(ctx)
459+
var entry *connEntry
460+
var err error
461+
462+
if i == 0 && firstConn != nil {
463+
entry = &connEntry{conn: firstConn}
464+
} else {
465+
entry, err = pool.factory.newEntry(ctx)
466+
}
467+
409468
if err != nil {
410469
exitSignal = err
411470
break
@@ -449,6 +508,47 @@ func NewBigtableChannelPool(ctx context.Context, connPoolSize int, strategy btop
449508
return pool, nil
450509
}
451510

511+
// checkIfDirectAccessIsAvailable attempts to create a single connection using the directAccessDialer,
512+
// primes it, and checks if direct access was successful
513+
func (p *BigtableChannelPool) checkIfDirectAccessIsAvailable() (*BigtableConn, bool) {
514+
conn, err := p.directAccessDialer()
515+
if err != nil {
516+
btopt.Debugf(p.logger, "bigtable_connpool: Direct Access failed: %v", err)
517+
return nil, false
518+
}
519+
520+
err = conn.Prime(p.poolCtx, p.instanceName, p.appProfile, p.directAccessFeatureFlagsMD)
521+
if err != nil {
522+
btopt.Debugf(p.logger, "bigtable_connpool: Prime() failed during Direct Access check: %v", err)
523+
conn.Close()
524+
p.reportDirectAccessMetric(false)
525+
return nil, false
526+
}
527+
528+
if conn.isALTSConn.Load() {
529+
p.reportDirectAccessMetric(true)
530+
return conn, true
531+
}
532+
533+
// If not ALTS, discard
534+
conn.Close()
535+
p.reportDirectAccessMetric(false)
536+
return nil, false
537+
}
538+
539+
// reportDirectAccessMetric records the direct_access/compatible metric.
540+
func (p *BigtableChannelPool) reportDirectAccessMetric(isEligible bool) {
541+
// Check if the instrument was successfully created during pool initialization
542+
if p.daEligibleGauge == nil {
543+
return
544+
}
545+
val := int64(0)
546+
if isEligible {
547+
val = 1
548+
}
549+
p.daEligibleGauge.Record(p.poolCtx, val)
550+
}
551+
452552
func (p *BigtableChannelPool) recordClientStartUp(clientCreationTimestamp time.Time, transportType string) {
453553
if p.meterProvider == nil {
454554
return

bigtable/internal/transport/connpool_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,6 +1558,146 @@ func TestRemoveConnections(t *testing.T) {
15581558
})
15591559
}
15601560

1561+
func TestDirectAccessLogic(t *testing.T) {
1562+
ctx := context.Background()
1563+
fake := &fakeService{}
1564+
addr := setupTestServer(t, fake)
1565+
baseDialFunc := func() (*BigtableConn, error) { return dialBigtableserver(addr) }
1566+
1567+
t.Run("DirectAccessAvailable_ReuseConnection", func(t *testing.T) {
1568+
fake.reset()
1569+
// Mock direct access dialer
1570+
daDialCalled := false
1571+
var daConn *BigtableConn
1572+
daDial := func() (*BigtableConn, error) {
1573+
c, err := baseDialFunc()
1574+
if err != nil {
1575+
return nil, err
1576+
}
1577+
if !daDialCalled {
1578+
daConn = c
1579+
daDialCalled = true
1580+
}
1581+
// poor man's direct access
1582+
c.isALTSConn.Store(true)
1583+
return c, nil
1584+
}
1585+
1586+
poolSize := 3
1587+
fake.setPingCount(0)
1588+
opts := append(poolOpts(), WithDirectAccessDialer(daDial))
1589+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, baseDialFunc, time.Now(), opts...)
1590+
1591+
if err != nil {
1592+
t.Fatalf("Failed to create pool: %v", err)
1593+
}
1594+
defer pool.Close()
1595+
1596+
if !daDialCalled {
1597+
t.Error("Direct Access dialer was not called")
1598+
}
1599+
1600+
conns := pool.getConns()
1601+
if len(conns) != poolSize {
1602+
t.Errorf("Pool size got %d, want %d", len(conns), poolSize)
1603+
}
1604+
1605+
// Verify reuse: the first connection should be the one from daDial
1606+
if conns[0].conn != daConn {
1607+
t.Error("Pool did not reuse the Direct Access connection as the first entry")
1608+
}
1609+
1610+
// Verify the first connection is ALTS
1611+
if !conns[0].isALTSUsed() {
1612+
t.Error("Reused connection does not report ALTS usage")
1613+
}
1614+
})
1615+
1616+
t.Run("DirectAccess_NotAlts", func(t *testing.T) {
1617+
fake.reset()
1618+
var daConn *BigtableConn
1619+
daDial := func() (*BigtableConn, error) {
1620+
c, err := baseDialFunc()
1621+
if err != nil {
1622+
return nil, err
1623+
}
1624+
daConn = c
1625+
// poor man's alts
1626+
c.isALTSConn.Store(false)
1627+
return c, nil
1628+
}
1629+
1630+
poolSize := 2
1631+
opts := append(poolOpts(), WithDirectAccessDialer(daDial))
1632+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, baseDialFunc, time.Now(), opts...)
1633+
if err != nil {
1634+
t.Fatalf("Failed to create pool: %v", err)
1635+
}
1636+
defer pool.Close()
1637+
1638+
conns := pool.getConns()
1639+
if conns[0].conn == daConn {
1640+
t.Error("Pool incorrectly reused non-ALTS connection")
1641+
}
1642+
1643+
// Verify the discarded connection is closed
1644+
if !isConnClosed(daConn.ClientConn) {
1645+
t.Error("Discarded non-ALTS connection was not closed")
1646+
}
1647+
})
1648+
1649+
t.Run("DirectAccess_DialFail_Fallback", func(t *testing.T) {
1650+
fake.reset()
1651+
daDial := func() (*BigtableConn, error) {
1652+
return nil, errors.New("da dial failed")
1653+
}
1654+
1655+
poolSize := 1
1656+
opts := append(poolOpts(), WithDirectAccessDialer(daDial))
1657+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, baseDialFunc, time.Now(), opts...)
1658+
if err != nil {
1659+
t.Fatalf("Failed to create pool: %v", err)
1660+
}
1661+
defer pool.Close()
1662+
1663+
if pool.Num() != 1 {
1664+
t.Errorf("Pool size got %d, want 1", pool.Num())
1665+
}
1666+
})
1667+
1668+
t.Run("DirectAccess_PrimeFailed", func(t *testing.T) {
1669+
fake.reset()
1670+
// Let's make the first Prime call fail (DA check), and subsequent ones succeed (standard dial).
1671+
fake.setPingErr(status.Error(codes.Internal, "da prime fail"), nil, nil, nil)
1672+
1673+
var daConn *BigtableConn
1674+
daDial := func() (*BigtableConn, error) {
1675+
c, err := baseDialFunc()
1676+
if err != nil {
1677+
return nil, err
1678+
}
1679+
daConn = c
1680+
return c, nil
1681+
}
1682+
opts := append(poolOpts(), WithDirectAccessDialer(daDial))
1683+
poolSize := 1
1684+
pool, err := NewBigtableChannelPool(ctx, poolSize, btopt.RoundRobin, baseDialFunc, time.Now(), opts...)
1685+
if err != nil {
1686+
t.Fatalf("Failed to create pool: %v", err)
1687+
}
1688+
defer pool.Close()
1689+
1690+
// Verify fallback
1691+
if pool.getConns()[0].conn == daConn {
1692+
t.Error("Pool reused connection despite Prime failure")
1693+
}
1694+
1695+
if !isConnClosed(daConn.ClientConn) {
1696+
t.Error("Failed DA connection was not closed")
1697+
}
1698+
})
1699+
}
1700+
15611701
func TestConnPoolStatisticsVisitor(t *testing.T) {
15621702
ctx := context.Background()
15631703
poolSize := 3

0 commit comments

Comments
 (0)