Skip to content

Commit 9faf370

Browse files
authored
fix: Don't try to Close backend connection when it's nil (#12124)
Unrelated to `state.NoOpClient`, `c.backendConn` is not set at all.
1 parent 2b9180f commit 9faf370

File tree

3 files changed

+24
-12
lines changed

3 files changed

+24
-12
lines changed

plugins/source/hackernews/resources/plugin/client.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
5353
if options.BackendOptions == nil {
5454
c.logger.Info().Msg("No backend options provided, using no state backend")
5555
stateClient = &state.NoOpClient{}
56+
c.backendConn = nil
5657
} else {
57-
conn, err := grpc.DialContext(ctx, options.BackendOptions.Connection,
58+
c.backendConn, err = grpc.DialContext(ctx, options.BackendOptions.Connection,
5859
grpc.WithTransportCredentials(insecure.NewCredentials()),
5960
grpc.WithDefaultCallOptions(
6061
grpc.MaxCallRecvMsgSize(maxMsgSize),
@@ -64,7 +65,7 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
6465
if err != nil {
6566
return fmt.Errorf("failed to dial grpc source plugin at %s: %w", options.BackendOptions.Connection, err)
6667
}
67-
stateClient, err = state.NewClient(ctx, conn, options.BackendOptions.TableName)
68+
stateClient, err = state.NewClient(ctx, c.backendConn, options.BackendOptions.TableName)
6869
if err != nil {
6970
return fmt.Errorf("failed to create state client: %w", err)
7071
}
@@ -79,16 +80,19 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
7980
return c.scheduler.Sync(ctx, schedulerClient, tt, res, scheduler.WithSyncDeterministicCQID(options.DeterministicCQID))
8081
}
8182

82-
func (c *Client) Tables(ctx context.Context, options plugin.TableOptions) (schema.Tables, error) {
83+
func (c *Client) Tables(_ context.Context, options plugin.TableOptions) (schema.Tables, error) {
8384
tt, err := c.tables.FilterDfs(options.Tables, options.SkipTables, options.SkipDependentTables)
8485
if err != nil {
8586
return nil, err
8687
}
8788
return tt, nil
8889
}
8990

90-
func (c *Client) Close(ctx context.Context) error {
91-
return c.backendConn.Close()
91+
func (c *Client) Close(_ context.Context) error {
92+
if c.backendConn != nil {
93+
return c.backendConn.Close()
94+
}
95+
return nil
9296
}
9397

9498
func getTables() []*schema.Table {
@@ -106,7 +110,7 @@ func getTables() []*schema.Table {
106110
return tables
107111
}
108112

109-
func Configure(ctx context.Context, logger zerolog.Logger, specBytes []byte, opts plugin.NewClientOptions) (plugin.Client, error) {
113+
func Configure(_ context.Context, logger zerolog.Logger, specBytes []byte, opts plugin.NewClientOptions) (plugin.Client, error) {
110114
if opts.NoConnection {
111115
return &Client{
112116
logger: logger,

plugins/source/shopify/resources/plugin/client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,9 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
5555
if options.BackendOptions == nil {
5656
c.logger.Info().Msg("No backend options provided, using no state backend")
5757
stateClient = &state.NoOpClient{}
58+
c.backendConn = nil
5859
} else {
59-
conn, err := grpc.DialContext(ctx, options.BackendOptions.Connection,
60+
c.backendConn, err = grpc.DialContext(ctx, options.BackendOptions.Connection,
6061
grpc.WithTransportCredentials(insecure.NewCredentials()),
6162
grpc.WithDefaultCallOptions(
6263
grpc.MaxCallRecvMsgSize(maxMsgSize),
@@ -66,7 +67,7 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
6667
if err != nil {
6768
return fmt.Errorf("failed to dial grpc source plugin at %s: %w", options.BackendOptions.Connection, err)
6869
}
69-
stateClient, err = state.NewClient(ctx, conn, options.BackendOptions.TableName)
70+
stateClient, err = state.NewClient(ctx, c.backendConn, options.BackendOptions.TableName)
7071
if err != nil {
7172
return fmt.Errorf("failed to create state client: %w", err)
7273
}
@@ -86,7 +87,10 @@ func (c *Client) Tables(_ context.Context, options plugin.TableOptions) (schema.
8687
}
8788

8889
func (c *Client) Close(_ context.Context) error {
89-
return c.backendConn.Close()
90+
if c.backendConn != nil {
91+
return c.backendConn.Close()
92+
}
93+
return nil
9094
}
9195

9296
func Configure(_ context.Context, logger zerolog.Logger, specBytes []byte, opts plugin.NewClientOptions) (plugin.Client, error) {

plugins/source/stripe/resources/plugin/client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,9 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
5252
if options.BackendOptions == nil {
5353
c.logger.Info().Msg("No backend options provided, using no state backend")
5454
stateClient = &state.NoOpClient{}
55+
c.backendConn = nil
5556
} else {
56-
conn, err := grpc.DialContext(ctx, options.BackendOptions.Connection,
57+
c.backendConn, err = grpc.DialContext(ctx, options.BackendOptions.Connection,
5758
grpc.WithTransportCredentials(insecure.NewCredentials()),
5859
grpc.WithDefaultCallOptions(
5960
grpc.MaxCallRecvMsgSize(maxMsgSize),
@@ -63,7 +64,7 @@ func (c *Client) Sync(ctx context.Context, options plugin.SyncOptions, res chan<
6364
if err != nil {
6465
return fmt.Errorf("failed to dial grpc source plugin at %s: %w", options.BackendOptions.Connection, err)
6566
}
66-
stateClient, err = state.NewClient(ctx, conn, options.BackendOptions.TableName)
67+
stateClient, err = state.NewClient(ctx, c.backendConn, options.BackendOptions.TableName)
6768
if err != nil {
6869
return fmt.Errorf("failed to create state client: %w", err)
6970
}
@@ -83,7 +84,10 @@ func (c *Client) Tables(_ context.Context, options plugin.TableOptions) (schema.
8384
}
8485

8586
func (c *Client) Close(_ context.Context) error {
86-
return c.backendConn.Close()
87+
if c.backendConn != nil {
88+
return c.backendConn.Close()
89+
}
90+
return nil
8791
}
8892

8993
func Configure(_ context.Context, logger zerolog.Logger, specBytes []byte, opts plugin.NewClientOptions) (plugin.Client, error) {

0 commit comments

Comments
 (0)