sheave_core/net/
rtmp.rs

1use std::{
2    fmt::{
3        Debug,
4        Formatter,
5        Result as FormatResult
6    },
7    io::{
8        Error as IOError,
9        IoSlice,
10        IoSliceMut,
11        Result as IOResult
12    },
13    net::{
14        SocketAddr,
15        TcpStream as StdStream
16    },
17    pin::Pin,
18    task::{
19        Context as FutureContext,
20        Poll
21    },
22    time::Duration
23};
24use bytes::buf::BufMut;
25use pin_project_lite::pin_project;
26use tokio::{
27    io::{
28        AsyncRead,
29        AsyncWrite,
30        Interest,
31        ReadBuf,
32        Ready
33    },
34    net::{
35        TcpStream as TokioStream,
36        ToSocketAddrs,
37        tcp::{
38            OwnedReadHalf,
39            OwnedWriteHalf,
40            ReadHalf,
41            WriteHalf
42        }
43    }
44};
45
46pin_project! {
47    /// A stream for RTMP that wrapped Tokio's `TcpStream`.
48    ///
49    /// If you constructs this struct from some address,  use `RtmpStream::connect("aaa.bbb.ccc.ddd:1935")`.
50    /// If you do it from already created std's TCPStream. use `RtmpStream::from_std(std_stream)`
51    pub struct RtmpStream {
52        #[pin]
53        tokio_stream: TokioStream
54    }
55}
56
57impl RtmpStream {
58    fn new(tokio_stream: TokioStream) -> Self {
59        Self { tokio_stream }
60    }
61
62    /// Opens a RTMP connection to a remote host.
63    /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
64    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.connect)
65    pub async fn connect<A: ToSocketAddrs>(addr: A) -> IOResult<Self> {
66        TokioStream::connect(addr).await.map(Self::new)
67    }
68
69    /// Creates new RtmpStream from a `std::net::TcpStream`.
70    /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
71    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.from_std)
72    pub fn from_std(std_stream: StdStream) -> IOResult<Self> {
73        TokioStream::from_std(std_stream).map(Self::new)
74    }
75
76    /// Turns a `sheave_core::net::rtmp::RtmpStream into `std::net::TcpStream`.
77    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_std)
78    pub fn into_std(self) -> IOResult<StdStream> {
79        self.tokio_stream.into_std()
80    }
81
82    /// Returns the local address that this stream is bound to.
83    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.local_addr)
84    pub fn local_addr(&self) -> IOResult<SocketAddr> {
85        self.tokio_stream.local_addr()
86    }
87
88    /// Returns the value of the `SO_ERROR` option.
89    pub fn take_error(&self) -> IOResult<Option<IOError>> {
90        self.tokio_stream.take_error()
91    }
92
93    /// Returns the remote address that this stream is connected to.
94    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peer_addr)
95    pub fn peer_addr(&self) -> IOResult<SocketAddr> {
96        self.tokio_stream.peer_addr()
97    }
98
99    /// Attempts to receive data on the socket, without removing that data from the queue, registering the current task for wakeup if data is not yet available.
100    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_peek)
101    pub fn poll_peek(&self, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<usize>> {
102        self.tokio_stream.poll_peek(cx, buf)
103    }
104
105    /// Waits for any of the requested ready states.
106    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ready)
107    pub async fn ready(&self, interest: Interest) -> IOResult<Ready> {
108        self.tokio_stream.ready(interest).await
109    }
110
111    /// Waits for the socket to become readable.
112    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.readable)
113    pub async fn readable(&self) -> IOResult<()> {
114        self.tokio_stream.readable().await
115    }
116
117    /// Polls for read readiness.
118    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_read_ready)
119    pub fn poll_read_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
120        self.tokio_stream.poll_read_ready(cx)
121    }
122
123    /// Tries to read data from the stream into the provided buffer, returning how many bytes were read.
124    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read)
125    pub fn try_read(&self, buf: &mut [u8]) -> IOResult<usize> {
126        self.tokio_stream.try_read(buf)
127    }
128
129    /// Tries to read data from the stream into the provided buffers, returning how many bytes were read.
130    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_vectored)
131    pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> IOResult<usize> {
132        self.tokio_stream.try_read_vectored(bufs)
133    }
134
135    /// Tries to read data from the stream into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
136    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_buf)
137    pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> IOResult<usize> {
138        self.tokio_stream.try_read_buf(buf)
139    }
140
141    /// Waits for the socket to become writable.
142    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.writable)
143    pub async fn writable(&self) -> IOResult<()> {
144        self.tokio_stream.writable().await
145    }
146
147    /// Polls for write readiness.
148    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_write_ready)
149    pub fn poll_write_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
150        self.tokio_stream.poll_write_ready(cx)
151    }
152
153    /// Tries to write several buffers to the stream, returning how many bytes were written.
154    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write)
155    pub fn try_write(&self, buf: &[u8]) -> IOResult<usize> {
156        self.tokio_stream.try_write(buf)
157    }
158
159    /// Tries to write several buffers to the stream, returning how many bytes were written.
160    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write_vectored)
161    pub fn try_write_vectored(&self, bufs: &[IoSlice<'_>]) -> IOResult<usize> {
162        self.tokio_stream.try_write_vectored(bufs)
163    }
164
165    /// Tries to read or write from the socket using a user-provided IO operation.
166    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_io)
167    pub fn try_io<R>(&self, interest: Interest, f: impl FnOnce() -> IOResult<R>) -> IOResult<R> {
168        self.tokio_stream.try_io(interest, f)
169    }
170
171    /// Reads or writes from the socket using a user-provided IO operation.
172    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.async_io)
173    pub async fn async_io<R>(&self, interest: Interest, f: impl FnMut() -> IOResult<R>) -> IOResult<R> {
174        self.tokio_stream.async_io(interest, f).await
175    }
176
177    /// Receives data on the socket from the remote address to which it is connected, without removing that data from the queue. On success, returns the number of bytes peeked.
178    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peek)
179    pub async fn peek(&self, buf: &mut [u8]) -> IOResult<usize> {
180        self.tokio_stream.peek(buf).await
181    }
182
183    /// Gets the value of the TCP_NODELAY option on this socket.
184    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.nodelay)
185    pub fn nodelay(&self) -> IOResult<bool> {
186        self.tokio_stream.nodelay()
187    }
188
189    /// Sets the value of the TCP_NODELAY option on this socket.
190    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay)
191    pub fn set_nodelay(&self, nodelay: bool) -> IOResult<()> {
192        self.tokio_stream.set_nodelay(nodelay)
193    }
194
195    /// Reads the linger duration for this socket by getting the SO_LINGER option.
196    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.linger)
197    pub fn linger(&self) -> IOResult<Option<Duration>> {
198        self.tokio_stream.linger()
199    }
200
201    /// Sets the linger duration of this socket by setting the SO_LINGER option.
202    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_linger)
203    pub fn set_linger(&self, dur: Option<Duration>) -> IOResult<()> {
204        self.tokio_stream.set_linger(dur)
205    }
206
207    /// Gets the value of the IP_TTL option for this socket.
208    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ttl)
209    pub fn ttl(&self) -> IOResult<u32> {
210        self.tokio_stream.ttl()
211    }
212
213    /// Sets the value for the IP_TTL option on this socket.
214    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_ttl)
215    pub fn set_ttl(&self, ttl: u32) -> IOResult<()> {
216        self.tokio_stream.set_ttl(ttl)
217    }
218
219    /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
220    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.split)
221    pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
222        self.tokio_stream.split()
223    }
224
225    /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
226    /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_split)
227    pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
228        self.tokio_stream.into_split()
229    }
230}
231
232impl TryFrom<StdStream> for RtmpStream {
233    type Error = IOError;
234
235    fn try_from(std_stream: StdStream) -> IOResult<Self> {
236        Self::from_std(std_stream)
237    }
238}
239
240impl From<TokioStream> for RtmpStream {
241    fn from(tokio_stream: TokioStream) -> Self {
242        Self::new(tokio_stream)
243    }
244}
245
246impl AsyncRead for RtmpStream {
247    fn poll_read(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<()>> {
248        let this = self.project();
249        this.tokio_stream.poll_read(cx, buf)
250    }
251}
252
253impl AsyncWrite for RtmpStream {
254    fn poll_write(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &[u8]) -> Poll<IOResult<usize>> {
255        let this = self.project();
256        this.tokio_stream.poll_write(cx, buf)
257    }
258
259    fn poll_flush(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
260        let this = self.project();
261        this.tokio_stream.poll_flush(cx)
262    }
263
264    fn poll_shutdown(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
265        let this = self.project();
266        this.tokio_stream.poll_shutdown(cx)
267    }
268}
269
270impl Debug for RtmpStream {
271    fn fmt(&self, f: &mut Formatter<'_>) -> FormatResult {
272        self.tokio_stream.fmt(f)
273    }
274}
275
276#[cfg(unix)]
277mod sys {
278    use std::os::unix::prelude::*;
279    use super::RtmpStream;
280
281    impl AsRawFd for RtmpStream {
282        fn as_raw_fd(&self) -> RawFd {
283            self.tokio_stream.as_raw_fd()
284        }
285    }
286
287    impl AsFd for RtmpStream {
288        fn as_fd(&self) -> BorrowedFd<'_> {
289            self.tokio_stream.as_fd()
290        }
291    }
292}
293
294#[cfg(any(all(doc, docsrs), windows))]
295#[cfg_attr(docsrs, doc(cfg(windows)))]
296mod sys {
297    use tokio::os::windows::io::{
298        AsRawSocket,
299        AsSocket,
300        BorrowedSocket,
301        Rawsocket
302    };
303    use super::RtmpStream;
304
305    impl AsRawSocket for RtmpStream {
306        fn as_raw_socket(&self) -> RawSocket {
307            self.tokio_stream.as_raw_socket()
308        }
309    }
310
311    impl AsSocket for RtmpStream {
312        fn as_sokcet(&self) -> BorrowedSocket<'_> {
313            self.tokio_stream.as_socket()
314        }
315    }
316}
317
318#[cfg(all(tokio_unstable, target_os = "wasi"))]
319mod sys {
320    use std::os::wasi::prelude::*;
321    use super::RtmpStream;
322
323    impl AsRawFd for RtmpStream {
324        fn as_raw_fd(&self) -> RawFd {
325            self.tokio_stream.as_raw_fd()
326        }
327    }
328
329    impl AsFd for RtmpStream {
330        fn as_fd(&self) -> BorrowedFd<'_> {
331            self.tokio_stream.as_fd()
332        }
333    }
334}