sheave_core/handlers.rs
1mod rtmp_context;
2mod inconsistent_sha;
3mod stream_wrapper;
4mod vec_stream;
5mod status;
6mod measure_acknowledgement;
7mod chain;
8mod while_ok;
9mod middlewares;
10mod map_err;
11mod stream_got_exhausted;
12mod client_type;
13
14use std::{
15 io::Result as IOResult,
16 pin::Pin,
17 sync::Arc,
18 task::{
19 Context as FutureContext,
20 Poll
21 }
22};
23use tokio::io::{
24 AsyncRead,
25 AsyncWrite
26};
27use self::{
28 chain::*,
29 while_ok::*,
30 middlewares::{
31 Wrap,
32 wrap
33 },
34 map_err::{
35 MapErr,
36 map_err,
37 }
38};
39pub use self::{
40 rtmp_context::*,
41 inconsistent_sha::*,
42 stream_wrapper::*,
43 vec_stream::*,
44 status::*,
45 middlewares::Middleware,
46 map_err::ErrorHandler,
47 measure_acknowledgement::*,
48 stream_got_exhausted::*,
49 client_type::*
50};
51
52/// The interface for handling RTMP connection steps with `Future`.
53///
54/// This trait unifies surfaces of handler APIs:
55///
56/// * `RtmpContext` is required.
57/// * Terminating with unit (`()`) is required.
58///
59/// The first requirement makes `RtmpContext` reusable for upper APIs.
60/// And the second requirement makes handlers return `Ok(())` when successfully terminates because currently they are run on `main`.
61///
62/// ```rust
63/// use std::{
64/// io::Result as IOResult,
65/// pin::Pin,
66/// sync::Arc,
67/// task::{
68/// Context as FutureContext,
69/// Poll
70/// }
71/// };
72/// use futures::future::poll_fn;
73/// use tokio::io::{
74/// AsyncRead,
75/// AsyncWrite
76/// };
77/// use sheave_core::handlers::{
78/// AsyncHandler,
79/// RtmpContext
80/// };
81///
82/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
83///
84/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
85/// fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
86/// // Something to handle
87///
88/// Poll::Ready(Ok(()))
89/// }
90/// }
91///
92/// #[tokio::main]
93/// async fn main() -> IOResult<()> {
94/// // Consider this is Tokio's `JoinHandle` which is run on `main`.
95/// poll_fn(
96/// |cx| {
97/// use std::{
98/// pin::pin,
99/// sync::Arc
100/// };
101/// use sheave_core::handlers::{
102/// AsyncHandler,
103/// VecStream,
104/// StreamWrapper
105/// };
106///
107/// let stream = Arc::new(StreamWrapper::new(VecStream::default()));
108/// pin!(SomethingHandler(stream)).poll_handle(cx, &mut RtmpContext::default())
109/// }
110/// ).await
111/// }
112/// ```
113///
114/// [`RtmpContext`]: RtmpContext
115pub trait AsyncHandler {
116 fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>>;
117}
118
119/// The extension methods for handlers.
120///
121/// Currently following extensions have been implemented.
122///
123/// * [`chain`]
124/// * [`wrap`]
125/// * [`while_ok`]
126/// * [`map_err`]
127///
128/// [`chain`]: AsyncHandlerExt::chain
129/// [`wrap`]: AsyncHandlerExt::wrap
130/// [`while_ok`]: AsyncHandlerExt::while_ok
131/// [`map_err`]: AsyncHandlerExt::map_err
132pub trait AsyncHandlerExt: AsyncHandler {
133 /// Chains this handler with `next`.
134 ///
135 /// # Examples
136 ///
137 /// ```rust
138 /// use std::{
139 /// io::Result as IOResult,
140 /// pin::Pin,
141 /// sync::Arc,
142 /// task::{
143 /// Context as FutureContext,
144 /// Poll
145 /// }
146 /// };
147 /// use futures::future::poll_fn;
148 /// use tokio::io::{
149 /// AsyncRead,
150 /// AsyncWrite
151 /// };
152 /// use sheave_core::handlers::{
153 /// AsyncHandler,
154 /// RtmpContext
155 /// };
156 ///
157 /// struct HandlerA<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
158 /// struct HandlerB<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
159 ///
160 /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerA<RW> {
161 /// fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
162 /// // Something to handle.
163 ///
164 /// Poll::Ready(Ok(()))
165 /// }
166 /// }
167 ///
168 /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandlerB<RW> {
169 /// fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
170 /// // Something to handle.
171 ///
172 /// Poll::Ready(Ok(()))
173 /// }
174 /// }
175 ///
176 /// #[tokio::main]
177 /// async fn main() -> IOResult<()> {
178 /// poll_fn(
179 /// |cx| {
180 /// use std::pin::pin;
181 /// use sheave_core::handlers::{
182 /// AsyncHandlerExt,
183 /// StreamWrapper,
184 /// VecStream
185 /// };
186 ///
187 /// let stream = Arc::new(StreamWrapper::new(VecStream::default()));
188 /// pin!(
189 /// HandlerA(Arc::clone(&stream))
190 /// .chain(HandlerB(Arc::clone(&stream)))
191 /// ).poll_handle(cx, &mut RtmpContext::default())
192 /// }
193 /// ).await
194 /// }
195 /// ```
196 fn chain<H>(self, next: H) -> Chain<Self, H>
197 where
198 H: AsyncHandler + Unpin,
199 Self: Sized + Unpin
200 {
201 chain(self, next)
202 }
203
204 /// Wraps previous handlers into a middleware.
205 ///
206 /// # Examples
207 ///
208 /// ```rust
209 /// use std::{
210 /// io::Result as IOResult,
211 /// pin::Pin,
212 /// sync::Arc,
213 /// task::{
214 /// Context as FutureContext,
215 /// Poll
216 /// }
217 /// };
218 /// use futures::{
219 /// future::poll_fn,
220 /// ready
221 /// };
222 /// use tokio::io::{
223 /// AsyncRead,
224 /// AsyncWrite
225 /// };
226 /// use sheave_core::handlers::{
227 /// AsyncHandler,
228 /// Middleware,
229 /// RtmpContext
230 /// };
231 ///
232 /// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
233 ///
234 /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
235 /// fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
236 /// // Something to handle.
237 ///
238 /// Poll::Ready(Ok(()))
239 /// }
240 /// }
241 ///
242 /// struct SomethingMiddleware<'a, W: Unpin>(Pin<&'a mut W>);
243 ///
244 /// impl<W: Unpin> Middleware for SomethingMiddleware<'_, W> {
245 /// fn poll_handle_wrapped<H: AsyncHandler + Unpin>(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, handler: Pin<&mut H>) -> Poll<IOResult<()>> {
246 /// println!("Starts wrapping.");
247 /// ready!(handler.poll_handle(cx, rtmp_context))?;
248 /// println!("Ends wrapping.");
249 /// Poll::Ready(Ok(()))
250 /// }
251 /// }
252 ///
253 /// #[tokio::main]
254 /// async fn main() {
255 /// let result = poll_fn(
256 /// |cx| {
257 /// use std::pin::pin;
258 /// use sheave_core::handlers::{
259 /// AsyncHandlerExt,
260 /// StreamWrapper,
261 /// VecStream
262 /// };
263 ///
264 /// let stream = Arc::new(StreamWrapper::new(VecStream::default()));
265 /// pin!(
266 /// SomethingHandler(Arc::clone(&stream))
267 /// .wrap(SomethingMiddleware(stream.make_weak_pin()))
268 /// ).poll_handle(cx, &mut RtmpContext::default())
269 /// }
270 /// ).await;
271 /// assert!(result.is_ok())
272 /// }
273 /// ```
274 fn wrap<M>(self, middleware: M) -> Wrap<M, Self>
275 where
276 M: Middleware + Unpin,
277 Self: Sized + Unpin
278 {
279 wrap(middleware, self)
280 }
281
282 /// Loops while the body returns `Ok(())` or `Pending`.
283 ///
284 /// # Examples
285 ///
286 /// ```rust
287 /// use std::{
288 /// io::{
289 /// Error as IOError,
290 /// ErrorKind,
291 /// Result as IOResult
292 /// },
293 /// pin::Pin,
294 /// sync::Arc,
295 /// task::{
296 /// Context as FutureContext,
297 /// Poll
298 /// }
299 /// };
300 /// use futures::future::poll_fn;
301 /// use tokio::io::{
302 /// AsyncRead,
303 /// AsyncWrite
304 /// };
305 /// use sheave_core::handlers::{
306 /// AsyncHandler,
307 /// RtmpContext,
308 /// StreamWrapper
309 /// };
310 ///
311 /// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
312 ///
313 /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
314 /// fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
315 /// // Something to handle.
316 ///
317 /// Poll::Ready(Ok(()))
318 /// }
319 /// }
320 ///
321 /// struct AnotherHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
322 ///
323 /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for AnotherHandler<RW> {
324 /// fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
325 /// Poll::Ready(Err(IOError::from(ErrorKind::Other)))
326 /// }
327 /// }
328 ///
329 /// #[tokio::main]
330 /// async fn main() {
331 /// let result = poll_fn(
332 /// |cx| {
333 /// use std::pin::pin;
334 /// use sheave_core::handlers::{
335 /// AsyncHandlerExt,
336 /// VecStream
337 /// };
338 ///
339 /// let stream = Arc::new(StreamWrapper::new(VecStream::default()));
340 /// pin!(
341 /// SomethingHandler(Arc::clone(&stream))
342 /// .while_ok(AnotherHandler(Arc::clone(&stream)))
343 /// ).poll_handle(cx, &mut RtmpContext::default())
344 /// }
345 /// ).await;
346 /// assert!(result.is_err())
347 /// }
348 /// ```
349 fn while_ok<H>(self, body: H) -> WhileOk<Self, H>
350 where
351 H: AsyncHandler + Unpin,
352 Self: Sized + Unpin
353 {
354 while_ok(self, body)
355 }
356
357 /// Handles some error when previous handler returns `Err`.
358 ///
359 /// # Examples
360 ///
361 /// ```rust
362 /// use std::{
363 /// io::{
364 /// Error as IOError,
365 /// Result as IOResult
366 /// },
367 /// pin::Pin,
368 /// sync::Arc,
369 /// task::{
370 /// Context as FutureContext,
371 /// Poll
372 /// }
373 /// };
374 /// use futures::future::poll_fn;
375 /// use tokio::io::{
376 /// AsyncRead,
377 /// AsyncWrite
378 /// };
379 /// use sheave_core::handlers::{
380 /// AsyncHandler,
381 /// ErrorHandler,
382 /// RtmpContext
383 /// };
384 ///
385 /// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
386 ///
387 /// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
388 /// fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
389 /// Poll::Ready(Err(IOError::other("Something Wrong.")))
390 /// }
391 /// }
392 ///
393 /// struct SomethingWrongHandler<'a, RW>(Pin<&'a mut RW>);
394 ///
395 /// impl<RW> ErrorHandler for SomethingWrongHandler<'_, RW> {
396 /// fn poll_handle_error(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
397 /// println!("{error}");
398 ///
399 /// // This `Ok` means that handled its error successfully.
400 /// Poll::Ready(Ok(()))
401 /// }
402 /// }
403 ///
404 /// #[tokio::main]
405 /// async fn main() {
406 /// let result = poll_fn(
407 /// |cx| {
408 /// use std::pin::pin;
409 /// use sheave_core::handlers::{
410 /// AsyncHandlerExt,
411 /// StreamWrapper,
412 /// VecStream
413 /// };
414 ///
415 /// let stream = Arc::new(StreamWrapper::new(VecStream::default()));
416 /// pin!(
417 /// SomethingHandler(Arc::clone(&stream))
418 /// .map_err(SomethingWrongHandler(stream.make_weak_pin()))
419 /// ).poll_handle(cx, &mut RtmpContext::default())
420 /// }
421 /// ).await;
422 /// assert!(result.is_ok())
423 /// }
424 /// ```
425 fn map_err<E>(self, error_handler: E) -> MapErr<Self, E>
426 where
427 E: ErrorHandler + Unpin,
428 Self: Sized + Unpin
429 {
430 map_err(self, error_handler)
431 }
432}
433
434impl<H: AsyncHandler> AsyncHandlerExt for H {}
435
436/// The interface for providing the way to construct any handler to clients/servers.
437///
438/// Servers / Clients pass streams and contexts to any handler they contain.
439/// Here we are necessary to be careful that some stream can't clone. (e.g. sockets)
440/// But we need to share these while handling RTMP communication steps.
441/// Therefore this provides the way of cloning stream instances via the (smart) pointer.
442///
443/// # Examples
444///
445/// ```rust
446/// use std::{
447/// future::Future,
448/// io::Result as IOResult,
449/// marker::PhantomData,
450/// pin::{
451/// Pin,
452/// pin
453/// },
454/// sync::Arc,
455/// task::{
456/// Context as FutureContext,
457/// Poll
458/// }
459/// };
460/// use tokio::io::{
461/// AsyncRead,
462/// AsyncWrite,
463/// ReadBuf
464/// };
465/// use sheave_core::handlers::{
466/// AsyncHandler,
467/// HandlerConstructor,
468/// RtmpContext
469/// };
470///
471/// struct SomethingStream;
472///
473/// impl AsyncRead for SomethingStream {
474/// fn poll_read(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _buf: &mut ReadBuf<'_>) -> Poll<IOResult<()>> {
475/// // Something to read.
476///
477/// Poll::Ready(Ok(()))
478/// }
479/// }
480///
481/// impl AsyncWrite for SomethingStream {
482/// fn poll_write(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, buf: &[u8]) -> Poll<IOResult<usize>> {
483/// // Something to write.
484///
485/// Poll::Ready(Ok(buf.len()))
486/// }
487///
488/// fn poll_flush(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
489/// // Something to flush.
490///
491/// Poll::Ready(Ok(()))
492/// }
493///
494/// fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
495/// // Something to shutdown.
496///
497/// Poll::Ready(Ok(()))
498/// }
499/// }
500///
501/// struct SomethingHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<RW>);
502///
503/// impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for SomethingHandler<RW> {
504/// fn poll_handle(self: Pin<&mut Self>, _cx: &mut FutureContext<'_>, _rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
505/// // Something to handle.
506///
507/// Poll::Ready(Ok(()))
508/// }
509/// }
510///
511/// impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<RW> for SomethingHandler<RW> {
512/// fn new(stream: Arc<RW>) -> Self {
513/// Self(stream)
514/// }
515/// }
516///
517/// struct SomethingRunner<RW, C>
518/// where
519/// RW: AsyncRead + AsyncWrite + Unpin,
520/// C: HandlerConstructor<RW>
521/// {
522/// stream: Arc<RW>,
523/// rtmp_context: Arc<RtmpContext>,
524/// handler_constructor: PhantomData<C>
525/// }
526///
527/// impl<RW, C> SomethingRunner<RW, C>
528/// where
529/// RW: AsyncRead + AsyncWrite + Unpin,
530/// C: HandlerConstructor<RW>
531/// {
532/// pub fn new(stream: RW, rtmp_context: RtmpContext, handler_constructor: PhantomData<C>) -> Self {
533/// Self {
534/// stream: Arc::new(stream),
535/// rtmp_context: Arc::new(rtmp_context),
536/// handler_constructor
537/// }
538/// }
539/// }
540///
541/// impl<RW, C> Future for SomethingRunner<RW, C>
542/// where
543/// RW: AsyncRead + AsyncWrite + Unpin,
544/// C: HandlerConstructor<RW>
545/// {
546/// type Output = IOResult<()>;
547///
548/// fn poll(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<Self::Output> {
549/// pin!(C::new(Arc::clone(&self.stream))).poll_handle(cx, self.rtmp_context.make_weak_mut())
550/// }
551/// }
552///
553/// #[tokio::main]
554/// async fn main() {
555/// let stream = SomethingStream;
556/// let rtmp_context = RtmpContext::default();
557/// let handler_constructor = PhantomData::<SomethingHandler<SomethingStream>>;
558/// let runner = SomethingRunner::new(stream, rtmp_context, handler_constructor);
559/// let result = runner.await;
560///
561/// assert!(result.is_ok());
562/// }
563/// ```
564pub trait HandlerConstructor<RW: AsyncRead + AsyncWrite + Unpin>: AsyncHandler {
565 fn new(stream: Arc<RW>) -> Self;
566}