Skip to content

Conversation

@eric-wang-1990
Copy link
Contributor

@eric-wang-1990 eric-wang-1990 commented Oct 16, 2025

Summary

This PR implements comprehensive Activity-based distributed tracing for the CloudFetch download pipeline in the Databricks C# driver, enabling real-time monitoring, structured logging, and improved observability.

Key Changes:

  • Add Activity-based tracing to CloudFetchDownloader with TraceActivityAsync
  • Create child Activities per individual file download for real-time progress visibility
  • Replace all Trace.TraceInformation/Error calls with Activity.AddEvent for structured logging
  • Add Activity tags for searchable metadata (offset, URL, file sizes)
  • Implement proper Activity context flow through async/await chains
  • Update CloudFetchDownloadManager to pass statement for tracing context
  • Fix all tests to include statement parameter in CloudFetchDownloader constructor

Architecture:

The implementation follows a hierarchical Activity structure:

Statement Activity (parent)
  ├─ DownloadFilesAsync Activity (overall batch)
  │   ├─ DownloadFile Activity (file 1) - flushes when complete
  │   ├─ DownloadFile Activity (file 2) - flushes when complete
  │   └─ ...
  └─ ReadNextRecordBatchAsync Activity (reader operations)

Benefits:

  • Real-time progress monitoring: Events flush immediately as each file completes (not batched)
  • Better fault tolerance: Completed downloads are logged before process crashes
  • Improved debuggability: Searchable Activity tags enable filtering by offset, URL, size
  • Granular metrics: Per-file download times, throughput, compression ratios visible in logs
  • OpenTelemetry-compatible: Activities follow System.Diagnostics.Activity standard

Events Logged:

  • cloudfetch.download_start - File download initiated
  • cloudfetch.content_length - Actual file size from HTTP response
  • cloudfetch.download_retry - Retry attempt with reason
  • cloudfetch.url_refreshed_before_download - URL refreshed proactively
  • cloudfetch.url_refreshed_after_auth_error - URL refreshed after 401/403
  • cloudfetch.decompression_complete - LZ4 decompression metrics
  • cloudfetch.download_complete - Download success with throughput
  • cloudfetch.download_failed_all_retries - Final failure after all retries
  • cloudfetch.download_summary - Overall batch statistics

Test Plan

  • ✅ All existing CloudFetchDownloader E2E tests pass (7 test methods)
  • ✅ Build succeeds with 0 warnings
  • Manual testing: Query with CloudFetch enabled and verify Activity events in logs
  • Verified Activity context flows correctly through async/await chains
  • Confirmed child Activities flush independently upon completion

{"Status":"Ok","HasRemoteParent":false,"Kind":"Client","OperationName":"DownloadFile","Duration":"00:00:00.5952467","StartTimeUtc":"2025-10-16T15:00:48.1657713Z","Id":"00-dc2baa073e36e8feab91170cb360e2f1-b71e0296e60abf8b-01","ParentId":"00-dc2baa073e36e8feab91170cb360e2f1-06801209b222d0e9-01","RootId":"dc2baa073e36e8feab91170cb360e2f1","TraceStateString":null,"SpanId":"b71e0296e60abf8b","TraceId":"dc2baa073e36e8feab91170cb360e2f1","Recorded":true,"IsAllDataRequested":true,"ActivityTraceFlags":"Recorded","ParentSpanId":"06801209b222d0e9","IdFormat":"W3C","TagObjects":{"cloudfetch.offset":134802,"cloudfetch.sanitized_url":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8","cloudfetch.expected_size_bytes":21839184},"Events":[{"Name":"cloudfetch.download_start","Timestamp":"2025-10-16T15:00:48.1649565+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"expected_size_bytes","Value":21839184},{"Key":"expected_size_kb","Value":21327.328125}]},{"Name":"cloudfetch.content_length","Timestamp":"2025-10-16T15:00:48.3370864+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"content_length_bytes","Value":6942292},{"Key":"content_length_mb","Value":6.6206855773925781}]},{"Name":"cloudfetch.decompression_complete","Timestamp":"2025-10-16T15:00:48.7599632+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"decompression_time_ms","Value":347},{"Key":"compressed_size_bytes","Value":6942292},{"Key":"compressed_size_kb","Value":6779.58203125},{"Key":"decompressed_size_bytes","Value":21839184},{"Key":"decompressed_size_kb","Value":21327.328125},{"Key":"compression_ratio","Value":3.1458175484407742}]},{"Name":"cloudfetch.download_complete","Timestamp":"2025-10-16T15:00:48.7599632+00:00","Tags":[{"Key":"offset","Value":134802},{"Key":"sanitized_url","Value":"https://root-benchmarking-prod-aws-us-west-2.s3.us-west-2.amazonaws.com/26Z_71145fb7-bc14-4719-91af-0cdfc92c8fc8"},{"Key":"actual_size_bytes","Value":21839184},{"Key":"actual_size_kb","Value":21327.328125},{"Key":"latency_ms","Value":594},{"Key":"throughput_mbps","Value":35.063078909209281}]}],"Links":[],"Baggage":{}}

🤖 Generated with Claude Code

eric-wang-1990 and others added 5 commits October 15, 2025 23:50
…eline

## Summary
Added Activity-based logging throughout the CloudFetch implementation to capture
performance metrics in ADBC trace files instead of only outputting to system trace
listeners (console, debug output, etc.). This enables users to collect detailed
CloudFetch performance data when troubleshooting.

## Changes
- Pass Activity context through the entire CloudFetch pipeline:
  - CloudFetchReader → CloudFetchDownloadManager → CloudFetchResultFetcher & CloudFetchDownloader
- Replace all System.Diagnostics.Trace calls with Activity.AddEvent() calls
- Include sanitized URLs in all CloudFetch logging events for debugging

## Events Captured
CloudFetch now emits Activity events for:
- URL fetching and refreshing (url_fetched, url_fetch_failed, url_refreshed_*)
- Download lifecycle (download_start, download_complete, download_failed)
- Download retries with error details
- Content length and actual file sizes
- LZ4 decompression metrics (time, compressed/decompressed sizes, compression ratio)
- Download throughput (MB/s) and latency
- Overall pipeline summary (total files, bytes downloaded, success/failure counts)

## Benefits
- Performance metrics are now serialized to JSON trace files via FileActivityListener
- Users can analyze CloudFetch behavior when collecting traces for support cases
- All events include sanitized URLs to maintain security while providing debugging context
- Metrics include download throughput, latency, compression ratios, and file sizes

## Test Plan
- Build succeeds with 0 warnings, 0 errors
- Existing CloudFetch functionality unchanged (Activity is optional parameter)
- Metrics will be validated in E2E tests with FileActivityListener enabled

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
## Problem
The previous implementation captured Activity.Current at construction time and
passed it as a parameter through the CloudFetch pipeline. However, this didn't
work because:
1. CloudFetchReader is constructed before any Activity exists
2. DatabricksCompositeReader.ReadNextRecordBatchAsync() wasn't wrapped in TraceActivityAsync
3. Even when Activity was captured, background tasks couldn't access it correctly

As a result, no CloudFetch events were being logged to trace files.

## Solution
Changed the approach to use Activity.Current directly when emitting events instead
of capturing and passing Activity as a parameter:
1. Added TraceActivityAsync wrapper to DatabricksCompositeReader.ReadNextRecordBatchAsync()
   to ensure an Activity exists at the top level
2. Removed all parentActivity parameters from CloudFetch constructors
3. Changed all event logging from _parentActivity?.AddEvent to Activity.Current?.AddEvent

## Why This Works
Activity.Current flows automatically through async operations via AsyncLocal.
When ReadNextRecordBatchAsync() is called with TraceActivityAsync, that Activity
becomes current. When CloudFetch components process items from their queues
(which happens as a consequence of that call), Activity.Current references that
same Activity.

## Changes
- DatabricksCompositeReader: Added TraceActivityAsync wrapper
- CloudFetchReader: Removed Activity.Current parameter passing
- CloudFetchDownloadManager: Removed parentActivity parameter
- CloudFetchResultFetcher: Removed _parentActivity field, use Activity.Current
- CloudFetchDownloader: Removed _parentActivity field, use Activity.Current

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
…ound task logging

## Problem
CloudFetchDownloader runs background tasks that don't have access to Activity.Current
because they execute in different execution contexts from the main reader thread.
The previous fix attempted to use Activity.Current directly in background tasks,
but Activity.Current is null in those contexts.

## Solution
Capture Activity.Current when each DownloadResult is created (during URL fetching)
and store it in the DownloadResult object. Background download tasks then use the
captured Activity from downloadResult.Activity when emitting events.

## Why This Works
- DownloadResults are created in CloudFetchResultFetcher.ProcessDirectResultsAsync()
  and FetchNextResultBatchAsync(), which run in the context of ReadNextRecordBatchAsync()
- At that time, Activity.Current is valid (from the TraceActivityAsync wrapper)
- The captured Activity is stored in each DownloadResult
- Background download tasks access the captured Activity via downloadResult.Activity
- Events are logged to the correct Activity even though downloads happen asynchronously

## Changes
- IDownloadResult: Added Activity property to expose captured Activity context
- DownloadResult: Capture Activity.Current in constructor, expose via Activity property
- EndOfResultsGuard: Implement Activity property (returns null)
- CloudFetchDownloader: Use downloadResult.Activity instead of Activity.Current for
  download-specific events (download_start, download_complete, etc.)
- CloudFetchDownloader: Keep Activity.Current for pipeline-level events
  (download_summary, error_state_set, etc.)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Refactor CloudFetch Activity-based logging to follow proper sibling
architecture where both CloudFetchReader and CloudFetchDownloader
create Activities as children of the statement, rather than nested.

Changes:
- Pass ITracingStatement to CloudFetchDownloader constructor
- Implement IActivityTracer in CloudFetchDownloader to delegate tracing to statement
- Wrap DownloadFilesAsync with TraceActivityAsync to create Activity for download loop
- Use activity parameter (instead of Activity.Current) in lambda body
- Remove Activity property from IDownloadResult and implementations
- Fix totalFiles metric to exclude EndOfResultsGuard sentinel
- Update tests to pass statement parameter to CloudFetchDownloader

Benefits:
- Proper Activity hierarchy: Statement → [Reader, Downloader] as siblings
- All CloudFetch events logged to correct Activity context
- Rich download metrics: sizes, timings, compression ratios, throughput
- Accurate file count in download_summary event

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
…CloudFetch progress

Wrap each DownloadFileAsync call with TraceActivityAsync to create
child Activities that flush immediately upon download completion.

Changes:
- Wrap DownloadFileAsync body with TraceActivityAsync
- Add Activity tags for offset, sanitized_url, expected_size_bytes
- Replace Activity.Current with activity parameter in all events
- Name child Activities 'DownloadFile' for clear identification

Benefits:
- Real-time progress monitoring (events flush per-file, not per-batch)
- Better fault tolerance (completed downloads logged before crashes)
- Improved debuggability with searchable Activity tags
- Granular per-file metrics visible as downloads complete

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
… from DatabricksCompositeReader

DatabricksCompositeReader already extends TracingReader which handles
Activity tracing. The explicit TraceActivityAsync wrapper in
ReadNextRecordBatchAsync was redundant and has been removed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@eric-wang-1990 eric-wang-1990 changed the title feat(csharp/databricks): Add Activity-based distributed tracing to CloudFetch pipeline feat(csharp/src/Drivers/databricks): Add Activity-based distributed tracing to CloudFetch pipeline Oct 16, 2025
@eric-wang-1990 eric-wang-1990 changed the title feat(csharp/src/Drivers/databricks): Add Activity-based distributed tracing to CloudFetch pipeline feat(csharp/src/Drivers/Databricks): Add Activity-based distributed tracing to CloudFetch pipeline Oct 16, 2025
Copy link
Contributor

@CurtHagenlocher CurtHagenlocher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks; looks good (and more straightforward in terms of the relationship between events and tags than the other tracing PR). I'd still like to get feedback from @birschick-bq but feel more comfortable checking this in without it.

}

// IActivityTracer implementation - delegates to statement
ActivityTrace IActivityTracer.Trace => ((IActivityTracer)_statement).Trace;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the casts to IActivityTracer look superfluous here because _statement is already an IActivityTracer.

Copy link
Contributor

@birschick-bq birschick-bq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good Eric.

I have a suggestion to use AddException instead of AddEvent when in an catch block. This applies multiple times.

And a few other comments.

Comment on lines 242 to 244
activity?.AddEvent("cloudfetch.wait_for_downloads_error", [
new("error_message", ex.Message)
]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
activity?.AddEvent("cloudfetch.wait_for_downloads_error", [
new("error_message", ex.Message)
]);
activity?.AddException(ex, [new("cloudfetch.wait_for_downloads_error", null)]);

AddException is an Event with specific tags.

Exception ex = t.Exception?.InnerException ?? new Exception("Unknown error");
Trace.TraceError($"Download failed for file {SanitizeUrl(downloadResult.Link.FileLink)}: {ex.Message}");
string sanitizedUrl = SanitizeUrl(downloadResult.Link.FileLink);
Activity.Current?.AddEvent("cloudfetch.download_failed", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why Activity.Current? Is there a reason why you're not using activity from the current block?

Comment on lines 338 to 340
activity?.AddEvent("cloudfetch.download_loop_error", [
new("error_message", ex.Message)
]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
activity?.AddEvent("cloudfetch.download_loop_error", [
new("error_message", ex.Message)
]);
activity?.AddException(ex, [new("cloudfetch.download_loop_error", null)]);

{
// Log the error and retry
Trace.TraceError($"Error downloading file {SanitizeUrl(url)} (attempt {retry + 1}/{_maxRetries}): {ex.Message}");
activity?.AddEvent("cloudfetch.download_retry", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, you can use AddException with your tags instead of AddEvent.

{
stopwatch.Stop();
Trace.TraceError($"Error decompressing data for file {sanitizedUrl}: {ex.Message}. Elapsed time: {stopwatch.ElapsedMilliseconds} ms");
activity?.AddEvent("cloudfetch.decompression_error", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddException ...

if (_error == null)
{
Trace.TraceError($"Setting error state: {ex.Message}");
Activity.Current?.AddEvent("cloudfetch.error_state_set", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddException ...
Note: this may be a duplicate from the caller of SetError.

catch (Exception ex)
{
Trace.TraceError($"Error completing with error: {ex.Message}");
Activity.Current?.AddEvent("cloudfetch.complete_with_error_failed", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AddException ...

Comment on lines 19 to 20
using System.Collections.Generic;
using System.Diagnostics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using System.Collections.Generic;
using System.Diagnostics;

{
// Expected when cancellation is requested
Trace.TraceInformation("Download process was cancelled");
activity?.AddEvent("cloudfetch.download_cancelled");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) AddException

var stopwatch = Stopwatch.StartNew();

// Log download start
activity?.AddEvent("cloudfetch.download_start", [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(optional) You could use a new AddActivity (or this.TraceActivityAsync) to wrap the download steps. Easiest way would be to extract to a separate method and use this.TraceActivityAsync. This will get you the duration of the overall process without having to use your own timer.

There would be a new line in the trace file for the new activity.

But still okay to use separate events.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will probably address this in a following PR

eric-wang-1990 and others added 2 commits October 17, 2025 14:46
…xt propagation

- Modified SetError() and CompleteWithError() methods to accept optional Activity parameter
- Updated all calls to these methods to pass the activity context instead of using Activity.Current
- Applied dotnet format for indentation consistency

Addresses review comments from PR apache#3580
Removed unused `System.Collections.Generic` and `System.Diagnostics`
imports from the interfaces file as they are not needed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
Copy link
Contributor

@birschick-bq birschick-bq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@CurtHagenlocher CurtHagenlocher merged commit 6fa0db2 into apache:main Oct 20, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants