-
Notifications
You must be signed in to change notification settings - Fork 0
test(csharp/src/Apache.Arrow.Adbc): proof of concept - unit test - proxy server injection #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| namespace Apache.Arrow.Adbc.Drivers.Apache.Spark | ||
| { | ||
| public class SparkConnection : HiveServer2Connection | ||
| internal class SparkConnection : HiveServer2Connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only SparkDriver needs to be public
|
|
||
| internal SparkConnection(IReadOnlyDictionary<string, string> properties) | ||
| : base(properties) | ||
| internal SparkConnection(IReadOnlyDictionary<string, string>? properties, MockDataSourceBase<TCLIService.IAsync>? proxy = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only SparkDriver needs to be public
| namespace Apache.Arrow.Adbc.Drivers.Apache.Spark | ||
| { | ||
| public class SparkDatabase : AdbcDatabase | ||
| internal class SparkDatabase : AdbcDatabase, IMockingDatabase<TCLIService.IAsync> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only SparkDriver needs to be public
| namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2 | ||
| { | ||
| public abstract class HiveServer2Connection : AdbcConnection | ||
| internal abstract class HiveServer2Connection : MockingConnection<TCLIService.IAsync> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only SparkDriver needs to be public
| namespace Apache.Arrow.Adbc.Drivers.Apache.Hive2 | ||
| { | ||
| public abstract class HiveServer2Statement : AdbcStatement | ||
| internal abstract class HiveServer2Statement : AdbcStatement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only SparkDriver needs to be public
| namespace Apache.Arrow.Adbc.Drivers.Apache.Impala | ||
| { | ||
| public class ImpalaStatement : HiveServer2Statement | ||
| internal class ImpalaStatement : HiveServer2Statement |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only ImpalaDriver needs to be public
| namespace Apache.Arrow.Adbc.Drivers.Apache.Impala | ||
| { | ||
| public class ImpalaConnection : HiveServer2Connection | ||
| internal class ImpalaConnection : HiveServer2Connection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only ImpalaDriver needs to be public
| return connection; | ||
| } | ||
|
|
||
| private static IReadOnlyDictionary<TKey, TValue> MergeDictionaries<TKey, TValue>(params IReadOnlyDictionary<TKey, TValue>?[] dictionaries) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Amazingly, this isn't part of .NET to merge two or more dictionaries in the case there are duplicates.
| /// <param name="connectionOptions">A dictionary of connection options.</param> | ||
| /// <param name="mock">An optional mocker server proxy implementation.</param> | ||
| /// <returns></returns> | ||
| protected AdbcConnection NewConnection(TConfig? testConfiguration = default, IReadOnlyDictionary<string, string>? connectionOptions = default, MockDataSourceBase<TMock>? mock = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adds a new override for a NewConnection where you can pass a mock
|
|
||
| public Task<TGetTablesResp> GetTables(TGetTablesReq req, CancellationToken cancellationToken = default) => throw new NotImplementedException(); | ||
|
|
||
| public Task<TGetTableTypesResp> GetTableTypes(TGetTableTypesReq req, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a POC where it is "hard-coded" response to the GetTablesTypes call.
The intention is to have a generic caching "replay" mechanism.
|
|
||
| public Task<TGetTypeInfoResp> GetTypeInfo(TGetTypeInfoReq req, CancellationToken cancellationToken = default) => throw new NotImplementedException(); | ||
|
|
||
| public Task<TOpenSessionResp> OpenSession(TOpenSessionReq req, CancellationToken cancellationToken = default) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation should be fine to mock the OpenSsession. In fact, a recording of the values may not be desirable.
| this.transport = protocol.Transport; | ||
| this.client = new TCLIService.Client(protocol); | ||
|
|
||
| this.client = DataSourceDriverProxy ?? await NewDataSourceDriverAsync(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the proxy if it exists.
| this.sessionHandle = s0.SessionHandle; | ||
| } | ||
|
|
||
| internal async override Task<TCLIService.IAsync> NewDataSourceDriverAsync() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make a factory for the actual connection available to the mock.
|
|
||
| this.transport?.Close(); | ||
| this.client.Dispose(); | ||
| if (this.client is IDisposable disposable) disposable.Dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to be more generic here because we're not sure if it implements IDisposable.
…proxy-server-unit-test
…proxy-server-unit-test
…proxy-server-unit-test
…che#3870) ## Fix Critical Deadlocks and Race Conditions in Snowflake Record Reader This PR addresses multiple critical concurrency issues in the Snowflake driver's `recordReader` that could cause complete application hangs under normal racing conditions. ### Issues Fixed *1. Critical Deadlock: `Release()` Blocking Forever* *Problem*: When `Release()` was called while producer goroutines were blocked on channel sends, a permanent deadlock occurred: * `Release()` cancels context and attempts to drain channels * Producer goroutines blocked on `ch <- rec` cannot see the cancellation * Channels never close because producers never exit * `Release()` blocks forever on `for rec := range ch` *Fix:* Added a `done` channel that signals when all producer goroutines have completed. `Release()` now waits for this signal before attempting to drain channels. *2. Severe Deadlock: Non-Context-Aware Channel Sends* *Problem:* Channel send operations at lines 694 and 732 checked context before the send but not during: ```go for rr.Next() && ctx.Err() == nil { // Context checked here // ... ch <- rec // But send blocks here without checking context } ``` *Fix:* Wrapped all channel sends in `select` statements with context awareness: ```go select { case chs[0] <- rec: // Successfully sent case <-ctx.Done(): rec.Release() return ctx.Err() } ``` *3. Critical Race Condition: Nil Channel Reads* *Problem:* Channels were created asynchronously in goroutines after `newRecordReader` returned. If `Next()` was called quickly after creation, it could read from uninitialized (nil) channels, causing infinite blocking. *Fix:* Initialize all channels upfront before starting any goroutines: ```go chs := make([]chan arrow.RecordBatch, len(batches)) for i := range chs { chs[i] = make(chan arrow.RecordBatch, bufferSize) } ``` *4. Goroutine Leaks on Initialization Errors* *Problem:* Error paths only cleaned up the first channel, potentially leaking goroutines if initialization failed after starting concurrent operations. *Fix:* Moved all error-prone initialization (GetStream, NewReader) before goroutine creation, and added proper cleanup on errors. ---------------------- #### Changes * Added `done` channel to `reader` struct to signal goroutine completion * Initialize all channels upfront to eliminate race conditions * Use context-aware sends with `select` statements for all channel operations * Update `Release()` to wait on `done` channel before draining * Reorganize initialization to handle errors before starting goroutines * Signal completion by closing `done` channel after all producers finish #### Reproduction Scenarios Prevented *Deadlock #1:* 1. bufferSize = 1, producer generates 2 records quickly 2. Channel becomes full after first record 3. Producer blocks on send 4. Consumer calls Release() before Next() 5. Without fix: permanent deadlock 6. With fix: producer responds to cancellation, Release() completes *Race Condition:* 1. Query returns 3 batches 2. First batch processes quickly 3. Next() advances to second channel 4. Without fix: reads from nil channel, blocks forever 5. With fix: channel already initialized, works correctly See apache#3730
No description provided.