Add REST API for reading audit logs across the cluster#8498
Conversation
Introduces a new `GET /audit/logs` endpoint that retrieves audit log entries from all peers in the cluster. The API supports filtering by time range (time_from/time_to), dynamic key=value field filters, and a built-in limit parameter to prevent reading too many logs at once. - Add `audit_reader` module in storage crate for efficient file-based log retrieval, selecting only files whose date range overlaps the query window - Add `GetAuditLog` internal gRPC RPC for cross-peer log retrieval - Add `GET /audit/logs` REST endpoint restricted to management access - Aggregate and sort results from all peers by timestamp (newest first) Made-with: Cursor
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds end-to-end audit log retrieval: a new internal gRPC method Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (3)
src/actix/api/audit_api.rs (1)
183-190: String-based timestamp sorting is acceptable for ISO-8601 but consider a comment.Lexicographic string comparison works correctly for ISO-8601 timestamps when they share the same format (RFC3339 with UTC). Since
AuditEventusesDateTime<Utc>, this is safe. A brief comment explaining this assumption would improve maintainability.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/actix/api/audit_api.rs` around lines 183 - 190, The sort currently compares timestamp JSON strings lexicographically in the closure used by all_entries.sort_by; because AuditEvent stores timestamps as DateTime<Utc> and the code expects RFC3339/ISO-8601 UTC strings, add a brief comment above this sort (referencing the sort_by closure and the "timestamp" field / AuditEvent::timestamp) stating that lexicographic comparison is safe here because timestamps are RFC3339/ISO-8601 in UTC and consistent across entries; this documents the assumption for future maintainers.src/tonic/api/qdrant_internal_api.rs (1)
157-161: Consider documenting or aligning thelimit=0behavior.When
limit == 0, this converts toNone, whichAuditLogQuery::newthen treats asDEFAULT_LIMIT(100). This differs from the REST endpoint, which validateslimitwithrange(min = 1)and rejects 0. The inconsistency may confuse internal callers.Consider either:
- Treating
limit=0as "use default" explicitly in the proto documentation, or- Returning an
invalid_argumenterror forlimit=0to match REST behavior🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/tonic/api/qdrant_internal_api.rs` around lines 157 - 161, Update the handling of req.limit in the qdrant_internal_api code: instead of silently converting limit == 0 to None (which causes AuditLogQuery::new to use DEFAULT_LIMIT), explicitly validate req.limit and return a tonic::Status::invalid_argument when req.limit == 0 to match the REST endpoint behavior; locate the conversion around the local variable limit (and references to AuditLogQuery::new) and replace the current if req.limit == 0 branch with an early return Err(tonic::Status::invalid_argument("limit must be >= 1")) (or alternatively, if you prefer the other approach, add/provide proto documentation indicating that limit=0 means "use default" and keep the existing conversion).lib/storage/src/audit_reader.rs (1)
141-152: File-level time filtering uses 1-day granularity for hourly files.For hourly rotation files (e.g.,
audit.2024-01-15-14.log), the filter still uses a 1-day window, which may include more files than necessary. Since entry-level filtering inmatches_queryhandles precision correctly, this is not a correctness bug—just suboptimal for large numbers of hourly files.Consider using hour-level granularity for hourly files in a follow-up optimization if performance becomes a concern.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/storage/src/audit_reader.rs` around lines 141 - 152, The file-level time filter in audit_reader.rs currently uses a 1-day window (using chrono::Duration::days(1)) around file_date when checking query.time_to and query.time_from; update the logic in the block that references file_date, query.time_to, and query.time_from to detect hourly-rotated filenames (e.g., pattern including an hour like audit.YYYY-MM-DD-HH.log) and use hour-level granularity (chrono::Duration::hours(1)) for those files instead of days(1), leaving daily files unchanged; keep entry-level precision in matches_query as-is and only change the coarse file-selection window for hourly files to avoid scanning extra files.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@lib/api/src/grpc/qdrant.rs`:
- Around line 13308-13309: Update the proto comment that currently reads
"Key=value filters applied to every JSON field" to specify that filters only
apply to dynamic top-level JSON fields (e.g., "Key=value filters applied to
dynamic top-level JSON fields only"), then regenerate the Rust bindings with
prost-build rather than editing lib/api/src/grpc/qdrant.rs directly; modify the
corresponding proto file where the map<string, string> field (tag = "3") is
defined, update the doc comment there, and run the project's proto
build/regeneration step so the new comment is reflected in the generated
qdrant.rs.
In `@lib/storage/src/audit_reader.rs`:
- Around line 56-71: The current loop reads audit files in ascending filename
order because log_files.sort() processes oldest files first, which can cause
results to hit query.limit before recent entries are read; change the ordering
so the newest files are processed first by sorting log_files in descending order
(e.g., use sort_by with b.cmp(a) or call reverse() after sort) before calling
filter_files_by_time_range and iterating to read_entries_from_file, ensuring the
loop that appends to results returns the most recent entries up to query.limit.
In `@src/actix/api/audit_api.rs`:
- Around line 133-134: The parsed timeout value (let timeout =
params.timeout.unwrap_or(DEFAULT_GRPC_TIMEOUT.as_secs()); and the params.timeout
validation) is computed but deliberately ignored (_ = timeout), which is
misleading; either wire this timeout through to your outbound gRPC calls (e.g.,
convert seconds to a Duration and set it as the request deadline/timeout on the
gRPC client used in the audit API call) so cross-peer requests honor
user-specified timeouts, or remove the timeout parameter and its validation from
the API to avoid exposing a non-functional option; update the code paths that
create/dispatch the gRPC request (the place where the audit API sends peer
requests) to accept and apply this Duration if you choose to implement it.
- Line 119: The code currently silences errors by using unwrap_or_default() on
the call to read_local_audit_logs in the handler, which can mask
I/O/configuration issues; change the handling so errors are propagated or at
least logged: replace unwrap_or_default() on read_local_audit_logs with proper
error handling inside the gRPC handler (e.g., map the error to a tonic::Status
and return Err from the handler or call process_logger/error! with the error and
continue only if partial results are acceptable), referencing the
read_local_audit_logs call and the local_entries binding so the handler either
returns the mapped error or logs the failure before using default results.
---
Nitpick comments:
In `@lib/storage/src/audit_reader.rs`:
- Around line 141-152: The file-level time filter in audit_reader.rs currently
uses a 1-day window (using chrono::Duration::days(1)) around file_date when
checking query.time_to and query.time_from; update the logic in the block that
references file_date, query.time_to, and query.time_from to detect
hourly-rotated filenames (e.g., pattern including an hour like
audit.YYYY-MM-DD-HH.log) and use hour-level granularity
(chrono::Duration::hours(1)) for those files instead of days(1), leaving daily
files unchanged; keep entry-level precision in matches_query as-is and only
change the coarse file-selection window for hourly files to avoid scanning extra
files.
In `@src/actix/api/audit_api.rs`:
- Around line 183-190: The sort currently compares timestamp JSON strings
lexicographically in the closure used by all_entries.sort_by; because AuditEvent
stores timestamps as DateTime<Utc> and the code expects RFC3339/ISO-8601 UTC
strings, add a brief comment above this sort (referencing the sort_by closure
and the "timestamp" field / AuditEvent::timestamp) stating that lexicographic
comparison is safe here because timestamps are RFC3339/ISO-8601 in UTC and
consistent across entries; this documents the assumption for future maintainers.
In `@src/tonic/api/qdrant_internal_api.rs`:
- Around line 157-161: Update the handling of req.limit in the
qdrant_internal_api code: instead of silently converting limit == 0 to None
(which causes AuditLogQuery::new to use DEFAULT_LIMIT), explicitly validate
req.limit and return a tonic::Status::invalid_argument when req.limit == 0 to
match the REST endpoint behavior; locate the conversion around the local
variable limit (and references to AuditLogQuery::new) and replace the current if
req.limit == 0 branch with an early return
Err(tonic::Status::invalid_argument("limit must be >= 1")) (or alternatively, if
you prefer the other approach, add/provide proto documentation indicating that
limit=0 means "use default" and keep the existing conversion).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: dfd69b2c-3cb7-4275-9753-71cb24a70836
📒 Files selected for processing (8)
lib/api/src/grpc/proto/qdrant_internal_service.protolib/api/src/grpc/qdrant.rslib/storage/src/audit_reader.rslib/storage/src/lib.rssrc/actix/api/audit_api.rssrc/actix/api/mod.rssrc/actix/mod.rssrc/tonic/api/qdrant_internal_api.rs
…ion granulatity (either hourly or daily)
…e, instead of manual parsing
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
lib/storage/src/audit_reader.rs (1)
59-60:⚠️ Potential issue | 🟠 MajorLimiting during forward scans drops the newest matches.
These loops fill the result set oldest-first: filenames are sorted ascending, and
reader.lines()starts at the beginning of each file. Oncequery.limit/remainingis hit, the reader stops before it ever reaches newer offsets or newer files. The later merge/sort insrc/common/audit.rscannot fix that, because those entries were never loaded.Please apply the limit only after traversing newest→oldest data for the selected window, or keep the last
remainingmatches while scanning each file.Also applies to: 66-73, 187-200
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@lib/storage/src/audit_reader.rs` around lines 59 - 60, The current scan orders files oldest→newest and applies query.limit/remaining as it goes, which drops newer matches; change the traversal to newest→oldest or retain the last N matches per file: reverse the file ordering returned by list_audit_files (e.g., sort and then .reverse() or sort_by(|a,b| b.cmp(a))) so log_files is iterated newest-first, and/or when processing reader.lines() for each file (the loop that checks query.limit / remaining) buffer the per-file matches and keep only the last `remaining` items before appending to results so the global limit is applied after newest-first traversal (adjust logic around the reader.lines() loop and the remaining/limit checks accordingly).src/common/audit.rs (1)
25-30:⚠️ Potential issue | 🟠 MajorDon't hide coordinator read failures.
unwrap_or_default()turns a local I/O/configuration failure into a success response with an incomplete result set, and unlike remote failures nothing is added tomissing_peersto tell the caller the coordinator's own logs were skipped.💡 Suggested change
let local_entries = cancel::blocking::spawn_cancel_on_drop(move |cancel| { read_local_audit_logs(&config, &query_clone, &cancel) }) .await - .map_err(|e| StorageError::service_error(format!("Failed to read local audit logs: {e}")))? - .unwrap_or_default(); + .map_err(|e| StorageError::service_error(format!("Failed to read local audit logs: {e}")))??;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/common/audit.rs` around lines 25 - 30, The current call to cancel::blocking::spawn_cancel_on_drop(...).await...unwrap_or_default() hides coordinator-local failures by turning them into an empty success; change this so coordinator read failures are not swallowed: remove unwrap_or_default() and either propagate the error (e.g., return Err(StorageError::service_error(...)) when the awaited result is None) or explicitly record the coordinator in missing_peers so callers know local logs were skipped; update the handling around local_entries (the result of read_local_audit_logs) and the downstream missing_peers logic to reflect the chosen behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/actix/api/audit_api.rs`:
- Around line 46-56: The parse_filters function must fail-fast instead of
dropping bad input: change fn parse_filters(query_string: &str) ->
HashMap<String, String> to return a Result<HashMap<String,String>, Error> (or a
custom error) and propagate urlencoding::decode errors (don’t unwrap_or_default)
and return an error when a decoded filter lacks the required key=value pair;
update callers to map that error to a 400 Bad Request response for the audit
endpoint. Ensure you replace the unwrap_or_default() on urlencoding::decode with
proper error handling (map_err/ ? ), validate decoded.split_once('=') and return
an explicit parse error if missing, and apply the same change to any other
filter-parsing sites that use the same logic so malformed filter= params produce
a 400 instead of silently being ignored.
In `@src/common/audit.rs`:
- Around line 93-98: The current sort uses lexical comparison of the
TIMESTAMP_KEY string which misorders RFC3339 timestamps with optional fractional
seconds; update the sort in the all_entries.sort_by closure to parse each
entry's timestamp string into a chrono::DateTime (e.g. DateTime<Utc> via
chrono::DateTime::parse_from_rfc3339 or FromStr) and compare the parsed
DateTimes, falling back to a sensible default (like MIN or MAX datetime or the
string comparison) when parsing fails; ensure the comparison still yields
newest-first order (i.e., compare parsed_b.cmp(&parsed_a)).
---
Duplicate comments:
In `@lib/storage/src/audit_reader.rs`:
- Around line 59-60: The current scan orders files oldest→newest and applies
query.limit/remaining as it goes, which drops newer matches; change the
traversal to newest→oldest or retain the last N matches per file: reverse the
file ordering returned by list_audit_files (e.g., sort and then .reverse() or
sort_by(|a,b| b.cmp(a))) so log_files is iterated newest-first, and/or when
processing reader.lines() for each file (the loop that checks query.limit /
remaining) buffer the per-file matches and keep only the last `remaining` items
before appending to results so the global limit is applied after newest-first
traversal (adjust logic around the reader.lines() loop and the remaining/limit
checks accordingly).
In `@src/common/audit.rs`:
- Around line 25-30: The current call to
cancel::blocking::spawn_cancel_on_drop(...).await...unwrap_or_default() hides
coordinator-local failures by turning them into an empty success; change this so
coordinator read failures are not swallowed: remove unwrap_or_default() and
either propagate the error (e.g., return Err(StorageError::service_error(...))
when the awaited result is None) or explicitly record the coordinator in
missing_peers so callers know local logs were skipped; update the handling
around local_entries (the result of read_local_audit_logs) and the downstream
missing_peers logic to reflect the chosen behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 62fd7715-05c4-4314-8554-c208fede2f69
📒 Files selected for processing (4)
lib/storage/src/audit_reader.rssrc/actix/api/audit_api.rssrc/common/audit.rssrc/tonic/api/qdrant_internal_api.rs
| limit: query.limit as u64, | ||
| }; | ||
|
|
||
| let all_peers: Vec<_> = channel_service |
There was a problem hiding this comment.
channel_service.id_to_address contains the local peer (see channel_service.rs:87 where await_commit_on_all_peers filters it with .filter(|id| **id != this_peer_id)). Since read_local_audit_logs is already called on line 25, the gRPC call to self duplicates every local entry.
Tested on a 3-node cluster — single create_collection sent to Node2:
$ curl -s "http://localhost:6433/audit/logs?filter=method=create_collection"
{
"entries": [
{
"timestamp": "2026-03-24T18:05:36.610713Z",
"method": "create_collection",
"auth_type": "None",
"remote": "127.0.0.1",
"result": "ok"
},
{
"timestamp": "2026-03-24T18:05:36.610713Z",
"method": "create_collection",
"auth_type": "None",
"remote": "127.0.0.1",
"result": "ok"
}
]
}1 entry in the file, 2 in the response. Every local entry is duplicated.
| } | ||
|
|
||
| #[get("/audit/logs")] | ||
| async fn get_audit_logs( |
There was a problem hiding this comment.
log_files.sort() sorts ascending (oldest file first), then read_entries_from_file reads top-to-bottom and stops at query.limit. This collects the N oldest entries per node. But fetch_cluster_audit_logs sorts descending ("newest first") after merge — so the actually newest entries never make it past the per-node limit.
Tested on a single node with 21 entries in the file:
$ tail -1 /tmp/qdrant-node1/storage/audit/audit.2026-03-24.log | jq .timestamp
"2026-03-24T18:12:54.633194Z"
$ curl -s "http://localhost:6333/audit/logs?limit=1" | jq .result.entries[0].timestamp
"2026-03-24T18:01:11.493952Z"
``` The newest entry is from 18:12:54, but the API returns 18:01:11 — over 11 minutes older. The entries in between are lost because the reader stopped after collecting 5 oldest lines from the top of the file.| }) | ||
| .await | ||
| .map_err(|e| StorageError::service_error(format!("Failed to read local audit logs: {e}")))? | ||
| .unwrap_or_default(); |
There was a problem hiding this comment.
.unwrap_or_default() silently converts a local StorageError into an empty Vec. If the audit directory is unreadable, the response is "status": "ok" with no indication that local entries are missing.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/common/audit.rs (1)
63-76: Consider logging malformed JSON entries.Lines 64-66 and 73-75 silently skip entries that fail JSON parsing. While defensive handling is appropriate, logging a warning would help diagnose data corruption or unexpected formats in audit logs.
💡 Optional: Add warning logs for parse failures
for entry_str in &local_entries { - if let Ok(val) = serde_json::from_str::<serde_json::Value>(entry_str) { - all_entries.push(val); + match serde_json::from_str::<serde_json::Value>(entry_str) { + Ok(val) => all_entries.push(val), + Err(e) => log::warn!("Skipping malformed local audit log entry: {e}"), } }Apply the same pattern for peer entries at lines 73-75.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/common/audit.rs` around lines 63 - 76, The JSON parsing currently ignores failures in the local_entries loop and the peer response loop; update both spots (where serde_json::from_str::<serde_json::Value>(entry_str) is called in the local_entries iteration and inside the loop over response.into_inner().entries) to log a warning when parsing fails—include the offending entry_str and, for the peer case, the peer_id/context—to aid debugging; use the project's logging facility (e.g., log::warn! or tracing::warn!) and keep pushing valid parsed values into all_entries as before.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/common/audit.rs`:
- Around line 63-76: The JSON parsing currently ignores failures in the
local_entries loop and the peer response loop; update both spots (where
serde_json::from_str::<serde_json::Value>(entry_str) is called in the
local_entries iteration and inside the loop over response.into_inner().entries)
to log a warning when parsing fails—include the offending entry_str and, for the
peer case, the peer_id/context—to aid debugging; use the project's logging
facility (e.g., log::warn! or tracing::warn!) and keep pushing valid parsed
values into all_entries as before.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: ba8154ce-f606-4c58-8543-d1a62a416b20
📒 Files selected for processing (3)
lib/collection/src/shards/channel_service.rssrc/actix/api/audit_api.rssrc/common/audit.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/actix/api/audit_api.rs
* Add REST API for reading audit logs across the cluster Introduces a new `GET /audit/logs` endpoint that retrieves audit log entries from all peers in the cluster. The API supports filtering by time range (time_from/time_to), dynamic key=value field filters, and a built-in limit parameter to prevent reading too many logs at once. - Add `audit_reader` module in storage crate for efficient file-based log retrieval, selecting only files whose date range overlaps the query window - Add `GetAuditLog` internal gRPC RPC for cross-peer log retrieval - Add `GET /audit/logs` REST endpoint restricted to management access - Aggregate and sort results from all peers by timestamp (newest first) Made-with: Cursor * [AI] Update filter_files_by_time_range make it aware of the log rotation granulatity (either hourly or daily) * [AI] For AuditLogParams, try to use DateTime types natively into serde, instead of manual parsing * [AI] Refactor API into separate file, propagate timeout, use spawn-blocking * [AI] introduce cancellation token * [AI] move timestamp to constant * small manual fixes * review fixes part 1 * review: switch to POST instead of GET * [AI] review: sorting update * [AI] use strict typing --------- Co-authored-by: Cursor Agent <[email protected]> Co-authored-by: Andrey Vasnetsov <[email protected]>
* Add REST API for reading audit logs across the cluster Introduces a new `GET /audit/logs` endpoint that retrieves audit log entries from all peers in the cluster. The API supports filtering by time range (time_from/time_to), dynamic key=value field filters, and a built-in limit parameter to prevent reading too many logs at once. - Add `audit_reader` module in storage crate for efficient file-based log retrieval, selecting only files whose date range overlaps the query window - Add `GetAuditLog` internal gRPC RPC for cross-peer log retrieval - Add `GET /audit/logs` REST endpoint restricted to management access - Aggregate and sort results from all peers by timestamp (newest first) Made-with: Cursor * [AI] Update filter_files_by_time_range make it aware of the log rotation granulatity (either hourly or daily) * [AI] For AuditLogParams, try to use DateTime types natively into serde, instead of manual parsing * [AI] Refactor API into separate file, propagate timeout, use spawn-blocking * [AI] introduce cancellation token * [AI] move timestamp to constant * small manual fixes * review fixes part 1 * review: switch to POST instead of GET * [AI] review: sorting update * [AI] use strict typing --------- Co-authored-by: Cursor Agent <[email protected]> Co-authored-by: Andrey Vasnetsov <[email protected]>
Promt:
AI generated from here
Summary
GET /audit/logsREST endpoint for reading audit log entries from all peers in the clusterGetAuditLogRPC onQdrantInternalfor cross-peer audit log retrievalaudit_readermodule in storage crate for efficient file-based log readingDetails
REST API (
GET /audit/logs)Query parameters:
time_from— ISO-8601 start time (inclusive)time_to— ISO-8601 end time (exclusive)filter=key=value— dynamic field filter (repeatable, e.g.filter=method=upsert_points&filter=result=ok)limit— max entries to return (default 100, max 10,000)timeout— timeout in seconds for cross-peer requestsThe endpoint requires management access. It reads local audit logs, fans out to all remote peers via internal gRPC, aggregates results sorted by timestamp (newest first), and truncates to the limit.
Efficient file selection
The audit reader parses dates from log filenames (
audit.YYYY-MM-DD.logfor daily,audit.YYYY-MM-DD-HH.logfor hourly rotation) and only reads files whose date range overlaps the requested time window.Filtering
Filters are dynamic key=value pairs matched against top-level JSON fields in audit entries. Keys are not hardcoded — any field present in the JSONL audit log entries can be filtered on (e.g.
method,result,auth_type,subject,collection,remote).Files changed
lib/storage/src/audit_reader.rssrc/actix/api/audit_api.rslib/api/src/grpc/proto/qdrant_internal_service.protoGetAuditLogRPC + request/response messageslib/api/src/grpc/qdrant.rssrc/tonic/api/qdrant_internal_api.rslib/storage/src/lib.rssrc/actix/api/mod.rssrc/actix/mod.rsTest plan
cargo build— clean, no warningscargo clippy— cleancargo +nightly fmt— cleanMade with Cursor