pub trait AsyncHandlerExt: AsyncHandler {
// Provided methods
fn chain<H>(self, next: H) -> Chain<Self, H>
where H: AsyncHandler + Unpin,
Self: Sized + Unpin { ... }
fn wrap<M>(self, middleware: M) -> Wrap<M, Self>
where M: Middleware + Unpin,
Self: Sized + Unpin { ... }
fn while_ok<H>(self, body: H) -> WhileOk<Self, H>
where H: AsyncHandler + Unpin,
Self: Sized + Unpin { ... }
fn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
where E: ErrorHandler + Unpin,
Self: Sized + Unpin { ... }
}
Expand description
Provided Methods§
Sourcefn chain<H>(self, next: H) -> Chain<Self, H>
fn chain<H>(self, next: H) -> Chain<Self, H>
Chains this handler with next
.
§Examples
use std::{
io::Result as IOResult,
pin::Pin,
sync::Arc,
task::{
Context as FutureContext,
Poll
}
};
use futures::future::poll_fn;
use tokio::io::{
AsyncRead,
AsyncWrite
};
use sheave_core::handlers::{
AsyncHandler,
RtmpContext
};
struct HandlerA<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
struct HandlerB<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerA<RW> {
fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
// Something to handle.
Poll::Ready(Ok(()))
}
}
impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerB<RW> {
fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
// Something to handle.
Poll::Ready(Ok(()))
}
}
#[tokio::main]
async fn main() -> IOResult<()> {
poll_fn(
|cx| {
use std::pin::pin;
use sheave_core::handlers::{
AsyncHandlerExt,
StreamWrapper,
VecStream
};
let stream = Arc::new(StreamWrapper::new(VecStream::default()));
pin!(
HandlerA(Arc::clone(&stream))
.chain(HandlerB(Arc::clone(&stream)))
).poll_handle(cx, &mut RtmpContext::default())
}
).await
}
Sourcefn wrap<M>(self, middleware: M) -> Wrap<M, Self>
fn wrap<M>(self, middleware: M) -> Wrap<M, Self>
Wraps previous handlers into a middleware.
§Examples
use std::{
io::Result as IOResult,
pin::Pin,
sync::Arc,
task::{
Context as FutureContext,
Poll
}
};
use futures::{
future::poll_fn,
ready
};
use tokio::io::{
AsyncRead,
AsyncWrite
};
use sheave_core::handlers::{
AsyncHandler,
Middleware,
RtmpContext
};
struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
// Something to handle.
Poll::Ready(Ok(()))
}
}
struct SomethingMiddleware<'a, W: Unpin>(Pin<&'a mut W>);
impl<W: Unpin> Middleware for SomethingMiddleware<'_, W> {
fn poll_handle_wrapped<H: AsyncHandler + Unpin>(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, handler: Pin<&mut H>) -> Poll<IOResult<()>> {
println!("Starts wrapping.");
ready!(handler.poll_handle(cx, rtmp_context))?;
println!("Ends wrapping.");
Poll::Ready(Ok(()))
}
}
#[tokio::main]
async fn main() {
let result = poll_fn(
|cx| {
use std::pin::pin;
use sheave_core::handlers::{
AsyncHandlerExt,
StreamWrapper,
VecStream
};
let stream = Arc::new(StreamWrapper::new(VecStream::default()));
pin!(
SomethingHandler(Arc::clone(&stream))
.wrap(SomethingMiddleware(stream.make_weak_pin()))
).poll_handle(cx, &mut RtmpContext::default())
}
).await;
assert!(result.is_ok())
}
Sourcefn while_ok<H>(self, body: H) -> WhileOk<Self, H>
fn while_ok<H>(self, body: H) -> WhileOk<Self, H>
Loops while the body returns Ok(())
or Pending
.
§Examples
use std::{
io::{
Error as IOError,
ErrorKind,
Result as IOResult
},
pin::Pin,
sync::Arc,
task::{
Context as FutureContext,
Poll
}
};
use futures::future::poll_fn;
use tokio::io::{
AsyncRead,
AsyncWrite
};
use sheave_core::handlers::{
AsyncHandler,
RtmpContext,
StreamWrapper
};
struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
// Something to handle.
Poll::Ready(Ok(()))
}
}
struct AnotherHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for AnotherHandler<RW> {
fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
Poll::Ready(Err(IOError::from(ErrorKind::Other)))
}
}
#[tokio::main]
async fn main() {
let result = poll_fn(
|cx| {
use std::pin::pin;
use sheave_core::handlers::{
AsyncHandlerExt,
VecStream
};
let stream = Arc::new(StreamWrapper::new(VecStream::default()));
pin!(
SomethingHandler(Arc::clone(&stream))
.while_ok(AnotherHandler(Arc::clone(&stream)))
).poll_handle(cx, &mut RtmpContext::default())
}
).await;
assert!(result.is_err())
}
Sourcefn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
fn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
Handles some error when previous handler returns Err
.
§Examples
use std::{
io::{
Error as IOError,
Result as IOResult
},
pin::Pin,
sync::Arc,
task::{
Context as FutureContext,
Poll
}
};
use futures::future::poll_fn;
use tokio::io::{
AsyncRead,
AsyncWrite
};
use sheave_core::handlers::{
AsyncHandler,
ErrorHandler,
RtmpContext
};
struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
Poll::Ready(Err(IOError::other("Something Wrong.")))
}
}
struct SomethingWrongHandler<'a, RW>(Pin<&'a mut RW>);
impl<RW> ErrorHandler for SomethingWrongHandler<'_, RW> {
fn poll_handle_error(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
println!("{error}");
// This `Ok` means that handled its error successfully.
Poll::Ready(Ok(()))
}
}
#[tokio::main]
async fn main() {
let result = poll_fn(
|cx| {
use std::pin::pin;
use sheave_core::handlers::{
AsyncHandlerExt,
StreamWrapper,
VecStream
};
let stream = Arc::new(StreamWrapper::new(VecStream::default()));
pin!(
SomethingHandler(Arc::clone(&stream))
.map_err(SomethingWrongHandler(stream.make_weak_pin()))
).poll_handle(cx, &mut RtmpContext::default())
}
).await;
assert!(result.is_ok())
}