Skip to content

Connection pooling#1071

Merged
JojiiOfficial merged 8 commits into
devfrom
connection-pooling
Sep 19, 2025
Merged

Connection pooling#1071
JojiiOfficial merged 8 commits into
devfrom
connection-pooling

Conversation

@JojiiOfficial

@JojiiOfficial JojiiOfficial commented Sep 12, 2025

Copy link
Copy Markdown
Contributor

Implements connection pooling for grpc connections.
Rest already supports connection pooling thanks to the underlying httpx library. Here, only the default value is changed for consistency.

@netlify

netlify Bot commented Sep 12, 2025

Copy link
Copy Markdown

Deploy Preview for poetic-froyo-8baba7 ready!

Name Link
🔨 Latest commit 6b114e1
🔍 Latest deploy log https://app.netlify.com/projects/poetic-froyo-8baba7/deploys/68cd45d6f0919c000819358c
😎 Deploy Preview https://deploy-preview-1071--poetic-froyo-8baba7.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

if self._grpc_snapshots_client_pool is None:
self._init_grpc_snapshots_client()
return self._grpc_snapshots_client
assert self._grpc_snapshots_client_pool is not None

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mypy wasn't happy with the types when I converted the variable into a list, so I added these assertions.
I tried # type: ignore but this doesn't get applied in the generated async client, leading to mypy throwing errors for the async client.
If there is a better approach to make mypy happy, that works in the async client as well, please let me know!

Comment thread qdrant_client/async_qdrant_client.py Outdated
coderabbitai[bot]

This comment was marked as outdated.

coderabbitai[bot]

This comment was marked as outdated.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (1)
qdrant_client/async_qdrant_remote.py (1)

278-281: About “race” on _grpc_client_next_index (context).

In asyncio, this method runs to completion between awaits, so intra-loop races are unlikely. Cross-thread use would be unsafe, though. If multi-threaded access is a goal, guard the rotation with a lock or constrain the class to single-threaded use in docs.

Would you like a minimal lock-based variant that keeps _next_grpc_client synchronous (no await in call-sites) but safe under threads?

🧹 Nitpick comments (1)
qdrant_client/async_qdrant_remote.py (1)

205-226: Close: make idempotent, GC-friendly, and resilient to cancellation.

  • Catch asyncio.CancelledError during shutdown.
  • Drop references and reset index so repeated close() calls are harmless and memory is freed promptly.

Apply:

@@
-        if len(self._grpc_channel_pool) > 0:
+        if len(self._grpc_channel_pool) > 0:
             for channel in self._grpc_channel_pool:
                 try:
                     await channel.close(grace=grpc_grace)
-                except AttributeError:
+                except AttributeError:
                     show_warning(
                         message="Unable to close grpc_channel. Connection was interrupted on the server side",
                         category=UserWarning,
                         stacklevel=4,
                     )
-                except RuntimeError:
+                except (RuntimeError, asyncio.CancelledError):
                     pass
@@
-        self._closed = True
+        # Drop references for GC and idempotency
+        self._grpc_points_client_pool = None
+        self._grpc_collections_client_pool = None
+        self._grpc_snapshots_client_pool = None
+        self._grpc_root_client_pool = None
+        self._grpc_channel_pool.clear()
+        self._grpc_client_next_index = 0
+        self._closed = True

Add import:

import asyncio
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2f210b and e63bb92.

📒 Files selected for processing (2)
  • qdrant_client/async_qdrant_remote.py (9 hunks)
  • qdrant_client/qdrant_remote.py (9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • qdrant_client/qdrant_remote.py
🧰 Additional context used
🧬 Code graph analysis (1)
qdrant_client/async_qdrant_remote.py (8)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/grpc/collections_service_pb2_grpc.py (1)
  • CollectionsStub (8-81)
qdrant_client/grpc/snapshots_service_pb2_grpc.py (1)
  • SnapshotsStub (8-46)
qdrant_client/grpc/qdrant_pb2_grpc.py (1)
  • QdrantStub (8-21)
qdrant_client/qdrant_remote.py (7)
  • close (243-264)
  • _init_grpc_points_client (296-300)
  • _init_grpc_channel (277-294)
  • _init_grpc_collections_client (302-306)
  • _init_grpc_snapshots_client (308-312)
  • _init_grpc_root_client (314-318)
  • _next_grpc_client (320-323)
qdrant_client/async_qdrant_client.py (1)
  • close (148-155)
qdrant_client/common/client_warnings.py (1)
  • show_warning (7-8)
qdrant_client/connection.py (1)
  • get_channel (254-275)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Redirect rules - poetic-froyo-8baba7
  • GitHub Check: Header rules - poetic-froyo-8baba7
  • GitHub Check: Pages changed - poetic-froyo-8baba7
  • GitHub Check: Python 3.13.x on ubuntu-latest test
  • GitHub Check: Python 3.12.x on ubuntu-latest test
  • GitHub Check: Python 3.9.x on ubuntu-latest test
  • GitHub Check: Python 3.10.x on ubuntu-latest test
  • GitHub Check: Python 3.11.x on ubuntu-latest test
🔇 Additional comments (2)
qdrant_client/async_qdrant_remote.py (2)

74-84: Good addition: pool_size with a safe floor.

Defaulting to 3 and guarding with max(1, pool_size) looks solid.


256-276: Stub pool init mirrors channel pool correctly.

Looks good. Once the channel top-up fix is in, these comprehensions will always align with the actual pool.

Comment thread qdrant_client/async_qdrant_remote.py Outdated
Comment thread qdrant_client/async_qdrant_remote.py
Comment thread qdrant_client/async_qdrant_remote.py
@qdrant qdrant deleted a comment from coderabbitai Bot Sep 12, 2025
@coderabbitai

coderabbitai Bot commented Sep 17, 2025

Copy link
Copy Markdown
📝 Walkthrough

Walkthrough

Adds .venv to .gitignore. Introduces gRPC channel pools for synchronous and asynchronous remotes: new pool_size constructor parameter (stored per-instance, default class constant DEFAULT_GRPC_POOL_SIZE = 3, normalized to ≥1), creation of per-instance channel pools and per-endpoint stub pools, round‑robin selection for stub accessors, and updated close() logic to close all channels in the pool. Propagates pool_size through QdrantClient and AsyncQdrantClient constructors and updates the async client generator to emit pool-aware close logic. HTTP/REST behavior and public signatures otherwise unchanged.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 34.29% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The title "Connection pooling" is concise and directly reflects the primary change of the PR, which implements gRPC connection pooling and standardizes pooling defaults. The raw summary shows additions like pool_size parameters, DEFAULT_GRPC_POOL_SIZE = 3, and multi-channel gRPC pools across multiple files, which align with the title. This wording is clear enough for a teammate scanning history to understand the main intent.
Description Check ✅ Passed The description succinctly states the PR's intent to implement gRPC connection pooling and notes REST already had pooling, which matches the file-level changes adding pool_size, DEFAULT_GRPC_POOL_SIZE, and pool logic. It is directly related to the changeset and not off-topic or misleading. Although brief, the description adequately describes the main objective of the PR.
✨ Finishing touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch connection-pooling

Tip

👮 Agentic pre-merge checks are now available in preview!

Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.

  • Built-in checks – Quickly apply ready-made checks to enforce title conventions, require pull request descriptions that follow templates, validate linked issues for compliance, and more.
  • Custom agentic checks – Define your own rules using CodeRabbit’s advanced agentic capabilities to enforce organization-specific policies and workflows. For example, you can instruct CodeRabbit’s agent to verify that API documentation is updated whenever API schema files are modified in a PR. Note: Upto 5 custom checks are currently allowed during the preview period. Pricing for this feature will be announced in a few weeks.

Please see the documentation for more information.

Example:

reviews:
  pre_merge_checks:
    custom_checks:
      - name: "Undocumented Breaking Changes"
        mode: "warning"
        instructions: |
          Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).

Please share your feedback with us on this Discord post.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
tools/async_client_generator/transformers/remote/function_def_transformer.py (1)

40-64: Close should also clear stub/channel pools to prevent reuse after shutdown

After closing channels and HTTP, null-out stub pools and clear the channel pool so any post-close access fails fast (generated clients currently keep returning stubs backed by closed channels).

Apply:

 async def close(self, grpc_grace: Optional[float] = None, **kwargs: Any) -> None:
-    if len(self._grpc_channel_pool) > 0:
+    if len(self._grpc_channel_pool) > 0:
         for channel in self._grpc_channel_pool:
             try:
                 await channel.close(grace=grpc_grace)
             except AttributeError:
                 show_warning(
                     message="Unable to close grpc_channel. Connection was interrupted on the server side",
                     category=UserWarning,
                     stacklevel=4,
                 )
             except RuntimeError:
                 pass

     try:
         await self.http.aclose()
     except Exception:
         show_warning(
             message="Unable to close http connection. Connection was interrupted on the server side",
             category=UserWarning,
             stacklevel=4,
         )

-    self._closed = True
+    # Prevent reuse after close
+    try:
+        self._grpc_points_client_pool = None
+        self._grpc_collections_client_pool = None
+        self._grpc_snapshots_client_pool = None
+        self._grpc_root_client_pool = None
+        self._grpc_channel_pool = []
+        self._grpc_client_next_index = 0
+    except AttributeError:
+        # Older generated clients may miss these attributes
+        pass
+    self._closed = True
qdrant_client/async_qdrant_remote.py (1)

204-225: Prevent reuse of closed stubs/channels after close()

After closing channels, the stub pools remain non-None, so callers can still get stubs backed by closed channels. Clear pools and reset the index.

Apply:

 async def close(self, grpc_grace: Optional[float] = None, **kwargs: Any) -> None:
@@
-        try:
+        try:
             await self.http.aclose()
         except Exception:
             show_warning(
                 message="Unable to close http connection. Connection was interrupted on the server side",
                 category=UserWarning,
                 stacklevel=4,
             )
-        self._closed = True
+        # Prevent reuse after close
+        self._grpc_points_client_pool = None
+        self._grpc_collections_client_pool = None
+        self._grpc_snapshots_client_pool = None
+        self._grpc_root_client_pool = None
+        self._grpc_channel_pool = []
+        self._grpc_client_next_index = 0
+        self._closed = True
qdrant_client/qdrant_remote.py (1)

243-264: Prevent reuse of closed stubs/channels after close()

Same issue as async: properties will keep returning stubs backed by closed channels. Clear the pools and reset index.

Apply:

 def close(self, grpc_grace: Optional[float] = None, **kwargs: Any) -> None:
@@
-        try:
+        try:
             self.openapi_client.close()
         except Exception:
             show_warning(
                 message="Unable to close http connection. Connection was interrupted on the server side",
                 category=RuntimeWarning,
                 stacklevel=4,
             )
 
-        self._closed = True
+        # Prevent reuse after close
+        self._grpc_points_client_pool = None
+        self._grpc_collections_client_pool = None
+        self._grpc_snapshots_client_pool = None
+        self._grpc_root_client_pool = None
+        self._grpc_channel_pool = []
+        self._grpc_client_next_index = 0
+        self._closed = True
♻️ Duplicate comments (2)
qdrant_client/async_qdrant_remote.py (1)

283-286: About the previously raised “race” on _next_grpc_client()

In a single asyncio event loop, this method has no await points and executes atomically; practical risk is low. If cross-thread access is expected, consider an asyncio.Lock.

Do you plan to use AsyncQdrantRemote across threads? If yes, I can provide a lock-based variant.

qdrant_client/qdrant_remote.py (1)

328-331: Thread-safety: protect round‑robin index with a lock

Sync client may be used from multiple threads; unsynchronized read/modify leads to skewed rotation. Add a lock and base modulo on actual pool length.

Apply:

+import threading
@@ class QdrantRemote(QdrantBase):
-        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_lock = threading.Lock()
@@
-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (self._grpc_client_next_index + 1) % self._pool_size
-        return current_index
+    def _next_grpc_client(self) -> int:
+        length = len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0
+        with self._grpc_client_lock:
+            current_index = self._grpc_client_next_index % length
+            self._grpc_client_next_index = (current_index + 1) % length
+            return current_index
🧹 Nitpick comments (4)
tools/async_client_generator/transformers/remote/function_def_transformer.py (1)

41-41: Nit: truthiness check is cleaner

Prefer if self._grpc_channel_pool: over len(self._grpc_channel_pool) > 0.

qdrant_client/async_qdrant_remote.py (2)

283-286: Round‑robin should use actual pool length, not configured pool_size

This avoids indexing errors if the pool is rebuilt to a different size in the future and keeps rotation correct.

Apply:

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (self._grpc_client_next_index + 1) % self._pool_size
-        return current_index
+    def _next_grpc_client(self) -> int:
+        length = len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0
+        current_index = self._grpc_client_next_index % length
+        self._grpc_client_next_index = (current_index + 1) % length
+        return current_index

241-257: Chain the original exception when wrapping initialization errors

Improves debuggability and satisfies linters.

Apply:

-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+        except Exception as e:
+            raise RuntimeError("Error initializing the grpc connection(s)") from e
qdrant_client/qdrant_remote.py (1)

301-302: Chain original exception in _init_grpc_channel()

Use exception chaining for context and to appease linters.

Apply:

-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+        except Exception as e:
+            raise RuntimeError("Error initializing the grpc connection(s)") from e
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3d29466 and d2e8a21.

📒 Files selected for processing (4)
  • .gitignore (1 hunks)
  • qdrant_client/async_qdrant_remote.py (9 hunks)
  • qdrant_client/qdrant_remote.py (9 hunks)
  • tools/async_client_generator/transformers/remote/function_def_transformer.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
qdrant_client/qdrant_remote.py (6)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/grpc/collections_service_pb2_grpc.py (1)
  • CollectionsStub (8-81)
qdrant_client/grpc/snapshots_service_pb2_grpc.py (1)
  • SnapshotsStub (8-46)
qdrant_client/grpc/qdrant_pb2_grpc.py (1)
  • QdrantStub (8-21)
qdrant_client/async_qdrant_remote.py (7)
  • close (204-225)
  • _init_grpc_points_client (259-263)
  • _init_grpc_channel (238-257)
  • _init_grpc_collections_client (265-269)
  • _init_grpc_snapshots_client (271-275)
  • _init_grpc_root_client (277-281)
  • _next_grpc_client (283-286)
qdrant_client/connection.py (1)
  • get_channel (254-275)
qdrant_client/async_qdrant_remote.py (6)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/grpc/collections_service_pb2_grpc.py (1)
  • CollectionsStub (8-81)
qdrant_client/grpc/snapshots_service_pb2_grpc.py (1)
  • SnapshotsStub (8-46)
qdrant_client/grpc/qdrant_pb2_grpc.py (1)
  • QdrantStub (8-21)
qdrant_client/qdrant_remote.py (7)
  • close (243-264)
  • _init_grpc_points_client (304-308)
  • _init_grpc_channel (277-302)
  • _init_grpc_collections_client (310-314)
  • _init_grpc_snapshots_client (316-320)
  • _init_grpc_root_client (322-326)
  • _next_grpc_client (328-331)
qdrant_client/connection.py (1)
  • get_channel (254-275)
🪛 Ruff (0.12.2)
qdrant_client/qdrant_remote.py

301-301: Do not catch blind exception: Exception

(BLE001)


302-302: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


302-302: Avoid specifying long messages outside the exception class

(TRY003)

qdrant_client/async_qdrant_remote.py

256-256: Do not catch blind exception: Exception

(BLE001)


257-257: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


257-257: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Redirect rules - poetic-froyo-8baba7
  • GitHub Check: Header rules - poetic-froyo-8baba7
  • GitHub Check: Pages changed - poetic-froyo-8baba7
  • GitHub Check: Python 3.9.x on ubuntu-latest test
  • GitHub Check: Python 3.12.x on ubuntu-latest test
  • GitHub Check: Python 3.10.x on ubuntu-latest test
  • GitHub Check: Python 3.13.x on ubuntu-latest test
  • GitHub Check: Python 3.11.x on ubuntu-latest test
🔇 Additional comments (1)
.gitignore (1)

18-18: LGTM: ignore .venv

Good addition; prevents committing local virtual envs.

@JojiiOfficial JojiiOfficial requested a review from joein September 17, 2025 07:07

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (3)
qdrant_client/async_qdrant_remote.py (2)

285-288: Make round‑robin use actual pool length.

Avoid modulo by configured size; use the real pool length to prevent future mismatches.

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (self._grpc_client_next_index + 1) % self._pool_size
-        return current_index
+    def _next_grpc_client(self, pool_len: Optional[int] = None) -> int:
+        length = pool_len or len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0  # defensive; pools are initialized before use
+        current_index = self._grpc_client_next_index % length
+        self._grpc_client_next_index = (current_index + 1) % length
+        return current_index

297-301: Use local pool var and pass its length to the rotator (also keeps mypy happy).

@@
-        assert self._grpc_collections_client_pool is not None
-        return self._grpc_collections_client_pool[self._next_grpc_client()]
+        assert self._grpc_collections_client_pool is not None
+        pool = self._grpc_collections_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@
-        assert self._grpc_points_client_pool is not None
-        return self._grpc_points_client_pool[self._next_grpc_client()]
+        assert self._grpc_points_client_pool is not None
+        pool = self._grpc_points_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@
-        assert self._grpc_snapshots_client_pool is not None
-        return self._grpc_snapshots_client_pool[self._next_grpc_client()]
+        assert self._grpc_snapshots_client_pool is not None
+        pool = self._grpc_snapshots_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@
-        assert self._grpc_root_client_pool is not None
-        return self._grpc_root_client_pool[self._next_grpc_client()]
+        assert self._grpc_root_client_pool is not None
+        pool = self._grpc_root_client_pool
+        return pool[self._next_grpc_client(len(pool))]

Also applies to: 309-313, 321-325, 333-337

qdrant_client/qdrant_remote.py (1)

331-335: Thread-safety: protect round‑robin index and use actual pool length.

Concurrent callers can race on _grpc_client_next_index. Add a lock and compute modulo by the real pool length.

Apply (plus import/init below):

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (self._grpc_client_next_index + 1) % self._pool_size
-        return current_index
+    def _next_grpc_client(self, pool_len: Optional[int] = None) -> int:
+        length = pool_len or len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0
+        with self._grpc_client_lock:
+            current_index = self._grpc_client_next_index % length
+            self._grpc_client_next_index = (current_index + 1) % length
+            return current_index

Add at top and in init:

@@
-from typing import (
+from typing import (
@@
 )
+import threading
@@
-        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_lock = threading.Lock()
🧹 Nitpick comments (4)
qdrant_client/async_qdrant_remote.py (2)

243-259: Harden channel init: top-up to target size and preserve exception cause.

  • Initialize until the pool reaches the target size (defensive if pool becomes undersized later).
  • Chain the original exception for debuggability and to satisfy linters (BLE001/B904/TRY003).
-        try:
-            channel_pool = []
-            if len(self._grpc_channel_pool) == 0:
-                for _ in range(self._pool_size):
+        try:
+            channel_pool = []
+            if len(self._grpc_channel_pool) < self._pool_size:
+                missing = self._pool_size - len(self._grpc_channel_pool)
+                for _ in range(missing):
                     channel = get_channel(
                         host=self._host,
                         port=self._grpc_port,
                         ssl=self._https,
                         metadata=self._grpc_headers,
                         options=self._grpc_options,
                         compression=self._grpc_compression,
                         auth_token_provider=self._auth_token_provider,
                     )
                     channel_pool.append(channel)
-                self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+                # Apply atomically to avoid half-initialized state
+                self._grpc_channel_pool.extend(channel_pool)
+        except Exception as e:
+            raise RuntimeError("Error initializing the grpc connection(s)") from e

206-219: Nit: align warning category with sync variant or downgrade duplicate except.

Sync uses RuntimeWarning; async uses UserWarning and also swallows RuntimeError. Consider aligning categories and add a brief comment on why RuntimeError is suppressed.

qdrant_client/qdrant_remote.py (2)

343-347: Use local pool var and pass its length to the rotator.

Prevents mismatches and improves readability/type narrowing.

@@
-        assert self._grpc_collections_client_pool is not None
-        return self._grpc_collections_client_pool[self._next_grpc_client()]
+        assert self._grpc_collections_client_pool is not None
+        pool = self._grpc_collections_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@
-        assert self._grpc_points_client_pool is not None
-        return self._grpc_points_client_pool[self._next_grpc_client()]
+        assert self._grpc_points_client_pool is not None
+        pool = self._grpc_points_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@
-        assert self._grpc_snapshots_client_pool is not None
-        return self._grpc_snapshots_client_pool[self._next_grpc_client()]
+        assert self._grpc_snapshots_client_pool is not None
+        pool = self._grpc_snapshots_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@
-        assert self._grpc_root_client_pool is not None
-        return self._grpc_root_client_pool[self._next_grpc_client()]
+        assert self._grpc_root_client_pool is not None
+        pool = self._grpc_root_client_pool
+        return pool[self._next_grpc_client(len(pool))]

Also applies to: 355-359, 367-371, 379-383


284-305: Preserve exception cause; optional: top‑up pool defensively.

  • Re-raise with from e to keep the original traceback (ruff BLE001/B904/TRY003).
  • Optionally, initialize until the pool reaches _pool_size (defensive against undersized pool).
-        try:
-            channel_pool = []
-
-            if len(self._grpc_channel_pool) == 0:
-                for _ in range(self._pool_size):
+        try:
+            channel_pool = []
+            if len(self._grpc_channel_pool) < self._pool_size:
+                missing = self._pool_size - len(self._grpc_channel_pool)
+                for _ in range(missing):
                     channel = get_channel(
                         host=self._host,
                         port=self._grpc_port,
                         ssl=self._https,
                         metadata=self._grpc_headers,
                         options=self._grpc_options,
                         compression=self._grpc_compression,
                         # sync get_channel does not accept coroutine functions,
                         # but we can't check type here, since it'll get into async client as well
                         auth_token_provider=self._auth_token_provider,  # type: ignore
                     )
-                    channel_pool.append(channel)
-
-                # Apply the clients late to prevent half-initialized pools if a channel creation fails.
-                self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+                    channel_pool.append(channel)
+                # Apply atomically to avoid half-initialized state.
+                self._grpc_channel_pool.extend(channel_pool)
+        except Exception as e:
+            raise RuntimeError("Error initializing the grpc connection(s)") from e
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d2e8a21 and d76da9e.

📒 Files selected for processing (2)
  • qdrant_client/async_qdrant_remote.py (10 hunks)
  • qdrant_client/qdrant_remote.py (10 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
qdrant_client/async_qdrant_remote.py (3)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/qdrant_remote.py (7)
  • close (246-267)
  • _init_grpc_points_client (307-311)
  • _init_grpc_channel (280-305)
  • _init_grpc_collections_client (313-317)
  • _init_grpc_snapshots_client (319-323)
  • _init_grpc_root_client (325-329)
  • _next_grpc_client (331-334)
qdrant_client/connection.py (1)
  • get_channel (254-275)
qdrant_client/qdrant_remote.py (4)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/async_qdrant_remote.py (7)
  • close (206-227)
  • _init_grpc_points_client (261-265)
  • _init_grpc_channel (240-259)
  • _init_grpc_collections_client (267-271)
  • _init_grpc_snapshots_client (273-277)
  • _init_grpc_root_client (279-283)
  • _next_grpc_client (285-288)
qdrant_client/common/client_warnings.py (1)
  • show_warning (7-8)
qdrant_client/connection.py (1)
  • get_channel (254-275)
🪛 Ruff (0.12.2)
qdrant_client/async_qdrant_remote.py

258-258: Do not catch blind exception: Exception

(BLE001)


259-259: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


259-259: Avoid specifying long messages outside the exception class

(TRY003)

qdrant_client/qdrant_remote.py

304-304: Do not catch blind exception: Exception

(BLE001)


305-305: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


305-305: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Redirect rules - poetic-froyo-8baba7
  • GitHub Check: Python 3.13.x on ubuntu-latest test
  • GitHub Check: Python 3.12.x on ubuntu-latest test
  • GitHub Check: Python 3.9.x on ubuntu-latest test
  • GitHub Check: Python 3.11.x on ubuntu-latest test
  • GitHub Check: Python 3.10.x on ubuntu-latest test
  • GitHub Check: Header rules - poetic-froyo-8baba7
  • GitHub Check: Pages changed - poetic-froyo-8baba7

Comment thread qdrant_client/async_qdrant_remote.py Outdated
Comment thread qdrant_client/qdrant_remote.py Outdated
@JojiiOfficial JojiiOfficial changed the title Connection pooling in GRPC Connection pooling Sep 17, 2025
Comment thread qdrant_client/async_qdrant_remote.py Outdated
Comment thread qdrant_client/qdrant_remote.py Outdated
Union[Callable[[], str], Callable[[], Awaitable[str]]]
] = None,
check_compatibility: bool = True,
pool_size: int = 3,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it only 3?

httpx had a bigger default value

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

though, I am not sure whether it is right to have the same amount of connections in grpc pool as we have in http pool by default..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided to go with 3. I also added this default value here to make it consistent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't say I like the idea of a silent change of a default value from 100 to 3. It will affect lots of users + we might need to do some benchmarks to see the actual difference

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I feel like pool_size should also be added to qdrant_client/qdrant_client.py

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a small attempt to benchmark it
This code is just uploading 200 batches of size 64 of 384 dimensional vectors

from qdrant_client import AsyncQdrantClient, models
import time
import numpy as np
import asyncio
import uuid


if __name__ == "__main__":

    batches = []

    for i in range(200):
        batch = [
            models.PointStruct(id=str(uuid.uuid4()), vector=vector.tolist())
            for vector in np.random.random(size=(64, 384))
        ]
        batches.append(batch)

    async def main():
        cl = AsyncQdrantClient(
            pool_size=100,
            timeout=60,
        )
        if await cl.collection_exists("test"):
            await cl.delete_collection("test")
        await cl.create_collection(
            "test", vectors_config=models.VectorParams(size=384, distance=models.Distance.COSINE)
        )

        tasks = []
        for b in batches:
            tasks.append(asyncio.Task(cl.upsert("test", b)))
        a = time.perf_counter()
        await asyncio.gather(*tasks)
        print(time.perf_counter() - a)

    asyncio.run(main())

With pool_size 100 it is almost 1.5x faster than with size 3, so I don't think we can change httpx default from 100 to 3 unfortunately
Maybe we should just set the default value to None and create two class variables (similar to DEFAULT_GRPC_TIMEOUT)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the default value to apply only to GRPC.

Note that in your benchmark the pool_size parameter is not applied if you address Qdrant on localhost.

@joein joein Sep 19, 2025

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's right
I actually used Qdrant cloud, but removed api_key and url from the example

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (7)
qdrant_client/async_qdrant_remote.py (3)

121-123: Good: use sanitized self._pool_size for httpx limits

Fixes prior bug where raw pool_size could be 0/negative.


286-289: Use actual pool length (and optionally a lock) in round‑robin

Defensive against any future divergence between configured size and realized pool.

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (self._grpc_client_next_index + 1) % self._pool_size
-        return current_index
+    def _next_grpc_client(self, pool_len: Optional[int] = None) -> int:
+        length = pool_len or len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0
+        # Optional if you introduced asyncio.Lock in __init__:
+        # async with self._grpc_client_lock:
+        current_index = self._grpc_client_next_index % length
+        self._grpc_client_next_index = (current_index + 1) % length
+        return current_index

298-302: Index using realized pool length; keep mypy happy with local var

Prevents theoretical IndexError if pool length diverges; local var preserves narrowing.

@@ def grpc_collections(self) -> grpc.CollectionsStub:
-        assert self._grpc_collections_client_pool is not None
-        return self._grpc_collections_client_pool[self._next_grpc_client()]
+        assert self._grpc_collections_client_pool is not None
+        pool = self._grpc_collections_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@ def grpc_points(self) -> grpc.PointsStub:
-        assert self._grpc_points_client_pool is not None
-        return self._grpc_points_client_pool[self._next_grpc_client()]
+        assert self._grpc_points_client_pool is not None
+        pool = self._grpc_points_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@ def grpc_snapshots(self) -> grpc.SnapshotsStub:
-        assert self._grpc_snapshots_client_pool is not None
-        return self._grpc_snapshots_client_pool[self._next_grpc_client()]
+        assert self._grpc_snapshots_client_pool is not None
+        pool = self._grpc_snapshots_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@ def grpc_root(self) -> grpc.QdrantStub:
-        assert self._grpc_root_client_pool is not None
-        return self._grpc_root_client_pool[self._next_grpc_client()]
+        assert self._grpc_root_client_pool is not None
+        pool = self._grpc_root_client_pool
+        return pool[self._next_grpc_client(len(pool))]

Also applies to: 310-314, 322-326, 334-337

qdrant_client/qdrant_remote.py (4)

133-136: Good: use sanitized self._pool_size for httpx limits

Fixes the earlier bug with raw pool_size.


345-349: Index using realized pool length; local var helps mypy

Keeps rotation correct even if the pool is rebuilt/top‑upped.

@@ def grpc_collections(self) -> grpc.CollectionsStub:
-        assert self._grpc_collections_client_pool is not None
-        return self._grpc_collections_client_pool[self._next_grpc_client()]
+        assert self._grpc_collections_client_pool is not None
+        pool = self._grpc_collections_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@ def grpc_points(self) -> grpc.PointsStub:
-        assert self._grpc_points_client_pool is not None
-        return self._grpc_points_client_pool[self._next_grpc_client()]
+        assert self._grpc_points_client_pool is not None
+        pool = self._grpc_points_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@ def grpc_snapshots(self) -> grpc.SnapshotsStub:
-        assert self._grpc_snapshots_client_pool is not None
-        return self._grpc_snapshots_client_pool[self._next_grpc_client()]
+        assert self._grpc_snapshots_client_pool is not None
+        pool = self._grpc_snapshots_client_pool
+        return pool[self._next_grpc_client(len(pool))]
@@ def grpc_root(self) -> grpc.QdrantStub:
-        assert self._grpc_root_client_pool is not None
-        return self._grpc_root_client_pool[self._next_grpc_client()]
+        assert self._grpc_root_client_pool is not None
+        pool = self._grpc_root_client_pool
+        return pool[self._next_grpc_client(len(pool))]

Also applies to: 357-361, 369-373, 381-385


333-336: Make round‑robin thread‑safe and length‑aware

Use the lock and actual pool length to avoid races and theoretical OOB.

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (self._grpc_client_next_index + 1) % self._pool_size
-        return current_index
+    def _next_grpc_client(self, pool_len: Optional[int] = None) -> int:
+        length = pool_len or len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0
+        with self._grpc_client_lock:
+            current_index = self._grpc_client_next_index % length
+            self._grpc_client_next_index = (current_index + 1) % length
+            return current_index

203-209: Add a lock for thread‑safe round‑robin

Multiple threads can read/update the index concurrently; protect it.

@@
-        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_lock = threading.Lock()

Additional import needed near other imports:

+import threading
🧹 Nitpick comments (4)
qdrant_client/async_qdrant_remote.py (2)

173-179: Consider guarding round‑robin index in async (low risk)

Async properties are synchronous (no awaits), so task interleaving is unlikely to corrupt the index. If you still want belt-and-suspenders safety for cross‑thread use, add a lock field now and use it in the rotator.

@@
-        self._grpc_client_next_index: int = 0
+        self._grpc_client_next_index: int = 0
+        # Optional: protect index if instance is shared across threads/loops
+        # import asyncio at top if you choose this path:
+        # import asyncio
+        # self._grpc_client_lock = asyncio.Lock()

244-260: Tighten exception handling; close partially created channels on failure

Avoid leaking channels if creation fails mid-loop and preserve tracebacks.

-    def _init_grpc_channel(self) -> None:
+    def _init_grpc_channel(self) -> None:
         if self._closed:
             raise RuntimeError("Client was closed. Please create a new QdrantClient instance.")
-        try:
-            channel_pool = []
+        channel_pool: list[grpc.Channel] = []
+        try:
             if len(self._grpc_channel_pool) == 0:
                 for _ in range(self._pool_size):
@@
                     )
                     channel_pool.append(channel)
                 self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+        except Exception as e:
+            # best‑effort cleanup for channels created before the failure
+            for ch in channel_pool:
+                try:
+                    await ch.close()
+                except Exception:
+                    pass
+            raise RuntimeError("Error initializing the grpc connection(s)") from e
qdrant_client/qdrant_remote.py (2)

286-307: Tighten exception handling; close partially created channels on failure

Avoid leaks on mid‑loop failure and preserve traceback chain.

-        try:
-            channel_pool = []
+        channel_pool: list[grpc.Channel] = []
+        try:
@@
-                self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+                self._grpc_channel_pool = channel_pool
+        except Exception as e:
+            for ch in channel_pool:
+                try:
+                    ch.close()
+                except Exception:
+                    pass
+            raise RuntimeError("Error initializing the grpc connection(s)") from e

210-214: Remove unused aio gRPC stub fields

rg shows only their declarations in qdrant_client/qdrant_remote.py (lines 210–213): self._aio_grpc_points_client, self._aio_grpc_collections_client, self._aio_grpc_snapshots_client, self._aio_grpc_root_client — delete them.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d76da9e and 0f51918.

📒 Files selected for processing (2)
  • qdrant_client/async_qdrant_remote.py (10 hunks)
  • qdrant_client/qdrant_remote.py (10 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
qdrant_client/async_qdrant_remote.py (7)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/grpc/collections_service_pb2_grpc.py (1)
  • CollectionsStub (8-81)
qdrant_client/grpc/snapshots_service_pb2_grpc.py (1)
  • SnapshotsStub (8-46)
qdrant_client/grpc/qdrant_pb2_grpc.py (1)
  • QdrantStub (8-21)
qdrant_client/qdrant_remote.py (7)
  • close (248-269)
  • _init_grpc_points_client (309-313)
  • _init_grpc_channel (282-307)
  • _init_grpc_collections_client (315-319)
  • _init_grpc_snapshots_client (321-325)
  • _init_grpc_root_client (327-331)
  • _next_grpc_client (333-336)
qdrant_client/common/client_warnings.py (1)
  • show_warning (7-8)
qdrant_client/connection.py (1)
  • get_channel (254-275)
qdrant_client/qdrant_remote.py (2)
qdrant_client/async_qdrant_remote.py (7)
  • close (207-228)
  • _init_grpc_points_client (262-266)
  • _init_grpc_channel (241-260)
  • _init_grpc_collections_client (268-272)
  • _init_grpc_snapshots_client (274-278)
  • _init_grpc_root_client (280-284)
  • _next_grpc_client (286-289)
qdrant_client/connection.py (1)
  • get_channel (254-275)
🪛 Ruff (0.12.2)
qdrant_client/async_qdrant_remote.py

259-259: Do not catch blind exception: Exception

(BLE001)


260-260: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


260-260: Avoid specifying long messages outside the exception class

(TRY003)

qdrant_client/qdrant_remote.py

306-306: Do not catch blind exception: Exception

(BLE001)


307-307: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


307-307: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Redirect rules - poetic-froyo-8baba7
  • GitHub Check: Header rules - poetic-froyo-8baba7
  • GitHub Check: Pages changed - poetic-froyo-8baba7
  • GitHub Check: Python 3.12.x on ubuntu-latest test
  • GitHub Check: Python 3.9.x on ubuntu-latest test
  • GitHub Check: Python 3.10.x on ubuntu-latest test
  • GitHub Check: Python 3.11.x on ubuntu-latest test
  • GitHub Check: Python 3.13.x on ubuntu-latest test
🔇 Additional comments (2)
qdrant_client/async_qdrant_remote.py (1)

83-85: Good: clamp pool_size to >= 1

Prevents invalid pools and keeps REST/gRPC consistent.

qdrant_client/qdrant_remote.py (1)

75-77: Good: clamp pool_size to >= 1

Prevents invalid pools and aligns defaults across transports.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
qdrant_client/qdrant_remote.py (1)

139-141: Bug fix confirmed: REST now uses sanitized pool size.

Switching httpx.Limits to self._pool_size prevents 0/negative usage. LGTM.

qdrant_client/async_qdrant_remote.py (1)

124-126: Bug fix confirmed: REST uses sanitized value.

Using self._pool_size is correct and consistent with sync.

🧹 Nitpick comments (10)
qdrant_client/qdrant_remote.py (6)

76-82: Good: input sanitization for pool_size.

Clamping to ≥1 avoids invalid pools. Consider also validating type (non‑int inputs) earlier if these can arrive from env/config.


209-215: Make round‑robin index thread‑safe.

Concurrent use of a single client instance can interleave the increment. Guard the index with a lock.

Apply:

@@
-import logging
+import logging
+import threading
@@ class QdrantRemote(QdrantBase):
-        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_rr_lock = threading.Lock()

351-356: Round‑robin should use actual pool length and lock.

Using configured size risks mismatch if the pool is rebuilt; also not thread‑safe. Use the lock and current pool length.

Apply:

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (
-            self._grpc_client_next_index + 1
-        ) % self._get_grpc_pool_size()
-        return current_index
+    def _next_grpc_client(self) -> int:
+        with self._grpc_rr_lock:
+            length = len(self._grpc_channel_pool) or self._get_grpc_pool_size()
+            idx = self._grpc_client_next_index % length
+            self._grpc_client_next_index = (idx + 1) % length
+            return idx

304-326: Tighten exception handling and release partially created channels.

Catching bare Exception and not closing locally created channels can leak resources on failures. Raise from the original error and close any channels created before the failure.

Apply:

-        try:
+        try:
             channel_pool = []
@@
-                self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+                self._grpc_channel_pool = channel_pool
+        except Exception as err:
+            # Best effort: close any channels created before the failure.
+            for ch in channel_pool:
+                try:
+                    ch.close()
+                except Exception:
+                    pass
+            raise RuntimeError("Error initializing the gRPC connection pool") from err

Notes:

  • Addresses BLE001/B904/TRY003 from static analysis.

48-48: Performance/behavior note: default REST vs gRPC pool sizes differ.

Given real‑world benchmarks showing pool_size=100 improves throughput ~1.5x vs 3, consider:

  • Keeping REST default (httpx) untouched unless user sets pool_size (current behavior).
  • Bumping DEFAULT_GRPC_POOL_SIZE based on benchmarks or making it None to mean “auto”.
    At minimum, document the asymmetry and recommend tuning in high‑concurrency workloads.

If you want, I can provide a quick aio/sync benchmark harness similar to the one shared in the PR to compare 3 vs 100 on your CI hardware.

Also applies to: 132-142, 288-299


48-48: Clarify DEFAULT_GRPC_POOL_SIZE=3 and REST pool_size behavior

DEFAULT_GRPC_POOL_SIZE is set to 3 in qdrant_client/qdrant_remote.py and async_qdrant_remote.py and is returned by _get_grpc_pool_size() when pool_size is unset. The REST client only applies httpx.Limits when pool_size is explicitly provided (localhost is a special-case that disables limits), so REST falls back to httpx's defaults otherwise. Add a short constructor docstring or docs note stating this; optionally align REST and gRPC defaults if consistency is desired.

qdrant_client/async_qdrant_remote.py (4)

57-57: Default gRPC pool size: clarify consistency stance.

Async sets DEFAULT_GRPC_POOL_SIZE=3. As with sync, document that REST keeps httpx defaults unless pool_size is provided.


258-274: Tighten exception handling and release partial channels.

Mirror the sync fix: don’t catch bare Exception without chaining, and close any created aio channels on failure.

Apply:

-        try:
+        try:
             channel_pool = []
@@
-                self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+                self._grpc_channel_pool = channel_pool
+        except Exception as err:
+            for ch in channel_pool:
+                try:
+                    await ch.close(grace=None)
+                except Exception:
+                    pass
+            raise RuntimeError("Error initializing the gRPC connection pool") from err

Notes:

  • Addresses BLE001/B904/TRY003 hints.
  • Since this method is sync, you can’t await; alternatively, drop the await and close best‑effort in finally later. If you prefer to keep it sync, do not await and just ignore close; or refactor _init_grpc_channel to async in the generator and adapt call sites.

Would you prefer we keep this method sync and skip best‑effort closes, or make it async in the generator?


300-305: Async RR: use actual pool length; lock likely unnecessary.

In asyncio (single thread), this method runs to completion without await, so race risk is minimal. Still, prefer modulo actual length for resilience.

Apply:

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (
-            self._grpc_client_next_index + 1
-        ) % self._get_grpc_pool_size()
-        return current_index
+    def _next_grpc_client(self) -> int:
+        length = len(self._grpc_channel_pool) or self._get_grpc_pool_size()
+        idx = self._grpc_client_next_index % length
+        self._grpc_client_next_index = (idx + 1) % length
+        return idx

57-57: Operational guidance: document tuning advice.

Recommend pool_size tuning in README/API docs (e.g., start at 32–128 for heavy parallel ingestion; keep httpx default unless constrained).

Also applies to: 84-88, 121-126

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0f51918 and 2e5e694.

📒 Files selected for processing (4)
  • qdrant_client/async_qdrant_client.py (2 hunks)
  • qdrant_client/async_qdrant_remote.py (11 hunks)
  • qdrant_client/qdrant_client.py (2 hunks)
  • qdrant_client/qdrant_remote.py (11 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • qdrant_client/qdrant_client.py
  • qdrant_client/async_qdrant_client.py
🧰 Additional context used
🧬 Code graph analysis (2)
qdrant_client/qdrant_remote.py (4)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/async_qdrant_remote.py (8)
  • close (210-231)
  • _get_grpc_pool_size (244-253)
  • _init_grpc_channel (255-274)
  • _init_grpc_points_client (276-280)
  • _init_grpc_collections_client (282-286)
  • _init_grpc_snapshots_client (288-292)
  • _init_grpc_root_client (294-298)
  • _next_grpc_client (300-305)
qdrant_client/common/client_warnings.py (1)
  • show_warning (7-8)
qdrant_client/connection.py (1)
  • get_channel (254-275)
qdrant_client/async_qdrant_remote.py (2)
qdrant_client/qdrant_remote.py (8)
  • close (254-275)
  • _get_grpc_pool_size (288-298)
  • _init_grpc_channel (300-325)
  • _init_grpc_points_client (327-331)
  • _init_grpc_collections_client (333-337)
  • _init_grpc_snapshots_client (339-343)
  • _init_grpc_root_client (345-349)
  • _next_grpc_client (351-356)
qdrant_client/connection.py (1)
  • get_channel (254-275)
🪛 Ruff (0.12.2)
qdrant_client/qdrant_remote.py

302-302: Avoid specifying long messages outside the exception class

(TRY003)


324-324: Do not catch blind exception: Exception

(BLE001)


325-325: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


325-325: Avoid specifying long messages outside the exception class

(TRY003)

qdrant_client/async_qdrant_remote.py

257-257: Avoid specifying long messages outside the exception class

(TRY003)


273-273: Do not catch blind exception: Exception

(BLE001)


274-274: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


274-274: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Redirect rules - poetic-froyo-8baba7
  • GitHub Check: Header rules - poetic-froyo-8baba7
  • GitHub Check: Pages changed - poetic-froyo-8baba7
  • GitHub Check: Python 3.12.x on ubuntu-latest test
  • GitHub Check: Python 3.13.x on ubuntu-latest test
  • GitHub Check: Python 3.9.x on ubuntu-latest test
  • GitHub Check: Python 3.11.x on ubuntu-latest test
  • GitHub Check: Python 3.10.x on ubuntu-latest test
🔇 Additional comments (7)
qdrant_client/qdrant_remote.py (2)

365-369: Minor: use local pool var for indexing (cleaner for mypy, consistent with length-based RR).

Keeps narrowing local and pairs with updated _next_grpc_client.
[raise_nitpick_refactor]
Apply (pattern repeated per property):

-        assert self._grpc_collections_client_pool is not None
-        return self._grpc_collections_client_pool[self._next_grpc_client()]
+        assert self._grpc_collections_client_pool is not None
+        pool = self._grpc_collections_client_pool
+        return pool[self._next_grpc_client()]

Also applies to: 377-381, 389-393, 401-405


254-265: Close(): unify warning category and consider logging unknown exceptions.

Using RuntimeWarning here while async uses UserWarning is inconsistent; also swallowing unexpected errors can hide issues.
[raise_nitpick_refactor]
Apply:

-                except AttributeError:
+                except AttributeError:
                     show_warning(
                         message="Unable to close grpc_channel. Connection was interrupted on the server side",
-                        category=RuntimeWarning,
+                        category=UserWarning,
                         stacklevel=4,
                     )
+                except Exception:
+                    logging.debug("Unexpected error while closing grpc_channel", exc_info=True)
qdrant_client/async_qdrant_remote.py (5)

84-88: Good: clamp pool_size to ≥1.

Matches sync; prevents invalid pools.


176-181: State init LGTM.

Pools and index initialized; clear separation per stub type.


210-222: Close(): minor consistency/logging nits.

Matching sync, consider logging unexpected exceptions; UserWarning category is fine.
[raise_nitpick_refactor]

-                except RuntimeError:
-                    pass
+                except RuntimeError:
+                    pass
+                except Exception:
+                    logging.debug("Unexpected error while closing aio grpc_channel", exc_info=True)

244-254: Pool size helper LGTM.

Encapsulates defaulting; good guidance to not access _pool_size directly.


314-317: Minor: narrow Optional pools locally before indexing.

Assign to a local var “pool” (post-assert) and index via that; pairs well with RR change and keeps mypy happy in generated code.
[raise_nitpick_refactor]
Apply (repeat pattern):

-        assert self._grpc_points_client_pool is not None
-        return self._grpc_points_client_pool[self._next_grpc_client()]
+        assert self._grpc_points_client_pool is not None
+        pool = self._grpc_points_client_pool
+        return pool[self._next_grpc_client()]

Also applies to: 326-329, 338-341, 350-353

Comment on lines +258 to +274
try:
channel_pool = []
if len(self._grpc_channel_pool) == 0:
for _ in range(self._get_grpc_pool_size()):
channel = get_channel(
host=self._host,
port=self._grpc_port,
ssl=self._https,
metadata=self._grpc_headers,
options=self._grpc_options,
compression=self._grpc_compression,
auth_token_provider=self._auth_token_provider,
)
channel_pool.append(channel)
self._grpc_channel_pool = channel_pool
except Exception as e:
raise RuntimeError(f"Error initializing the grpc connection(s): {e}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

🧩 Analysis chain

Generator note: changes must be applied in the transformer.

File is auto‑generated. Please update tools/async_client_generator to emit the improved RR and exception‑handling patterns so they persist.

Also applies to: 300-305, 314-353


🏁 Script executed:

#!/bin/bash
# Locate async_qdrant_remote.py and show header to verify auto-generated notice
file_path=$(fd async_qdrant_remote.py)
echo "Using file: $file_path"
head -n 20 "$file_path"

# Search for async_client_generator references in codebase
rg -n "async_client_generator" -C2 .

# List contents of tools directory if it exists
if [ -d tools ]; then
  echo "tools directory contents:"
  ls tools
else
  echo "tools directory not found"
fi

Length of output: 11216


Update generator to emit safe gRPC channel-pool initialization and robust exception handling

File is autogenerated — make the change in tools/async_client_generator so it persists: edit tools/async_client_generator/transformers/remote/function_def_transformer.py (and tools/async_client_generator/remote_generator.py if the remote snippet is composed there), then run tools/generate_async_client.sh to regenerate qdrant_client/async_qdrant_remote.py. Also applies to lines 300–305 and 314–353.

🧰 Tools
🪛 Ruff (0.12.2)

273-273: Do not catch blind exception: Exception

(BLE001)


274-274: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


274-274: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In qdrant_client/async_qdrant_remote.py around lines 258–274 (and similarly at
300–305 and 314–353), the autogenerated gRPC channel-pool initialization
swallows partial failures and can leak channels; update the generator
(tools/async_client_generator/transformers/remote/function_def_transformer.py
and tools/async_client_generator/remote_generator.py if used) so the emitted
code: (1) builds the pool incrementally, catching exceptions per-channel; (2) on
any failure closes/disposes any channels already created before re-raising; (3)
raises a clear RuntimeError including the original exception info (not just
string interpolation) so the traceback is preserved; and (4) guards against
concurrent initialization (e.g., check-and-set under a lock or double-checked
assignment) to avoid race conditions; after modifying the generator, run
tools/generate_async_client.sh to regenerate
qdrant_client/async_qdrant_remote.py.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
qdrant_client/qdrant_remote.py (2)

355-360: Make RR index thread‑safe and use actual pool length.
Avoids races and mis-modulo if pool size ever diverges from configured size.

Apply:

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (
-            self._grpc_client_next_index + 1
-        ) % self._get_grpc_pool_size()
-        return current_index
+    def _next_grpc_client(self) -> int:
+        # Protect against concurrent access
+        with self._grpc_rr_lock:  # type: ignore[attr-defined]
+            length = len(self._grpc_channel_pool)
+            if length <= 0:
+                return 0
+            current_index = self._grpc_client_next_index % length
+            self._grpc_client_next_index = (current_index + 1) % length
+            return current_index

213-219: Add locks for round‑robin and pool init to avoid races and leaks.
Concurrent threads can double‑init the pool and race the RR counter.

Apply:

@@
-        self._grpc_client_next_index: int = 0  # The next index to use
+        self._grpc_client_next_index: int = 0  # The next index to use
+        # Synchronization for multithreaded use
+        import threading  # local import acceptable; consider hoisting to module imports
+        self._grpc_rr_lock = threading.Lock()
+        self._grpc_init_lock = threading.Lock()
🧹 Nitpick comments (5)
qdrant_client/qdrant_remote.py (3)

258-269: grpc_grace is unused in sync close().
Either document as ignored for sync or remove the param to avoid confusion.


311-323: Auth token provider: reject async provider in sync client.
Passing an async callable to the sync interceptor will fail at runtime.

Consider validating and warning:

+        if callable(self._auth_token_provider) and getattr(self._auth_token_provider, "__await__", None):
+            show_warning("Async auth_token_provider is not supported in sync client.", category=UserWarning, stacklevel=4)
+            self._auth_token_provider = None

259-269: Nit: hasattr guard is redundant.
Field is always set in init.

-        if hasattr(self, "_grpc_channel_pool") and len(self._grpc_channel_pool) > 0:
+        if self._grpc_channel_pool:
qdrant_client/async_qdrant_remote.py (2)

259-278: Preserve original traceback; avoid bare Exception.
Raise from the caught error. Keeps debugging intact.

Apply:

-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+        except Exception as e:
+            raise RuntimeError("Error initializing the grpc connection(s).") from e

Note: This file is autogenerated; update the generator to persist.

Would you like me to propose the corresponding change in tools/async_client_generator so regeneration keeps this behavior?


304-309: Use actual pool length for RR modulo (consistency with sync fix).
Prevents future drift if pool size differs at runtime.

Apply:

-    def _next_grpc_client(self) -> int:
-        current_index = self._grpc_client_next_index
-        self._grpc_client_next_index = (
-            self._grpc_client_next_index + 1
-        ) % self._get_grpc_pool_size()
-        return current_index
+    def _next_grpc_client(self) -> int:
+        length = len(self._grpc_channel_pool)
+        if length <= 0:
+            return 0
+        current_index = self._grpc_client_next_index % length
+        self._grpc_client_next_index = (current_index + 1) % length
+        return current_index

Since this file is autogenerated, please update the generator accordingly.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2e5e694 and 6b114e1.

📒 Files selected for processing (5)
  • qdrant_client/async_qdrant_client.py (3 hunks)
  • qdrant_client/async_qdrant_remote.py (11 hunks)
  • qdrant_client/qdrant_client.py (3 hunks)
  • qdrant_client/qdrant_remote.py (11 hunks)
  • tools/async_client_generator/transformers/remote/function_def_transformer.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • tools/async_client_generator/transformers/remote/function_def_transformer.py
  • qdrant_client/async_qdrant_client.py
  • qdrant_client/qdrant_client.py
🧰 Additional context used
🧬 Code graph analysis (2)
qdrant_client/qdrant_remote.py (6)
qdrant_client/grpc/points_service_pb2_grpc.py (1)
  • PointsStub (8-156)
qdrant_client/grpc/collections_service_pb2_grpc.py (1)
  • CollectionsStub (8-81)
qdrant_client/grpc/snapshots_service_pb2_grpc.py (1)
  • SnapshotsStub (8-46)
qdrant_client/grpc/qdrant_pb2_grpc.py (1)
  • QdrantStub (8-21)
qdrant_client/async_qdrant_remote.py (8)
  • close (214-235)
  • _get_grpc_pool_size (248-257)
  • _init_grpc_channel (259-278)
  • _init_grpc_points_client (280-284)
  • _init_grpc_collections_client (286-290)
  • _init_grpc_snapshots_client (292-296)
  • _init_grpc_root_client (298-302)
  • _next_grpc_client (304-309)
qdrant_client/connection.py (1)
  • get_channel (254-275)
qdrant_client/async_qdrant_remote.py (3)
qdrant_client/qdrant_remote.py (8)
  • close (258-279)
  • _get_grpc_pool_size (292-302)
  • _init_grpc_channel (304-329)
  • _init_grpc_points_client (331-335)
  • _init_grpc_collections_client (337-341)
  • _init_grpc_snapshots_client (343-347)
  • _init_grpc_root_client (349-353)
  • _next_grpc_client (355-360)
qdrant_client/common/client_warnings.py (1)
  • show_warning (7-8)
qdrant_client/connection.py (1)
  • get_channel (254-275)
🪛 Ruff (0.12.2)
qdrant_client/qdrant_remote.py

142-145: Avoid specifying long messages outside the exception class

(TRY003)


306-306: Avoid specifying long messages outside the exception class

(TRY003)


328-328: Do not catch blind exception: Exception

(BLE001)


329-329: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


329-329: Avoid specifying long messages outside the exception class

(TRY003)

qdrant_client/async_qdrant_remote.py

127-129: Avoid specifying long messages outside the exception class

(TRY003)


261-261: Avoid specifying long messages outside the exception class

(TRY003)


277-277: Do not catch blind exception: Exception

(BLE001)


278-278: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


278-278: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (8)
  • GitHub Check: Redirect rules - poetic-froyo-8baba7
  • GitHub Check: Header rules - poetic-froyo-8baba7
  • GitHub Check: Pages changed - poetic-froyo-8baba7
  • GitHub Check: Python 3.13.x on ubuntu-latest test
  • GitHub Check: Python 3.9.x on ubuntu-latest test
  • GitHub Check: Python 3.12.x on ubuntu-latest test
  • GitHub Check: Python 3.11.x on ubuntu-latest test
  • GitHub Check: Python 3.10.x on ubuntu-latest test
🔇 Additional comments (4)
qdrant_client/qdrant_remote.py (2)

76-82: Good: pool_size is normalized and kept internal.
Consistent min-clamp and “access via helper” comment are solid.


132-146: Good: REST limits now use sanitized pool size and are mutually exclusive with limits.
Prevents invalid httpx configs; clear error on conflicts.

qdrant_client/async_qdrant_remote.py (2)

124-129: Good: REST limits use sanitized pool size and reject conflicts.
Matches sync behavior; prevents invalid configs.


214-227: Good: async close() iterates pool and tolerates interrupted connections.
Solid cleanup path.

Comment on lines 304 to 330
def _init_grpc_channel(self) -> None:
if self._closed:
raise RuntimeError("Client was closed. Please create a new QdrantClient instance.")

if self._grpc_channel is None:
self._grpc_channel = get_channel(
host=self._host,
port=self._grpc_port,
ssl=self._https,
metadata=self._grpc_headers,
options=self._grpc_options,
compression=self._grpc_compression,
# sync get_channel does not accept coroutine functions,
# but we can't check type here, since it'll get into async client as well
auth_token_provider=self._auth_token_provider, # type: ignore
)
try:
channel_pool = []

if len(self._grpc_channel_pool) == 0:
for _ in range(self._get_grpc_pool_size()):
channel = get_channel(
host=self._host,
port=self._grpc_port,
ssl=self._https,
metadata=self._grpc_headers,
options=self._grpc_options,
compression=self._grpc_compression,
# sync get_channel does not accept coroutine functions,
# but we can't check type here, since it'll get into async client as well
auth_token_provider=self._auth_token_provider, # type: ignore
)
channel_pool.append(channel)

# Apply the clients late to prevent half-initialized pools if a channel creation fails.
self._grpc_channel_pool = channel_pool
except Exception as e:
raise RuntimeError(f"Error initializing the grpc connection(s): {e}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Make gRPC pool init thread‑safe; clean up on partial failure; preserve cause.
Currently unsynchronized; may leak channels and loses traceback.

Apply:

-    def _init_grpc_channel(self) -> None:
-        if self._closed:
-            raise RuntimeError("Client was closed. Please create a new QdrantClient instance.")
-
-        try:
-            channel_pool = []
-
-            if len(self._grpc_channel_pool) == 0:
-                for _ in range(self._get_grpc_pool_size()):
-                    channel = get_channel(
-                        host=self._host,
-                        port=self._grpc_port,
-                        ssl=self._https,
-                        metadata=self._grpc_headers,
-                        options=self._grpc_options,
-                        compression=self._grpc_compression,
-                        # sync get_channel does not accept coroutine functions,
-                        # but we can't check type here, since it'll get into async client as well
-                        auth_token_provider=self._auth_token_provider,  # type: ignore
-                    )
-                    channel_pool.append(channel)
-
-                # Apply the clients late to prevent half-initialized pools if a channel creation fails.
-                self._grpc_channel_pool = channel_pool
-        except Exception as e:
-            raise RuntimeError(f"Error initializing the grpc connection(s): {e}")
+    def _init_grpc_channel(self) -> None:
+        if self._closed:
+            raise RuntimeError("Client was closed. Please create a new QdrantClient instance.")
+        # Ensure only one thread performs initialization
+        with self._grpc_init_lock:  # type: ignore[attr-defined]
+            if self._grpc_channel_pool:  # already initialized
+                return
+            channel_pool: list[grpc.Channel] = []
+            try:
+                for _ in range(self._get_grpc_pool_size()):
+                    channel = get_channel(
+                        host=self._host,
+                        port=self._grpc_port,
+                        ssl=self._https,
+                        metadata=self._grpc_headers,
+                        options=self._grpc_options,
+                        compression=self._grpc_compression,
+                        # sync get_channel does not accept coroutine functions
+                        auth_token_provider=self._auth_token_provider,  # type: ignore
+                    )
+                    channel_pool.append(channel)
+                # Apply late to avoid half-initialized state
+                self._grpc_channel_pool = channel_pool
+            except Exception as e:
+                # Best-effort cleanup of partially created channels
+                for ch in channel_pool:
+                    try:
+                        ch.close()
+                    except Exception:
+                        pass
+                raise RuntimeError("Error initializing the grpc connection(s).") from e

Committable suggestion skipped: line range outside the PR's diff.

🧰 Tools
🪛 Ruff (0.12.2)

306-306: Avoid specifying long messages outside the exception class

(TRY003)


328-328: Do not catch blind exception: Exception

(BLE001)


329-329: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


329-329: Avoid specifying long messages outside the exception class

(TRY003)

@JojiiOfficial JojiiOfficial merged commit b9fb492 into dev Sep 19, 2025
14 checks passed
@JojiiOfficial JojiiOfficial deleted the connection-pooling branch September 19, 2025 12:38
joein added a commit that referenced this pull request Nov 14, 2025
* Connection pooling in GRPC

* Undo auto formattings

* Make next_grpc_client() protected

* Prevent partial initialization of grpc clients on channel creation error

* also apply pool_size to rest client

* Fix using non clamped pool_size in httpx

* Review remarks

* fix: fix minor things (#1075)

---------

Co-authored-by: George <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants