Skip to content

Conversation

@birschick-bq
Copy link
Owner

No description provided.

namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
public class SparkConnection : HiveServer2Connection
internal class SparkConnection : HiveServer2Connection
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only SparkDriver needs to be public


internal SparkConnection(IReadOnlyDictionary<string, string> properties)
: base(properties)
internal SparkConnection(IReadOnlyDictionary<string, string>? properties, MockDataSourceBase<TCLIService.IAsync>? proxy = default)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only SparkDriver needs to be public

namespace Apache.Arrow.Adbc.Drivers.Apache.Spark
{
public class SparkDatabase : AdbcDatabase
internal class SparkDatabase : AdbcDatabase, IMockingDatabase<TCLIService.IAsync>
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only SparkDriver needs to be public

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Connection : AdbcConnection
internal abstract class HiveServer2Connection : MockingConnection<TCLIService.IAsync>
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only SparkDriver needs to be public

namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2
{
public abstract class HiveServer2Statement : AdbcStatement
internal abstract class HiveServer2Statement : AdbcStatement
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only SparkDriver needs to be public

namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
{
public class ImpalaStatement : HiveServer2Statement
internal class ImpalaStatement : HiveServer2Statement
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only ImpalaDriver needs to be public

namespace Apache.Arrow.Adbc.Drivers.Apache.Impala
{
public class ImpalaConnection : HiveServer2Connection
internal class ImpalaConnection : HiveServer2Connection
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only ImpalaDriver needs to be public

return connection;
}

private static IReadOnlyDictionary<TKey, TValue> MergeDictionaries<TKey, TValue>(params IReadOnlyDictionary<TKey, TValue>?[] dictionaries)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazingly, this isn't part of .NET to merge two or more dictionaries in the case there are duplicates.

/// <param name="connectionOptions">A dictionary of connection options.</param>
/// <param name="mock">An optional mocker server proxy implementation.</param>
/// <returns></returns>
protected AdbcConnection NewConnection(TConfig? testConfiguration = default, IReadOnlyDictionary<string, string>? connectionOptions = default, MockDataSourceBase<TMock>? mock = default)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adds a new override for a NewConnection where you can pass a mock


public Task<TGetTablesResp> GetTables(TGetTablesReq req, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public Task<TGetTableTypesResp> GetTableTypes(TGetTableTypesReq req, CancellationToken cancellationToken = default)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a POC where it is "hard-coded" response to the GetTablesTypes call.

The intention is to have a generic caching "replay" mechanism.


public Task<TGetTypeInfoResp> GetTypeInfo(TGetTypeInfoReq req, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public Task<TOpenSessionResp> OpenSession(TOpenSessionReq req, CancellationToken cancellationToken = default)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation should be fine to mock the OpenSsession. In fact, a recording of the values may not be desirable.

this.transport = protocol.Transport;
this.client = new TCLIService.Client(protocol);

this.client = DataSourceDriverProxy ?? await NewDataSourceDriverAsync();
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the proxy if it exists.

this.sessionHandle = s0.SessionHandle;
}

internal async override Task<TCLIService.IAsync> NewDataSourceDriverAsync()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make a factory for the actual connection available to the mock.


this.transport?.Close();
this.client.Dispose();
if (this.client is IDisposable disposable) disposable.Dispose();
Copy link
Owner Author

@birschick-bq birschick-bq Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to be more generic here because we're not sure if it implements IDisposable.

birschick-bq added a commit that referenced this pull request Aug 12, 2024
birschick-bq added a commit that referenced this pull request Oct 31, 2024
birschick-bq added a commit that referenced this pull request Feb 25, 2025
@birschick-bq birschick-bq deleted the dev/birschick-bq/poc-proxy-server-unit-test branch August 13, 2025 20:12
birschick-bq pushed a commit that referenced this pull request Jan 16, 2026
…che#3870)

## Fix Critical Deadlocks and Race Conditions in Snowflake Record Reader

This PR addresses multiple critical concurrency issues in the Snowflake
driver's `recordReader` that could cause complete application hangs
under normal racing conditions.

### Issues Fixed

*1. Critical Deadlock: `Release()` Blocking Forever*

*Problem*: When `Release()` was called while producer goroutines were
blocked on channel sends, a permanent deadlock occurred:

* `Release()` cancels context and attempts to drain channels
* Producer goroutines blocked on `ch <- rec` cannot see the cancellation
* Channels never close because producers never exit
* `Release()` blocks forever on `for rec := range ch`

*Fix:* Added a `done` channel that signals when all producer goroutines
have completed. `Release()` now waits for this signal before attempting
to drain channels.

*2. Severe Deadlock: Non-Context-Aware Channel Sends*

*Problem:* Channel send operations at lines 694 and 732 checked context
before the send but not during:

```go
for rr.Next() && ctx.Err() == nil {  // Context checked here
    // ... 
    ch <- rec  // But send blocks here without checking context
}
```

*Fix:* Wrapped all channel sends in `select` statements with context
awareness:

```go
select {
case chs[0] <- rec:
    // Successfully sent
case <-ctx.Done():
    rec.Release()
    return ctx.Err()
}
```

*3. Critical Race Condition: Nil Channel Reads*

*Problem:* Channels were created asynchronously in goroutines after
`newRecordReader` returned. If `Next()` was called quickly after
creation, it could read from uninitialized (nil) channels, causing
infinite blocking.

*Fix:* Initialize all channels upfront before starting any goroutines:

```go
chs := make([]chan arrow.RecordBatch, len(batches))
for i := range chs {
    chs[i] = make(chan arrow.RecordBatch, bufferSize)
}
```

*4. Goroutine Leaks on Initialization Errors*

*Problem:* Error paths only cleaned up the first channel, potentially
leaking goroutines if initialization failed after starting concurrent
operations.

*Fix:* Moved all error-prone initialization (GetStream, NewReader)
before goroutine creation, and added proper cleanup on errors.

----------------------

#### Changes

* Added `done` channel to `reader` struct to signal goroutine completion
* Initialize all channels upfront to eliminate race conditions
* Use context-aware sends with `select` statements for all channel
operations
* Update `Release()` to wait on `done` channel before draining
* Reorganize initialization to handle errors before starting goroutines
* Signal completion by closing `done` channel after all producers finish

#### Reproduction Scenarios Prevented

*Deadlock #1:*

1. bufferSize = 1, producer generates 2 records quickly
2. Channel becomes full after first record
3. Producer blocks on send
4. Consumer calls Release() before Next()
5. Without fix: permanent deadlock
6. With fix: producer responds to cancellation, Release() completes

*Race Condition:*

1. Query returns 3 batches
2. First batch processes quickly
3. Next() advances to second channel
4. Without fix: reads from nil channel, blocks forever
5. With fix: channel already initialized, works correctly

See apache#3730
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants