sheave_core/net/
rtmp.rs

1use std::{
2    fmt::{
3        Debug,
4        Formatter,
5        Result as FormatResult
6    },
7    io::{
8        Error as IOError,
9        IoSlice,
10        IoSliceMut,
11        Result as IOResult
12    },
13    net::{
14        SocketAddr,
15        TcpStream as StdStream
16    },
17    pin::Pin,
18    task::{
19        Context as FutureContext,
20        Poll
21    },
22    time::Duration
23};
24use bytes::buf::BufMut;
25use pin_project_lite::pin_project;
26use tokio::{
27    io::{
28        AsyncRead,
29        AsyncWrite,
30        Interest,
31        ReadBuf,
32        Ready
33    },
34    net::{
35        TcpStream as TokioStream,
36        ToSocketAddrs,
37        tcp::{
38            OwnedReadHalf,
39            OwnedWriteHalf,
40            ReadHalf,
41            WriteHalf
42        }
43    }
44};
45
46pin_project! {
47    /// A stream for RTMP that wrapped Tokio's `TcpStream`.
48    ///
49    /// If you constructs this struct from some address, use `RtmpStream::connect("aaa.bbb.ccc.ddd:1935")`.
50    /// Or if you do it from already created std's TcpStream, use `RtmpStream::from_std(std_stream)`
51    pub struct RtmpStream {
52        #[pin]
53        tokio_stream: TokioStream
54    }
55}
56
57impl RtmpStream {
58    fn new(tokio_stream: TokioStream) -> Self {
59        Self { tokio_stream }
60    }
61
62    /// Opens a RTMP connection to a remote host.
63    ///
64    /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
65    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.connect)
66    pub async fn connect<A: ToSocketAddrs>(addr: A) -> IOResult<Self> {
67        TokioStream::connect(addr).await.map(Self::new)
68    }
69
70    /// Creates new RtmpStream from a `std::net::TcpStream`.
71    ///
72    /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
73    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.from_std)
74    pub fn from_std(std_stream: StdStream) -> IOResult<Self> {
75        TokioStream::from_std(std_stream).map(Self::new)
76    }
77
78    /// Turns a `sheave_core::net::rtmp::RtmpStream into `std::net::TcpStream`.
79    ///
80    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_std)
81    pub fn into_std(self) -> IOResult<StdStream> {
82        self.tokio_stream.into_std()
83    }
84
85    /// Returns the local address that this stream is bound to.
86    ///
87    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.local_addr)
88    pub fn local_addr(&self) -> IOResult<SocketAddr> {
89        self.tokio_stream.local_addr()
90    }
91
92    /// Returns the value of the `SO_ERROR` option.
93    pub fn take_error(&self) -> IOResult<Option<IOError>> {
94        self.tokio_stream.take_error()
95    }
96
97    /// Returns the remote address that this stream is connected to.
98    ///
99    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peer_addr)
100    pub fn peer_addr(&self) -> IOResult<SocketAddr> {
101        self.tokio_stream.peer_addr()
102    }
103
104    /// Attempts to receive data on the socket, without removing that data from the queue, registering the current task for wakeup if data is not yet available.
105    ///
106    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_peek)
107    pub fn poll_peek(&self, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<usize>> {
108        self.tokio_stream.poll_peek(cx, buf)
109    }
110
111    /// Waits for any of the requested ready states.
112    ///
113    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ready)
114    pub async fn ready(&self, interest: Interest) -> IOResult<Ready> {
115        self.tokio_stream.ready(interest).await
116    }
117
118    /// Waits for the socket to become readable.
119    ///
120    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.readable)
121    pub async fn readable(&self) -> IOResult<()> {
122        self.tokio_stream.readable().await
123    }
124
125    /// Polls for read readiness.
126    ///
127    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_read_ready)
128    pub fn poll_read_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
129        self.tokio_stream.poll_read_ready(cx)
130    }
131
132    /// Tries to read data from the stream into the provided buffer, returning how many bytes were read.
133    ///
134    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read)
135    pub fn try_read(&self, buf: &mut [u8]) -> IOResult<usize> {
136        self.tokio_stream.try_read(buf)
137    }
138
139    /// Tries to read data from the stream into the provided buffers, returning how many bytes were read.
140    ///
141    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_vectored)
142    pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> IOResult<usize> {
143        self.tokio_stream.try_read_vectored(bufs)
144    }
145
146    /// Tries to read data from the stream into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
147    ///
148    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_buf)
149    pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> IOResult<usize> {
150        self.tokio_stream.try_read_buf(buf)
151    }
152
153    /// Waits for the socket to become writable.
154    ///
155    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.writable)
156    pub async fn writable(&self) -> IOResult<()> {
157        self.tokio_stream.writable().await
158    }
159
160    /// Polls for write readiness.
161    ///
162    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_write_ready)
163    pub fn poll_write_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
164        self.tokio_stream.poll_write_ready(cx)
165    }
166
167    /// Tries to write several buffers to the stream, returning how many bytes were written.
168    ///
169    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write)
170    pub fn try_write(&self, buf: &[u8]) -> IOResult<usize> {
171        self.tokio_stream.try_write(buf)
172    }
173
174    /// Tries to write several buffers to the stream, returning how many bytes were written.
175    ///
176    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write_vectored)
177    pub fn try_write_vectored(&self, bufs: &[IoSlice<'_>]) -> IOResult<usize> {
178        self.tokio_stream.try_write_vectored(bufs)
179    }
180
181    /// Tries to read or write from the socket using a user-provided IO operation.
182    ///
183    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_io)
184    pub fn try_io<R>(&self, interest: Interest, f: impl FnOnce() -> IOResult<R>) -> IOResult<R> {
185        self.tokio_stream.try_io(interest, f)
186    }
187
188    /// Reads or writes from the socket using a user-provided IO operation.
189    ///
190    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.async_io)
191    pub async fn async_io<R>(&self, interest: Interest, f: impl FnMut() -> IOResult<R>) -> IOResult<R> {
192        self.tokio_stream.async_io(interest, f).await
193    }
194
195    /// Receives data on the socket from the remote address to which it is connected, without removing that data from the queue.
196    /// On success, returns the number of bytes peeked.
197    ///
198    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peek)
199    pub async fn peek(&self, buf: &mut [u8]) -> IOResult<usize> {
200        self.tokio_stream.peek(buf).await
201    }
202
203    /// Gets the value of the TCP_NODELAY option on this socket.
204    ///
205    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.nodelay)
206    pub fn nodelay(&self) -> IOResult<bool> {
207        self.tokio_stream.nodelay()
208    }
209
210    /// Sets the value of the TCP_NODELAY option on this socket.
211    ///
212    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay)
213    pub fn set_nodelay(&self, nodelay: bool) -> IOResult<()> {
214        self.tokio_stream.set_nodelay(nodelay)
215    }
216
217    /// Reads the linger duration for this socket by getting the SO_LINGER option.
218    ///
219    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.linger)
220    pub fn linger(&self) -> IOResult<Option<Duration>> {
221        self.tokio_stream.linger()
222    }
223
224    /// Sets the linger duration of this socket by setting the SO_LINGER option.
225    ///
226    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_linger)
227    pub fn set_linger(&self, dur: Option<Duration>) -> IOResult<()> {
228        self.tokio_stream.set_linger(dur)
229    }
230
231    /// Gets the value of the IP_TTL option for this socket.
232    ///
233    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ttl)
234    pub fn ttl(&self) -> IOResult<u32> {
235        self.tokio_stream.ttl()
236    }
237
238    /// Sets the value for the IP_TTL option on this socket.
239    ///
240    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_ttl)
241    pub fn set_ttl(&self, ttl: u32) -> IOResult<()> {
242        self.tokio_stream.set_ttl(ttl)
243    }
244
245    /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
246    ///
247    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.split)
248    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
249        self.tokio_stream.split()
250    }
251
252    /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
253    ///
254    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_split)
255    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
256        self.tokio_stream.into_split()
257    }
258}
259
260impl TryFrom<StdStream> for RtmpStream {
261    type Error = IOError;
262
263    fn try_from(std_stream: StdStream) -> IOResult<Self> {
264        Self::from_std(std_stream)
265    }
266}
267
268impl From<TokioStream> for RtmpStream {
269    fn from(tokio_stream: TokioStream) -> Self {
270        Self::new(tokio_stream)
271    }
272}
273
274impl AsyncRead for RtmpStream {
275    fn poll_read(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<()>> {
276        let this = self.project();
277        this.tokio_stream.poll_read(cx, buf)
278    }
279}
280
281impl AsyncWrite for RtmpStream {
282    fn poll_write(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &[u8]) -> Poll<IOResult<usize>> {
283        let this = self.project();
284        this.tokio_stream.poll_write(cx, buf)
285    }
286
287    fn poll_flush(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
288        let this = self.project();
289        this.tokio_stream.poll_flush(cx)
290    }
291
292    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
293        let this = self.project();
294        this.tokio_stream.poll_shutdown(cx)
295    }
296}
297
298impl Debug for RtmpStream {
299    fn fmt(&self, f: &mut Formatter<'_>) -> FormatResult {
300        self.tokio_stream.fmt(f)
301    }
302}
303
304#[cfg(unix)]
305mod sys {
306    use std::os::unix::prelude::*;
307    use super::RtmpStream;
308
309    impl AsRawFd for RtmpStream {
310        fn as_raw_fd(&self) -> RawFd {
311            self.tokio_stream.as_raw_fd()
312        }
313    }
314
315    impl AsFd for RtmpStream {
316        fn as_fd(&self) -> BorrowedFd<'_> {
317            self.tokio_stream.as_fd()
318        }
319    }
320}
321
322#[cfg(any(all(doc, docsrs), windows))]
323#[cfg_attr(docsrs, doc(cfg(windows)))]
324mod sys {
325    use tokio::os::windows::io::{
326        AsRawSocket,
327        AsSocket,
328        BorrowedSocket,
329        Rawsocket
330    };
331    use super::RtmpStream;
332
333    impl AsRawSocket for RtmpStream {
334        fn as_raw_socket(&self) -> RawSocket {
335            self.tokio_stream.as_raw_socket()
336        }
337    }
338
339    impl AsSocket for RtmpStream {
340        fn as_sokcet(&self) -> BorrowedSocket<'_> {
341            self.tokio_stream.as_socket()
342        }
343    }
344}
345
346#[cfg(all(tokio_unstable, target_os = "wasi"))]
347mod sys {
348    use std::os::wasi::prelude::*;
349    use super::RtmpStream;
350
351    impl AsRawFd for RtmpStream {
352        fn as_raw_fd(&self) -> RawFd {
353            self.tokio_stream.as_raw_fd()
354        }
355    }
356
357    impl AsFd for RtmpStream {
358        fn as_fd(&self) -> BorrowedFd<'_> {
359            self.tokio_stream.as_fd()
360        }
361    }
362}