sheave_server/server.rs
1use std::{
2 future::Future,
3 io::{
4 Result as IOResult
5 },
6 marker::PhantomData,
7 pin::{
8 Pin,
9 pin
10 },
11 sync::Arc,
12 task::{
13 Context as FutureContext,
14 Poll
15 }
16};
17use tokio::io::{
18 AsyncRead,
19 AsyncWrite
20};
21use sheave_core::handlers::{
22 RtmpContext,
23 StreamWrapper,
24 HandlerConstructor
25};
26
27/// # The server instance of the Sheave
28///
29/// This consists of:
30///
31/// * Some stream instance which can both of read and write.
32/// * Context data in the server.
33/// * Some type parameter which implemented the [`HandlerConstructor`] trait.
34///
35/// The server wraps streams into [`Arc`] as a way of sharing streams among communication steps.
36/// And also wraps contexts because of the same purpose.
37///
38/// The server makes any foreign handler to be able to construct via the [`PhantomData`], where a type parameter of [`PhantomData`] requires to implement the [`HandlerConstructor`] trait.
39/// That is, its type parameter behaves as the constructor injection.
40///
41/// ## Examples
42///
43/// ```rust
44/// use std::{
45/// io::Result as IOResult,
46/// marker::PhantomData,
47/// pin::Pin,
48/// sync::Arc,
49/// task::{
50/// Context as FutureContext,
51/// Poll
52/// }
53/// };
54/// use tokio::io::{
55/// AsyncRead,
56/// AsyncWrite
57/// };
58/// use sheave_core::handlers::{
59/// AsyncHandler,
60/// HandlerConstructor,
61/// RtmpContext,
62/// StreamWrapper,
63/// VecStream
64/// };
65/// use sheave_server::Server;
66///
67/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
68///
69/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
70/// fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
71/// Poll::Ready(Ok(()))
72/// }
73/// }
74///
75/// impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for SomethingHandler<RW> {
76/// fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
77/// Self(stream)
78/// }
79/// }
80///
81/// #[tokio::main]
82/// async fn main() {
83/// let stream = VecStream::default();
84/// let rtmp_context = RtmpContext::default();
85/// let mut server = Server::new(stream, rtmp_context, PhantomData::<SomethingHandler<VecStream>>);
86/// let result = server.await;
87/// assert!(result.is_ok())
88/// }
89/// ```
90///
91/// [`Arc`]: std::sync::Arc
92/// [`PhantomData`]: std::marker::PhantomData
93/// [`HandlerConstructor`]: sheave_core::handlers::HandlerConstructor
94#[derive(Debug)]
95pub struct Server<RW, C>
96where
97 RW: AsyncRead + AsyncWrite + Unpin,
98 C: HandlerConstructor<StreamWrapper<RW>>
99{
100 stream: Arc<StreamWrapper<RW>>,
101 rtmp_context: Arc<RtmpContext>,
102 handler_constructor: PhantomData<C>
103}
104
105impl<RW, C> Server<RW, C>
106where
107 RW: AsyncRead + AsyncWrite + Unpin,
108 C: HandlerConstructor<StreamWrapper<RW>>
109{
110 /// Constructs a Server instance.
111 pub fn new(stream: RW, rtmp_context: RtmpContext, handler_constructor: PhantomData<C>) -> Self {
112 Self {
113 stream: Arc::new(StreamWrapper::new(stream)),
114 rtmp_context: Arc::new(rtmp_context),
115 handler_constructor
116 }
117 }
118}
119
120impl<RW, C> Future for Server<RW, C>
121where
122 RW: AsyncRead + AsyncWrite + Unpin,
123 C: HandlerConstructor<StreamWrapper<RW>>
124{
125 type Output = IOResult<()>;
126
127 fn poll(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
128 pin!(C::new(Arc::clone(&self.stream))).poll_handle(cx, self.rtmp_context.make_weak_mut())
129 }
130}