feat: rebac with openfga (#2452)

This commit is contained in:
Ashish Kolhe 2024-01-11 20:40:17 +05:30 committed by GitHub
parent 7435dfd40f
commit da08a7f5e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 95 additions and 56 deletions

View File

@ -114,7 +114,7 @@ pub struct AuthExtractor {
pub method: String,
pub o2_type: String,
pub org_id: String,
pub is_ingestion_ep: bool,
pub bypass_check: bool,
}
impl FromRequest for AuthExtractor {
@ -143,7 +143,7 @@ impl FromRequest for AuthExtractor {
method,
o2_type: format!("stream:{org_id}"),
org_id,
is_ingestion_ep: true,
bypass_check: true,
}));
}
}
@ -170,32 +170,64 @@ impl FromRequest for AuthExtractor {
.unwrap_or(&path_columns[url_len - 1]),
path_columns[url_len - 2]
)
} else if url_len == 3 {
if method.eq("PUT") || method.eq("DELETE") {
format!(
"{}:{}",
o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS
.get(path_columns[1])
.unwrap_or(&path_columns[1]),
path_columns[2]
)
} else {
format!(
"{}:{}",
o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS
.get(path_columns[2])
.unwrap_or(&path_columns[2]),
path_columns[0]
)
}
} else if method.eq("PUT") || method.eq("DELETE") {
if path_columns[url_len - 1].eq("delete_fields") {
method = "DELETE".to_string();
}
format!(
"{}:{}",
o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS
.get(path_columns[url_len - 2])
.unwrap_or(&path_columns[url_len - 2]),
path_columns[url_len - 1]
.get(path_columns[1])
.unwrap_or(&path_columns[1]),
path_columns[2]
)
} else {
format!(
"{}:{}",
o2_enterprise::enterprise::openfga::meta::mapping::OFGA_MODELS
.get(path_columns[url_len - 1])
.unwrap_or(&path_columns[url_len - 1]),
path_columns[url_len - 3]
.get(path_columns[1])
.unwrap_or(&path_columns[1]),
path_columns[2]
)
};
if let Some(auth_header) = req.headers().get("Authorization") {
if let Ok(auth_str) = auth_header.to_str() {
if (method.eq("POST") && path_columns[1].eq("_search"))
|| path.contains("/prometheus/api/v1/query")
{
return ready(Ok(AuthExtractor {
auth: auth_str.to_owned(),
method: "".to_string(),
o2_type: "".to_string(),
org_id: "".to_string(),
bypass_check: true, // bypass check permissions
}));
}
return ready(Ok(AuthExtractor {
auth: auth_str.to_owned(),
method,
o2_type: object_type,
org_id,
is_ingestion_ep: false,
bypass_check: false,
}));
}
}
@ -213,7 +245,7 @@ impl FromRequest for AuthExtractor {
method: "".to_string(),
o2_type: "".to_string(),
org_id: "".to_string(),
is_ingestion_ep: true, // bypass check permissions
bypass_check: true, // bypass check permissions
}));
}
}

View File

@ -87,6 +87,7 @@ pub async fn token_validator(
if user.is_some() {
// / Hack for prometheus, need support POST and check the header
let mut req = req;
if req.method().eq(&Method::POST) && !req.headers().contains_key("content-type")
{
req.headers_mut().insert(
@ -99,7 +100,7 @@ pub async fn token_validator(
header::HeaderValue::from_str(&res.0.user_email).unwrap(),
);
if auth_info.is_ingestion_ep || check_permissions(user_id, auth_info).await {
if auth_info.bypass_check || check_permissions(user_id, auth_info).await {
Ok(req)
} else {
Err((ErrorForbidden("Unauthorized Access"), req))

View File

@ -73,7 +73,7 @@ pub async fn validator(
header::HeaderName::from_static("user_id"),
header::HeaderValue::from_str(&res.user_email).unwrap(),
);
if auth_info.is_ingestion_ep || check_permissions(user_id, auth_info).await {
if auth_info.bypass_check || check_permissions(user_id, auth_info).await {
Ok(req)
} else {
Err((ErrorForbidden("Unauthorized Access"), req))

View File

@ -142,7 +142,7 @@ pub async fn update_function(
(status = 200, description = "Success", content_type = "application/json", body = StreamFunctionsList),
)
)]
#[get("/{org_id}/{stream_name}/functions")]
#[get("/{org_id}/streams/{stream_name}/functions")]
async fn list_stream_functions(
path: web::Path<(String, String)>,
req: HttpRequest,
@ -181,7 +181,7 @@ async fn list_stream_functions(
(status = 404, description = "NotFound", content_type = "application/json", body = HttpResponse),
)
)]
#[delete("/{org_id}/{stream_name}/functions/{name}")]
#[delete("/{org_id}/streams/{stream_name}/functions/{name}")]
async fn delete_stream_function(
path: web::Path<(String, String, String)>,
req: HttpRequest,
@ -222,7 +222,7 @@ async fn delete_stream_function(
(status = 400, description = "Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[post("/{org_id}/{stream_name}/functions/{name}")]
#[put("/{org_id}/streams/{stream_name}/functions/{name}")]
pub async fn add_function_to_stream(
path: web::Path<(String, String, String)>,
stream_order: web::Json<StreamOrder>,

View File

@ -18,7 +18,7 @@ use std::{
io::{Error, ErrorKind},
};
use actix_web::{delete, get, http, post, web, HttpRequest, HttpResponse, Responder};
use actix_web::{delete, get, http, put, web, HttpRequest, HttpResponse, Responder};
use config::meta::stream::StreamType;
use crate::{
@ -50,7 +50,7 @@ use crate::{
(status = 400, description = "Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[get("/{org_id}/{stream_name}/schema")]
#[get("/{org_id}/streams/{stream_name}/schema")]
async fn schema(
path: web::Path<(String, String)>,
req: HttpRequest,
@ -90,7 +90,7 @@ async fn schema(
(status = 400, description = "Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[post("/{org_id}/{stream_name}/settings")]
#[put("/{org_id}/streams/{stream_name}/settings")]
async fn settings(
path: web::Path<(String, String)>,
settings: web::Json<StreamSettings>,
@ -145,7 +145,7 @@ async fn settings(
(status = 400, description = "Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[post("/{org_id}/{stream_name}/delete_fields")]
#[put("/{org_id}/streams/{stream_name}/delete_fields")]
async fn delete_fields(
path: web::Path<(String, String)>,
fields: web::Json<StreamDeleteFields>,
@ -200,7 +200,7 @@ async fn delete_fields(
(status = 400, description = "Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[delete("/{org_id}/{stream_name}")]
#[delete("/{org_id}/streams/{stream_name}")]
async fn delete(
path: web::Path<(String, String)>,
req: HttpRequest,

View File

@ -25,10 +25,11 @@ use crate::{
common::{
infra::config::STREAM_FUNCTIONS,
meta::{
authz::Authz,
functions::{
FunctionList, StreamFunctionsList, StreamOrder, StreamTransform, Transform,
},
http::HttpResponse as MetaHttpResponse, authz::Authz,
http::HttpResponse as MetaHttpResponse,
},
utils::auth::{remove_ownership, set_ownership},
},

View File

@ -27,6 +27,7 @@ use crate::{
infra::{cache::stats, config::STREAM_SCHEMAS},
meta::{
self,
authz::Authz,
http::HttpResponse as MetaHttpResponse,
prom,
stream::{PartitionTimeLevel, Stream, StreamProperty, StreamSettings, StreamStats},
@ -264,6 +265,8 @@ pub async fn delete_stream(
);
};
crate::common::utils::auth::remove_ownership(org_id, "streams", Authz::new(stream_name)).await;
Ok(HttpResponse::Ok().json(MetaHttpResponse::message(
StatusCode::OK.into(),
"stream deleted".to_string(),

View File

@ -69,7 +69,7 @@ def test_e2e_templatescreation(create_session, base_url):
payload = {"body": "invalid", "ise2e": True, "name": template_name}
# create template under alerts
resp_get_alltemplates = session.post(
f"{url}api/{org_id}/alerts/templates/{template_name}", json=payload
f"{url}api/{org_id}/alerts/templates", json=payload
)
print(resp_get_alltemplates.content)
@ -120,23 +120,25 @@ def test_e2e_createdestination(create_session, base_url):
# createtemplate
resp_create_destinations = session.post(
f"{url}api/{org_id}/alerts/templates/pytesttemplate",
f"{url}api/{org_id}/alerts/templates",
json=payload,
headers=headers,
)
print(resp_create_destinations.content)
destination_name = "py-destinations"
payload = {
"url": "www",
"method": "post",
"skip_tls_verify": skip_tls_verify_value,
"template": "pytesttemplate",
"headers": {"test": "test"},
"name":"py-destinations"
}
destination_name = "py-destinations"
# create destination
resp_create_destinations = session.post(
f"{url}api/{org_id}/alerts/destinations/{destination_name}",
f"{url}api/{org_id}/alerts/destinations",
json=payload,
headers=headers,
)
@ -189,7 +191,7 @@ def test_e2e_createdestination(create_session, base_url):
alert_name = "py-alert"
is_real_time = False
payload = {
"name": "pytest-alert",
"name": alert_name,
"stream_type": "logs",
"stream_name": "newpy_tests",
"is_real_time": is_real_time,
@ -219,7 +221,7 @@ def test_e2e_createdestination(create_session, base_url):
"description": "",
}
resp_create_alert = session.post(
f"{url}api/{org_id}/{stream_name}/alerts/{alert_name}",
f"{url}api/{org_id}/{stream_name}/alerts",
json=payload,
headers=headers,
)

View File

@ -178,7 +178,7 @@ def test_e2e_allfunctionstreams(create_session, base_url):
stream_name = "test"
resp_get_streamfunction = session.get(
f"{base_url}api/{org_id}/{stream_name}/functions"
f"{base_url}api/{org_id}/streams/{stream_name}/functions"
)
print(resp_get_streamfunction.content)
@ -213,7 +213,7 @@ def test_e2e_addDeleteStreamFunction(create_session, base_url):
), f"Expected 200, but got {resp_create_function.status_code} {resp_create_function.content}"
resp_get_streamfunction = session.get(
f"{base_url}api/{org_id}/{stream_name}/functions"
f"{base_url}api/{org_id}/streams/{stream_name}/functions"
)
print(resp_get_streamfunction.content)
@ -222,8 +222,8 @@ def test_e2e_addDeleteStreamFunction(create_session, base_url):
), f"Get all functions streams list 200, but got {resp_get_streamfunction.status_code} {resp_get_streamfunction.content}"
payload = {"order": 2}
resp_add_streamfunction = session.post(
f"{base_url}api/{org_id}/{stream_name}/functions/pytestfunction",
resp_add_streamfunction = session.put(
f"{base_url}api/{org_id}/streams/{stream_name}/functions/pytestfunction",
json=payload,
)
@ -232,7 +232,7 @@ def test_e2e_addDeleteStreamFunction(create_session, base_url):
resp_add_streamfunction.status_code == 200
), f"Add stream to function 200, but got {resp_add_streamfunction.status_code} {resp_add_streamfunction.content}"
resp_delete_streamfunction = session.delete(
f"{base_url}api/{org_id}/{stream_name}/functions/pytestfunction"
f"{base_url}api/{org_id}/streams/{stream_name}/functions/pytestfunction"
)
assert (
resp_delete_streamfunction.status_code == 200

View File

@ -166,7 +166,7 @@ def test_e2e_getschema(create_session, base_url):
url = base_url
org_id = "default"
resp_get_streamschema = session.get(f"{url}api/{org_id}/newpy_tests/schema")
resp_get_streamschema = session.get(f"{url}api/{org_id}/streams/newpy_tests/schema")
print(resp_get_streamschema.content)
assert (
@ -181,7 +181,7 @@ def test_e2e_deleteinvalidstreams(create_session, base_url):
url = base_url
org_id = "default"
resp_get_deletestreams = session.delete(f"{url}api/{org_id}/invalidstream")
resp_get_deletestreams = session.delete(f"{url}api/{org_id}/streams/invalidstream")
print(resp_get_deletestreams.content)
assert (
@ -195,7 +195,7 @@ def test_e2e_incorrectstreamesettings(create_session, base_url):
session = create_session
url = base_url
org_id = "default"
resp_get_streamssettings = session.post(f"{url}api/{org_id}/newpy_tests/settings")
resp_get_streamssettings = session.put(f"{url}api/{org_id}/streams/newpy_tests/settings")
print(resp_get_streamssettings.content)
assert (
@ -210,7 +210,7 @@ def test_e2e_deletevalidstreams(create_session, base_url):
url = base_url
org_id = "default"
resp_get_allstreams = session.delete(f"{url}api/{org_id}/newpy_tests")
resp_get_allstreams = session.delete(f"{url}api/{org_id}/streams/newpy_tests")
print(resp_get_allstreams.content)
assert (

View File

@ -5,7 +5,7 @@ def test_e2e_organisations(create_session, base_url):
url = base_url
org_id = "default"
resp_get_allorgs = session.get(f"{url}api/{org_id}/organizations")
resp_get_allorgs = session.get(f"{url}api/organizations")
print(resp_get_allorgs.content)
assert (
@ -50,7 +50,7 @@ def test_e2e_passcode(create_session, base_url):
url = base_url
org_id = "default"
resp_get_allorgs = session.get(f"{url}api/{org_id}/organizations/passcode")
resp_get_allorgs = session.get(f"{url}api/{org_id}/passcode")
print(resp_get_allorgs.content)
assert (

View File

@ -15,7 +15,7 @@ def test_e2e_rumtoken(create_session, base_url):
session = create_session
org_id = "default"
resp_get_rumtoken = session.get(f"{base_url}api/{org_id}/organizations/rumtoken")
resp_get_rumtoken = session.get(f"{base_url}api/{org_id}/rumtoken")
# get rumtoken
print(resp_get_rumtoken.content)
@ -92,7 +92,7 @@ def test_e2e_rumingestinglogs(create_session, base_url):
rum_org = "default"
logs_url = f"{base_url}rum/v1/{rum_org}/logs"
ip_address = "182.70.14.246"
resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/organizations/rumtoken")
resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/rumtoken")
# get rumtoken
print(resp_get_rumtoken.content)
@ -193,7 +193,7 @@ def test_e2e_rumingestinglogs(create_session, base_url):
# session = create_session
# url = base_url
# org_id = "e2e"
# resp_generate_rumtoken = session.post(f"{base_url}api/{org_id}/organizations/rumtoken")
# resp_generate_rumtoken = session.post(f"{base_url}api/{org_id}/rumtoken")
# print(resp_generate_rumtoken.content)
# rum_token = resp_generate_rumtoken.json()["data"]["rum_token"]
@ -207,7 +207,7 @@ def test_e2e_rumingestinglogs(create_session, base_url):
# session = create_session
# url = base_url
# org_id = "e2e"
# resp_update_rumtoken = session.put(f"{base_url}api/{org_id}/organizations/rumtoken")
# resp_update_rumtoken = session.put(f"{base_url}api/{org_id}/rumtoken")
# print(resp_update_rumtoken.content)
# rum_token = resp_update_rumtoken.json()["data"]["rum_token"]
@ -224,7 +224,7 @@ def test_e2e_rumdataingestandsearch(create_session, base_url):
rum_org = "default"
ip_address = "182.70.14.246"
headers = {"X-Forwarded-For": ip_address, "Content-Type": "application/json"}
resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/organizations/rumtoken")
resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/rumtoken")
# get rumtoken
print(resp_get_rumtoken.content)
@ -318,7 +318,7 @@ def test_e2e_rumverifygeodata(create_session, base_url):
logs_url = f"{base_url}rum/v1/{rum_org}/logs"
ip_address = "182.70.14.246"
headers = {"X-Forwarded-For": ip_address, "Content-Type": "application/json"}
resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/organizations/rumtoken")
resp_get_rumtoken = session.get(f"{base_url}api/{rum_org}/rumtoken")
# get rumtoken
print(resp_get_rumtoken.content)

View File

@ -45,12 +45,12 @@ var index = {
type: string,
data: any
) => {
let url = `/api/${org_identifier}/${stream_name}/settings`;
let url = `/api/${org_identifier}/streams/${stream_name}/settings`;
if (type != "") {
url += "?type=" + type;
}
return http().post(url, data);
return http().put(url, data);
},
};

View File

@ -64,7 +64,7 @@ const jstransform = {
stream_type: string
) => {
return http().get(
`/api/${org_identifier}/${stream_name}/functions?type=${stream_type}`
`/api/${org_identifier}/streams/${stream_name}/functions?type=${stream_type}`
);
},
apply_stream_function: (
@ -74,8 +74,8 @@ const jstransform = {
function_name: string,
data: any
) => {
return http().post(
`/api/${org_identifier}/${stream_name}/functions/${function_name}?type=${stream_type}`,
return http().put(
`/api/${org_identifier}/streams/${stream_name}/functions/${function_name}?type=${stream_type}`,
data
);
},
@ -86,7 +86,7 @@ const jstransform = {
function_name: string
) => {
return http().delete(
`/api/${org_identifier}/${stream_name}/functions/${function_name}?type=${stream_type}`
`/api/${org_identifier}/streams/${stream_name}/functions/${function_name}?type=${stream_type}`
);
},
create_enrichment_table: (

View File

@ -33,7 +33,7 @@ const stream = {
},
schema: (org_identifier: string, stream_name: string, type: string) => {
let url = `/api/${org_identifier}/${stream_name}/schema`;
let url = `/api/${org_identifier}/streams/${stream_name}/schema`;
if (type != "") {
url += "?type=" + type;
@ -47,12 +47,12 @@ const stream = {
type: string,
data: any
) => {
let url = `/api/${org_identifier}/${stream_name}/settings`;
let url = `/api/${org_identifier}/streams/${stream_name}/settings`;
if (type != "") {
url += "?type=" + type;
}
return http().post(url, data);
return http().put(url, data);
},
fieldValues: ({
@ -112,12 +112,12 @@ const stream = {
stream_type: string
) => {
return http().delete(
`/api/${org_identifier}/${stream_name}?type=${stream_type}`
`/api/${org_identifier}/streams/${stream_name}?type=${stream_type}`
);
},
deleteFields: (org_identifier: string, stream_name: string, fields: []) => {
return http().post(`/api/${org_identifier}/${stream_name}/delete_fields`, {
return http().put(`/api/${org_identifier}/streams/${stream_name}/delete_fields`, {
fields,
});
},