Skip to content

Conversation

@eric-wang-1990
Copy link
Contributor

@eric-wang-1990 eric-wang-1990 commented Nov 4, 2025

Summary

Implements RecyclableMemoryStream for LZ4 decompression to reduce memory pressure and fix stream corruption issues.

Before the change:

Method ReadDelayMs Mean Min Max Median Peak Memory (MB) Gen0 Gen1 Gen2 Allocated
ExecuteLargeQuery 5 15.95 s 14.99 s 16.64 s 16.21 s See previous console output 364000.0000 63000.0000 38000.0000 2.73 GB

For the dotNet Mem dump, the set_Capacity from MemoryStream is occupying 5GB.
image

After the change:

Method ReadDelayMs Mean Min Max Median Peak Memory (MB) Gen0 Gen1 Gen2 Allocated
ExecuteLargeQuery 5 16.36 s 15.25 s 17.95 s 15.88 s See previous console output 411000.0000 26000.0000 16000.0000 1.27 GB

For the dotNet Mem dump, we do not see memory allocation for set_Capacity.
image

CloudFetch

This only touches CloudFetch path for now.
For non-cloud fetch case where we use the syncrho version for Lz4 Decode, there will be another PR to address that if necessary.

Changes

  • Modified Lz4Utilities.DecompressLz4Async to return MemoryStream instead of (buffer, length) tuple
  • Stream is now disposed by the caller (Arrow) after reading, not immediately in the utility method
  • Uses RecyclableMemoryStreamManager for pooled memory allocation
  • Decompression uses CopyAsync with 80KB chunks for efficient streaming
  • Fixed bug where buffer was recycled before Arrow finished reading it

Problem Fixed

Previously, the RecyclableMemoryStream was disposed immediately after decompression, returning its buffer to the pool. This caused "Unexpectedly reached end of stream" errors when Arrow tried to read from the buffer later, as it could have been overwritten by other operations.

Solution

The new API returns the stream itself without disposing it. Arrow reads from the stream and then disposes it when done, ensuring the buffer is only returned to the pool after reading completes.

Benefits

  • ✅ Reduces GC pressure by reusing memory streams from pool
  • ✅ Fixes "Unexpectedly reached end of stream" error
  • ✅ Memory is only returned to pool after Arrow completes reading
  • ✅ More efficient memory usage for high-volume CloudFetch operations
  • ✅ Uses CopyAsync for chunk-by-chunk decompression (80KB chunks)

Testing

  • Code compiles successfully on Mac for all target frameworks (netstandard2.0, net472, net8.0)
  • Needs testing on Windows with benchmark suite
  • Verify CloudFetch E2E benchmark completes without stream errors

🤖 Generated with Claude Code

…uce memory pressure

Changes:
- Modified Lz4Utilities.DecompressLz4Async to return MemoryStream instead of (buffer, length) tuple
- Stream is now disposed by the caller (Arrow) after reading, not immediately
- Uses RecyclableMemoryStream for pooled memory allocation
- Decompression uses CopyAsync with 80KB chunks for efficient streaming
- Fixed bug where buffer was recycled before Arrow finished reading

Benefits:
- Reduces GC pressure by reusing memory streams from pool
- Fixes "Unexpectedly reached end of stream" error
- Memory is only returned to pool after Arrow completes reading
- More efficient memory usage for high-volume CloudFetch operations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@github-actions github-actions bot added this to the ADBC Libraries 21 milestone Nov 4, 2025
@eric-wang-1990 eric-wang-1990 changed the title feat(csharp): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure (HOLD ON) feat(csharp): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure Nov 4, 2025
eric-wang-1990 and others added 2 commits November 4, 2025 10:04
Address review feedback: Instead of conditionally including the Tests
project reference based on TargetFramework, use the same pattern as
the Tests project itself with conditional TargetFrameworks based on
the IsWindows property.

This ensures net472 is only targeted on Windows platforms where it's
supported, making the Tests project reference always compatible.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@eric-wang-1990 eric-wang-1990 changed the title (HOLD ON) feat(csharp): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure feat(csharp): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure Nov 5, 2025
@eric-wang-1990 eric-wang-1990 changed the title feat(csharp): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure feat(csharp/src/Drivers/Databricks): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure Nov 5, 2025
@eric-wang-1990 eric-wang-1990 changed the title feat(csharp/src/Drivers/Databricks): Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure feat(csharp/src/Drivers/Databricks): CloudFetch Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure Nov 5, 2025
@eric-wang-1990
Copy link
Contributor Author

@CurtHagenlocher This is ready to merge, please let me know if you have any concerns

Copy link
Contributor

@CurtHagenlocher CurtHagenlocher left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks really promising! I have concerns about the lifetime of the RecyclableMemoryStreamManager, especially since no limit is set on the pool size. Perhaps it can be associated with the lifetime of the driver, database or connection?

eric-wang-1990 added a commit to eric-wang-1990/arrow-adbc that referenced this pull request Nov 7, 2025
…tabase instance

Addresses PR apache#3683 review comment about static RecyclableMemoryStreamManager
living for process lifetime with unbounded growth potential.

Changes:
- Moved RecyclableMemoryStreamManager from static field in Lz4Utilities
  to instance field in DatabricksDatabase
- Updated Lz4Utilities.DecompressLz4Async to accept RecyclableMemoryStreamManager
  as parameter instead of using static instance
- Changed return type from MemoryStream to RecyclableMemoryStream to make
  disposal requirements explicit
- Updated CloudFetchDownloader to use IHiveServer2Statement to access
  Connection.RecyclableMemoryStreamManager
- Database-level pooling enables proper cleanup on disposal and efficient
  sharing across connection pools

Thread-safety: RecyclableMemoryStreamManager is thread-safe for concurrent
access from multiple connections, with each operation receiving its own
stream instance that is not shared between threads.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@eric-wang-1990 eric-wang-1990 force-pushed the feat/recyclable-memory-stream-lz4 branch 2 times, most recently from 3088aa4 to ef333d5 Compare November 7, 2025 02:12
eric-wang-1990 and others added 2 commits November 6, 2025 18:17
…tabase instance

Addresses PR apache#3683 review comment about static RecyclableMemoryStreamManager
living for process lifetime with unbounded growth potential.

Changes:
- Moved RecyclableMemoryStreamManager from static field in Lz4Utilities
  to instance field in DatabricksDatabase
- Updated Lz4Utilities.DecompressLz4Async to accept RecyclableMemoryStreamManager
  as parameter instead of using static instance
- Changed return type from MemoryStream to RecyclableMemoryStream to make
  disposal requirements explicit (addresses reviewer's concern about proper disposal)
- Updated CloudFetchDownloader to use IHiveServer2Statement to access
  Connection.RecyclableMemoryStreamManager
- Database-level pooling enables proper cleanup on disposal and efficient
  sharing across connection pools

Thread-safety: RecyclableMemoryStreamManager is thread-safe for concurrent
access from multiple connections, with each operation receiving its own
stream instance that is not shared between threads.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
…onstructor

Improves on the previous commit by making the manager a required part of
connection initialization when created via Database, while maintaining
backward compatibility for direct connection creation.

Changes:
- Added internal constructor that accepts RecyclableMemoryStreamManager
- Public constructor chains to internal constructor with null (creates own manager)
- DatabricksDatabase passes shared manager via internal constructor
- No public API changes - backward compatible with existing code
- Compile-time safety - Database must explicitly pass the manager

This ensures the manager is always set and prevents forgetting to initialize it.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@CurtHagenlocher
Copy link
Contributor

Looks like this will need rebasing.

@eric-wang-1990 eric-wang-1990 force-pushed the feat/recyclable-memory-stream-lz4 branch from 938aebd to 0701d0c Compare November 7, 2025 19:20
This merge combines both memory management approaches:
- RecyclableMemoryStreamManager (for output streams)
- ArrayPool (for internal LZ4 decompression buffers)

Changes:
- DatabricksConnection: Added both RecyclableMemoryStreamManager and Lz4BufferPool properties
- DatabricksDatabase: Instantiates both pools and passes to connections
- Lz4Utilities: DecompressLz4Async now accepts both managers
- CloudFetchDownloader: Uses both for decompression operations
- Added CustomLZ4DecoderStream and CustomLZ4FrameReader from PR apache#3654
@eric-wang-1990
Copy link
Contributor Author

@CurtHagenlocher The new result with both commits further reduce the memory to 541MB from 2.73GB.

Method ReadDelayMs Mean Min Max Median Peak Memory (MB) Total Rows Total Batches Gen0 Gen1 Gen2 Allocated
ExecuteLargeQuery 5 12.45 s 10.43 s 14.46 s 12.46 s 284.13 1,439,935 743 385000.0000 33000.0000 6000.0000 541.65 MB

@CurtHagenlocher CurtHagenlocher merged commit da37188 into apache:main Nov 7, 2025
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants