-
Notifications
You must be signed in to change notification settings - Fork 173
feat(csharp/src/Drivers/Databricks): Add Activity-based distributed tracing to CloudFetch pipeline #3580
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(csharp/src/Drivers/Databricks): Add Activity-based distributed tracing to CloudFetch pipeline #3580
Conversation
…eline ## Summary Added Activity-based logging throughout the CloudFetch implementation to capture performance metrics in ADBC trace files instead of only outputting to system trace listeners (console, debug output, etc.). This enables users to collect detailed CloudFetch performance data when troubleshooting. ## Changes - Pass Activity context through the entire CloudFetch pipeline: - CloudFetchReader → CloudFetchDownloadManager → CloudFetchResultFetcher & CloudFetchDownloader - Replace all System.Diagnostics.Trace calls with Activity.AddEvent() calls - Include sanitized URLs in all CloudFetch logging events for debugging ## Events Captured CloudFetch now emits Activity events for: - URL fetching and refreshing (url_fetched, url_fetch_failed, url_refreshed_*) - Download lifecycle (download_start, download_complete, download_failed) - Download retries with error details - Content length and actual file sizes - LZ4 decompression metrics (time, compressed/decompressed sizes, compression ratio) - Download throughput (MB/s) and latency - Overall pipeline summary (total files, bytes downloaded, success/failure counts) ## Benefits - Performance metrics are now serialized to JSON trace files via FileActivityListener - Users can analyze CloudFetch behavior when collecting traces for support cases - All events include sanitized URLs to maintain security while providing debugging context - Metrics include download throughput, latency, compression ratios, and file sizes ## Test Plan - Build succeeds with 0 warnings, 0 errors - Existing CloudFetch functionality unchanged (Activity is optional parameter) - Metrics will be validated in E2E tests with FileActivityListener enabled 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
## Problem The previous implementation captured Activity.Current at construction time and passed it as a parameter through the CloudFetch pipeline. However, this didn't work because: 1. CloudFetchReader is constructed before any Activity exists 2. DatabricksCompositeReader.ReadNextRecordBatchAsync() wasn't wrapped in TraceActivityAsync 3. Even when Activity was captured, background tasks couldn't access it correctly As a result, no CloudFetch events were being logged to trace files. ## Solution Changed the approach to use Activity.Current directly when emitting events instead of capturing and passing Activity as a parameter: 1. Added TraceActivityAsync wrapper to DatabricksCompositeReader.ReadNextRecordBatchAsync() to ensure an Activity exists at the top level 2. Removed all parentActivity parameters from CloudFetch constructors 3. Changed all event logging from _parentActivity?.AddEvent to Activity.Current?.AddEvent ## Why This Works Activity.Current flows automatically through async operations via AsyncLocal. When ReadNextRecordBatchAsync() is called with TraceActivityAsync, that Activity becomes current. When CloudFetch components process items from their queues (which happens as a consequence of that call), Activity.Current references that same Activity. ## Changes - DatabricksCompositeReader: Added TraceActivityAsync wrapper - CloudFetchReader: Removed Activity.Current parameter passing - CloudFetchDownloadManager: Removed parentActivity parameter - CloudFetchResultFetcher: Removed _parentActivity field, use Activity.Current - CloudFetchDownloader: Removed _parentActivity field, use Activity.Current 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…ound task logging ## Problem CloudFetchDownloader runs background tasks that don't have access to Activity.Current because they execute in different execution contexts from the main reader thread. The previous fix attempted to use Activity.Current directly in background tasks, but Activity.Current is null in those contexts. ## Solution Capture Activity.Current when each DownloadResult is created (during URL fetching) and store it in the DownloadResult object. Background download tasks then use the captured Activity from downloadResult.Activity when emitting events. ## Why This Works - DownloadResults are created in CloudFetchResultFetcher.ProcessDirectResultsAsync() and FetchNextResultBatchAsync(), which run in the context of ReadNextRecordBatchAsync() - At that time, Activity.Current is valid (from the TraceActivityAsync wrapper) - The captured Activity is stored in each DownloadResult - Background download tasks access the captured Activity via downloadResult.Activity - Events are logged to the correct Activity even though downloads happen asynchronously ## Changes - IDownloadResult: Added Activity property to expose captured Activity context - DownloadResult: Capture Activity.Current in constructor, expose via Activity property - EndOfResultsGuard: Implement Activity property (returns null) - CloudFetchDownloader: Use downloadResult.Activity instead of Activity.Current for download-specific events (download_start, download_complete, etc.) - CloudFetchDownloader: Keep Activity.Current for pipeline-level events (download_summary, error_state_set, etc.) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Refactor CloudFetch Activity-based logging to follow proper sibling architecture where both CloudFetchReader and CloudFetchDownloader create Activities as children of the statement, rather than nested. Changes: - Pass ITracingStatement to CloudFetchDownloader constructor - Implement IActivityTracer in CloudFetchDownloader to delegate tracing to statement - Wrap DownloadFilesAsync with TraceActivityAsync to create Activity for download loop - Use activity parameter (instead of Activity.Current) in lambda body - Remove Activity property from IDownloadResult and implementations - Fix totalFiles metric to exclude EndOfResultsGuard sentinel - Update tests to pass statement parameter to CloudFetchDownloader Benefits: - Proper Activity hierarchy: Statement → [Reader, Downloader] as siblings - All CloudFetch events logged to correct Activity context - Rich download metrics: sizes, timings, compression ratios, throughput - Accurate file count in download_summary event 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…CloudFetch progress Wrap each DownloadFileAsync call with TraceActivityAsync to create child Activities that flush immediately upon download completion. Changes: - Wrap DownloadFileAsync body with TraceActivityAsync - Add Activity tags for offset, sanitized_url, expected_size_bytes - Replace Activity.Current with activity parameter in all events - Name child Activities 'DownloadFile' for clear identification Benefits: - Real-time progress monitoring (events flush per-file, not per-batch) - Better fault tolerance (completed downloads logged before crashes) - Improved debuggability with searchable Activity tags - Granular per-file metrics visible as downloads complete 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
… from DatabricksCompositeReader DatabricksCompositeReader already extends TracingReader which handles Activity tracing. The explicit TraceActivityAsync wrapper in ReadNextRecordBatchAsync was redundant and has been removed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
CurtHagenlocher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks; looks good (and more straightforward in terms of the relationship between events and tags than the other tracing PR). I'd still like to get feedback from @birschick-bq but feel more comfortable checking this in without it.
| } | ||
|
|
||
| // IActivityTracer implementation - delegates to statement | ||
| ActivityTrace IActivityTracer.Trace => ((IActivityTracer)_statement).Trace; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the casts to IActivityTracer look superfluous here because _statement is already an IActivityTracer.
birschick-bq
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good Eric.
I have a suggestion to use AddException instead of AddEvent when in an catch block. This applies multiple times.
And a few other comments.
| activity?.AddEvent("cloudfetch.wait_for_downloads_error", [ | ||
| new("error_message", ex.Message) | ||
| ]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| activity?.AddEvent("cloudfetch.wait_for_downloads_error", [ | |
| new("error_message", ex.Message) | |
| ]); | |
| activity?.AddException(ex, [new("cloudfetch.wait_for_downloads_error", null)]); |
AddException is an Event with specific tags.
| Exception ex = t.Exception?.InnerException ?? new Exception("Unknown error"); | ||
| Trace.TraceError($"Download failed for file {SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}"); | ||
| string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink); | ||
| Activity.Current?.AddEvent("cloudfetch.download_failed", [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why Activity.Current? Is there a reason why you're not using activity from the current block?
| activity?.AddEvent("cloudfetch.download_loop_error", [ | ||
| new("error_message", ex.Message) | ||
| ]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| activity?.AddEvent("cloudfetch.download_loop_error", [ | |
| new("error_message", ex.Message) | |
| ]); | |
| activity?.AddException(ex, [new("cloudfetch.download_loop_error", null)]); |
| { | ||
| // Log the error and retry | ||
| Trace.TraceError($"Error downloading file {SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}"); | ||
| activity?.AddEvent("cloudfetch.download_retry", [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, you can use AddException with your tags instead of AddEvent.
| { | ||
| stopwatch.Stop(); | ||
| Trace.TraceError($"Error decompressing data for file {sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} ms"); | ||
| activity?.AddEvent("cloudfetch.decompression_error", [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AddException ...
| if (_error == null) | ||
| { | ||
| Trace.TraceError($"Setting error state: {ex.Message}"); | ||
| Activity.Current?.AddEvent("cloudfetch.error_state_set", [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AddException ...
Note: this may be a duplicate from the caller of SetError.
| catch (Exception ex) | ||
| { | ||
| Trace.TraceError($"Error completing with error: {ex.Message}"); | ||
| Activity.Current?.AddEvent("cloudfetch.complete_with_error_failed", [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AddException ...
| using System.Collections.Generic; | ||
| using System.Diagnostics; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| using System.Collections.Generic; | |
| using System.Diagnostics; |
| { | ||
| // Expected when cancellation is requested | ||
| Trace.TraceInformation("Download process was cancelled"); | ||
| activity?.AddEvent("cloudfetch.download_cancelled"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(optional) AddException
| var stopwatch = Stopwatch.StartNew(); | ||
|
|
||
| // Log download start | ||
| activity?.AddEvent("cloudfetch.download_start", [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(optional) You could use a new AddActivity (or this.TraceActivityAsync) to wrap the download steps. Easiest way would be to extract to a separate method and use this.TraceActivityAsync. This will get you the duration of the overall process without having to use your own timer.
There would be a new line in the trace file for the new activity.
But still okay to use separate events.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will probably address this in a following PR
…xt propagation - Modified SetError() and CompleteWithError() methods to accept optional Activity parameter - Updated all calls to these methods to pass the activity context instead of using Activity.Current - Applied dotnet format for indentation consistency Addresses review comments from PR apache#3580
Removed unused `System.Collections.Generic` and `System.Diagnostics` imports from the interfaces file as they are not needed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
birschick-bq
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Summary
This PR implements comprehensive Activity-based distributed tracing for the CloudFetch download pipeline in the Databricks C# driver, enabling real-time monitoring, structured logging, and improved observability.
Key Changes:
Architecture:
The implementation follows a hierarchical Activity structure:
Benefits:
Events Logged:
cloudfetch.download_start- File download initiatedcloudfetch.content_length- Actual file size from HTTP responsecloudfetch.download_retry- Retry attempt with reasoncloudfetch.url_refreshed_before_download- URL refreshed proactivelycloudfetch.url_refreshed_after_auth_error- URL refreshed after 401/403cloudfetch.decompression_complete- LZ4 decompression metricscloudfetch.download_complete- Download success with throughputcloudfetch.download_failed_all_retries- Final failure after all retriescloudfetch.download_summary- Overall batch statisticsTest Plan
{"Status":"Ok","HasRemoteParent":false,"Kind":"Client","OperationName":"DownloadFile","Duration":"00:00:00.5952467","StartTimeUtc":"2025-10-16T15:00:48.1657713Z","Id":"00-dc2baa073e36e8feab91170cb360e2f1-b71e0296e60abf8b-01","ParentId":"00-dc2baa073e36e8feab91170cb360e2f1-06801209b222d0e9-01","RootId":"dc2baa073e36e8feab91170cb360e2f1","TraceStateString":null,"SpanId":"b71e0296e60abf8b","TraceId":"dc2baa073e36e8feab91170cb360e2f1","Recorded":true,"IsAllDataRequested":true,"ActivityTraceFlags":"Recorded","ParentSpanId":"06801209b222d0e9","IdFormat":"W3C","TagObjects":{"cloudfetch.offset":134802,"cloudfetch.sanitized_url":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8","cloudfetch.expected_size_bytes":21839184},"Events":[{"Name":"cloudfetch.download_start","Timestamp":"2025-10-16T15:00:48.1649565+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"expected_size_bytes","Value":21839184},{"Key":"expected_size_kb","Value":21327.328125}]},{"Name":"cloudfetch.content_length","Timestamp":"2025-10-16T15:00:48.3370864+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"content_length_bytes","Value":6942292},{"Key":"content_length_mb","Value":6.6206855773925781}]},{"Name":"cloudfetch.decompression_complete","Timestamp":"2025-10-16T15:00:48.7599632+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"decompression_time_ms","Value":347},{"Key":"compressed_size_bytes","Value":6942292},{"Key":"compressed_size_kb","Value":6779.58203125},{"Key":"decompressed_size_bytes","Value":21839184},{"Key":"decompressed_size_kb","Value":21327.328125},{"Key":"compression_ratio","Value":3.1458175484407742}]},{"Name":"cloudfetch.download_complete","Timestamp":"2025-10-16T15:00:48.7599632+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"actual_size_bytes","Value":21839184},{"Key":"actual_size_kb","Value":21327.328125},{"Key":"latency_ms","Value":594},{"Key":"throughput_mbps","Value":35.063078909209281}]}],"Links":[],"Baggage":{}}🤖 Generated with Claude Code