feat: support multiple etcd hosts (#1243)

This commit is contained in:
Hengfei Yang 2023-07-24 10:07:57 +08:00 committed by GitHub
parent 339787b015
commit 97d1e44440
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 89 additions and 25 deletions

64
Cargo.lock generated
View File

@ -2487,16 +2487,16 @@ dependencies = [
[[package]] [[package]]
name = "etcd-client" name = "etcd-client"
version = "0.10.4" version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4319dc0fb739a6e84cb8678b8cf50c9bcfa4712ae826b33ecf00cc0850550a58" checksum = "f4b0ea5ef6dc2388a4b1669fa32097249bc03a15417b97cb75e38afb309e4a89"
dependencies = [ dependencies = [
"http", "http",
"prost", "prost",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic 0.9.2",
"tonic-build", "tonic-build 0.9.2",
"tower", "tower",
"tower-service", "tower-service",
] ]
@ -4050,8 +4050,8 @@ dependencies = [
"time 0.3.23", "time 0.3.23",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic 0.8.3",
"tonic-build", "tonic-build 0.8.4",
"tracing", "tracing",
"tracing-opentelemetry", "tracing-opentelemetry",
"tracing-subscriber", "tracing-subscriber",
@ -4112,7 +4112,7 @@ dependencies = [
"serde", "serde",
"thiserror", "thiserror",
"tokio", "tokio",
"tonic", "tonic 0.8.3",
] ]
[[package]] [[package]]
@ -4128,8 +4128,8 @@ dependencies = [
"protobuf", "protobuf",
"serde", "serde",
"serde_json", "serde_json",
"tonic", "tonic 0.8.3",
"tonic-build", "tonic-build 0.8.4",
] ]
[[package]] [[package]]
@ -6211,9 +6211,7 @@ dependencies = [
"pin-project", "pin-project",
"prost", "prost",
"prost-derive", "prost-derive",
"rustls-pemfile",
"tokio", "tokio",
"tokio-rustls 0.23.4",
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"tower", "tower",
@ -6223,6 +6221,37 @@ dependencies = [
"tracing-futures", "tracing-futures",
] ]
[[package]]
name = "tonic"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3082666a3a6433f7f511c7192923fa1fe07c69332d3c6a2e6bb040b569199d5a"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.2",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"rustls-pemfile",
"tokio",
"tokio-rustls 0.24.1",
"tokio-stream",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]] [[package]]
name = "tonic-build" name = "tonic-build"
version = "0.8.4" version = "0.8.4"
@ -6236,6 +6265,19 @@ dependencies = [
"syn 1.0.109", "syn 1.0.109",
] ]
[[package]]
name = "tonic-build"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6fdaae4c2c638bb70fe42803a26fbd6fc6ac8c72f5c59f67ecc2a2dcabf4b07"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "tower" name = "tower"
version = "0.4.13" version = "0.4.13"

View File

@ -77,7 +77,7 @@ datafusion = { version = "27", features = ["simd"] }
dotenv_config = "0.1" dotenv_config = "0.1"
dotenvy = "0.15" dotenvy = "0.15"
env_logger = "0.10" env_logger = "0.10"
etcd-client = { version = "0.10", features = ["tls"] } etcd-client = { version = "0.11", features = ["tls"] }
flate2 = { version = "1.0", features = ["zlib"] } flate2 = { version = "1.0", features = ["zlib"] }
futures = "0.3" futures = "0.3"
get_if_addrs = "0.5" get_if_addrs = "0.5"

View File

@ -193,8 +193,8 @@ pub async fn register() -> Result<()> {
id: node_id, id: node_id,
uuid: LOCAL_NODE_UUID.clone(), uuid: LOCAL_NODE_UUID.clone(),
name: CONFIG.common.instance_name.clone(), name: CONFIG.common.instance_name.clone(),
http_addr: format!("http://{}:{}", get_local_node_ip(), CONFIG.http.port), http_addr: format!("http://{}:{}", get_local_http_ip(), CONFIG.http.port),
grpc_addr: format!("http://{}:{}", get_local_node_ip(), CONFIG.grpc.port), grpc_addr: format!("http://{}:{}", get_local_grpc_ip(), CONFIG.grpc.port),
role: LOCAL_NODE_ROLE.clone(), role: LOCAL_NODE_ROLE.clone(),
cpu_num: sys_info::cpu_num().unwrap() as u64, cpu_num: sys_info::cpu_num().unwrap() as u64,
status: NodeStatus::Prepare, status: NodeStatus::Prepare,
@ -404,6 +404,24 @@ fn load_local_node_uuid() -> String {
Uuid::new_v4().to_string() Uuid::new_v4().to_string()
} }
#[inline(always)]
pub fn get_local_http_ip() -> String {
if !CONFIG.http.addr.is_empty() {
CONFIG.http.addr.clone()
} else {
get_local_node_ip()
}
}
#[inline(always)]
pub fn get_local_grpc_ip() -> String {
if !CONFIG.grpc.addr.is_empty() {
CONFIG.grpc.addr.clone()
} else {
get_local_node_ip()
}
}
#[inline(always)] #[inline(always)]
pub fn get_local_node_ip() -> String { pub fn get_local_node_ip() -> String {
for adapter in get_if_addrs::get_if_addrs().unwrap() { for adapter in get_if_addrs::get_if_addrs().unwrap() {

View File

@ -125,6 +125,8 @@ pub struct Auth {
pub struct Http { pub struct Http {
#[env_config(name = "ZO_HTTP_PORT", default = 5080)] #[env_config(name = "ZO_HTTP_PORT", default = 5080)]
pub port: u16, pub port: u16,
#[env_config(name = "ZO_HTTP_ADDR", default = "")]
pub addr: String,
#[env_config(name = "ZO_HTTP_IPV6_ENABLED", default = false)] #[env_config(name = "ZO_HTTP_IPV6_ENABLED", default = false)]
pub ipv6_enabled: bool, pub ipv6_enabled: bool,
} }
@ -133,6 +135,8 @@ pub struct Http {
pub struct Grpc { pub struct Grpc {
#[env_config(name = "ZO_GRPC_PORT", default = 5081)] #[env_config(name = "ZO_GRPC_PORT", default = 5081)]
pub port: u16, pub port: u16,
#[env_config(name = "ZO_GRPC_ADDR", default = "")]
pub addr: String,
#[env_config(name = "ZO_GRPC_TIMEOUT", default = 600)] #[env_config(name = "ZO_GRPC_TIMEOUT", default = 600)]
pub timeout: u64, pub timeout: u64,
#[env_config(name = "ZO_GRPC_ORG_HEADER_KEY", default = "zinc-org-id")] #[env_config(name = "ZO_GRPC_ORG_HEADER_KEY", default = "zinc-org-id")]

View File

@ -17,18 +17,17 @@ use async_once::AsyncOnce;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use etcd_client::{ use etcd_client::{
Compare, CompareOp, DeleteOptions, EventType, GetOptions, SortOrder, SortTarget, TxnOp, Certificate, Compare, CompareOp, DeleteOptions, EventType, GetOptions, Identity, SortOrder,
SortTarget, TlsOptions, TxnOp,
}; };
use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::{
use std::sync::Arc; atomic::{AtomicU8, Ordering},
use tokio::sync::mpsc; Arc,
use tokio::task::JoinHandle; };
use tokio::time; use tokio::{sync::mpsc, task::JoinHandle, time};
use tonic::transport::{Certificate, ClientTlsConfig, Identity};
use super::{Event, EventData}; use super::{Event, EventData};
use crate::common::infra::cluster; use crate::common::infra::{cluster, config::CONFIG, errors::*};
use crate::common::infra::{config::CONFIG, errors::*};
/// max operations in txn request /// max operations in txn request
pub const MAX_OPS_PER_TXN: usize = 120; // etcd hard coded limit is 128 pub const MAX_OPS_PER_TXN: usize = 120; // etcd hard coded limit is 128
@ -424,13 +423,14 @@ pub async fn connect_etcd() -> Option<etcd_client::Client> {
let client_cert = tokio::fs::read(&CONFIG.etcd.cert_file).await.unwrap(); let client_cert = tokio::fs::read(&CONFIG.etcd.cert_file).await.unwrap();
let client_key = tokio::fs::read(&CONFIG.etcd.key_file).await.unwrap(); let client_key = tokio::fs::read(&CONFIG.etcd.key_file).await.unwrap();
let client_identity = Identity::from_pem(client_cert, client_key); let client_identity = Identity::from_pem(client_cert, client_key);
let tls = ClientTlsConfig::new() let tls = TlsOptions::new()
.domain_name(&CONFIG.etcd.domain_name) .domain_name(&CONFIG.etcd.domain_name)
.ca_certificate(server_root_ca_cert) .ca_certificate(server_root_ca_cert)
.identity(client_identity); .identity(client_identity);
opts = opts.with_tls(tls); opts = opts.with_tls(tls);
} }
let client = etcd_client::Client::connect([&CONFIG.etcd.addr], Some(opts)) let addrs = CONFIG.etcd.addr.split(',').collect::<Vec<&str>>();
let client = etcd_client::Client::connect(addrs, Some(opts))
.await .await
.expect("Etcd connect failed"); .expect("Etcd connect failed");