-
Notifications
You must be signed in to change notification settings - Fork 173
feat(csharp/src/Drivers/Databricks): CloudFetch Use RecyclableMemoryStream for LZ4 decompression to reduce memory pressure #3683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…uce memory pressure Changes: - Modified Lz4Utilities.DecompressLz4Async to return MemoryStream instead of (buffer, length) tuple - Stream is now disposed by the caller (Arrow) after reading, not immediately - Uses RecyclableMemoryStream for pooled memory allocation - Decompression uses CopyAsync with 80KB chunks for efficient streaming - Fixed bug where buffer was recycled before Arrow finished reading Benefits: - Reduces GC pressure by reusing memory streams from pool - Fixes "Unexpectedly reached end of stream" error - Memory is only returned to pool after Arrow completes reading - More efficient memory usage for high-volume CloudFetch operations 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
Address review feedback: Instead of conditionally including the Tests project reference based on TargetFramework, use the same pattern as the Tests project itself with conditional TargetFrameworks based on the IsWindows property. This ensures net472 is only targeted on Windows platforms where it's supported, making the Tests project reference always compatible. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
|
@CurtHagenlocher This is ready to merge, please let me know if you have any concerns |
CurtHagenlocher
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this looks really promising! I have concerns about the lifetime of the RecyclableMemoryStreamManager, especially since no limit is set on the pool size. Perhaps it can be associated with the lifetime of the driver, database or connection?
…tabase instance Addresses PR apache#3683 review comment about static RecyclableMemoryStreamManager living for process lifetime with unbounded growth potential. Changes: - Moved RecyclableMemoryStreamManager from static field in Lz4Utilities to instance field in DatabricksDatabase - Updated Lz4Utilities.DecompressLz4Async to accept RecyclableMemoryStreamManager as parameter instead of using static instance - Changed return type from MemoryStream to RecyclableMemoryStream to make disposal requirements explicit - Updated CloudFetchDownloader to use IHiveServer2Statement to access Connection.RecyclableMemoryStreamManager - Database-level pooling enables proper cleanup on disposal and efficient sharing across connection pools Thread-safety: RecyclableMemoryStreamManager is thread-safe for concurrent access from multiple connections, with each operation receiving its own stream instance that is not shared between threads. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
3088aa4 to
ef333d5
Compare
…tabase instance Addresses PR apache#3683 review comment about static RecyclableMemoryStreamManager living for process lifetime with unbounded growth potential. Changes: - Moved RecyclableMemoryStreamManager from static field in Lz4Utilities to instance field in DatabricksDatabase - Updated Lz4Utilities.DecompressLz4Async to accept RecyclableMemoryStreamManager as parameter instead of using static instance - Changed return type from MemoryStream to RecyclableMemoryStream to make disposal requirements explicit (addresses reviewer's concern about proper disposal) - Updated CloudFetchDownloader to use IHiveServer2Statement to access Connection.RecyclableMemoryStreamManager - Database-level pooling enables proper cleanup on disposal and efficient sharing across connection pools Thread-safety: RecyclableMemoryStreamManager is thread-safe for concurrent access from multiple connections, with each operation receiving its own stream instance that is not shared between threads. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
…onstructor Improves on the previous commit by making the manager a required part of connection initialization when created via Database, while maintaining backward compatibility for direct connection creation. Changes: - Added internal constructor that accepts RecyclableMemoryStreamManager - Public constructor chains to internal constructor with null (creates own manager) - DatabricksDatabase passes shared manager via internal constructor - No public API changes - backward compatible with existing code - Compile-time safety - Database must explicitly pass the manager This ensures the manager is always set and prevents forgetting to initialize it. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
|
Looks like this will need rebasing. |
938aebd to
0701d0c
Compare
This merge combines both memory management approaches: - RecyclableMemoryStreamManager (for output streams) - ArrayPool (for internal LZ4 decompression buffers) Changes: - DatabricksConnection: Added both RecyclableMemoryStreamManager and Lz4BufferPool properties - DatabricksDatabase: Instantiates both pools and passes to connections - Lz4Utilities: DecompressLz4Async now accepts both managers - CloudFetchDownloader: Uses both for decompression operations - Added CustomLZ4DecoderStream and CustomLZ4FrameReader from PR apache#3654
|
@CurtHagenlocher The new result with both commits further reduce the memory to 541MB from 2.73GB.
|
Summary
Implements RecyclableMemoryStream for LZ4 decompression to reduce memory pressure and fix stream corruption issues.
Before the change:
For the dotNet Mem dump, the set_Capacity from MemoryStream is occupying 5GB.

After the change:
For the dotNet Mem dump, we do not see memory allocation for set_Capacity.

CloudFetch
This only touches CloudFetch path for now.
For non-cloud fetch case where we use the syncrho version for Lz4 Decode, there will be another PR to address that if necessary.
Changes
Lz4Utilities.DecompressLz4Asyncto returnMemoryStreaminstead of(buffer, length)tupleRecyclableMemoryStreamManagerfor pooled memory allocationCopyAsyncwith 80KB chunks for efficient streamingProblem Fixed
Previously, the RecyclableMemoryStream was disposed immediately after decompression, returning its buffer to the pool. This caused "Unexpectedly reached end of stream" errors when Arrow tried to read from the buffer later, as it could have been overwritten by other operations.
Solution
The new API returns the stream itself without disposing it. Arrow reads from the stream and then disposes it when done, ensuring the buffer is only returned to the pool after reading completes.
Benefits
Testing
🤖 Generated with Claude Code