-
Notifications
You must be signed in to change notification settings - Fork 711
feat: apply vrl functions over hits as array #4058
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes primarily enhance the Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant SearchFunction as Search Function
participant ResultHandler as Result Handler
Client ->> SearchFunction: Send Search Request (with query_fn)
SearchFunction ->> SearchFunction: Check if query_fn starts with #ResultArray#
alt If it does
SearchFunction ->> SearchFunction: Remove #ResultArray# prefix
SearchFunction ->> ResultHandler: Process results as array
else
SearchFunction ->> ResultHandler: Process results as objects
end
ResultHandler -->> SearchFunction: Return processed results
SearchFunction -->> Client: Send response
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (invoked as PR comments)
Additionally, you can add CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- src/service/search/cluster/http.rs (3 hunks)
- src/service/search/cluster/super_cluster.rs (2 hunks)
Additional context used
Path-based instructions (2)
src/service/search/cluster/http.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
src/service/search/cluster/super_cluster.rs (1)
Pattern
**/*.rs: You are a smart rustlang pull request reviewer.
You are going to review all the rustlang files.
Be concise, and add a brief explanation to your suggestionsMake sure the responses are not too verbose and keep the suggestions to the point i.e. actionable.
Additional comments not posted (1)
src/service/search/cluster/http.rs (1)
49-49: Ensure thread safety when modifyingquery_fn.The
query_fnis now mutable, which might introduce concurrency issues. Ensure that this change does not affect thread safety.
| let apply_over_hits = query_fn.trim().starts_with("#ResultArray#"); | ||
| if apply_over_hits { | ||
| query_fn = query_fn.trim().replace("#ResultArray#", ""); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor conditional logic for clarity.
The conditional logic for apply_over_hits can be refactored for better readability and maintainability.
let apply_over_hits = query_fn.trim().starts_with("#ResultArray#");
if apply_over_hits {
query_fn = query_fn.trim().replace("#ResultArray#", "");
}let query_fn = query_fn.trim();
let apply_over_hits = query_fn.starts_with("#ResultArray#");
let query_fn = if apply_over_hits {
query_fn.replace("#ResultArray#", "")
} else {
query_fn.to_string()
};| Some(program) => { | ||
| if apply_over_hits { | ||
| let ret_val = crate::service::ingestion::apply_vrl_fn( | ||
| &mut runtime, | ||
| &VRLResultResolver { | ||
| program: program.program.clone(), | ||
| fields: program.fields.clone(), | ||
| }, | ||
| &json::Value::Object(hit.clone()), | ||
| &json::Value::Array( | ||
| json_rows | ||
| .into_iter() | ||
| .filter(|v| !v.is_empty()) | ||
| .map(json::Value::Object) | ||
| .collect(), | ||
| ), | ||
| &sql.org_id, | ||
| &sql.stream_name, | ||
| ); | ||
| (!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap()) | ||
| }) | ||
| .collect(), | ||
| ret_val | ||
| .as_array() | ||
| .unwrap() | ||
| .iter() | ||
| .filter_map(|v| { | ||
| (!v.is_null()).then_some(flatten::flatten(v.clone()).unwrap()) | ||
| }) | ||
| .collect() | ||
| } else { | ||
| json_rows | ||
| .into_iter() | ||
| .filter(|v| !v.is_empty()) | ||
| .filter_map(|hit| { | ||
| let ret_val = crate::service::ingestion::apply_vrl_fn( | ||
| &mut runtime, | ||
| &VRLResultResolver { | ||
| program: program.program.clone(), | ||
| fields: program.fields.clone(), | ||
| }, | ||
| &json::Value::Object(hit.clone()), | ||
| &sql.org_id, | ||
| &sql.stream_name, | ||
| ); | ||
| (!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap()) | ||
| }) | ||
| .collect() | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors when applying VRL function.
Ensure that errors during the application of the VRL function are handled gracefully.
if apply_over_hits {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Array(
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.map(json::Value::Object)
.collect(),
),
&sql.org_id,
&sql.stream_name,
);
ret_val
.as_array()
.unwrap()
.iter()
.filter_map(|v| {
(!v.is_null()).then_some(flatten::flatten(v.clone()).unwrap())
})
.collect()
} else {
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.filter_map(|hit| {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Object(hit.clone()),
&sql.org_id,
&sql.stream_name,
);
(!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap())
})
.collect()
}if apply_over_hits {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Array(
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.map(json::Value::Object)
.collect(),
),
&sql.org_id,
&sql.stream_name,
);
match ret_val.as_array() {
Some(array) => array.iter()
.filter_map(|v| (!v.is_null()).then_some(flatten::flatten(v.clone()).unwrap()))
.collect(),
None => {
log::error!("[trace_id {trace_id}] search->vrl: apply err: {:?}", ret_val);
result.function_error = "VRL function application error".to_string();
Vec::new()
}
}
} else {
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.filter_map(|hit| {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Object(hit.clone()),
&sql.org_id,
&sql.stream_name,
);
(!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap())
})
.collect()
}| let apply_over_hits = query_fn.trim().starts_with("#ResultArray#"); | ||
| if apply_over_hits { | ||
| query_fn = query_fn.trim().replace("#ResultArray#", ""); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor conditional logic for clarity.
The conditional logic for apply_over_hits can be refactored for better readability and maintainability.
let apply_over_hits = query_fn.trim().starts_with("#ResultArray#");
if apply_over_hits {
query_fn = query_fn.trim().replace("#ResultArray#", "");
}let query_fn = query_fn.trim();
let apply_over_hits = query_fn.starts_with("#ResultArray#");
let query_fn = if apply_over_hits {
query_fn.replace("#ResultArray#", "")
} else {
query_fn.to_string()
};| Some(program) => { | ||
| if apply_over_hits { | ||
| let ret_val = crate::service::ingestion::apply_vrl_fn( | ||
| &mut runtime, | ||
| &VRLResultResolver { | ||
| program: program.program.clone(), | ||
| fields: program.fields.clone(), | ||
| }, | ||
| &json::Value::Object(hit.clone()), | ||
| &json::Value::Array( | ||
| json_rows | ||
| .into_iter() | ||
| .filter(|v| !v.is_empty()) | ||
| .map(json::Value::Object) | ||
| .collect(), | ||
| ), | ||
| &sql.org_id, | ||
| &sql.stream_name, | ||
| ); | ||
| (!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap()) | ||
| }) | ||
| .collect(), | ||
| ret_val | ||
| .as_array() | ||
| .unwrap() | ||
| .iter() | ||
| .filter_map(|v| { | ||
| (!v.is_null()).then_some(flatten::flatten(v.clone()).unwrap()) | ||
| }) | ||
| .collect() | ||
| } else { | ||
| json_rows | ||
| .into_iter() | ||
| .filter(|v| !v.is_empty()) | ||
| .filter_map(|hit| { | ||
| let ret_val = crate::service::ingestion::apply_vrl_fn( | ||
| &mut runtime, | ||
| &VRLResultResolver { | ||
| program: program.program.clone(), | ||
| fields: program.fields.clone(), | ||
| }, | ||
| &json::Value::Object(hit.clone()), | ||
| &sql.org_id, | ||
| &sql.stream_name, | ||
| ); | ||
| (!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap()) | ||
| }) | ||
| .collect() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handle potential errors when applying VRL function.
Ensure that errors during the application of the VRL function are handled gracefully.
if apply_over_hits {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Array(
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.map(json::Value::Object)
.collect(),
),
&sql.org_id,
&sql.stream_name,
);
ret_val
.as_array()
.unwrap()
.iter()
.filter_map(|v| {
(!v.is_null()).then_some(flatten::flatten(v.clone()).unwrap())
})
.collect()
} else {
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.filter_map(|hit| {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Object(hit.clone()),
&sql.org_id,
&sql.stream_name,
);
(!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap())
})
.collect()
}if apply_over_hits {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Array(
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.map(json::Value::Object)
.collect(),
),
&sql.org_id,
&sql.stream_name,
);
match ret_val.as_array() {
Some(array) => array.iter()
.filter_map(|v| (!v.is_null()).then_some(flatten::flatten(v.clone()).unwrap()))
.collect(),
None => {
log::error!("[trace_id {trace_id}] search->vrl: apply err: {:?}", ret_val);
result.function_error = "VRL function application error".to_string();
Vec::new()
}
}
} else {
json_rows
.into_iter()
.filter(|v| !v.is_empty())
.filter_map(|hit| {
let ret_val = crate::service::ingestion::apply_vrl_fn(
&mut runtime,
&VRLResultResolver {
program: program.program.clone(),
fields: program.fields.clone(),
},
&json::Value::Object(hit.clone()),
&sql.org_id,
&sql.stream_name,
);
(!ret_val.is_null()).then_some(flatten::flatten(ret_val).unwrap())
})
.collect()
}<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced search functionality to accommodate dynamic query processing. - Improved handling of results based on specific query formats. - **Bug Fixes** - Corrected response handling for queries with the `#ResultArray#` prefix, ensuring accurate results. - **Refactor** - Streamlined logic in the search function for better performance and adaptability to varying input queries. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
Summary by CodeRabbit
New Features
Bug Fixes
#ResultArray#prefix, ensuring accurate results.Refactor