sheave_core::handlers

Trait AsyncHandlerExt

Source
pub trait AsyncHandlerExt: AsyncHandler {
    // Provided methods
    fn chain<H>(self, next: H) -> Chain<Self, H>
       where H: AsyncHandler + Unpin,
             Self: Sized + Unpin { ... }
    fn wrap<M>(self, middleware: M) -> Wrap<M, Self>
       where M: Middleware + Unpin,
             Self: Sized + Unpin { ... }
    fn while_ok<H>(self, body: H) -> WhileOk<Self, H>
       where H: AsyncHandler + Unpin,
             Self: Sized + Unpin { ... }
    fn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
       where E: ErrorHandler + Unpin,
             Self: Sized + Unpin { ... }
}
Expand description

The extension methods for handlers.

Currently following extensions have been implemented.

Provided Methods§

Source

fn chain<H>(self, next: H) -> Chain<Self, H>
where H: AsyncHandler + Unpin, Self: Sized + Unpin,

Chains this handler with next.

§Examples
use std::{
    io::Result as IOResult,
    pin::Pin,
    sync::Arc,
    task::{
        Context as FutureContext,
        Poll
    }
};
use futures::future::poll_fn;
use tokio::io::{
    AsyncRead,
    AsyncWrite
};
use sheave_core::handlers::{
    AsyncHandler,
    RtmpContext
};

struct HandlerA<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
struct HandlerB<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);

impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerA<RW> {
    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
        // Something to handle.

        Poll::Ready(Ok(()))
    }
}

impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerB<RW> {
    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
        // Something to handle.

        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() -> IOResult<()> {
    poll_fn(
        |cx| {
            use std::pin::pin;
            use sheave_core::handlers::{
                AsyncHandlerExt,
                StreamWrapper,
                VecStream
            };

            let stream = Arc::new(StreamWrapper::new(VecStream::default()));
            pin!(
                HandlerA(Arc::clone(&stream))
                    .chain(HandlerB(Arc::clone(&stream)))
            ).poll_handle(cx, &mut RtmpContext::default())
        }
    ).await
}
Source

fn wrap<M>(self, middleware: M) -> Wrap<M, Self>
where M: Middleware + Unpin, Self: Sized + Unpin,

Wraps previous handlers into a middleware.

§Examples
use std::{
    io::Result as IOResult,
    pin::Pin,
    sync::Arc,
    task::{
        Context as FutureContext,
        Poll
    }
};
use futures::{
    future::poll_fn,
    ready
};
use tokio::io::{
    AsyncRead,
    AsyncWrite
};
use sheave_core::handlers::{
    AsyncHandler,
    Middleware,
    RtmpContext
};

struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);

impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
        // Something to handle.

        Poll::Ready(Ok(()))
    }
}

struct SomethingMiddleware<'a, W: Unpin>(Pin<&'a mut W>);

impl<W: Unpin> Middleware for SomethingMiddleware<'_, W> {
    fn poll_handle_wrapped<H: AsyncHandler + Unpin>(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, handler: Pin<&mut H>) -> Poll<IOResult<()>> {
        println!("Starts wrapping.");
        ready!(handler.poll_handle(cx, rtmp_context))?;
        println!("Ends wrapping.");
        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() {
    let result = poll_fn(
        |cx| {
            use std::pin::pin;
            use sheave_core::handlers::{
                AsyncHandlerExt,
                StreamWrapper,
                VecStream
            };

            let stream = Arc::new(StreamWrapper::new(VecStream::default()));
            pin!(
                SomethingHandler(Arc::clone(&stream))
                    .wrap(SomethingMiddleware(stream.make_weak_pin()))
            ).poll_handle(cx, &mut RtmpContext::default())
        }
    ).await;
    assert!(result.is_ok())
}
Source

fn while_ok<H>(self, body: H) -> WhileOk<Self, H>
where H: AsyncHandler + Unpin, Self: Sized + Unpin,

Loops while the body returns Ok(()) or Pending.

§Examples
use std::{
    io::{
        Error as IOError,
        ErrorKind,
        Result as IOResult
    },
    pin::Pin,
    sync::Arc,
    task::{
        Context as FutureContext,
        Poll
    }
};
use futures::future::poll_fn;
use tokio::io::{
    AsyncRead,
    AsyncWrite
};
use sheave_core::handlers::{
    AsyncHandler,
    RtmpContext,
    StreamWrapper
};

struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);

impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
    fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
        // Something to handle.

        Poll::Ready(Ok(()))
    }
}

struct AnotherHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);

impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for AnotherHandler<RW> {
    fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
        Poll::Ready(Err(IOError::from(ErrorKind::Other)))
    }
}

#[tokio::main]
async fn main() {
    let result = poll_fn(
        |cx| {
            use std::pin::pin;
            use sheave_core::handlers::{
                AsyncHandlerExt,
                VecStream
            };

            let stream = Arc::new(StreamWrapper::new(VecStream::default()));
            pin!(
                SomethingHandler(Arc::clone(&stream))
                    .while_ok(AnotherHandler(Arc::clone(&stream)))
            ).poll_handle(cx, &mut RtmpContext::default())
        }
    ).await;
    assert!(result.is_err())
}
Source

fn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
where E: ErrorHandler + Unpin, Self: Sized + Unpin,

Handles some error when previous handler returns Err.

§Examples
use std::{
    io::{
        Error as IOError,
        Result as IOResult
    },
    pin::Pin,
    sync::Arc,
    task::{
        Context as FutureContext,
        Poll
    }
};
use futures::future::poll_fn;
use tokio::io::{
    AsyncRead,
    AsyncWrite
};
use sheave_core::handlers::{
    AsyncHandler,
    ErrorHandler,
    RtmpContext
};

struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);

impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
    fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
        Poll::Ready(Err(IOError::other("Something Wrong.")))
    }
}

struct SomethingWrongHandler<'a, RW>(Pin<&'a mut RW>);

impl<RW> ErrorHandler for SomethingWrongHandler<'_, RW> {
    fn poll_handle_error(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
        println!("{error}");

        // This `Ok` means that handled its error successfully.
        Poll::Ready(Ok(()))
    }
}

#[tokio::main]
async fn main() {
    let result = poll_fn(
        |cx| {
            use std::pin::pin;
            use sheave_core::handlers::{
                AsyncHandlerExt,
                StreamWrapper,
                VecStream
            };

            let stream = Arc::new(StreamWrapper::new(VecStream::default()));
            pin!(
                SomethingHandler(Arc::clone(&stream))
                    .map_err(SomethingWrongHandler(stream.make_weak_pin()))
            ).poll_handle(cx, &mut RtmpContext::default())
        }
    ).await;
    assert!(result.is_ok())
}

Implementors§