sheave_server/server.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133
mod message_id_provider;
use std::{
future::Future,
io::{
Result as IOResult
},
marker::PhantomData,
pin::{
Pin,
pin
},
sync::Arc,
task::{
Context as FutureContext,
Poll
}
};
use tokio::io::{
AsyncRead,
AsyncWrite
};
use sheave_core::handlers::{
RtmpContext,
StreamWrapper,
HandlerConstructor
};
pub use self::message_id_provider::*;
/// # The server instance of the Sheave
///
/// This consists of:
///
/// * Some stream instance which can both of read and write.
/// * Context data in the server.
/// * Some type parameter which implemented the [`HandlerConstructor`] trait.
///
/// The server wraps streams into [`Arc`] as a way of sharing streams among communication steps.
/// And also wraps contexts because of the same purpose.
///
/// The server makes any foreign handler to be able to construct via the [`PhantomData`], where a type parameter of [`PhantomData`] requires to implement the [`HandlerConstructor`] trait.
/// That is, its type parameter behaves as the constructor injection.
///
/// ## Examples
///
/// ```rust
/// use std::{
/// io::Result as IOResult,
/// marker::PhantomData,
/// pin::Pin,
/// sync::Arc,
/// task::{
/// Context as FutureContext,
/// Poll
/// }
/// };
/// use tokio::io::{
/// AsyncRead,
/// AsyncWrite
/// };
/// use sheave_core::handlers::{
/// AsyncHandler,
/// HandlerConstructor,
/// RtmpContext,
/// StreamWrapper,
/// VecStream
/// };
/// use sheave_server::Server;
///
/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
///
/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
/// fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
/// Poll::Ready(Ok(()))
/// }
/// }
///
/// impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for SomethingHandler<RW> {
/// fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
/// Self(stream)
/// }
/// }
///
/// #[tokio::main]
/// async fn main() {
/// let stream = VecStream::default();
/// let rtmp_context = RtmpContext::default();
/// let mut server = Server::new(stream, rtmp_context, PhantomData::<SomethingHandler<VecStream>>);
/// let result = server.await;
/// assert!(result.is_ok())
/// }
/// ```
///
/// [`Arc`]: std::sync::Arc
/// [`PhantomData`]: std::marker::PhantomData
/// [`HandlerConstructor`]: sheave_core::handlers::HandlerConstructor
#[derive(Debug)]
pub struct Server<RW, C>
where
RW: AsyncRead + AsyncWrite + Unpin,
C: HandlerConstructor<StreamWrapper<RW>>
{
stream: Arc<StreamWrapper<RW>>,
rtmp_context: Arc<RtmpContext>,
handler_constructor: PhantomData<C>
}
impl<RW, C> Server<RW, C>
where
RW: AsyncRead + AsyncWrite + Unpin,
C: HandlerConstructor<StreamWrapper<RW>>
{
/// Constructs a Server instance.
pub fn new(stream: RW, rtmp_context: RtmpContext, handler_constructor: PhantomData<C>) -> Self {
Self {
stream: Arc::new(StreamWrapper::new(stream)),
rtmp_context: Arc::new(rtmp_context),
handler_constructor
}
}
}
impl<RW, C> Future for Server<RW, C>
where
RW: AsyncRead + AsyncWrite + Unpin,
C: HandlerConstructor<StreamWrapper<RW>>
{
type Output = IOResult<()>;
fn poll(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
pin!(C::new(Arc::clone(&self.stream))).poll_handle(cx, self.rtmp_context.make_weak_mut())
}
}