sheave_core/
handlers.rs

1mod rtmp_context;
2mod inconsistent_sha;
3mod stream_wrapper;
4mod vec_stream;
5mod status;
6mod measure_acknowledgement;
7mod chain;
8mod while_ok;
9mod middlewares;
10mod map_err;
11mod stream_got_exhausted;
12mod client_type;
13
14use std::{
15    io::Result as IOResult,
16    pin::Pin,
17    sync::Arc,
18    task::{
19        Context as FutureContext,
20        Poll
21    }
22};
23use tokio::io::{
24    AsyncRead,
25    AsyncWrite
26};
27use self::{
28    chain::*,
29    while_ok::*,
30    middlewares::{
31        Wrap,
32        wrap
33    },
34    map_err::{
35        MapErr,
36        map_err,
37    }
38};
39pub use self::{
40    rtmp_context::*,
41    inconsistent_sha::*,
42    stream_wrapper::*,
43    vec_stream::*,
44    status::*,
45    middlewares::Middleware,
46    map_err::ErrorHandler,
47    measure_acknowledgement::*,
48    stream_got_exhausted::*,
49    client_type::*
50};
51
52/// The interface for handling RTMP connection steps with `Future`.
53///
54/// This trait unifies surfaces of handler APIs:
55///
56/// * `RtmpContext` is required.
57/// * Terminating with unit (`()`) is required.
58///
59/// The first requirement makes `RtmpContext` reusable for upper APIs.
60/// And the second requirement makes handlers return `Ok(())` when successfully terminates because currently they are run on `main`.
61///
62/// ```rust
63/// use std::{
64///     io::Result as IOResult,
65///     pin::Pin,
66///     sync::Arc,
67///     task::{
68///         Context as FutureContext,
69///         Poll
70///     }
71/// };
72/// use futures::future::poll_fn;
73/// use tokio::io::{
74///     AsyncRead,
75///     AsyncWrite
76/// };
77/// use sheave_core::handlers::{
78///     AsyncHandler,
79///     RtmpContext
80/// };
81///
82/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
83///
84/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
85///     fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
86///         // Something to handle
87///
88///         Poll::Ready(Ok(()))
89///     }
90/// }
91///
92/// #[tokio::main]
93/// async fn main() -> IOResult<()> {
94///     // Consider this is Tokio's `JoinHandle` which is run on `main`.
95///     poll_fn(
96///         |cx| {
97///             use std::{
98///                 pin::pin,
99///                 sync::Arc
100///             };
101///             use sheave_core::handlers::{
102///                 AsyncHandler,
103///                 VecStream,
104///                 StreamWrapper
105///             };
106///
107///             let stream = Arc::new(StreamWrapper::new(VecStream::default()));
108///             pin!(SomethingHandler(stream)).poll_handle(cx, &mut RtmpContext::default())
109///         }
110///     ).await
111/// }
112/// ```
113///
114/// [`RtmpContext`]: RtmpContext
115pub trait AsyncHandler {
116    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>>;
117}
118
119/// The extension methods for handlers.
120///
121/// Currently following extensions have been implemented.
122///
123/// * [`chain`]
124/// * [`wrap`]
125/// * [`while_ok`]
126/// * [`map_err`]
127///
128/// [`chain`]: AsyncHandlerExt::chain
129/// [`wrap`]: AsyncHandlerExt::wrap
130/// [`while_ok`]: AsyncHandlerExt::while_ok
131/// [`map_err`]: AsyncHandlerExt::map_err
132pub trait AsyncHandlerExt: AsyncHandler {
133    /// Chains this handler with `next`.
134    ///
135    /// # Examples
136    ///
137    /// ```rust
138    /// use std::{
139    ///     io::Result as IOResult,
140    ///     pin::Pin,
141    ///     sync::Arc,
142    ///     task::{
143    ///         Context as FutureContext,
144    ///         Poll
145    ///     }
146    /// };
147    /// use futures::future::poll_fn;
148    /// use tokio::io::{
149    ///     AsyncRead,
150    ///     AsyncWrite
151    /// };
152    /// use sheave_core::handlers::{
153    ///     AsyncHandler,
154    ///     RtmpContext
155    /// };
156    ///
157    /// struct HandlerA<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
158    /// struct HandlerB<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
159    ///
160    /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerA<RW> {
161    ///     fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
162    ///         // Something to handle.
163    ///
164    ///         Poll::Ready(Ok(()))
165    ///     }
166    /// }
167    ///
168    /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerB<RW> {
169    ///     fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
170    ///         // Something to handle.
171    ///
172    ///         Poll::Ready(Ok(()))
173    ///     }
174    /// }
175    ///
176    /// #[tokio::main]
177    /// async fn main() -> IOResult<()> {
178    ///     poll_fn(
179    ///         |cx| {
180    ///             use std::pin::pin;
181    ///             use sheave_core::handlers::{
182    ///                 AsyncHandlerExt,
183    ///                 StreamWrapper,
184    ///                 VecStream
185    ///             };
186    ///
187    ///             let stream = Arc::new(StreamWrapper::new(VecStream::default()));
188    ///             pin!(
189    ///                 HandlerA(Arc::clone(&stream))
190    ///                     .chain(HandlerB(Arc::clone(&stream)))
191    ///             ).poll_handle(cx, &mut RtmpContext::default())
192    ///         }
193    ///     ).await
194    /// }
195    /// ```
196    fn chain<H>(self, next: H) -> Chain<Self, H>
197    where
198        H: AsyncHandler + Unpin,
199        Self: Sized + Unpin
200    {
201        chain(self, next)
202    }
203
204    /// Wraps previous handlers into a middleware.
205    ///
206    /// # Examples
207    ///
208    /// ```rust
209    /// use std::{
210    ///     io::Result as IOResult,
211    ///     pin::Pin,
212    ///     sync::Arc,
213    ///     task::{
214    ///         Context as FutureContext,
215    ///         Poll
216    ///     }
217    /// };
218    /// use futures::{
219    ///     future::poll_fn,
220    ///     ready
221    /// };
222    /// use tokio::io::{
223    ///     AsyncRead,
224    ///     AsyncWrite
225    /// };
226    /// use sheave_core::handlers::{
227    ///     AsyncHandler,
228    ///     Middleware,
229    ///     RtmpContext
230    /// };
231    ///
232    /// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
233    ///
234    /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
235    ///     fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
236    ///         // Something to handle.
237    ///
238    ///         Poll::Ready(Ok(()))
239    ///     }
240    /// }
241    ///
242    /// struct SomethingMiddleware<'a, W: Unpin>(Pin<&'a mut W>);
243    ///
244    /// impl<W: Unpin> Middleware for SomethingMiddleware<'_, W> {
245    ///     fn poll_handle_wrapped<H: AsyncHandler + Unpin>(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, handler: Pin<&mut H>) -> Poll<IOResult<()>> {
246    ///         println!("Starts wrapping.");
247    ///         ready!(handler.poll_handle(cx, rtmp_context))?;
248    ///         println!("Ends wrapping.");
249    ///         Poll::Ready(Ok(()))
250    ///     }
251    /// }
252    ///
253    /// #[tokio::main]
254    /// async fn main() {
255    ///     let result = poll_fn(
256    ///         |cx| {
257    ///             use std::pin::pin;
258    ///             use sheave_core::handlers::{
259    ///                 AsyncHandlerExt,
260    ///                 StreamWrapper,
261    ///                 VecStream
262    ///             };
263    ///
264    ///             let stream = Arc::new(StreamWrapper::new(VecStream::default()));
265    ///             pin!(
266    ///                 SomethingHandler(Arc::clone(&stream))
267    ///                     .wrap(SomethingMiddleware(stream.make_weak_pin()))
268    ///             ).poll_handle(cx, &mut RtmpContext::default())
269    ///         }
270    ///     ).await;
271    ///     assert!(result.is_ok())
272    /// }
273    /// ```
274    fn wrap<M>(self, middleware: M) -> Wrap<M, Self>
275    where
276        M: Middleware + Unpin,
277        Self: Sized + Unpin
278    {
279        wrap(middleware, self)
280    }
281
282    /// Loops while the body returns `Ok(())` or `Pending`.
283    ///
284    /// # Examples
285    ///
286    /// ```rust
287    /// use std::{
288    ///     io::{
289    ///         Error as IOError,
290    ///         ErrorKind,
291    ///         Result as IOResult
292    ///     },
293    ///     pin::Pin,
294    ///     sync::Arc,
295    ///     task::{
296    ///         Context as FutureContext,
297    ///         Poll
298    ///     }
299    /// };
300    /// use futures::future::poll_fn;
301    /// use tokio::io::{
302    ///     AsyncRead,
303    ///     AsyncWrite
304    /// };
305    /// use sheave_core::handlers::{
306    ///     AsyncHandler,
307    ///     RtmpContext,
308    ///     StreamWrapper
309    /// };
310    ///
311    /// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
312    ///
313    /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
314    ///     fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
315    ///         // Something to handle.
316    ///
317    ///         Poll::Ready(Ok(()))
318    ///     }
319    /// }
320    ///
321    /// struct AnotherHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
322    ///
323    /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for AnotherHandler<RW> {
324    ///     fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
325    ///         Poll::Ready(Err(IOError::from(ErrorKind::Other)))
326    ///     }
327    /// }
328    ///
329    /// #[tokio::main]
330    /// async fn main() {
331    ///     let result = poll_fn(
332    ///         |cx| {
333    ///             use std::pin::pin;
334    ///             use sheave_core::handlers::{
335    ///                 AsyncHandlerExt,
336    ///                 VecStream
337    ///             };
338    ///
339    ///             let stream = Arc::new(StreamWrapper::new(VecStream::default()));
340    ///             pin!(
341    ///                 SomethingHandler(Arc::clone(&stream))
342    ///                     .while_ok(AnotherHandler(Arc::clone(&stream)))
343    ///             ).poll_handle(cx, &mut RtmpContext::default())
344    ///         }
345    ///     ).await;
346    ///     assert!(result.is_err())
347    /// }
348    /// ```
349    fn while_ok<H>(self, body: H) -> WhileOk<Self, H>
350    where
351        H: AsyncHandler + Unpin,
352        Self: Sized + Unpin
353    {
354        while_ok(self, body)
355    }
356
357    /// Handles some error when previous handler returns `Err`.
358    ///
359    /// # Examples
360    ///
361    /// ```rust
362    /// use std::{
363    ///     io::{
364    ///         Error as IOError,
365    ///         Result as IOResult
366    ///     },
367    ///     pin::Pin,
368    ///     sync::Arc,
369    ///     task::{
370    ///         Context as FutureContext,
371    ///         Poll
372    ///     }
373    /// };
374    /// use futures::future::poll_fn;
375    /// use tokio::io::{
376    ///     AsyncRead,
377    ///     AsyncWrite
378    /// };
379    /// use sheave_core::handlers::{
380    ///     AsyncHandler,
381    ///     ErrorHandler,
382    ///     RtmpContext
383    /// };
384    ///
385    /// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
386    ///
387    /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
388    ///     fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
389    ///         Poll::Ready(Err(IOError::other("Something Wrong.")))
390    ///     }
391    /// }
392    ///
393    /// struct SomethingWrongHandler<'a, RW>(Pin<&'a mut RW>);
394    ///
395    /// impl<RW> ErrorHandler for SomethingWrongHandler<'_, RW> {
396    ///     fn poll_handle_error(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
397    ///         println!("{error}");
398    ///
399    ///         // This `Ok` means that handled its error successfully.
400    ///         Poll::Ready(Ok(()))
401    ///     }
402    /// }
403    ///
404    /// #[tokio::main]
405    /// async fn main() {
406    ///     let result = poll_fn(
407    ///         |cx| {
408    ///             use std::pin::pin;
409    ///             use sheave_core::handlers::{
410    ///                 AsyncHandlerExt,
411    ///                 StreamWrapper,
412    ///                 VecStream
413    ///             };
414    ///
415    ///             let stream = Arc::new(StreamWrapper::new(VecStream::default()));
416    ///             pin!(
417    ///                 SomethingHandler(Arc::clone(&stream))
418    ///                     .map_err(SomethingWrongHandler(stream.make_weak_pin()))
419    ///             ).poll_handle(cx, &mut RtmpContext::default())
420    ///         }
421    ///     ).await;
422    ///     assert!(result.is_ok())
423    /// }
424    /// ```
425    fn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
426    where
427        E: ErrorHandler + Unpin,
428        Self: Sized + Unpin
429    {
430        map_err(self, error_handler)
431    }
432}
433
434impl<H: AsyncHandler> AsyncHandlerExt for H {}
435
436/// The interface for providing the way to construct any handler to clients/servers.
437///
438/// Servers / Clients pass streams and contexts to any handler they contain.
439/// Here we are necessary to be careful that some stream can't clone. (e.g. sockets)
440/// But we need to share these while handling RTMP communication steps.
441/// Therefore this provides the way of cloning stream instances via the (smart) pointer.
442///
443/// # Examples
444///
445/// ```rust
446/// use std::{
447///     future::Future,
448///     io::Result as IOResult,
449///     marker::PhantomData,
450///     pin::{
451///         Pin,
452///         pin
453///     },
454///     sync::Arc,
455///     task::{
456///         Context as FutureContext,
457///         Poll
458///     }
459/// };
460/// use tokio::io::{
461///     AsyncRead,
462///     AsyncWrite,
463///     ReadBuf
464/// };
465/// use sheave_core::handlers::{
466///     AsyncHandler,
467///     HandlerConstructor,
468///     RtmpContext
469/// };
470///
471/// struct SomethingStream;
472///
473/// impl AsyncRead for SomethingStream {
474///     fn poll_read(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _buf: &mut ReadBuf<'_>) -> Poll<IOResult<()>> {
475///         // Something to read.
476///
477///         Poll::Ready(Ok(()))
478///     }
479/// }
480///
481/// impl AsyncWrite for SomethingStream {
482///     fn poll_write(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, buf: &[u8]) -> Poll<IOResult<usize>> {
483///         // Something to write.
484///
485///         Poll::Ready(Ok(buf.len()))
486///     }
487///
488///     fn poll_flush(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
489///         // Something to flush.
490///
491///         Poll::Ready(Ok(()))
492///     }
493///
494///     fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
495///         // Something to shutdown.
496///
497///         Poll::Ready(Ok(()))
498///     }
499/// }
500///
501/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
502///
503/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
504///     fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
505///         // Something to handle.
506///
507///         Poll::Ready(Ok(()))
508///     }
509/// }
510///
511/// impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<RW> for SomethingHandler<RW> {
512///     fn new(stream: Arc<RW>) -> Self {
513///         Self(stream)
514///     }
515/// }
516///
517/// struct SomethingRunner<RW, C>
518/// where
519///     RW: AsyncRead + AsyncWrite + Unpin,
520///     C: HandlerConstructor<RW>
521/// {
522///     stream: Arc<RW>,
523///     rtmp_context: Arc<RtmpContext>,
524///     handler_constructor: PhantomData<C>
525/// }
526///
527/// impl<RW, C> SomethingRunner<RW, C>
528/// where
529///     RW: AsyncRead + AsyncWrite + Unpin,
530///     C: HandlerConstructor<RW>
531/// {
532///     pub fn new(stream: RW, rtmp_context: RtmpContext, handler_constructor: PhantomData<C>) -> Self {
533///         Self {
534///             stream: Arc::new(stream),
535///             rtmp_context: Arc::new(rtmp_context),
536///             handler_constructor
537///         }
538///     }
539/// }
540///
541/// impl<RW, C> Future for SomethingRunner<RW, C>
542/// where
543///     RW: AsyncRead + AsyncWrite + Unpin,
544///     C: HandlerConstructor<RW>
545/// {
546///     type Output = IOResult<()>;
547///
548///     fn poll(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
549///         pin!(C::new(Arc::clone(&self.stream))).poll_handle(cx, self.rtmp_context.make_weak_mut())
550///     }
551/// }
552///
553/// #[tokio::main]
554/// async fn main() {
555///     let stream = SomethingStream;
556///     let rtmp_context = RtmpContext::default();
557///     let handler_constructor = PhantomData::<SomethingHandler<SomethingStream>>;
558///     let runner = SomethingRunner::new(stream, rtmp_context, handler_constructor);
559///     let result = runner.await;
560///
561///     assert!(result.is_ok());
562/// }
563/// ```
564pub trait HandlerConstructor<RW: AsyncRead + AsyncWrite + Unpin>: AsyncHandler {
565    fn new(stream: Arc<RW>) -> Self;
566}