This commit is contained in:
Hengfei Yang 2024-09-25 18:29:22 +08:00 committed by GitHub
commit 75cc42f8d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 25 additions and 9 deletions

4
Cargo.lock generated
View File

@ -1150,9 +1150,9 @@ checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0"
[[package]]
name = "awc"
version = "3.5.0"
version = "3.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe6b67e44fb95d1dc9467e3930383e115f9b4ed60ca689db41409284e967a12d"
checksum = "79049b2461279b886e46f1107efc347ebecc7b88d74d023dda010551a124967b"
dependencies = [
"actix-codec",
"actix-http",

View File

@ -66,7 +66,7 @@ anyhow.workspace = true
argon2.workspace = true
async-trait.workspace = true
async-recursion.workspace = true
awc = "3.4"
awc = "3.5"
base64.workspace = true
bitvec.workspace = true
blake3 = { version = "1.4", features = ["rayon"] }

View File

@ -469,6 +469,8 @@ pub struct Route {
pub timeout: u64,
#[env_config(name = "ZO_ROUTE_MAX_CONNECTIONS", default = 1024)]
pub max_connections: usize,
#[env_config(name = "ZO_ROUTE_CONNECTION_POOL_DISABLED", default = false)]
pub connection_pool_disabled: bool,
// zo1-openobserve-ingester.ziox-dev.svc.cluster.local
#[env_config(name = "ZO_INGESTER_SERVICE_URL", default = "")]
pub ingester_srv_url: String,
@ -764,7 +766,7 @@ pub struct Common {
pub traces_span_metrics_channel_buffer: usize,
#[env_config(
name = "ZO_RESULT_CACHE_ENABLED",
default = true,
default = false,
help = "Enable result cache for query results"
)]
pub result_cache_enabled: bool,

View File

@ -143,8 +143,9 @@ async fn check_keepalive(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, actix_web::Error> {
let req_conn_type = req.head().connection_type();
let mut resp = next.call(req).await?;
if resp.status() >= StatusCode::BAD_REQUEST {
if resp.status() >= StatusCode::BAD_REQUEST || req_conn_type == ConnectionType::Close {
resp.response_mut()
.head_mut()
.set_connection_type(ConnectionType::Close);

View File

@ -158,10 +158,23 @@ async fn dispatch(
}
// send query
let resp = client
.request_from(new_url.value.clone(), req.head())
.send_stream(payload)
.await;
let cfg = get_config();
let resp = if cfg.route.connection_pool_disabled {
let client = awc::Client::builder()
.timeout(std::time::Duration::from_secs(cfg.route.timeout))
.disable_redirects()
.finish();
client
.request_from(new_url.value.clone(), req.head())
.insert_header((awc::http::header::CONNECTION, "close"))
.send_stream(payload)
.await
} else {
client
.request_from(new_url.value.clone(), req.head())
.send_stream(payload)
.await
};
if let Err(e) = resp {
log::error!(
"dispatch: {}, error: {}, took: {} ms",