feat: add auto upgrade (#1524)

Auto upgrade from v0.5.2 to v0.6.0. Moves from sled to sqlite for local mode without user intervention. There should be no noticeable changes to end users.
This commit is contained in:
Hengfei Yang 2023-09-18 00:26:43 +08:00 committed by GitHub
parent 69fb8b355a
commit 9adb340e5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 67 additions and 15 deletions

2
Cargo.lock generated
View File

@ -4334,7 +4334,7 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
name = "openobserve"
version = "0.5.3"
version = "0.6.0"
dependencies = [
"actix-cors",
"actix-multipart",

View File

@ -16,7 +16,7 @@ keywords = [
license = "Apache-2.0"
name = "openobserve"
repository = "https://github.com/openobserve/openobserve/"
version = "0.5.3"
version = "0.6.0"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -13,13 +13,34 @@
// limitations under the License.
use crate::common::{
infra::file_list as infra_file_list,
infra::{config::CONFIG, file_list as infra_file_list},
meta::{common::FileKey, stream::PartitionTimeLevel, StreamType},
utils::time::BASE_TIME,
utils::{file::get_file_meta, time::BASE_TIME},
};
use crate::job::{file_list, files};
use crate::service::{compact::stats::update_stats_from_file_list, db};
pub async fn run(prefix: &str) -> Result<(), anyhow::Error> {
if get_file_meta(&CONFIG.common.data_wal_dir).is_err() {
// there is no local wal files, no need upgrade
return Ok(());
}
// move files from wal for disk
if let Err(e) = files::disk::move_files_to_storage().await {
log::error!("Error moving disk files to remote: {}", e);
}
// move file_list from wal for disk
if let Err(e) = file_list::move_file_list_to_storage(false).await {
log::error!("Error moving disk files to remote: {}", e);
}
if get_file_meta(&CONFIG.common.data_stream_dir).is_err() {
// there is no local stream files, no need upgrade
return Ok(());
}
// load stream list
db::schema::cache().await?;
// load file list to db

View File

@ -6,7 +6,7 @@ use crate::common::{
db::{self, Db},
},
meta::alert::Trigger,
utils::json,
utils::{file::get_file_meta, json},
};
const ITEM_PREFIXES: [&str; 11] = [
@ -24,7 +24,6 @@ const ITEM_PREFIXES: [&str; 11] = [
];
pub async fn run() -> Result<(), anyhow::Error> {
println!("local mode is {}", CONFIG.common.local_mode);
if CONFIG.common.local_mode {
load_meta_from_sled().await
} else {
@ -33,6 +32,10 @@ pub async fn run() -> Result<(), anyhow::Error> {
}
pub async fn load_meta_from_sled() -> Result<(), anyhow::Error> {
if get_file_meta(&format!("{}db", CONFIG.sled.data_dir)).is_err() {
// there is no local db, no need upgrade
return Ok(());
}
let (src, dest) = if CONFIG.common.local_mode {
(Box::<db::sled::SledDb>::default(), db::default())
} else {

View File

@ -12,5 +12,28 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::common::infra::config::CONFIG;
pub mod file_list;
pub mod meta;
pub async fn check_upgrade(old_ver: &str, new_ver: &str) -> Result<(), anyhow::Error> {
if !CONFIG.common.local_mode || old_ver >= new_ver {
return Ok(());
}
log::info!("Upgrading from {} to {}", old_ver, new_ver);
match (old_ver, new_ver) {
(_, "v0.5.3") | (_, "v0.6.0") => upgrade_052_053().await,
_ => Ok(()),
}
}
async fn upgrade_052_053() -> Result<(), anyhow::Error> {
// migration for metadata
meta::run().await?;
// migration for file_list
file_list::run("").await?;
Ok(())
}

View File

@ -40,7 +40,7 @@ pub async fn run_move_file_to_s3() -> Result<(), anyhow::Error> {
interval.tick().await; // trigger the first run
loop {
interval.tick().await;
if let Err(e) = move_file_list_to_storage().await {
if let Err(e) = move_file_list_to_storage(true).await {
log::error!("Error moving file_list to remote: {}", e);
}
}
@ -49,7 +49,7 @@ pub async fn run_move_file_to_s3() -> Result<(), anyhow::Error> {
/*
* upload compressed file_list to storage & delete moved files from local
*/
async fn move_file_list_to_storage() -> Result<(), anyhow::Error> {
pub async fn move_file_list_to_storage(check_in_use: bool) -> Result<(), anyhow::Error> {
let data_dir = Path::new(&CONFIG.common.data_wal_dir)
.canonicalize()
.unwrap();
@ -77,7 +77,7 @@ async fn move_file_list_to_storage() -> Result<(), anyhow::Error> {
}
// check the file is using for write
if wal::check_in_use("", "", StreamType::Filelist, &file_name) {
if check_in_use && wal::check_in_use("", "", StreamType::Filelist, &file_name) {
continue;
}
log::info!("[JOB] convert file_list: {}", file);

View File

@ -48,7 +48,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
/*
* upload compressed files to storage & delete moved files from local
*/
async fn move_files_to_storage() -> Result<(), anyhow::Error> {
pub async fn move_files_to_storage() -> Result<(), anyhow::Error> {
let wal_dir = Path::new(&CONFIG.common.data_wal_dir)
.canonicalize()
.unwrap();

View File

@ -48,7 +48,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
/*
* upload compressed files to storage & delete moved files from local
*/
async fn move_files_to_storage() -> Result<(), anyhow::Error> {
pub async fn move_files_to_storage() -> Result<(), anyhow::Error> {
// need to clone here, to avoid thread boundry issues across awaits
let files = wal::MEMORY_FILES.list().clone();
// use multiple threads to upload files

View File

@ -17,8 +17,8 @@ use crate::common::{
meta::StreamType,
};
mod disk;
mod memory;
pub mod disk;
pub mod memory;
pub async fn run() -> Result<(), anyhow::Error> {
if !cluster::is_ingester(&cluster::LOCAL_NODE_ROLE) {

View File

@ -27,8 +27,8 @@ use crate::service::{compact::stats::update_stats_from_file_list, db, users};
mod alert_manager;
mod compact;
mod file_list;
mod files;
pub(crate) mod file_list;
pub(crate) mod files;
mod metrics;
mod prom;
mod stats;

View File

@ -145,6 +145,11 @@ async fn main() -> Result<(), anyhow::Error> {
.expect("cluster init failed");
// init infra
infra::init().await.expect("infra init failed");
// check version upgrade
let old_version = db::version::get().await.unwrap_or("v0.0.0".to_string());
migration::check_upgrade(&old_version, VERSION).await?;
// init job
job::init().await.expect("job init failed");