Skip to content

Conversation

@hengfeiyang
Copy link
Contributor

@hengfeiyang hengfeiyang commented Oct 20, 2025

PR Type

Bug fix, Enhancement


Description

  • Make AuthExtractor async, return Future

  • Extract auth from session stored in DB

  • Add async extract_auth_str helper

  • Improve session cache miss handling


Diagram Walkthrough

flowchart LR
  Req["HttpRequest"] -- async FromRequest --> AuthExtractor["AuthExtractor"]
  AuthExtractor -- calls --> extract_auth_str["extract_auth_str (async)"]
  extract_auth_str -- session key --> SessionDB["service::db::session::get"]
  SessionDB -- cache miss -> DB["KV DB"]
  SessionDB -- cache hit -> USER_SESSIONS["In-memory cache"]
  AuthExtractor -- produces --> AuthContext["AuthExtractor instance"]
Loading

File Walkthrough

Relevant files
Enhancement
4 files
auth.rs
Make auth extraction async; support session tokens; refactor extractor
+564/-512
mod.rs
Simplify policy extraction with as_deref                                 
+1/-1     
query_optimization_recommendation.rs
Adjust ingester usage and feature gate                                     
+2/-5     
resource_extractor.rs
Use basic-auth-only extractor for ratelimit rules               
+2/-2     
Bug fix
3 files
chat.rs
Use async auth extraction in chat stream                                 
+1/-1     
mod.rs
Use async auth extraction in logout handler                           
+1/-1     
session.rs
Read sessions from DB on cache miss and cache results       
+16/-11 

@hengfeiyang hengfeiyang requested a review from Loaki07 October 20, 2025 11:27
@github-actions github-actions bot added the ☢️ Bug Something isn't working label Oct 20, 2025
@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 Security concerns

Token handling:
The new async extract_auth_str reads session tokens from cookies and falls back to DB; some paths use unwrap on header/cookie parsing which can lead to denial-of-service via panics on malformed inputs. Additionally, logging a session_id on cache miss (log::warn in session get) may expose sensitive identifiers in logs. Consider reducing log verbosity or redacting session IDs, and replace unwraps with safe handling.

⚡ Recommended focus areas for review

Possible Panic

Several uses of unwrap/unwrap_or_default on header/cookie decoding and to_str conversions in the new async extractor paths may panic or silently mask errors (e.g., regex/auth cookie/base64 decode). Consider graceful error handling instead of unwrap to avoid crashing on malformed inputs.

        Box::pin(async move {
            let start = std::time::Instant::now();

            let query =
                web::Query::<HashMap<String, String>>::from_query(req.query_string()).unwrap();
            let stream_type = get_stream_type_from_request(&query);

            let folder = get_folder(&query);

            let mut method = req.method().to_string();
            let local_path = req.path().to_string();
            let path = match local_path
                .strip_prefix(format!("{}/api/", config::get_config().common.base_uri).as_str())
            {
                Some(path) => path,
                None => &local_path,
            };

            let path_columns = path.split('/').collect::<Vec<&str>>();
            let url_len = path_columns.len();
            let org_id = if url_len > 1 && path_columns[0].eq(V2_API_PREFIX) {
                path_columns[1].to_string()
            } else {
                path_columns[0].to_string()
            };

            // This is case for ingestion endpoints where we need to check
            // permissions on the stream
            if method.eq("POST") && INGESTION_EP.contains(&path_columns[url_len - 1]) {
                if let Some(auth_header) = req.headers().get("Authorization")
                    && let Ok(auth_str) = auth_header.to_str()
                {
                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method,
                        o2_type: format!("stream:{org_id}"),
                        org_id,
                        bypass_check: true,
                        parent_id: folder,
                    });
                }
                return Err(actix_web::error::ErrorUnauthorized("Unauthorized Access"));
            }

            // get ofga object type from the url
            // depends on the url path count
            let object_type = if url_len == 1 {
                // for organization entity itself, get requires the list
                // permissions, and the object is a special format string
                if path_columns[0].eq("organizations") {
                    if method.eq("GET") {
                        method = "LIST".to_string();
                    };

                    "org:##user_id##".to_string()
                } else if path_columns[0].eq("invites") && method.eq("GET") {
                    let auth_str = extract_auth_str(&req).await;
                    // because the /invites route is checked by user_id,
                    // and does not return any other info, we can bypass the auth
                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method: "GET".to_string(),
                        o2_type: "".to_string(),
                        org_id: "".to_string(),
                        bypass_check: true, // bypass check permissions
                        parent_id: "".to_string(),
                    });
                } else {
                    path_columns[0].to_string()
                }
            } else if url_len == 2 || (url_len > 2 && path_columns[1].eq("settings")) {
                // for settings, the post/delete require PUT permissions, GET needs LIST permissions
                // also the special settings exception is for 3-part urls for logo /text
                // which are of path /org/settings/logo , which need permission of operating
                // on permission in general
                if path_columns[1].eq("settings") {
                    if method.eq("POST") || method.eq("DELETE") {
                        method = "PUT".to_string();
                    }
                } else if method.eq("GET") {
                    method = "LIST".to_string();
                }

                if path_columns[0].eq("invites") && method.eq("DELETE") {
                    let auth_str = extract_auth_str(&req).await;
                    // because the delete /invites/token route is checked by user_id,
                    // and does not return any other info, we can bypass the auth
                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method: "DELETE".to_string(),
                        o2_type: "".to_string(),
                        org_id: "".to_string(),
                        bypass_check: true, // bypass check permissions
                        parent_id: "".to_string(),
                    });
                }

                // this will take format of settings:{org_id} or pipelines:{org_id} etc
                let key = if path_columns[1].eq("invites") {
                    "users"
                } else if (path_columns[1].eq("rename")
                    || path_columns[1].eq("extend_trial_period"))
                    && method.eq("PUT")
                {
                    "organizations"
                } else {
                    path_columns[1]
                };

                // for organization api changes we need perms on _all_{org}
                let entity = match (key, path_columns[1]) {
                    ("organizations", "extend_trial_period") => "_all__meta".to_string(),
                    ("organizations", "organizations") => path_columns[0].to_string(),
                    ("organizations", _) => format!("_all_{}", path_columns[0]),
                    _ => path_columns[0].to_string(),
                };

                format!(
                    "{}:{}",
                    OFGA_MODELS.get(key).map_or(key, |model| model.key),
                    entity
                )
            } else if path_columns[1].eq("groups") || path_columns[1].eq("roles") {
                // for groups or roles, path will be of format /org/roles/id , so we need
                // to check permission on role:org/id for permissions on that specific role
                format!(
                    "{}:{org_id}/{}",
                    OFGA_MODELS
                        .get(path_columns[1])
                        .map_or(path_columns[1], |model| model.key),
                    path_columns[2]
                )
            } else if url_len == 3 {
                // Handle /v2 alert apis
                if path_columns[0].eq(V2_API_PREFIX) && path_columns[2].eq("alerts") {
                    if method.eq("GET") {
                        method = "LIST".to_string();
                    }
                    format!(
                        "{}:{}",
                        OFGA_MODELS.get("alert_folders").unwrap().key,
                        folder
                    )
                } else if path_columns[1] == "re_patterns" && path_columns[2] == "test" {
                    // specifically for testing re_patterns we need get permissions
                    // on re patterns
                    method = "LIST".to_string();
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[0]
                    )
                }
                // these are cases where the entity is "sub-entity" of some other entity,
                // for example, alerts are on route /org/stream/alerts
                // or templates are on route /org/alerts/templates and so on
                // users/roles is one of the special exception here
                else if path_columns[2].eq("alerts")
                    || path_columns[2].eq("templates")
                    || path_columns[2].eq("destinations")
                    || path.ends_with("users/roles")
                {
                    if method.eq("GET") {
                        method = "LIST".to_string();
                    }
                    if method.eq("PUT") || method.eq("DELETE") || path_columns[1].eq("search_jobs")
                    {
                        // for put/delete actions i.e. updations, we need permissions
                        // on that particular "sub-entity", and this will take form of
                        // alert:templates or alerts:destinations or stream:alerts
                        // search jobs also fall under this 3 length case
                        format!(
                            "{}:{}",
                            OFGA_MODELS
                                .get(path_columns[1])
                                .map_or(path_columns[1], |model| model.key),
                            path_columns[2]
                        )
                    } else {
                        // otherwise for listing/creating we need permissions on that "sub-entity"
                        // in general such as org:templates or org:destinations or org:alerts
                        format!(
                            "{}:{}",
                            OFGA_MODELS
                                .get(path_columns[2])
                                .map_or(path_columns[2], |model| model.key),
                            path_columns[0]
                        )
                    }
                } else if path_columns[2].starts_with("_values")
                    || path_columns[2].starts_with("_around")
                {
                    if method.eq("POST") {
                        // For _around search, the rbac check will be "GET"
                        method = "GET".to_string();
                    }
                    // special case of _values/_around , where we need permission on that stream,
                    // as it is part of search, but still 3-part route
                    format!(
                        "{}:{}",
                        OFGA_MODELS.get("streams").unwrap().key,
                        path_columns[1]
                    )
                } else if path_columns[1].starts_with("rename") {
                    // Org rename
                    format!(
                        "{}:{}",
                        OFGA_MODELS.get("organizations").unwrap().key,
                        org_id
                    )
                } else if path_columns[1].eq("invites") && method.eq("DELETE") {
                    // this is specifically for deleting an existing invite
                    let key = "users";
                    let entity = path_columns[0].to_string();
                    format!(
                        "{}:{}",
                        OFGA_MODELS.get(key).map_or(key, |model| model.key),
                        entity
                    )
                } else if (method.eq("PUT") && !path_columns[1].starts_with("ratelimit"))
                    || method.eq("DELETE")
                    || path_columns[1].eq("reports")
                    || path_columns[1].eq("savedviews")
                    || path_columns[1].eq("functions")
                    || path_columns[1].eq("service_accounts")
                    || path_columns[1].eq("cipher_keys")
                {
                    // Similar to the alerts/templates etc, but for other entities such as specific
                    // pipeline, specific stream, specific alert/destination etc.
                    // and these are not "sub-entities" under some other entities, hence
                    // a separate else-if clause
                    // Similarly, for the put/delete or any operation on these
                    // entities, we need access to that particular item
                    // so url will be of form /org/reports/name or /org/functions/name etc.
                    // nd this will take form name:reports or name:function
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[2]
                    )
                } else if method.eq("GET")
                    && (path_columns[1].eq("dashboards")
                        || path_columns[1].eq("folders")
                        || path_columns[1].eq("actions"))
                {
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[2] // dashboard id
                    )
                } else {
                    // for things like dashboards and folders etc,
                    // this will take form org:dashboard or org:folders

                    // handle ratelimit:org
                    if method.eq("GET") && path_columns[1].starts_with("ratelimit") {
                        method = "LIST".to_string();
                    }

                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[0]
                    )
                }
            } else if url_len == 4 {
                // Handle /v2 alert apis
                if path_columns[0].eq(V2_API_PREFIX) {
                    if path_columns[2].eq("alerts") {
                        format!(
                            "{}:{}",
                            OFGA_MODELS
                                .get(path_columns[2])
                                .map_or(path_columns[2], |model| model.key),
                            path_columns[3]
                        )
                    } else {
                        if method.eq("GET") {
                            method = "LIST".to_string();
                        }
                        let ofga_type = if path_columns[3].eq("alerts") {
                            "alert_folders"
                        } else {
                            "folders"
                        };
                        format!(
                            "{}:{}",
                            OFGA_MODELS
                                .get(ofga_type)
                                .map_or(ofga_type, |model| model.key),
                            path_columns[1]
                        )
                    }
                }
                // this is for specific sub-items like specific alert, destination etc.
                // and sub-items such as schema, stream settings, or enabling/triggering reports
                else if method.eq("PUT") && path_columns[1].eq("reports") {
                    // for report enable/trigger, we need permissions on that specific
                    // report, so this will be name:reports
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[2]
                    )
                } else if method.eq("PUT")
                    && path_columns[1] != "streams"
                    && path_columns[1] != "pipelines"
                    || method.eq("DELETE") && path_columns[3] != "annotations"
                {
                    // for put on on-stream, non-pipeline such as specific
                    // alert/template/destination or delete on any such
                    // (stream/pipeline delete are not 4-part routes)
                    // this will take form of name:alert or name:destination or name:template etc
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[2])
                            .map_or(path_columns[2], |model| model.key),
                        path_columns[3]
                    )
                } else if method.eq("GET")
                    && path_columns[1].eq("folders")
                    && path_columns[2].eq("name")
                {
                    // To search with folder name, you need GET permission on all folders
                    format!(
                        "{}:_all_{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[0]
                    )
                } else if method.eq("GET")
                    && path_columns[1].eq("actions")
                    && path_columns[2].eq("download")
                {
                    // To access actions download name, you need GET permission on actions
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[3]
                    )
                } else if method.eq("GET")
                    && (path_columns[2].eq("templates")
                        || path_columns[2].eq("destinations")
                        || path_columns[2].eq("alerts"))
                {
                    // To access templates, you need GET permission on the template
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[2])
                            .map_or(path_columns[2], |model| model.key),
                        path_columns[3]
                    )
                } else {
                    // for other get/put requests on any entities such as templates,
                    // alerts, enable pipeline, update dashboard etc, we need permission
                    // on that entity in general, this will take form of
                    // alerts:destinations or roles:role_name or stream_name:alerts etc
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[2]
                    )
                }
            } else if method.eq("PUT") || method.eq("DELETE") || method.eq("PATCH") {
                // this block is for all other urls
                // specifically checking PUT /org_id/streams/stream_name/delete_fields
                // even though method is put, we actually need to check delete permissions
                if path_columns[url_len - 1].eq("delete_fields") {
                    method = "DELETE".to_string();
                }

                if method.eq("PATCH") {
                    method = "PUT".to_string();
                }

                // Handle /v2 folders apis
                if path_columns[0].eq(V2_API_PREFIX) && path_columns[2].eq("folders") {
                    let ofga_type = if path_columns[3].eq("alerts") {
                        "alert_folders"
                    } else {
                        "folders"
                    };
                    if url_len == 6 {
                        // Should check for all_org permissions
                        format!(
                            "{}:{}",
                            OFGA_MODELS.get(ofga_type).unwrap().key,
                            path_columns[1]
                        )
                    } else {
                        format!(
                            "{}:{}",
                            OFGA_MODELS.get(ofga_type).unwrap().key,
                            path_columns[4]
                        )
                    }
                }
                //  this is specifically for enabling alerts
                else if path_columns[url_len - 1].eq("enable") {
                    // this will take form name:alert
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[2])
                            .map_or(path_columns[2], |model| model.key),
                        path_columns[3]
                    )
                } else {
                    // This is specifically for triggering the alert on url
                    // /org_id/stream_name/alerts/alert_name/trigger
                    // and will take form stream_name:alerts
                    format!(
                        "{}:{}",
                        OFGA_MODELS
                            .get(path_columns[1])
                            .map_or(path_columns[1], |model| model.key),
                        path_columns[2]
                    )
                }
            } else {
                // This is the final catch-all for what did not fit in above cases,
                // and for the prometheus urls this will be ignored below.
                format!(
                    "{}:{}",
                    OFGA_MODELS
                        .get(path_columns[1])
                        .map_or(path_columns[1], |model| model.key),
                    path_columns[2]
                )
            };

            // Check if the ws request is using internal grpc token
            if method.eq("GET")
                && path.contains("/ws")
                && let Some(auth_header) = req.headers().get("Authorization")
                && auth_header
                    .to_str()
                    .unwrap()
                    .eq(&get_config().grpc.internal_grpc_token)
            {
                return Ok(AuthExtractor {
                    auth: auth_header.to_str().unwrap().to_string(),
                    method,
                    o2_type: format!("stream:{org_id}"),
                    org_id,
                    bypass_check: true,
                    parent_id: folder,
                });
            }

            let auth_str = extract_auth_str(&req).await;

            // if let Some(auth_header) = req.headers().get("Authorization") {
            if !auth_str.is_empty() {
                if (method.eq("POST") && url_len > 1 && path_columns[1].starts_with("_search"))
                || (method.eq("POST")
                    && url_len > 1
                    && path_columns[1].starts_with("result_schema"))
                || (method.eq("POST") && url_len > 1 && path.ends_with("actions/upload"))
                || path.contains("/prometheus/api/v1/query")
                || path.contains("/resources")
                || path.contains("/format_query")
                || path.contains("/prometheus/api/v1/series")
                || path.contains("/traces/latest")
                || path.contains("clusters")
                || path.contains("query_manager")
                || path.contains("/short")
                || path.contains("/ws")
                || path.contains("/_values_stream")
                // bulk enable of pipelines and alerts
                || path.contains("/bulk/enable")
                {
                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method: "".to_string(),
                        o2_type: "".to_string(),
                        org_id: "".to_string(),
                        bypass_check: true, // bypass check permissions
                        parent_id: folder,
                    });
                }
                if object_type.starts_with("stream") {
                    let object_type = match stream_type {
                        Some(stream_type) => {
                            if stream_type.eq(&StreamType::EnrichmentTables) {
                                // since enrichment tables have separate permissions
                                let stream_type_str = format!("{stream_type}");

                                object_type.replace(
                                    "stream:",
                                    format!(
                                        "{}:",
                                        OFGA_MODELS
                                            .get(stream_type_str.as_str())
                                            .map_or(stream_type_str.as_str(), |model| model.key)
                                    )
                                    .as_str(),
                                )
                            } else {
                                object_type.replace("stream:", format!("{stream_type}:").as_str())
                            }
                        }
                        None => object_type,
                    };
                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method,
                        o2_type: object_type,
                        org_id,
                        bypass_check: false,
                        parent_id: folder,
                    });
                }
                if object_type.contains("dashboard") && url_len > 1 {
                    let object_type = if method.eq("POST") || method.eq("LIST") {
                        format!(
                            "{}:{}",
                            OFGA_MODELS
                                .get(path_columns[1])
                                .map_or("dfolder", |model| model.parent),
                            folder.as_str(),
                        )
                    } else {
                        object_type
                    };
                    // Currently, we have a patch api for dashboard move,
                    // which can not be handled by the middleware layer,
                    // so we need to bypass the check here
                    if method.eq("PATCH") {
                        return Ok(AuthExtractor {
                            auth: auth_str.to_owned(),
                            method: "".to_string(),
                            o2_type: "".to_string(),
                            org_id: "".to_string(),
                            bypass_check: true, // bypass check permissions
                            parent_id: folder,
                        });
                    }

                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method,
                        o2_type: object_type,
                        org_id,
                        bypass_check: false,
                        parent_id: folder,
                    });
                }

                if method.eq("PATCH") && object_type.eq("alert:move") {
                    return Ok(AuthExtractor {
                        auth: auth_str.to_owned(),
                        method: "".to_string(),
                        o2_type: "".to_string(),
                        org_id: "".to_string(),
                        bypass_check: true, // bypass check permissions
                        parent_id: folder,
                    });
                }

                return Ok(AuthExtractor {
                    auth: auth_str.to_owned(),
                    method,
                    o2_type: object_type,
                    org_id,
                    bypass_check: false,
                    parent_id: folder,
                });
            }
            log::info!(
                "AuthExtractor::from_request took {} ms",
                start.elapsed().as_millis()
            );
            Err(actix_web::error::ErrorUnauthorized("Unauthorized Access"))
        })
    }

    #[cfg(not(feature = "enterprise"))]
    fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
        let req = req.clone();
        Box::pin(async move {
            let auth_str = if let Some(cookie) = req.cookie("auth_tokens") {
                let val = config::utils::base64::decode_raw(cookie.value()).unwrap_or_default();
                let auth_tokens: AuthTokens =
                    json::from_str(std::str::from_utf8(&val).unwrap_or_default())
                        .unwrap_or_default();
                let access_token = auth_tokens.access_token;
                if access_token.starts_with("Basic") || access_token.starts_with("Bearer") {
                    access_token
                } else {
                    format!("Bearer {access_token}")
                }
            } else if let Some(auth_header) = req.headers().get("Authorization") {
                if let Ok(auth_str) = auth_header.to_str() {
                    auth_str.to_owned()
                } else {
                    "".to_string()
                }
            } else {
                "".to_string()
            };

            // if let Some(auth_header) = req.headers().get("Authorization") {
            if !auth_str.is_empty() {
                return Ok(AuthExtractor {
                    auth: auth_str.to_owned(),
                    method: "".to_string(),
                    o2_type: "".to_string(),
                    org_id: "".to_string(),
                    bypass_check: true, // bypass check permissions
                    parent_id: "".to_string(),
                });
            }

            Err(actix_web::error::ErrorUnauthorized("Unauthorized Access"))
        })
    }
}

#[cfg(feature = "enterprise")]
pub async fn extract_auth_str(req: &HttpRequest) -> String {
    let auth_ext_cookie = |req: &HttpRequest| -> String {
        req.cookie("auth_ext")
            .map(|cookie| {
                let val = config::utils::base64::decode_raw(cookie.value()).unwrap_or_default();
                std::str::from_utf8(&val).unwrap_or_default().to_string()
            })
            .unwrap_or_default()
    };

    if let Some(cookie) = req.cookie("auth_tokens") {
        let val = config::utils::base64::decode_raw(cookie.value()).unwrap_or_default();
        let auth_tokens: AuthTokens =
            json::from_str(std::str::from_utf8(&val).unwrap_or_default()).unwrap_or_default();
        let access_token = auth_tokens.access_token;
        if access_token.is_empty() {
            // If cookie was set but access token is still empty
            // we check auth_ext cookie to get the token.
            auth_ext_cookie(req)
        } else if access_token.starts_with("Basic") || access_token.starts_with("Bearer") {
            access_token
        } else if access_token.starts_with("session") {
            let session_key = access_token.strip_prefix("session ").unwrap().to_string();
            match crate::service::db::session::get(&session_key).await {
                Ok(token) => {
                    format!("Bearer {token}")
                }
                Err(_) => access_token,
            }
        } else {
            format!("Bearer {access_token}")
        }
    } else if let Some(cookie) = req.cookie("auth_ext") {
        let val = config::utils::base64::decode_raw(cookie.value()).unwrap_or_default();
        std::str::from_utf8(&val).unwrap_or_default().to_string()
    } else if let Some(auth_header) = req.headers().get("Authorization") {
        if let Ok(auth_str) = auth_header.to_str() {
            auth_str.to_owned()
        } else {
Cache Consistency

On cache miss, the value read from DB is inserted into USER_SESSIONS without TTL or validation. Verify thread-safety and eviction policy; also ensure that empty or malformed values are not cached inadvertently.

pub async fn get(session_id: &str) -> Result<String, anyhow::Error> {
    if let Some(val) = USER_SESSIONS.get(session_id) {
        return Ok(val.to_string());
    }

    // get from db
    log::warn!("Cache miss for user session, read from db: {}", session_id);
    let val = db::get(&format!("{USER_SESSION_KEY}{session_id}")).await?;
    // json format: convert bytes to string and trim any quotes
    let val = String::from_utf8(val.to_vec())
        .unwrap()
        .trim_matches('"')
        .to_string();
    // cache it in memory
    if !val.is_empty() {
        USER_SESSIONS.insert(session_id.to_string(), val.to_string());
    }
    Ok(val)
}
Behavior Change

Replacing Ingester::default() with Ingester {} and returning into_inner() may alter initialization/config behavior and response type conversion. Confirm Ingester has a stable default state without Default and that the returned type matches callers’ expectations.

    })
}

fn ingest_recommendations(
    &self,
    recommendations: Vec<OptimiserRecommendation>,
) -> Pin<Box<dyn Future<Output = Result<IngestionResponse, anyhow::Error>> + Send>> {
    Box::pin(async move {
        let ingester = Ingester {};
        let request = IngestionRequest {
            org_id: META_ORG_ID.to_string(),
            stream_type: StreamType::Logs.to_string(),
            stream_name: "query_recommendations".to_string(),
            data: Some(IngestionData {
                data: serde_json::to_vec_pretty(&recommendations).map_err(|e| {
                    anyhow::anyhow!("Recommendation serialization failed. Error={:?}", e)
                })?,
            }),
            ingestion_type: Some(IngestionType::Json as i32),
            metadata: None,
        };
        Ok(ingester
            .ingest(tonic::Request::new(request))
            .await?
            .into_inner())
    })

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent panic on ws auth parsing

Using to_str().unwrap() twice on Authorization risks panic for non-UTF8 values.
Parse once, handle errors gracefully, and avoid multiple conversions. Reject invalid
headers with Unauthorized instead of panicking.

src/common/utils/auth.rs [688-704]

-if method.eq("GET")
-    && path.contains("/ws")
-    && let Some(auth_header) = req.headers().get("Authorization")
-    && auth_header
-        .to_str()
-        .unwrap()
-        .eq(&get_config().grpc.internal_grpc_token)
-{
-    return Ok(AuthExtractor {
-        auth: auth_header.to_str().unwrap().to_string(),
-        method,
-        o2_type: format!("stream:{org_id}"),
-        org_id,
-        bypass_check: true,
-        parent_id: folder,
-    });
+if method.eq("GET") && path.contains("/ws") {
+    if let Some(auth_header) = req.headers().get("Authorization") {
+        if let Ok(auth_str) = auth_header.to_str() {
+            if auth_str == get_config().grpc.internal_grpc_token {
+                return Ok(AuthExtractor {
+                    auth: auth_str.to_string(),
+                    method,
+                    o2_type: format!("stream:{org_id}"),
+                    org_id,
+                    bypass_check: true,
+                    parent_id: folder,
+                });
+            }
+        } else {
+            return Err(actix_web::error::ErrorUnauthorized("Unauthorized Access"));
+        }
+    }
 }
Suggestion importance[1-10]: 7

__

Why: This targets the ws internal token fast-path that currently uses unwrap twice; replacing with safe parsing is accurate and avoids potential panics on malformed headers, improving stability.

Medium
Handle invalid UTF-8 safely

Avoid unwrap() on potentially invalid UTF-8 from storage to prevent panics. Handle
decode errors and unexpected formats gracefully, and only cache successfully decoded
non-empty values.

src/service/db/session.rs [35-46]

 let val = db::get(&format!("{USER_SESSION_KEY}{session_id}")).await?;
-// json format: convert bytes to string and trim any quotes
-let val = String::from_utf8(val.to_vec())
-    .unwrap()
-    .trim_matches('"')
-    .to_string();
-// cache it in memory
-if !val.is_empty() {
-    USER_SESSIONS.insert(session_id.to_string(), val.to_string());
+let decoded = match String::from_utf8(val.to_vec()) {
+    Ok(s) => s.trim_matches('"').to_string(),
+    Err(_) => return Err(anyhow::anyhow!("Invalid UTF-8 in session value")),
+};
+if !decoded.is_empty() {
+    USER_SESSIONS.insert(session_id.to_string(), decoded.clone());
 }
-Ok(val)
+Ok(decoded)
Suggestion importance[1-10]: 7

__

Why: The change correctly replaces an unwrap on UTF-8 conversion with error handling and preserves caching semantics; it increases robustness without altering core logic.

Medium
Avoid panics on header parsing

Calling to_str().unwrap() or using unwrap() on header parsing can panic on invalid
header values. Replace unwraps with safe fallbacks and avoid returning borrowed
header &str without cloning safely. Handle invalid UTF-8 by rejecting with
Unauthorized instead of panicking.

src/common/utils/auth.rs [268-281]

-if let Some(auth_header) = req.headers().get("Authorization")
-    && let Ok(auth_str) = auth_header.to_str()
-{
-    return Ok(AuthExtractor {
-        auth: auth_str.to_owned(),
-        method,
-        o2_type: format!("stream:{org_id}"),
-        org_id,
-        bypass_check: true,
-        parent_id: folder,
-    });
+if let Some(auth_header) = req.headers().get("Authorization") {
+    if let Ok(auth_str) = auth_header.to_str() {
+        return Ok(AuthExtractor {
+            auth: auth_str.to_owned(),
+            method,
+            o2_type: format!("stream:{org_id}"),
+            org_id,
+            bypass_check: true,
+            parent_id: folder,
+        });
+    } else {
+        return Err(actix_web::error::ErrorUnauthorized("Unauthorized Access"));
+    }
 }
 return Err(actix_web::error::ErrorUnauthorized("Unauthorized Access"));
Suggestion importance[1-10]: 6

__

Why: The snippet matches the new async branch handling ingestion auth and avoids unwrap panics by handling invalid header values; it's a sound robustness improvement but not critical.

Low

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Summary

This PR implements a fallback mechanism to read user sessions from the database when the cache is missing, improving session reliability across distributed deployments.

Key Changes:

  • Converted extract_auth_str to async to support database lookups for session tokens when cache misses occur
  • Refactored FromRequest trait implementation for AuthExtractor to use async futures instead of synchronous Ready futures
  • Added extract_basic_auth_str synchronous variant for code paths that cannot be async (rate limiting middleware)
  • Updated all callers of extract_auth_str to await the async version
  • Included minor code improvements: as_deref() simplification and Ingester {} direct instantiation

Implementation:
The session retrieval now follows a cache-aside pattern: check in-memory cache first, and on miss, fetch from database and populate cache. This ensures sessions remain accessible even when cache is cleared or in multi-node scenarios where cache synchronization might lag.

Confidence Score: 4/5

  • This PR is safe to merge with minor considerations around error handling
  • The implementation correctly adds DB fallback for session caching with proper async conversion. The main concern is the use of .unwrap() on line 38 of session.rs which could panic if the DB returns invalid UTF-8, though this is unlikely in practice. All async conversions are properly implemented and the synchronous variant maintains backward compatibility for rate limiting middleware.
  • Pay attention to src/service/db/session.rs line 38 which uses .unwrap() on UTF-8 conversion

Important Files Changed

File Analysis

Filename Score Overview
src/common/utils/auth.rs 4/5 Converted extract_auth_str to async to read user sessions from DB when cache misses occur; converted FromRequest implementation to async future; added extract_basic_auth_str for synchronous paths
src/service/db/session.rs 5/5 Implements cache-aside pattern with DB fallback for session retrieval - returns cached session or fetches from DB on cache miss

Sequence Diagram

sequenceDiagram
    participant Client
    participant AuthExtractor
    participant extract_auth_str
    participant Cache as USER_SESSIONS Cache
    participant DB as Database
    participant Handler

    Client->>AuthExtractor: HTTP Request with auth cookie
    AuthExtractor->>extract_auth_str: Extract auth from request (async)
    extract_auth_str->>extract_auth_str: Check auth_tokens cookie
    alt session token found
        extract_auth_str->>Cache: Check cache for session
        alt cache hit
            Cache-->>extract_auth_str: Return cached bearer token
        else cache miss
            extract_auth_str->>DB: session::get(session_key)
            DB-->>extract_auth_str: Return bearer token from DB
            extract_auth_str->>Cache: Cache the token
        end
        extract_auth_str-->>AuthExtractor: Return "Bearer {token}"
    else basic/bearer token
        extract_auth_str-->>AuthExtractor: Return token as-is
    end
    AuthExtractor->>Handler: Continue with authenticated request
Loading

7 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: fix/auth-session | Commit: e64ee3a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 364 344 0 19 1 95% 4m 41s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: fix/auth-session | Commit: 6d5971a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 364 343 0 19 2 94% 4m 43s

View Detailed Results

@testdino-playwright-reporter
Copy link

⚠️ Test Run Unstable


Author: hengfeiyang | Branch: fix/auth-session | Commit: 6d5971a

Testdino Test Results

Status Total Passed Failed Skipped Flaky Pass Rate Duration
All tests passed 364 344 0 19 1 95% 4m 38s

View Detailed Results

@hengfeiyang hengfeiyang merged commit e850ea2 into main Oct 20, 2025
32 checks passed
@hengfeiyang hengfeiyang deleted the fix/auth-session branch October 20, 2025 13:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

☢️ Bug Something isn't working Review effort 3/5

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants