Skip to content

Commit 74e4421

Browse files
authored
fix: apply functions for routed streams (#4202)
Fixes #4129 - Current workflow for the pipelining - ingestion to the main stream -> apply functions of main stream with `apply_before_flattening` flag -> flatten data -> routing -> apply rest of the functions of the routed stream. `apply_before_flattening` functions will be ignored for the routed stream because data is already flattened before routing. - Fix of bug: For non-bulk apis for log ingestion, if the first record is routed (i.e. the routing condition matched), the rest of the records also are routed even if the condition does not match. - Fix of bug: For json log ingestion, routing was happening before flattening of stream data, this PR fixes that. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Enhanced stream processing functionality with improved control flow and routing logic. - Introduced structured management of stream transformations using HashMaps for better clarity and maintainability. - **Bug Fixes** - Improved robustness of stream processing by ensuring all relevant streams are processed without premature termination. - **Chores** - Renamed variables for clarity regarding their roles in stream processing. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 8a8c11a commit 74e4421

File tree

6 files changed

+254
-156
lines changed

6 files changed

+254
-156
lines changed

src/service/ingestion/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,8 @@ pub async fn get_stream_functions<'a>(
149149
if stream_after_functions_map.contains_key(&key)
150150
|| stream_before_functions_map.contains_key(&key)
151151
{
152-
return;
152+
// functions for this stream already fetched
153+
continue;
153154
}
154155
// let mut _local_trans: Vec<StreamTransform> = vec![];
155156
// let local_stream_vrl_map;

src/service/logs/bulk.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,9 @@ pub async fn ingest(
165165
} else {
166166
next_line_is_data = false;
167167

168-
let key = format!("{org_id}/{}/{stream_name}", StreamType::Logs);
168+
let main_stream_key = format!("{org_id}/{}/{stream_name}", StreamType::Logs);
169169
// Start row based transform before flattening the value
170-
if let Some(transforms) = stream_before_functions_map.get(&key) {
170+
if let Some(transforms) = stream_before_functions_map.get(&main_stream_key) {
171171
if !transforms.is_empty() {
172172
let mut ret_value = value.clone();
173173
ret_value = crate::service::ingestion::apply_stream_functions(
@@ -201,6 +201,7 @@ pub async fn ingest(
201201
// JSON Flattening
202202
let mut value = flatten::flatten_with_level(value, cfg.limit.ingest_flatten_level)?;
203203

204+
let mut routed_stream_name = stream_name.clone();
204205
// Start re-routing if exists
205206
if let Some(routing) = stream_routing_map.get(&stream_name) {
206207
if !routing.is_empty() {
@@ -214,14 +215,16 @@ pub async fn ingest(
214215
}
215216
}
216217
if is_routed && !val.is_empty() {
217-
stream_name = route.destination.clone();
218+
routed_stream_name = route.destination.clone();
218219
break;
219220
}
220221
}
221222
}
222223
}
223224
// End re-routing
224225

226+
let key = format!("{org_id}/{}/{routed_stream_name}", StreamType::Logs);
227+
225228
// Start row based transform
226229
if let Some(transforms) = stream_after_functions_map.get(&key) {
227230
if !transforms.is_empty() {
@@ -231,14 +234,14 @@ pub async fn ingest(
231234
ret_value,
232235
&stream_vrl_map,
233236
org_id,
234-
&stream_name,
237+
&routed_stream_name,
235238
&mut runtime,
236239
)?;
237240

238241
if ret_value.is_null() || !ret_value.is_object() {
239242
bulk_res.errors = true;
240243
add_record_status(
241-
stream_name.clone(),
244+
routed_stream_name.clone(),
242245
&doc_id,
243246
action.clone(),
244247
Some(value),
@@ -265,7 +268,7 @@ pub async fn ingest(
265268
local_val.insert("_id".to_string(), json::Value::String(doc_id.to_owned()));
266269
}
267270

268-
if let Some(fields) = user_defined_schema_map.get(&stream_name) {
271+
if let Some(fields) = user_defined_schema_map.get(&routed_stream_name) {
269272
local_val = crate::service::logs::refactor_map(local_val, fields);
270273
}
271274

@@ -276,7 +279,7 @@ pub async fn ingest(
276279
Err(_e) => {
277280
bulk_res.errors = true;
278281
add_record_status(
279-
stream_name.clone(),
282+
routed_stream_name.clone(),
280283
&doc_id,
281284
action.clone(),
282285
Some(value),
@@ -295,7 +298,7 @@ pub async fn ingest(
295298
bulk_res.errors = true;
296299
let failure_reason = Some(get_upto_discard_error().to_string());
297300
add_record_status(
298-
stream_name.clone(),
301+
routed_stream_name.clone(),
299302
&doc_id,
300303
action.clone(),
301304
Some(value),
@@ -311,7 +314,7 @@ pub async fn ingest(
311314
);
312315

313316
let fns_length = stream_before_functions_map
314-
.get(&key)
317+
.get(&main_stream_key)
315318
.map(|v| v.len())
316319
.unwrap_or_default()
317320
+ stream_after_functions_map
@@ -320,7 +323,7 @@ pub async fn ingest(
320323
.unwrap_or_default();
321324

322325
let (ts_data, fn_num) = json_data_by_stream
323-
.entry(stream_name.clone())
326+
.entry(routed_stream_name.clone())
324327
.or_insert((Vec::new(), None));
325328
ts_data.push((timestamp, local_val));
326329
*fn_num = Some(fns_length);

src/service/logs/ingest.rs

Lines changed: 75 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub async fn ingest(
6262
let mut need_usage_report = true;
6363

6464
// check stream
65-
let mut stream_name = if cfg.common.skip_formatting_stream_name {
65+
let stream_name = if cfg.common.skip_formatting_stream_name {
6666
get_formatted_stream_name(StreamParams::new(org_id, in_stream_name, StreamType::Logs))
6767
.await?
6868
} else {
@@ -73,15 +73,10 @@ pub async fn ingest(
7373
let min_ts = (Utc::now() - Duration::try_hours(cfg.limit.ingest_allowed_upto).unwrap())
7474
.timestamp_micros();
7575

76-
// Start Register Transforms for stream
7776
let mut runtime = crate::service::ingestion::init_functions_runtime();
78-
let (before_local_trans, after_local_trans, stream_vrl_map) =
79-
crate::service::ingestion::register_stream_functions(
80-
org_id,
81-
&StreamType::Logs,
82-
&stream_name,
83-
);
84-
// End Register Transforms for stream
77+
let mut stream_vrl_map: HashMap<String, VRLResultResolver> = HashMap::new();
78+
let mut stream_before_functions_map: HashMap<String, Vec<StreamTransform>> = HashMap::new();
79+
let mut stream_after_functions_map: HashMap<String, Vec<StreamTransform>> = HashMap::new();
8580

8681
let mut stream_params = vec![StreamParams::new(org_id, &stream_name, StreamType::Logs)];
8782

@@ -113,6 +108,16 @@ pub async fn ingest(
113108
.await;
114109
// End get user defined schema
115110

111+
// Start Register functions for stream
112+
crate::service::ingestion::get_stream_functions(
113+
&stream_params,
114+
&mut stream_before_functions_map,
115+
&mut stream_after_functions_map,
116+
&mut stream_vrl_map,
117+
)
118+
.await;
119+
// End Register functions for index
120+
116121
let json_req: Vec<json::Value>; // to hold json request because of borrow checker
117122
let (endpoint, usage_type, data) = match in_req {
118123
IngestionRequest::JSON(req) => {
@@ -171,15 +176,42 @@ pub async fn ingest(
171176
return Err(anyhow::anyhow!("Failed processing: {:?}", e));
172177
}
173178
};
179+
let mut routed_stream_name = stream_name.clone();
174180

175181
if let Some(extend) = extend_json.as_ref() {
176182
for (key, val) in extend.iter() {
177183
item[key] = val.clone();
178184
}
179185
}
180186

187+
let main_stream_key = format!("{org_id}/{}/{stream_name}", StreamType::Logs);
188+
// Start row based transform before flattening the value
189+
if let Some(transforms) = stream_before_functions_map.get(&main_stream_key) {
190+
if !transforms.is_empty() {
191+
item = match apply_functions(
192+
item,
193+
transforms,
194+
&stream_vrl_map,
195+
org_id,
196+
&routed_stream_name,
197+
&mut runtime,
198+
) {
199+
Ok(res) => res,
200+
Err(e) => {
201+
stream_status.status.failed += 1;
202+
stream_status.status.error = e.to_string();
203+
continue;
204+
}
205+
}
206+
}
207+
}
208+
// end row based transformation
209+
210+
// JSON Flattening
211+
let item = flatten::flatten_with_level(item, cfg.limit.ingest_flatten_level)?;
212+
181213
// Start re-routing if exists
182-
if let Some(routings) = stream_routing_map.get(&stream_name) {
214+
if let Some(routings) = stream_routing_map.get(&routed_stream_name) {
183215
if !routings.is_empty() {
184216
for route in routings {
185217
let mut is_routed = true;
@@ -191,30 +223,34 @@ pub async fn ingest(
191223
}
192224
}
193225
if !val.is_empty() && is_routed {
194-
stream_name = route.destination.clone();
226+
routed_stream_name = route.destination.clone();
195227
break;
196228
}
197229
}
198230
}
199231
}
200232
// End re-routing
201233

234+
let key = format!("{org_id}/{}/{routed_stream_name}", StreamType::Logs);
202235
// Start row based transform
203-
let mut res = match apply_functions(
204-
item,
205-
&before_local_trans,
206-
&after_local_trans,
207-
&stream_vrl_map,
208-
org_id,
209-
&stream_name,
210-
&mut runtime,
211-
) {
212-
Ok(res) => res,
213-
Err(e) => {
214-
stream_status.status.failed += 1;
215-
stream_status.status.error = e.to_string();
216-
continue;
236+
let mut res = if let Some(transforms) = stream_after_functions_map.get(&key) {
237+
match apply_functions(
238+
item,
239+
transforms,
240+
&stream_vrl_map,
241+
org_id,
242+
&routed_stream_name,
243+
&mut runtime,
244+
) {
245+
Ok(res) => res,
246+
Err(e) => {
247+
stream_status.status.failed += 1;
248+
stream_status.status.error = e.to_string();
249+
continue;
250+
}
217251
}
252+
} else {
253+
item
218254
};
219255
// end row based transform
220256

@@ -224,7 +260,7 @@ pub async fn ingest(
224260
_ => unreachable!(),
225261
};
226262

227-
if let Some(fields) = user_defined_schema_map.get(&stream_name) {
263+
if let Some(fields) = user_defined_schema_map.get(&routed_stream_name) {
228264
local_val = crate::service::logs::refactor_map(local_val, fields);
229265
}
230266

@@ -238,11 +274,19 @@ pub async fn ingest(
238274
}
239275
};
240276

277+
let function_no = stream_before_functions_map
278+
.get(&main_stream_key)
279+
.map(|v| v.len())
280+
.unwrap_or_default()
281+
+ stream_after_functions_map
282+
.get(&key)
283+
.map(|v| v.len())
284+
.unwrap_or_default();
241285
let (ts_data, fn_num) = json_data_by_stream
242-
.entry(stream_name.clone())
286+
.entry(routed_stream_name.clone())
243287
.or_insert((Vec::new(), None));
244288
ts_data.push((timestamp, local_val));
245-
*fn_num = need_usage_report.then_some(before_local_trans.len() + after_local_trans.len());
289+
*fn_num = need_usage_report.then_some(function_no);
246290
}
247291

248292
// if no data, fast return
@@ -312,30 +356,16 @@ pub async fn ingest(
312356

313357
pub fn apply_functions<'a>(
314358
item: json::Value,
315-
before_local_trans: &[StreamTransform],
316-
after_local_trans: &[StreamTransform],
359+
local_trans: &[StreamTransform],
317360
stream_vrl_map: &'a HashMap<String, VRLResultResolver>,
318361
org_id: &'a str,
319362
stream_name: &'a str,
320363
runtime: &mut Runtime,
321364
) -> Result<json::Value> {
322-
let cfg = get_config();
323365
let mut value = item;
324-
if !before_local_trans.is_empty() {
325-
value = crate::service::ingestion::apply_stream_functions(
326-
before_local_trans,
327-
value,
328-
stream_vrl_map,
329-
org_id,
330-
stream_name,
331-
runtime,
332-
)?;
333-
}
334-
value = flatten::flatten_with_level(value, cfg.limit.ingest_flatten_level)?;
335-
336-
if !after_local_trans.is_empty() {
366+
if !local_trans.is_empty() {
337367
value = crate::service::ingestion::apply_stream_functions(
338-
after_local_trans,
368+
local_trans,
339369
value,
340370
stream_vrl_map,
341371
org_id,

0 commit comments

Comments
 (0)