sheave_server/
server.rs

1use std::{
2    future::Future,
3    io::{
4        Result as IOResult
5    },
6    marker::PhantomData,
7    pin::{
8        Pin,
9        pin
10    },
11    sync::Arc,
12    task::{
13        Context as FutureContext,
14        Poll
15    }
16};
17use tokio::io::{
18    AsyncRead,
19    AsyncWrite
20};
21use sheave_core::handlers::{
22    RtmpContext,
23    StreamWrapper,
24    HandlerConstructor
25};
26
27/// # The server instance of the Sheave
28///
29/// This consists of:
30///
31/// * Some stream instance which can both of read and write.
32/// * Context data in the server.
33/// * Some type parameter which implemented the [`HandlerConstructor`] trait.
34///
35/// The server wraps streams into [`Arc`] as a way of sharing streams among communication steps.
36/// And also wraps contexts because of the same purpose.
37///
38/// The server makes any foreign handler to be able to construct via the [`PhantomData`], where a type parameter of [`PhantomData`] requires to implement the [`HandlerConstructor`] trait.
39/// That is, its type parameter behaves as the constructor injection.
40///
41/// ## Examples
42///
43/// ```rust
44/// use std::{
45///     io::Result as IOResult,
46///     marker::PhantomData,
47///     pin::Pin,
48///     sync::Arc,
49///     task::{
50///         Context as FutureContext,
51///         Poll
52///     }
53/// };
54/// use tokio::io::{
55///     AsyncRead,
56///     AsyncWrite
57/// };
58/// use sheave_core::handlers::{
59///     AsyncHandler,
60///     HandlerConstructor,
61///     RtmpContext,
62///     StreamWrapper,
63///     VecStream
64/// };
65/// use sheave_server::Server;
66///
67/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
68///
69/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
70///     fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
71///         Poll::Ready(Ok(()))
72///     }
73/// }
74///
75/// impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for SomethingHandler<RW> {
76///     fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
77///         Self(stream)
78///     }
79/// }
80///
81/// #[tokio::main]
82/// async fn main() {
83///     let stream = VecStream::default();
84///     let rtmp_context = RtmpContext::default();
85///     let mut server = Server::new(stream, rtmp_context, PhantomData::<SomethingHandler<VecStream>>);
86///     let result = server.await;
87///     assert!(result.is_ok())
88/// }
89/// ```
90///
91/// [`Arc`]: std::sync::Arc
92/// [`PhantomData`]: std::marker::PhantomData
93/// [`HandlerConstructor`]: sheave_core::handlers::HandlerConstructor
94#[derive(Debug)]
95pub struct Server<RW, C>
96where
97    RW: AsyncRead + AsyncWrite + Unpin,
98    C: HandlerConstructor<StreamWrapper<RW>>
99{
100    stream: Arc<StreamWrapper<RW>>,
101    rtmp_context: Arc<RtmpContext>,
102    handler_constructor: PhantomData<C>
103}
104
105impl<RW, C> Server<RW, C>
106where
107    RW: AsyncRead + AsyncWrite + Unpin,
108    C: HandlerConstructor<StreamWrapper<RW>>
109{
110    /// Constructs a Server instance.
111    pub fn new(stream: RW, rtmp_context: RtmpContext, handler_constructor: PhantomData<C>) -> Self {
112        Self {
113            stream: Arc::new(StreamWrapper::new(stream)),
114            rtmp_context: Arc::new(rtmp_context),
115            handler_constructor
116        }
117    }
118}
119
120impl<RW, C> Future for Server<RW, C>
121where
122    RW: AsyncRead + AsyncWrite + Unpin,
123    C: HandlerConstructor<StreamWrapper<RW>>
124{
125    type Output = IOResult<()>;
126
127    fn poll(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
128        pin!(C::new(Arc::clone(&self.stream))).poll_handle(cx, self.rtmp_context.make_weak_mut())
129    }
130}