sheave_core/net/rtmp.rs
1use std::{
2 fmt::{
3 Debug,
4 Formatter,
5 Result as FormatResult
6 },
7 io::{
8 Error as IOError,
9 IoSlice,
10 IoSliceMut,
11 Result as IOResult
12 },
13 net::{
14 SocketAddr,
15 TcpStream as StdStream
16 },
17 pin::Pin,
18 task::{
19 Context as FutureContext,
20 Poll
21 },
22 time::Duration
23};
24use bytes::buf::BufMut;
25use pin_project_lite::pin_project;
26use tokio::{
27 io::{
28 AsyncRead,
29 AsyncWrite,
30 Interest,
31 ReadBuf,
32 Ready
33 },
34 net::{
35 TcpStream as TokioStream,
36 ToSocketAddrs,
37 tcp::{
38 OwnedReadHalf,
39 OwnedWriteHalf,
40 ReadHalf,
41 WriteHalf
42 }
43 }
44};
45
46pin_project! {
47 /// A stream for RTMP that wrapped Tokio's `TcpStream`.
48 ///
49 /// If you constructs this struct from some address, use `RtmpStream::connect("aaa.bbb.ccc.ddd:1935")`.
50 /// If you do it from already created std's TCPStream. use `RtmpStream::from_std(std_stream)`
51 pub struct RtmpStream {
52 #[pin]
53 tokio_stream: TokioStream
54 }
55}
56
57impl RtmpStream {
58 fn new(tokio_stream: TokioStream) -> Self {
59 Self { tokio_stream }
60 }
61
62 /// Opens a RTMP connection to a remote host.
63 /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
64 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.connect)
65 pub async fn connect<A: ToSocketAddrs>(addr: A) -> IOResult<Self> {
66 TokioStream::connect(addr).await.map(Self::new)
67 }
68
69 /// Creates new RtmpStream from a `std::net::TcpStream`.
70 /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
71 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.from_std)
72 pub fn from_std(std_stream: StdStream) -> IOResult<Self> {
73 TokioStream::from_std(std_stream).map(Self::new)
74 }
75
76 /// Turns a `sheave_core::net::rtmp::RtmpStream into `std::net::TcpStream`.
77 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_std)
78 pub fn into_std(self) -> IOResult<StdStream> {
79 self.tokio_stream.into_std()
80 }
81
82 /// Returns the local address that this stream is bound to.
83 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.local_addr)
84 pub fn local_addr(&self) -> IOResult<SocketAddr> {
85 self.tokio_stream.local_addr()
86 }
87
88 /// Returns the value of the `SO_ERROR` option.
89 pub fn take_error(&self) -> IOResult<Option<IOError>> {
90 self.tokio_stream.take_error()
91 }
92
93 /// Returns the remote address that this stream is connected to.
94 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peer_addr)
95 pub fn peer_addr(&self) -> IOResult<SocketAddr> {
96 self.tokio_stream.peer_addr()
97 }
98
99 /// Attempts to receive data on the socket, without removing that data from the queue, registering the current task for wakeup if data is not yet available.
100 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_peek)
101 pub fn poll_peek(&self, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<usize>> {
102 self.tokio_stream.poll_peek(cx, buf)
103 }
104
105 /// Waits for any of the requested ready states.
106 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ready)
107 pub async fn ready(&self, interest: Interest) -> IOResult<Ready> {
108 self.tokio_stream.ready(interest).await
109 }
110
111 /// Waits for the socket to become readable.
112 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.readable)
113 pub async fn readable(&self) -> IOResult<()> {
114 self.tokio_stream.readable().await
115 }
116
117 /// Polls for read readiness.
118 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_read_ready)
119 pub fn poll_read_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
120 self.tokio_stream.poll_read_ready(cx)
121 }
122
123 /// Tries to read data from the stream into the provided buffer, returning how many bytes were read.
124 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read)
125 pub fn try_read(&self, buf: &mut [u8]) -> IOResult<usize> {
126 self.tokio_stream.try_read(buf)
127 }
128
129 /// Tries to read data from the stream into the provided buffers, returning how many bytes were read.
130 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_vectored)
131 pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> IOResult<usize> {
132 self.tokio_stream.try_read_vectored(bufs)
133 }
134
135 /// Tries to read data from the stream into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
136 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_buf)
137 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> IOResult<usize> {
138 self.tokio_stream.try_read_buf(buf)
139 }
140
141 /// Waits for the socket to become writable.
142 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.writable)
143 pub async fn writable(&self) -> IOResult<()> {
144 self.tokio_stream.writable().await
145 }
146
147 /// Polls for write readiness.
148 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_write_ready)
149 pub fn poll_write_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
150 self.tokio_stream.poll_write_ready(cx)
151 }
152
153 /// Tries to write several buffers to the stream, returning how many bytes were written.
154 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write)
155 pub fn try_write(&self, buf: &[u8]) -> IOResult<usize> {
156 self.tokio_stream.try_write(buf)
157 }
158
159 /// Tries to write several buffers to the stream, returning how many bytes were written.
160 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write_vectored)
161 pub fn try_write_vectored(&self, bufs: &[IoSlice<'_>]) -> IOResult<usize> {
162 self.tokio_stream.try_write_vectored(bufs)
163 }
164
165 /// Tries to read or write from the socket using a user-provided IO operation.
166 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_io)
167 pub fn try_io<R>(&self, interest: Interest, f: impl FnOnce() -> IOResult<R>) -> IOResult<R> {
168 self.tokio_stream.try_io(interest, f)
169 }
170
171 /// Reads or writes from the socket using a user-provided IO operation.
172 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.async_io)
173 pub async fn async_io<R>(&self, interest: Interest, f: impl FnMut() -> IOResult<R>) -> IOResult<R> {
174 self.tokio_stream.async_io(interest, f).await
175 }
176
177 /// Receives data on the socket from the remote address to which it is connected, without removing that data from the queue. On success, returns the number of bytes peeked.
178 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peek)
179 pub async fn peek(&self, buf: &mut [u8]) -> IOResult<usize> {
180 self.tokio_stream.peek(buf).await
181 }
182
183 /// Gets the value of the TCP_NODELAY option on this socket.
184 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.nodelay)
185 pub fn nodelay(&self) -> IOResult<bool> {
186 self.tokio_stream.nodelay()
187 }
188
189 /// Sets the value of the TCP_NODELAY option on this socket.
190 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay)
191 pub fn set_nodelay(&self, nodelay: bool) -> IOResult<()> {
192 self.tokio_stream.set_nodelay(nodelay)
193 }
194
195 /// Reads the linger duration for this socket by getting the SO_LINGER option.
196 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.linger)
197 pub fn linger(&self) -> IOResult<Option<Duration>> {
198 self.tokio_stream.linger()
199 }
200
201 /// Sets the linger duration of this socket by setting the SO_LINGER option.
202 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_linger)
203 pub fn set_linger(&self, dur: Option<Duration>) -> IOResult<()> {
204 self.tokio_stream.set_linger(dur)
205 }
206
207 /// Gets the value of the IP_TTL option for this socket.
208 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ttl)
209 pub fn ttl(&self) -> IOResult<u32> {
210 self.tokio_stream.ttl()
211 }
212
213 /// Sets the value for the IP_TTL option on this socket.
214 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_ttl)
215 pub fn set_ttl(&self, ttl: u32) -> IOResult<()> {
216 self.tokio_stream.set_ttl(ttl)
217 }
218
219 /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
220 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.split)
221 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
222 self.tokio_stream.split()
223 }
224
225 /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
226 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_split)
227 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
228 self.tokio_stream.into_split()
229 }
230}
231
232impl TryFrom<StdStream> for RtmpStream {
233 type Error = IOError;
234
235 fn try_from(std_stream: StdStream) -> IOResult<Self> {
236 Self::from_std(std_stream)
237 }
238}
239
240impl From<TokioStream> for RtmpStream {
241 fn from(tokio_stream: TokioStream) -> Self {
242 Self::new(tokio_stream)
243 }
244}
245
246impl AsyncRead for RtmpStream {
247 fn poll_read(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<()>> {
248 let this = self.project();
249 this.tokio_stream.poll_read(cx, buf)
250 }
251}
252
253impl AsyncWrite for RtmpStream {
254 fn poll_write(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &[u8]) -> Poll<IOResult<usize>> {
255 let this = self.project();
256 this.tokio_stream.poll_write(cx, buf)
257 }
258
259 fn poll_flush(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
260 let this = self.project();
261 this.tokio_stream.poll_flush(cx)
262 }
263
264 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
265 let this = self.project();
266 this.tokio_stream.poll_shutdown(cx)
267 }
268}
269
270impl Debug for RtmpStream {
271 fn fmt(&self, f: &mut Formatter<'_>) -> FormatResult {
272 self.tokio_stream.fmt(f)
273 }
274}
275
276#[cfg(unix)]
277mod sys {
278 use std::os::unix::prelude::*;
279 use super::RtmpStream;
280
281 impl AsRawFd for RtmpStream {
282 fn as_raw_fd(&self) -> RawFd {
283 self.tokio_stream.as_raw_fd()
284 }
285 }
286
287 impl AsFd for RtmpStream {
288 fn as_fd(&self) -> BorrowedFd<'_> {
289 self.tokio_stream.as_fd()
290 }
291 }
292}
293
294#[cfg(any(all(doc, docsrs), windows))]
295#[cfg_attr(docsrs, doc(cfg(windows)))]
296mod sys {
297 use tokio::os::windows::io::{
298 AsRawSocket,
299 AsSocket,
300 BorrowedSocket,
301 Rawsocket
302 };
303 use super::RtmpStream;
304
305 impl AsRawSocket for RtmpStream {
306 fn as_raw_socket(&self) -> RawSocket {
307 self.tokio_stream.as_raw_socket()
308 }
309 }
310
311 impl AsSocket for RtmpStream {
312 fn as_sokcet(&self) -> BorrowedSocket<'_> {
313 self.tokio_stream.as_socket()
314 }
315 }
316}
317
318#[cfg(all(tokio_unstable, target_os = "wasi"))]
319mod sys {
320 use std::os::wasi::prelude::*;
321 use super::RtmpStream;
322
323 impl AsRawFd for RtmpStream {
324 fn as_raw_fd(&self) -> RawFd {
325 self.tokio_stream.as_raw_fd()
326 }
327 }
328
329 impl AsFd for RtmpStream {
330 fn as_fd(&self) -> BorrowedFd<'_> {
331 self.tokio_stream.as_fd()
332 }
333 }
334}