Skip to content

Commit 287185a

Browse files
Subhra264taimingl
andauthored
fix: pipeline rbac fixes (#4976)
Fixes #4982 - Adds migration for pipelines for ofga tuple database. - Fixes some rbac permission logic for pipelines. Co-authored-by: Taiming Liu <[email protected]>
1 parent 9e7abd5 commit 287185a

File tree

5 files changed

+53
-10
lines changed

5 files changed

+53
-10
lines changed

src/common/infra/ofga.rs

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use o2_enterprise::enterprise::{
2121
common::infra::config::get_config as get_o2_config,
2222
openfga::{
2323
authorizer::authz::{
24-
get_index_creation_tuples, get_org_creation_tuples, get_user_role_tuple, update_tuples,
24+
add_tuple_for_pipeline, get_index_creation_tuples, get_org_creation_tuples,
25+
get_ownership_all_org_tuple, get_user_role_tuple, update_tuples,
2526
},
2627
meta::mapping::{NON_OWNING_ORG, OFGA_MODELS},
2728
},
@@ -44,6 +45,7 @@ pub async fn init() -> Result<(), anyhow::Error> {
4445
let mut init_tuples = vec![];
4546
let mut migrate_native_objects = false;
4647
let mut need_migrate_index_streams = false;
48+
let mut need_pipeline_migration = false;
4749
let mut existing_meta = match db::ofga::get_ofga_model().await {
4850
Ok(Some(model)) => Some(model),
4951
Ok(None) | Err(_) => {
@@ -110,9 +112,13 @@ pub async fn init() -> Result<(), anyhow::Error> {
110112
version_compare::Version::from(&existing_model.version).unwrap();
111113
let v0_0_4 = version_compare::Version::from("0.0.4").unwrap();
112114
let v0_0_5 = version_compare::Version::from("0.0.5").unwrap();
115+
let v0_0_6 = version_compare::Version::from("0.0.6").unwrap();
113116
if meta_version > v0_0_4 && existing_model_version < v0_0_5 {
114117
need_migrate_index_streams = true;
115118
}
119+
if meta_version > v0_0_5 && existing_model_version < v0_0_6 {
120+
need_pipeline_migration = true;
121+
}
116122
}
117123

118124
// 1. create a cluster lock
@@ -189,7 +195,26 @@ pub async fn init() -> Result<(), anyhow::Error> {
189195
}
190196
} else {
191197
for org_name in orgs {
192-
get_index_creation_tuples(org_name, &mut tuples).await;
198+
if need_migrate_index_streams {
199+
get_index_creation_tuples(org_name, &mut tuples).await;
200+
}
201+
if need_pipeline_migration {
202+
get_ownership_all_org_tuple(org_name, "pipelines", &mut tuples);
203+
match db::pipeline::list_by_org(org_name).await {
204+
Ok(pipelines) => {
205+
for pipeline in pipelines {
206+
add_tuple_for_pipeline(org_name, &pipeline.id, &mut tuples);
207+
}
208+
}
209+
Err(e) => {
210+
log::error!(
211+
"Failed to migrate RBAC for org {} pipelines: {}",
212+
org_name,
213+
e
214+
);
215+
}
216+
}
217+
}
193218
}
194219
}
195220

src/common/utils/auth.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,11 @@ impl FromRequest for AuthExtractor {
340340
.map_or(path_columns[1], |model| model.key),
341341
path_columns[2]
342342
)
343-
} else if method.eq("PUT") && path_columns[1] != "streams" || method.eq("DELETE") {
343+
} else if method.eq("PUT")
344+
&& path_columns[1] != "streams"
345+
&& path_columns[1] != "pipelines"
346+
|| method.eq("DELETE")
347+
{
344348
format!(
345349
"{}:{}",
346350
OFGA_MODELS
@@ -349,9 +353,6 @@ impl FromRequest for AuthExtractor {
349353
path_columns[3]
350354
)
351355
} else {
352-
if method.eq("POST") && path_columns[3].eq("pipelines") {
353-
method = "PUT".to_string();
354-
}
355356
format!(
356357
"{}:{}",
357358
OFGA_MODELS
@@ -401,7 +402,6 @@ impl FromRequest for AuthExtractor {
401402
|| path.contains("/format_query")
402403
|| path.contains("/prometheus/api/v1/series")
403404
|| path.contains("/traces/latest")
404-
|| (method.eq("LIST") && path.contains("pipelines"))
405405
|| path.contains("clusters")
406406
|| path.contains("query_manager")
407407
|| path.contains("/short")

src/handler/http/request/pipeline.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,16 @@ async fn list_pipelines(
7474
// Get List of allowed objects
7575
#[cfg(feature = "enterprise")]
7676
{
77+
use o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS;
78+
7779
let user_id = _req.headers().get("user_id").unwrap();
7880
match crate::handler::http::auth::validator::list_objects_for_user(
7981
&org_id,
8082
user_id.to_str().unwrap(),
8183
"GET",
82-
"logs",
84+
OFGA_MODELS
85+
.get("pipelines")
86+
.map_or("pipelines", |model| model.key),
8387
)
8488
.await
8589
{

src/service/pipeline/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,11 +173,11 @@ pub async fn list_pipelines(
173173
|| permitted
174174
.as_ref()
175175
.unwrap()
176-
.contains(&format!("pipelines:{}", pipeline.id))
176+
.contains(&format!("pipeline:{}", pipeline.id))
177177
|| permitted
178178
.as_ref()
179179
.unwrap()
180-
.contains(&format!("pipelines:_all_{}", org_id))
180+
.contains(&format!("pipeline:_all_{}", org_id))
181181
{
182182
result.push(pipeline)
183183
}

web/src/components/iam/roles/EditRole.vue

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ import {
326326
import { useQuasar } from "quasar";
327327
import type { AxiosPromise } from "axios";
328328
import streamService from "@/services/stream";
329+
import pipelineService from "@/services/pipelines";
329330
import alertService from "@/services/alerts";
330331
import reportService from "@/services/reports";
331332
import templateService from "@/services/alert_templates";
@@ -1251,6 +1252,7 @@ const getResourceEntities = (resource: Resource | Entity) => {
12511252
alert: getAlerts,
12521253
template: getTemplates,
12531254
destination: getDestinations,
1255+
pipeline: getPipelines,
12541256
enrichment_table: getEnrichmentTables,
12551257
function: getFunctions,
12561258
org: getOrgs,
@@ -1432,6 +1434,18 @@ const getTemplates = async () => {
14321434
});
14331435
};
14341436
1437+
const getPipelines = async () => {
1438+
const pipelines = await pipelineService.getPipelines(
1439+
store.state.selectedOrganization.identifier
1440+
);
1441+
1442+
updateResourceEntities("pipeline", ["name"], [...pipelines.data.list]);
1443+
1444+
return new Promise((resolve) => {
1445+
resolve(true);
1446+
});
1447+
};
1448+
14351449
const getAlerts = async () => {
14361450
const alerts = await alertService.list(
14371451
1,

0 commit comments

Comments
 (0)