fix: dispose of runtime when stream is actually finished (closes #1097) (#1110)

This commit is contained in:
Greg Johnston 2023-05-28 13:44:31 -04:00 committed by GitHub
parent 475566837e
commit 5d70275c3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 50 additions and 41 deletions

View File

@ -600,6 +600,7 @@ where
let res_options3 = default_res_options.clone();
let local_pool = get_leptos_pool();
let (tx, rx) = futures::channel::mpsc::channel(8);
let (runtime_tx, runtime_rx) = futures::channel::oneshot::channel();
let current_span = tracing::Span::current();
local_pool.spawn_pinned(move || async move {
@ -624,9 +625,17 @@ where
replace_blocks
);
runtime_tx.send(runtime).expect("should be able to send runtime");
forward_stream(&options, res_options2, bundle, runtime, scope, tx).await;
}.instrument(current_span));
async move { generate_response(res_options3, rx).await }
async move {
let runtime = runtime_rx
.await
.expect("runtime should be sent by renderer");
generate_response(res_options3, rx, runtime).await
}
})
}
}
@ -635,6 +644,7 @@ where
async fn generate_response(
res_options: ResponseOptions,
rx: Receiver<String>,
runtime: RuntimeId,
) -> Response<StreamBody<PinnedHtmlStream>> {
let mut stream = Box::pin(rx.map(|html| Ok(Bytes::from(html))));
@ -647,7 +657,11 @@ async fn generate_response(
let complete_stream =
futures::stream::iter([first_chunk.unwrap(), second_chunk.unwrap()])
.chain(stream);
.chain(stream)
.chain(futures::stream::once(async move {
runtime.dispose();
Ok(Default::default())
}));
let mut res = Response::new(StreamBody::new(
Box::pin(complete_stream) as PinnedHtmlStream
@ -689,8 +703,6 @@ async fn forward_stream(
let mut writable = res_options2.0.write();
*writable = new_res_parts;
runtime.dispose();
tx.close_channel();
}
@ -758,6 +770,8 @@ where
let full_path = format!("http://leptos.dev{path}");
let (tx, rx) = futures::channel::mpsc::channel(8);
let (runtime_tx, runtime_rx) =
futures::channel::oneshot::channel();
let local_pool = get_leptos_pool();
let current_span = tracing::Span::current();
local_pool.spawn_pinned(|| async move {
@ -777,10 +791,15 @@ where
add_context,
);
runtime_tx.send(runtime).expect("should be able to send runtime");
forward_stream(&options, res_options2, bundle, runtime, scope, tx).await;
}.instrument(current_span));
generate_response(res_options3, rx).await
let runtime = runtime_rx
.await
.expect("runtime should be sent by renderer");
generate_response(res_options3, rx, runtime).await
}
})
}

View File

@ -190,29 +190,27 @@ pub fn render_to_stream_with_prefix_undisposed_with_context_and_block_replacemen
// create the runtime
let runtime = create_runtime();
let (
(shell, pending_resources, pending_fragments, serializers),
scope,
disposer,
) = run_scope_undisposed(runtime, {
move |cx| {
// Add additional context items
additional_context(cx);
// the actual app body/template code
// this does NOT contain any of the data being loaded asynchronously in resources
let shell = view(cx).render_to_string(cx);
let ((shell, pending_resources, pending_fragments, serializers), scope, _) =
run_scope_undisposed(runtime, {
move |cx| {
// Add additional context items
additional_context(cx);
// the actual app body/template code
// this does NOT contain any of the data being loaded asynchronously in resources
let shell = view(cx).render_to_string(cx);
let resources = cx.pending_resources();
let pending_resources = serde_json::to_string(&resources).unwrap();
let resources = cx.pending_resources();
let pending_resources =
serde_json::to_string(&resources).unwrap();
(
shell,
pending_resources,
cx.pending_fragments(),
cx.serialization_resolvers(),
)
}
});
(
shell,
pending_resources,
cx.pending_fragments(),
cx.serialization_resolvers(),
)
}
});
let cx = Scope { runtime, id: scope };
let mut blocking_fragments = FuturesUnordered::new();
@ -280,12 +278,7 @@ pub fn render_to_stream_with_prefix_undisposed_with_context_and_block_replacemen
// TODO these should be combined again in a way that chains them appropriately
// such that individual resources can resolve before all fragments are done
.chain(fragments)
.chain(resources)
// dispose of the root scope
.chain(futures::stream::once(async move {
disposer.dispose();
Default::default()
}));
.chain(resources);
(stream, runtime, scope)
}

View File

@ -102,7 +102,7 @@ pub fn render_to_stream_in_order_with_prefix_undisposed_with_context(
serializers,
),
scope_id,
disposer,
_,
) = run_scope_undisposed(runtime, |cx| {
// add additional context
additional_context(cx);
@ -147,12 +147,7 @@ pub fn render_to_stream_in_order_with_prefix_undisposed_with_context(
)
})
.chain(rx)
.chain(render_serializers(serializers))
// dispose of the scope
.chain(futures::stream::once(async move {
disposer.dispose();
Default::default()
}));
.chain(render_serializers(serializers));
(stream, runtime, scope_id)
}

View File

@ -56,7 +56,7 @@ where
IV: IntoView + 'static,
{
let runtime = create_runtime();
run_scope(runtime, move |cx| {
let routes = run_scope(runtime, move |cx| {
let integration = ServerIntegration {
path: "http://leptos.rs/".to_string(),
};
@ -94,5 +94,7 @@ where
})
})
.collect()
})
});
runtime.dispose();
routes
}