fix streaming requests and clarify in docs
This commit is contained in:
parent
94cb4c0ec3
commit
25120c0e9f
|
@ -50,7 +50,6 @@ pub fn HomePage() -> impl IntoView {
|
|||
<RkyvExample/>
|
||||
<FileUpload/>
|
||||
<FileWatcher/>
|
||||
<StreamingValues/>
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -507,47 +506,3 @@ pub fn CustomErrorTypes() -> impl IntoView {
|
|||
</p>
|
||||
}
|
||||
}
|
||||
|
||||
#[component]
|
||||
pub fn StreamingValues() -> impl IntoView {
|
||||
use futures::StreamExt;
|
||||
|
||||
/// You can create server functions that accept streaming values by using the encoding
|
||||
/// `Streaming` (with type `ByteStream`) or encoding `StreamingText` (with type `TextStream`)
|
||||
#[server(input = StreamingText, output = StreamingText)]
|
||||
pub async fn streaming(input: TextStream) -> Result<TextStream, ServerFnError> {
|
||||
println!("inside streaming() fn");
|
||||
Ok(TextStream::from(input.into_inner().map(|text| format!("{}!!!", text.unwrap_or_else(|e| e.to_string())))))
|
||||
}
|
||||
|
||||
let mut count = 0;
|
||||
let (tx, rx) = futures::channel::mpsc::unbounded();
|
||||
let (result, set_result) = create_signal("Click me...".to_string());
|
||||
|
||||
|
||||
if cfg!(feature = "hydrate") {
|
||||
spawn_local(async move {
|
||||
logging::log!("calling streaming server fn");
|
||||
match streaming(TextStream::new(rx)).await {
|
||||
Ok(res) => {
|
||||
logging::log!("after calling streaming()");
|
||||
let mut stream = res.into_inner();
|
||||
while let Some(chunk) = stream.next().await {
|
||||
set_result(chunk.unwrap_or_else(|e| e.to_string()));
|
||||
}
|
||||
}, Err(e) => logging::log!("{e}") }
|
||||
})
|
||||
}
|
||||
|
||||
view! {
|
||||
<h3>Streaming arguments and responses</h3>
|
||||
<button
|
||||
on:click=move |_| {
|
||||
count += 1;
|
||||
tx.unbounded_send(Ok(count.to_string())).expect("couldn't send into channel");
|
||||
}
|
||||
>
|
||||
{result}
|
||||
</button>
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,15 @@ use std::{fmt::Debug, pin::Pin};
|
|||
/// An encoding that represents a stream of bytes.
|
||||
///
|
||||
/// A server function that uses this as its output encoding should return [`ByteStream`].
|
||||
///
|
||||
/// ## Browser Support for Streaming Input
|
||||
///
|
||||
/// Browser fetch requests do not currently support full request duplexing, which
|
||||
/// means that that they do begin handling responses until the full request has been sent.
|
||||
/// This means that if you use a streaming input encoding, the input stream needs to
|
||||
/// end before the output will begin.
|
||||
///
|
||||
/// Streaming requests are only allowed over HTTP2 or HTTP3.
|
||||
pub struct Streaming;
|
||||
|
||||
impl Encoding for Streaming {
|
||||
|
@ -49,6 +58,15 @@ where
|
|||
/// A stream of bytes.
|
||||
///
|
||||
/// A server function can return this type if its output encoding is [`Streaming`].
|
||||
///
|
||||
/// ## Browser Support for Streaming Input
|
||||
///
|
||||
/// Browser fetch requests do not currently support full request duplexing, which
|
||||
/// means that that they do begin handling responses until the full request has been sent.
|
||||
/// This means that if you use a streaming input encoding, the input stream needs to
|
||||
/// end before the output will begin.
|
||||
///
|
||||
/// Streaming requests are only allowed over HTTP2 or HTTP3.
|
||||
pub struct ByteStream<CustErr = NoCustomError>(
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, ServerFnError<CustErr>>> + Send>>,
|
||||
);
|
||||
|
@ -115,10 +133,14 @@ where
|
|||
///
|
||||
/// A server function that uses this as its output encoding should return [`TextStream`].
|
||||
///
|
||||
/// **Note**: Browser fetch requests do not currently support full request duplexing, which
|
||||
/// ## Browser Support for Streaming Input
|
||||
///
|
||||
/// Browser fetch requests do not currently support full request duplexing, which
|
||||
/// means that that they do begin handling responses until the full request has been sent.
|
||||
/// This means that if you use streaming text as an input encoding, the input stream needs to
|
||||
/// This means that if you use a streaming input encoding, the input stream needs to
|
||||
/// end before the output will begin.
|
||||
///
|
||||
/// Streaming requests are only allowed over HTTP2 or HTTP3.
|
||||
pub struct StreamingText;
|
||||
|
||||
impl Encoding for StreamingText {
|
||||
|
@ -130,10 +152,14 @@ impl Encoding for StreamingText {
|
|||
///
|
||||
/// A server function can return this type if its output encoding is [`StreamingText`].
|
||||
///
|
||||
/// **Note**: Browser fetch requests do not currently support full request duplexing, which
|
||||
/// ## Browser Support for Streaming Input
|
||||
///
|
||||
/// Browser fetch requests do not currently support full request duplexing, which
|
||||
/// means that that they do begin handling responses until the full request has been sent.
|
||||
/// This means that if you use streaming text as an input encoding, the input stream needs to
|
||||
/// This means that if you use a streaming input encoding, the input stream needs to
|
||||
/// end before the output will begin.
|
||||
///
|
||||
/// Streaming requests are only allowed over HTTP2 or HTTP3.
|
||||
pub struct TextStream<CustErr = NoCustomError>(
|
||||
Pin<Box<dyn Stream<Item = Result<String, ServerFnError<CustErr>>> + Send>>,
|
||||
);
|
||||
|
|
|
@ -3,11 +3,11 @@ use crate::{client::get_server_url, error::ServerFnError};
|
|||
use bytes::Bytes;
|
||||
use futures::{Stream, StreamExt};
|
||||
pub use gloo_net::http::Request;
|
||||
use js_sys::Uint8Array;
|
||||
use js_sys::{Reflect, Uint8Array};
|
||||
use send_wrapper::SendWrapper;
|
||||
use wasm_bindgen::JsValue;
|
||||
use wasm_streams::ReadableStream;
|
||||
use web_sys::{FormData, UrlSearchParams};
|
||||
use web_sys::{FormData, Headers, RequestInit, UrlSearchParams};
|
||||
|
||||
/// A `fetch` request made in the browser.
|
||||
#[derive(Debug)]
|
||||
|
@ -143,17 +143,36 @@ impl<CustErr> ClientReq<CustErr> for BrowserRequest {
|
|||
content_type: &str,
|
||||
body: impl Stream<Item = Bytes> + 'static,
|
||||
) -> Result<Self, ServerFnError<CustErr>> {
|
||||
let stream = ReadableStream::from_stream(body.map(|bytes| {
|
||||
let data = Uint8Array::from(bytes.as_ref());
|
||||
let data = JsValue::from(data);
|
||||
Ok(data) as Result<JsValue, JsValue>
|
||||
}));
|
||||
Ok(Self(SendWrapper::new(
|
||||
Request::post(path)
|
||||
.header("Content-Type", content_type)
|
||||
.header("Accept", accepts)
|
||||
.body(stream.into_raw())
|
||||
.map_err(|e| ServerFnError::Request(e.to_string()))?,
|
||||
)))
|
||||
let req = streaming_request(path, accepts, content_type, body)
|
||||
.map_err(|e| ServerFnError::Request(format!("{e:?}")))?;
|
||||
Ok(Self(SendWrapper::new(req)))
|
||||
}
|
||||
}
|
||||
|
||||
fn streaming_request(
|
||||
path: &str,
|
||||
accepts: &str,
|
||||
content_type: &str,
|
||||
body: impl Stream<Item = Bytes> + 'static,
|
||||
) -> Result<Request, JsValue> {
|
||||
let stream = ReadableStream::from_stream(body.map(|bytes| {
|
||||
let data = Uint8Array::from(bytes.as_ref());
|
||||
let data = JsValue::from(data);
|
||||
Ok(data) as Result<JsValue, JsValue>
|
||||
}))
|
||||
.into_raw();
|
||||
let headers = Headers::new()?;
|
||||
headers.append("Content-Type", content_type)?;
|
||||
headers.append("Accept", accepts)?;
|
||||
let mut init = RequestInit::new();
|
||||
init.headers(&headers).method("POST").body(Some(&stream));
|
||||
|
||||
// Chrome requires setting `duplex: "half"` on streaming requests
|
||||
Reflect::set(
|
||||
&init,
|
||||
&JsValue::from_str("duplex"),
|
||||
&JsValue::from_str("half"),
|
||||
)?;
|
||||
let req = web_sys::Request::new_with_str_and_init(path, &init)?;
|
||||
Ok(Request::from(req))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue