Support local `llama-server` executable in `Assistant::boot`

This commit is contained in:
Héctor Ramón Jiménez 2024-07-11 19:17:46 +02:00
parent 5790f01c58
commit a787117001
No known key found for this signature in database
GPG Key ID: 7CC46565708259A7
1 changed files with 211 additions and 151 deletions

View File

@ -1,3 +1,4 @@
use futures::channel::mpsc;
use futures::{SinkExt, Stream, StreamExt};
use serde::Deserialize;
use serde_json::json;
@ -11,7 +12,7 @@ use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct Assistant {
model: Id,
_container: Arc<Container>,
_server: Arc<Server>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -29,7 +30,25 @@ impl Assistant {
const HOST_PORT: u64 = 8080;
pub fn boot(file: File, backend: Backend) -> impl Stream<Item = Result<BootEvent, Error>> {
iced::stream::try_channel(1, move |mut sender| async move {
#[derive(Clone)]
struct Sender(mpsc::Sender<BootEvent>);
impl Sender {
async fn log(&mut self, log: String) {
let _ = self.0.send(BootEvent::Logged(log)).await;
}
async fn progress(&mut self, stage: &'static str, percent: u64) {
let _ = self.0.send(BootEvent::Progressed { stage, percent }).await;
}
async fn finish(mut self, assistant: Assistant) {
let _ = self.0.send(BootEvent::Finished(assistant)).await;
}
}
iced::stream::try_channel(1, move |sender| async move {
let mut sender = Sender(sender);
let _ = fs::create_dir_all(Self::MODELS_DIR).await?;
let model_path = format!(
@ -39,18 +58,12 @@ impl Assistant {
);
if fs::try_exists(&model_path).await? {
let _ = sender
.send(BootEvent::Logged(format!(
sender.progress("Verifying model...", 0).await;
sender
.log(format!(
"{filename} found. Verifying...",
filename = file.name
)))
.await;
let _ = sender
.send(BootEvent::Progressed {
stage: "Verifying model...",
percent: 0,
})
))
.await;
let metadata = reqwest::get(format!(
@ -93,27 +106,20 @@ impl Assistant {
if new_progress > progress {
progress = new_progress;
let _ = sender
.send(BootEvent::Progressed {
stage: "Verifying model...",
percent: progress,
})
.await;
sender.progress("Verifying model...", progress).await;
}
}
let hash = format!("{hash:x}", hash = hasher.finalize());
if checksum == hash {
let _ = sender
.send(BootEvent::Logged(format!("Correct checksum! {hash}",)))
.await;
sender.log(format!("Correct checksum! {hash}")).await;
} else {
let _ = sender
.send(BootEvent::Logged(format!(
sender
.log(format!(
"Invalid checksum. Deleting {filename}...",
filename = file.name
)))
))
.await;
fs::remove_file(&model_path).await?;
@ -122,11 +128,11 @@ impl Assistant {
}
if !fs::try_exists(&model_path).await? {
let _ = sender
.send(BootEvent::Logged(format!(
sender
.log(format!(
"{filename} not found. Starting download...",
filename = file.name
)))
))
.await;
let mut model = io::BufWriter::new(fs::File::create(&model_path).await?);
@ -158,19 +164,14 @@ impl Assistant {
if new_progress > progress {
progress = new_progress;
let _ = sender
.send(BootEvent::Logged(format!(
sender
.log(format!(
"Downloading {file}... {progress}%",
file = file.name,
)))
))
.await;
let _ = sender
.send(BootEvent::Progressed {
stage: "Downloading model...",
percent: progress,
})
.await;
sender.progress("Downloading model...", progress).await;
}
}
@ -180,120 +181,139 @@ impl Assistant {
model.flush().await?;
}
let _ = sender
.send(BootEvent::Progressed {
stage: "Preparing container...",
percent: 0,
})
.await;
sender.progress("Detecting executor...", 0).await;
let _ = sender
.send(BootEvent::Logged(format!(
"Launching {model} with llama.cpp...",
model = file.model.name(),
)))
.await;
let (server, stdout, stderr) = if let Ok(version) =
process::Command::new("llama-server")
.arg("--version")
.output()
.await
{
sender
.log("Local llama-server binary found!".to_owned())
.await;
let command = match backend {
Backend::CPU => {
format!(
"create --rm -p {port}:80 -v {volume}:/models \
let mut lines = version.stdout.lines();
while let Some(line) = lines.next_line().await? {
sender.log(line).await;
}
sender
.log(format!(
"Launching {model} with local llama-server...",
model = file.model.name(),
))
.await;
let mut server = Self::launch_with_executable("llama-server", &file, backend)?;
let stdout = server.stdout.take();
let stderr = server.stderr.take();
(Server::Process(server), stdout, stderr)
} else if let Ok(_docker) = process::Command::new("docker")
.arg("version")
.output()
.await
{
sender
.log(format!(
"Launching {model} with Docker...",
model = file.model.name(),
))
.await;
sender.progress("Preparing container...", 0).await;
let command = match backend {
Backend::CPU => {
format!(
"create --rm -p {port}:80 -v {volume}:/models \
{container} --model models/{filename} --conversation \
--port 80 --host 0.0.0.0",
filename = file.name,
container = Self::LLAMA_CPP_CONTAINER_CPU,
port = Self::HOST_PORT,
volume = Self::MODELS_DIR,
)
}
Backend::CUDA => {
format!(
"create --rm --gpus all -p {port}:80 -v {volume}:/models \
filename = file.name,
container = Self::LLAMA_CPP_CONTAINER_CPU,
port = Self::HOST_PORT,
volume = Self::MODELS_DIR,
)
}
Backend::CUDA => {
format!(
"create --rm --gpus all -p {port}:80 -v {volume}:/models \
{container} --model models/{filename} --conversation \
--port 80 --host 0.0.0.0 --gpu-layers 40",
filename = file.name,
container = Self::LLAMA_CPP_CONTAINER_CUDA,
port = Self::HOST_PORT,
volume = Self::MODELS_DIR,
)
}
};
filename = file.name,
container = Self::LLAMA_CPP_CONTAINER_CUDA,
port = Self::HOST_PORT,
volume = Self::MODELS_DIR,
)
}
};
let mut docker = process::Command::new("docker")
.args(
command
.split(' ')
.map(str::trim)
.filter(|arg| !arg.is_empty()),
)
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let mut docker = process::Command::new("docker")
.args(Self::parse_args(&command))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let notify_progress = {
let mut sender = sender.clone();
let notify_progress = {
let mut sender = sender.clone();
let output = io::BufReader::new(docker.stderr.take().expect("piped stderr"));
let output = io::BufReader::new(docker.stderr.take().expect("piped stderr"));
async move {
let mut lines = output.lines();
while let Ok(Some(log)) = lines.next_line().await {
sender.log(log).await;
}
}
};
let _ = tokio::task::spawn(notify_progress);
let container = {
let output = io::BufReader::new(docker.stdout.take().expect("piped stdout"));
async move {
let mut lines = output.lines();
while let Ok(Some(log)) = lines.next_line().await {
let _ = sender.send(BootEvent::Logged(log)).await;
}
lines
.next_line()
.await?
.ok_or_else(|| Error::DockerFailed("no container id returned by docker"))?
};
if !docker.wait().await?.success() {
return Err(Error::DockerFailed("failed to create container"));
}
sender.progress("Launching assistant...", 99).await;
let server = Server::Container(container.clone());
let _start = process::Command::new("docker")
.args(&["start", &container])
.output()
.await?;
let mut logs = process::Command::new("docker")
.args(&["logs", "-f", &container])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
(server, logs.stdout.take(), logs.stderr.take())
} else {
return Err(Error::NoExecutorAvailable);
};
let _ = tokio::task::spawn(notify_progress);
let container = {
let output = io::BufReader::new(docker.stdout.take().expect("piped stdout"));
let mut lines = output.lines();
lines
.next_line()
.await?
.ok_or_else(|| Error::DockerFailed("no container id returned by docker"))?
};
if !docker.wait().await?.success() {
return Err(Error::DockerFailed("failed to create container"));
}
let _ = sender
.send(BootEvent::Progressed {
stage: "Launching assistant...",
percent: 99,
})
.await;
let assistant = Self {
model: file.model,
_container: Arc::new(Container {
id: container.clone(),
}),
};
let _start = process::Command::new("docker")
.args(&["start", &container])
.output()
.await?;
let mut logs = process::Command::new("docker")
.args(&["logs", "-f", &container])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
let mut lines = {
use futures::stream;
use tokio_stream::wrappers::LinesStream;
let stdout = io::BufReader::new(logs.stdout.take().expect("piped stdout"));
let stderr = io::BufReader::new(logs.stderr.take().expect("piped stderr"));
let stdout = io::BufReader::new(stdout.expect("piped stdout"));
let stderr = io::BufReader::new(stderr.expect("piped stderr"));
stream::select(
LinesStream::new(stdout.lines()),
@ -302,22 +322,21 @@ impl Assistant {
};
while let Some(log) = lines.next().await.transpose()? {
let message = log
.split("\u{1b}")
.last()
.map(|log| log.chars().skip(4))
.unwrap_or(log.chars().skip(0));
let _ = sender.send(BootEvent::Logged(message.collect())).await;
if log.contains("HTTP server listening") {
let _ = sender.send(BootEvent::Finished(assistant)).await;
sender
.finish(Assistant {
model: file.model.clone(),
_server: Arc::new(server),
})
.await;
return Ok(());
}
sender.log(log).await;
}
Err(Error::DockerFailed("container stopped unexpectedly"))
Err(Error::ExecutorFailed("llama-server exited unexpectedly"))
})
}
@ -433,6 +452,37 @@ impl Assistant {
pub fn name(&self) -> &str {
self.model.name()
}
fn launch_with_executable(
executable: &'static str,
file: &File,
backend: Backend,
) -> Result<process::Child, Error> {
let gpu_flags = match backend {
Backend::CPU => "",
Backend::CUDA => "--gpu-layers 40",
};
let server = process::Command::new(executable)
.args(Self::parse_args(&format!(
"--model models/{filename} --conversation \
--port 80 --host 0.0.0.0 {gpu_flags}",
filename = file.name,
)))
.kill_on_drop(true)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()?;
Ok(server)
}
fn parse_args(command: &str) -> impl Iterator<Item = &str> {
command
.split(' ')
.map(str::trim)
.filter(|arg| !arg.is_empty())
}
}
#[derive(Debug, Clone)]
@ -478,21 +528,27 @@ impl From<serde_json::Error> for ChatError {
}
}
#[derive(Debug, Clone)]
struct Container {
id: String,
#[derive(Debug)]
enum Server {
Container(String),
Process(process::Child),
}
impl Drop for Container {
impl Drop for Server {
fn drop(&mut self) {
use std::process;
let _ = process::Command::new("docker")
.args(&["stop", &self.id])
.stdin(process::Stdio::null())
.stdout(process::Stdio::null())
.stderr(process::Stdio::null())
.spawn();
match self {
Self::Container(id) => {
let _ = process::Command::new("docker")
.args(&["stop", id])
.stdin(process::Stdio::null())
.stdout(process::Stdio::null())
.stderr(process::Stdio::null())
.spawn();
}
Self::Process(_process) => {}
}
}
}
@ -655,6 +711,10 @@ pub enum Error {
IOFailed(Arc<io::Error>),
#[error("docker operation failed: {0}")]
DockerFailed(&'static str),
#[error("executor failed: {0}")]
ExecutorFailed(&'static str),
#[error("no suitable executor was found: neither llama-server nor docker are installed")]
NoExecutorAvailable,
}
impl From<reqwest::Error> for Error {