From da08a7f5e840ce58be15520fc5681ca741be1155 Mon Sep 17 00:00:00 2001 From: Ashish Kolhe <35160958+oasisk@users.noreply.github.com> Date: Thu, 11 Jan 2024 20:40:17 +0530 Subject: [PATCH] feat: rebac with openfga (#2452) --- src/common/utils/auth.rs | 52 +++++++++++++++---- src/handler/http/auth/token.rs | 3 +- src/handler/http/auth/validator.rs | 2 +- src/handler/http/request/functions/mod.rs | 6 +-- src/handler/http/request/stream/mod.rs | 10 ++-- src/service/functions.rs | 3 +- src/service/stream.rs | 3 ++ tests/api-testing/tests/test_alerts.py | 16 +++--- tests/api-testing/tests/test_functions.py | 10 ++-- tests/api-testing/tests/test_logs.py | 8 +-- tests/api-testing/tests/test_organisations.py | 4 +- tests/api-testing/tests/test_rum.py | 12 ++--- web/src/services/index.ts | 4 +- web/src/services/jstransform.ts | 8 +-- web/src/services/stream.ts | 10 ++-- 15 files changed, 95 insertions(+), 56 deletions(-) diff --git a/src/common/utils/auth.rs b/src/common/utils/auth.rs index 80ef75c3c..2caeba662 100644 --- a/src/common/utils/auth.rs +++ b/src/common/utils/auth.rs @@ -114,7 +114,7 @@ pub struct AuthExtractor { pub method: String, pub o2_type: String, pub org_id: String, - pub is_ingestion_ep: bool, + pub bypass_check: bool, } impl FromRequest for AuthExtractor { @@ -143,7 +143,7 @@ impl FromRequest for AuthExtractor { method, o2_type: format!("stream:{org_id}"), org_id, - is_ingestion_ep: true, + bypass_check: true, })); } } @@ -170,32 +170,64 @@ impl FromRequest for AuthExtractor { .unwrap_or(&path_columns[url_len - 1]), path_columns[url_len - 2] ) + } else if url_len == 3 { + if method.eq("PUT") || method.eq("DELETE") { + format!( + "{}:{}", + o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS + .get(path_columns[1]) + .unwrap_or(&path_columns[1]), + path_columns[2] + ) + } else { + format!( + "{}:{}", + o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS + .get(path_columns[2]) + .unwrap_or(&path_columns[2]), + path_columns[0] + ) + } } else if method.eq("PUT") || method.eq("DELETE") { + if path_columns[url_len - 1].eq("delete_fields") { + method = "DELETE".to_string(); + } format!( "{}:{}", o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS - .get(path_columns[url_len - 2]) - .unwrap_or(&path_columns[url_len - 2]), - path_columns[url_len - 1] + .get(path_columns[1]) + .unwrap_or(&path_columns[1]), + path_columns[2] ) } else { format!( "{}:{}", o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS - .get(path_columns[url_len - 1]) - .unwrap_or(&path_columns[url_len - 1]), - path_columns[url_len - 3] + .get(path_columns[1]) + .unwrap_or(&path_columns[1]), + path_columns[2] ) }; if let Some(auth_header) = req.headers().get("Authorization") { if let Ok(auth_str) = auth_header.to_str() { + if (method.eq("POST") && path_columns[1].eq("_search")) + || path.contains("/prometheus/api/v1/query") + { + return ready(Ok(AuthExtractor { + auth: auth_str.to_owned(), + method: "".to_string(), + o2_type: "".to_string(), + org_id: "".to_string(), + bypass_check: true, // bypass check permissions + })); + } return ready(Ok(AuthExtractor { auth: auth_str.to_owned(), method, o2_type: object_type, org_id, - is_ingestion_ep: false, + bypass_check: false, })); } } @@ -213,7 +245,7 @@ impl FromRequest for AuthExtractor { method: "".to_string(), o2_type: "".to_string(), org_id: "".to_string(), - is_ingestion_ep: true, // bypass check permissions + bypass_check: true, // bypass check permissions })); } } diff --git a/src/handler/http/auth/token.rs b/src/handler/http/auth/token.rs index 501e80deb..851ad1456 100644 --- a/src/handler/http/auth/token.rs +++ b/src/handler/http/auth/token.rs @@ -87,6 +87,7 @@ pub async fn token_validator( if user.is_some() { // / Hack for prometheus, need support POST and check the header let mut req = req; + if req.method().eq(&Method::POST) && !req.headers().contains_key("content-type") { req.headers_mut().insert( @@ -99,7 +100,7 @@ pub async fn token_validator( header::HeaderValue::from_str(&res.0.user_email).unwrap(), ); - if auth_info.is_ingestion_ep || check_permissions(user_id, auth_info).await { + if auth_info.bypass_check || check_permissions(user_id, auth_info).await { Ok(req) } else { Err((ErrorForbidden("Unauthorized Access"), req)) diff --git a/src/handler/http/auth/validator.rs b/src/handler/http/auth/validator.rs index d05ac0a7b..7f50e2c33 100644 --- a/src/handler/http/auth/validator.rs +++ b/src/handler/http/auth/validator.rs @@ -73,7 +73,7 @@ pub async fn validator( header::HeaderName::from_static("user_id"), header::HeaderValue::from_str(&res.user_email).unwrap(), ); - if auth_info.is_ingestion_ep || check_permissions(user_id, auth_info).await { + if auth_info.bypass_check || check_permissions(user_id, auth_info).await { Ok(req) } else { Err((ErrorForbidden("Unauthorized Access"), req)) diff --git a/src/handler/http/request/functions/mod.rs b/src/handler/http/request/functions/mod.rs index 4726c8357..550bd654e 100644 --- a/src/handler/http/request/functions/mod.rs +++ b/src/handler/http/request/functions/mod.rs @@ -142,7 +142,7 @@ pub async fn update_function( (status = 200, description = "Success", content_type = "application/json", body = StreamFunctionsList), ) )] -#[get("/{org_id}/{stream_name}/functions")] +#[get("/{org_id}/streams/{stream_name}/functions")] async fn list_stream_functions( path: web::Path<(String, String)>, req: HttpRequest, @@ -181,7 +181,7 @@ async fn list_stream_functions( (status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse), ) )] -#[delete("/{org_id}/{stream_name}/functions/{name}")] +#[delete("/{org_id}/streams/{stream_name}/functions/{name}")] async fn delete_stream_function( path: web::Path<(String, String, String)>, req: HttpRequest, @@ -222,7 +222,7 @@ async fn delete_stream_function( (status = 400, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] -#[post("/{org_id}/{stream_name}/functions/{name}")] +#[put("/{org_id}/streams/{stream_name}/functions/{name}")] pub async fn add_function_to_stream( path: web::Path<(String, String, String)>, stream_order: web::Json, diff --git a/src/handler/http/request/stream/mod.rs b/src/handler/http/request/stream/mod.rs index 6e668ee7a..489d3267f 100644 --- a/src/handler/http/request/stream/mod.rs +++ b/src/handler/http/request/stream/mod.rs @@ -18,7 +18,7 @@ use std::{ io::{Error, ErrorKind}, }; -use actix_web::{delete, get, http, post, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{delete, get, http, put, web, HttpRequest, HttpResponse, Responder}; use config::meta::stream::StreamType; use crate::{ @@ -50,7 +50,7 @@ use crate::{ (status = 400, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] -#[get("/{org_id}/{stream_name}/schema")] +#[get("/{org_id}/streams/{stream_name}/schema")] async fn schema( path: web::Path<(String, String)>, req: HttpRequest, @@ -90,7 +90,7 @@ async fn schema( (status = 400, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] -#[post("/{org_id}/{stream_name}/settings")] +#[put("/{org_id}/streams/{stream_name}/settings")] async fn settings( path: web::Path<(String, String)>, settings: web::Json, @@ -145,7 +145,7 @@ async fn settings( (status = 400, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] -#[post("/{org_id}/{stream_name}/delete_fields")] +#[put("/{org_id}/streams/{stream_name}/delete_fields")] async fn delete_fields( path: web::Path<(String, String)>, fields: web::Json, @@ -200,7 +200,7 @@ async fn delete_fields( (status = 400, description = "Failure", content_type = "application/json", body = HttpResponse), ) )] -#[delete("/{org_id}/{stream_name}")] +#[delete("/{org_id}/streams/{stream_name}")] async fn delete( path: web::Path<(String, String)>, req: HttpRequest, diff --git a/src/service/functions.rs b/src/service/functions.rs index 1ca568c6d..74d17f873 100644 --- a/src/service/functions.rs +++ b/src/service/functions.rs @@ -25,10 +25,11 @@ use crate::{ common::{ infra::config::STREAM_FUNCTIONS, meta::{ + authz::Authz, functions::{ FunctionList, StreamFunctionsList, StreamOrder, StreamTransform, Transform, }, - http::HttpResponse as MetaHttpResponse, authz::Authz, + http::HttpResponse as MetaHttpResponse, }, utils::auth::{remove_ownership, set_ownership}, }, diff --git a/src/service/stream.rs b/src/service/stream.rs index b6c58a176..b738a410d 100644 --- a/src/service/stream.rs +++ b/src/service/stream.rs @@ -27,6 +27,7 @@ use crate::{ infra::{cache::stats, config::STREAM_SCHEMAS}, meta::{ self, + authz::Authz, http::HttpResponse as MetaHttpResponse, prom, stream::{PartitionTimeLevel, Stream, StreamProperty, StreamSettings, StreamStats}, @@ -264,6 +265,8 @@ pub async fn delete_stream( ); }; + crate::common::utils::auth::remove_ownership(org_id, "streams", Authz::new(stream_name)).await; + Ok(HttpResponse::Ok().json(MetaHttpResponse::message( StatusCode::OK.into(), "stream deleted".to_string(), diff --git a/tests/api-testing/tests/test_alerts.py b/tests/api-testing/tests/test_alerts.py index 644698423..4633ae4b2 100644 --- a/tests/api-testing/tests/test_alerts.py +++ b/tests/api-testing/tests/test_alerts.py @@ -69,7 +69,7 @@ def test_e2e_templatescreation(create_session, base_url): payload = {"body": "invalid", "ise2e": True, "name": template_name} # create template under alerts resp_get_alltemplates = session.post( - f"{url}api/{org_id}/alerts/templates/{template_name}", json=payload + f"{url}api/{org_id}/alerts/templates", json=payload ) print(resp_get_alltemplates.content) @@ -120,23 +120,25 @@ def test_e2e_createdestination(create_session, base_url): # createtemplate resp_create_destinations = session.post( - f"{url}api/{org_id}/alerts/templates/pytesttemplate", + f"{url}api/{org_id}/alerts/templates", json=payload, headers=headers, ) print(resp_create_destinations.content) - + + destination_name = "py-destinations" payload = { "url": "www", "method": "post", "skip_tls_verify": skip_tls_verify_value, "template": "pytesttemplate", "headers": {"test": "test"}, + "name":"py-destinations" } - destination_name = "py-destinations" + # create destination resp_create_destinations = session.post( - f"{url}api/{org_id}/alerts/destinations/{destination_name}", + f"{url}api/{org_id}/alerts/destinations", json=payload, headers=headers, ) @@ -189,7 +191,7 @@ def test_e2e_createdestination(create_session, base_url): alert_name = "py-alert" is_real_time = False payload = { - "name": "pytest-alert", + "name": alert_name, "stream_type": "logs", "stream_name": "newpy_tests", "is_real_time": is_real_time, @@ -219,7 +221,7 @@ def test_e2e_createdestination(create_session, base_url): "description": "", } resp_create_alert = session.post( - f"{url}api/{org_id}/{stream_name}/alerts/{alert_name}", + f"{url}api/{org_id}/{stream_name}/alerts", json=payload, headers=headers, ) diff --git a/tests/api-testing/tests/test_functions.py b/tests/api-testing/tests/test_functions.py index cd117312a..225e9c5e3 100644 --- a/tests/api-testing/tests/test_functions.py +++ b/tests/api-testing/tests/test_functions.py @@ -178,7 +178,7 @@ def test_e2e_allfunctionstreams(create_session, base_url): stream_name = "test" resp_get_streamfunction = session.get( - f"{base_url}api/{org_id}/{stream_name}/functions" + f"{base_url}api/{org_id}/streams/{stream_name}/functions" ) print(resp_get_streamfunction.content) @@ -213,7 +213,7 @@ def test_e2e_addDeleteStreamFunction(create_session, base_url): ), f"Expected 200, but got {resp_create_function.status_code} {resp_create_function.content}" resp_get_streamfunction = session.get( - f"{base_url}api/{org_id}/{stream_name}/functions" + f"{base_url}api/{org_id}/streams/{stream_name}/functions" ) print(resp_get_streamfunction.content) @@ -222,8 +222,8 @@ def test_e2e_addDeleteStreamFunction(create_session, base_url): ), f"Get all functions streams list 200, but got {resp_get_streamfunction.status_code} {resp_get_streamfunction.content}" payload = {"order": 2} - resp_add_streamfunction = session.post( - f"{base_url}api/{org_id}/{stream_name}/functions/pytestfunction", + resp_add_streamfunction = session.put( + f"{base_url}api/{org_id}/streams/{stream_name}/functions/pytestfunction", json=payload, ) @@ -232,7 +232,7 @@ def test_e2e_addDeleteStreamFunction(create_session, base_url): resp_add_streamfunction.status_code == 200 ), f"Add stream to function 200, but got {resp_add_streamfunction.status_code} {resp_add_streamfunction.content}" resp_delete_streamfunction = session.delete( - f"{base_url}api/{org_id}/{stream_name}/functions/pytestfunction" + f"{base_url}api/{org_id}/streams/{stream_name}/functions/pytestfunction" ) assert ( resp_delete_streamfunction.status_code == 200 diff --git a/tests/api-testing/tests/test_logs.py b/tests/api-testing/tests/test_logs.py index 00bbcbdbf..171279640 100644 --- a/tests/api-testing/tests/test_logs.py +++ b/tests/api-testing/tests/test_logs.py @@ -166,7 +166,7 @@ def test_e2e_getschema(create_session, base_url): url = base_url org_id = "default" - resp_get_streamschema = session.get(f"{url}api/{org_id}/newpy_tests/schema") + resp_get_streamschema = session.get(f"{url}api/{org_id}/streams/newpy_tests/schema") print(resp_get_streamschema.content) assert ( @@ -181,7 +181,7 @@ def test_e2e_deleteinvalidstreams(create_session, base_url): url = base_url org_id = "default" - resp_get_deletestreams = session.delete(f"{url}api/{org_id}/invalidstream") + resp_get_deletestreams = session.delete(f"{url}api/{org_id}/streams/invalidstream") print(resp_get_deletestreams.content) assert ( @@ -195,7 +195,7 @@ def test_e2e_incorrectstreamesettings(create_session, base_url): session = create_session url = base_url org_id = "default" - resp_get_streamssettings = session.post(f"{url}api/{org_id}/newpy_tests/settings") + resp_get_streamssettings = session.put(f"{url}api/{org_id}/streams/newpy_tests/settings") print(resp_get_streamssettings.content) assert ( @@ -210,7 +210,7 @@ def test_e2e_deletevalidstreams(create_session, base_url): url = base_url org_id = "default" - resp_get_allstreams = session.delete(f"{url}api/{org_id}/newpy_tests") + resp_get_allstreams = session.delete(f"{url}api/{org_id}/streams/newpy_tests") print(resp_get_allstreams.content) assert ( diff --git a/tests/api-testing/tests/test_organisations.py b/tests/api-testing/tests/test_organisations.py index 53c258436..4c2b62313 100644 --- a/tests/api-testing/tests/test_organisations.py +++ b/tests/api-testing/tests/test_organisations.py @@ -5,7 +5,7 @@ def test_e2e_organisations(create_session, base_url): url = base_url org_id = "default" - resp_get_allorgs = session.get(f"{url}api/{org_id}/organizations") + resp_get_allorgs = session.get(f"{url}api/organizations") print(resp_get_allorgs.content) assert ( @@ -50,7 +50,7 @@ def test_e2e_passcode(create_session, base_url): url = base_url org_id = "default" - resp_get_allorgs = session.get(f"{url}api/{org_id}/organizations/passcode") + resp_get_allorgs = session.get(f"{url}api/{org_id}/passcode") print(resp_get_allorgs.content) assert ( diff --git a/tests/api-testing/tests/test_rum.py b/tests/api-testing/tests/test_rum.py index a92c91964..5c332060a 100644 --- a/tests/api-testing/tests/test_rum.py +++ b/tests/api-testing/tests/test_rum.py @@ -15,7 +15,7 @@ def test_e2e_rumtoken(create_session, base_url): session = create_session org_id = "default" - resp_get_rumtoken = session.get(f"{base_url}api/{org_id}/organizations/rumtoken") + resp_get_rumtoken = session.get(f"{base_url}api/{org_id}/rumtoken") # get rumtoken print(resp_get_rumtoken.content) @@ -92,7 +92,7 @@ def test_e2e_rumingestinglogs(create_session, base_url): rum_org = "default" logs_url = f"{base_url}rum/v1/{rum_org}/logs" ip_address = "182.70.14.246" - resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/organizations/rumtoken") + resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/rumtoken") # get rumtoken print(resp_get_rumtoken.content) @@ -193,7 +193,7 @@ def test_e2e_rumingestinglogs(create_session, base_url): # session = create_session # url = base_url # org_id = "e2e" -# resp_generate_rumtoken = session.post(f"{base_url}api/{org_id}/organizations/rumtoken") +# resp_generate_rumtoken = session.post(f"{base_url}api/{org_id}/rumtoken") # print(resp_generate_rumtoken.content) # rum_token = resp_generate_rumtoken.json()["data"]["rum_token"] @@ -207,7 +207,7 @@ def test_e2e_rumingestinglogs(create_session, base_url): # session = create_session # url = base_url # org_id = "e2e" -# resp_update_rumtoken = session.put(f"{base_url}api/{org_id}/organizations/rumtoken") +# resp_update_rumtoken = session.put(f"{base_url}api/{org_id}/rumtoken") # print(resp_update_rumtoken.content) # rum_token = resp_update_rumtoken.json()["data"]["rum_token"] @@ -224,7 +224,7 @@ def test_e2e_rumdataingestandsearch(create_session, base_url): rum_org = "default" ip_address = "182.70.14.246" headers = {"X-Forwarded-For": ip_address, "Content-Type": "application/json"} - resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/organizations/rumtoken") + resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/rumtoken") # get rumtoken print(resp_get_rumtoken.content) @@ -318,7 +318,7 @@ def test_e2e_rumverifygeodata(create_session, base_url): logs_url = f"{base_url}rum/v1/{rum_org}/logs" ip_address = "182.70.14.246" headers = {"X-Forwarded-For": ip_address, "Content-Type": "application/json"} - resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/organizations/rumtoken") + resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/rumtoken") # get rumtoken print(resp_get_rumtoken.content) diff --git a/web/src/services/index.ts b/web/src/services/index.ts index b428dc30f..18c49f97b 100644 --- a/web/src/services/index.ts +++ b/web/src/services/index.ts @@ -45,12 +45,12 @@ var index = { type: string, data: any ) => { - let url = `/api/${org_identifier}/${stream_name}/settings`; + let url = `/api/${org_identifier}/streams/${stream_name}/settings`; if (type != "") { url += "?type=" + type; } - return http().post(url, data); + return http().put(url, data); }, }; diff --git a/web/src/services/jstransform.ts b/web/src/services/jstransform.ts index 6d16e90e4..0bf6a3ac1 100644 --- a/web/src/services/jstransform.ts +++ b/web/src/services/jstransform.ts @@ -64,7 +64,7 @@ const jstransform = { stream_type: string ) => { return http().get( - `/api/${org_identifier}/${stream_name}/functions?type=${stream_type}` + `/api/${org_identifier}/streams/${stream_name}/functions?type=${stream_type}` ); }, apply_stream_function: ( @@ -74,8 +74,8 @@ const jstransform = { function_name: string, data: any ) => { - return http().post( - `/api/${org_identifier}/${stream_name}/functions/${function_name}?type=${stream_type}`, + return http().put( + `/api/${org_identifier}/streams/${stream_name}/functions/${function_name}?type=${stream_type}`, data ); }, @@ -86,7 +86,7 @@ const jstransform = { function_name: string ) => { return http().delete( - `/api/${org_identifier}/${stream_name}/functions/${function_name}?type=${stream_type}` + `/api/${org_identifier}/streams/${stream_name}/functions/${function_name}?type=${stream_type}` ); }, create_enrichment_table: ( diff --git a/web/src/services/stream.ts b/web/src/services/stream.ts index 9a1a2e390..d13fac663 100644 --- a/web/src/services/stream.ts +++ b/web/src/services/stream.ts @@ -33,7 +33,7 @@ const stream = { }, schema: (org_identifier: string, stream_name: string, type: string) => { - let url = `/api/${org_identifier}/${stream_name}/schema`; + let url = `/api/${org_identifier}/streams/${stream_name}/schema`; if (type != "") { url += "?type=" + type; @@ -47,12 +47,12 @@ const stream = { type: string, data: any ) => { - let url = `/api/${org_identifier}/${stream_name}/settings`; + let url = `/api/${org_identifier}/streams/${stream_name}/settings`; if (type != "") { url += "?type=" + type; } - return http().post(url, data); + return http().put(url, data); }, fieldValues: ({ @@ -112,12 +112,12 @@ const stream = { stream_type: string ) => { return http().delete( - `/api/${org_identifier}/${stream_name}?type=${stream_type}` + `/api/${org_identifier}/streams/${stream_name}?type=${stream_type}` ); }, deleteFields: (org_identifier: string, stream_name: string, fields: []) => { - return http().post(`/api/${org_identifier}/${stream_name}/delete_fields`, { + return http().put(`/api/${org_identifier}/streams/${stream_name}/delete_fields`, { fields, }); },