sheave_core/net/rtmp.rs
1use std::{
2 fmt::{
3 Debug,
4 Formatter,
5 Result as FormatResult
6 },
7 io::{
8 Error as IOError,
9 IoSlice,
10 IoSliceMut,
11 Result as IOResult
12 },
13 net::{
14 SocketAddr,
15 TcpStream as StdStream
16 },
17 pin::Pin,
18 task::{
19 Context as FutureContext,
20 Poll
21 },
22 time::Duration
23};
24use bytes::buf::BufMut;
25use pin_project_lite::pin_project;
26use tokio::{
27 io::{
28 AsyncRead,
29 AsyncWrite,
30 Interest,
31 ReadBuf,
32 Ready
33 },
34 net::{
35 TcpStream as TokioStream,
36 ToSocketAddrs,
37 tcp::{
38 OwnedReadHalf,
39 OwnedWriteHalf,
40 ReadHalf,
41 WriteHalf
42 }
43 }
44};
45
46pin_project! {
47 /// A stream for RTMP that wrapped Tokio's `TcpStream`.
48 ///
49 /// If you constructs this struct from some address, use `RtmpStream::connect("aaa.bbb.ccc.ddd:1935")`.
50 /// Or if you do it from already created std's TcpStream, use `RtmpStream::from_std(std_stream)`
51 pub struct RtmpStream {
52 #[pin]
53 tokio_stream: TokioStream
54 }
55}
56
57impl RtmpStream {
58 fn new(tokio_stream: TokioStream) -> Self {
59 Self { tokio_stream }
60 }
61
62 /// Opens a RTMP connection to a remote host.
63 ///
64 /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
65 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.connect)
66 pub async fn connect<A: ToSocketAddrs>(addr: A) -> IOResult<Self> {
67 TokioStream::connect(addr).await.map(Self::new)
68 }
69
70 /// Creates new RtmpStream from a `std::net::TcpStream`.
71 ///
72 /// When connection succeeded, this wraps tokio's TcpStream into RtmpStream.
73 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.from_std)
74 pub fn from_std(std_stream: StdStream) -> IOResult<Self> {
75 TokioStream::from_std(std_stream).map(Self::new)
76 }
77
78 /// Turns a `sheave_core::net::rtmp::RtmpStream into `std::net::TcpStream`.
79 ///
80 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_std)
81 pub fn into_std(self) -> IOResult<StdStream> {
82 self.tokio_stream.into_std()
83 }
84
85 /// Returns the local address that this stream is bound to.
86 ///
87 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.local_addr)
88 pub fn local_addr(&self) -> IOResult<SocketAddr> {
89 self.tokio_stream.local_addr()
90 }
91
92 /// Returns the value of the `SO_ERROR` option.
93 pub fn take_error(&self) -> IOResult<Option<IOError>> {
94 self.tokio_stream.take_error()
95 }
96
97 /// Returns the remote address that this stream is connected to.
98 ///
99 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peer_addr)
100 pub fn peer_addr(&self) -> IOResult<SocketAddr> {
101 self.tokio_stream.peer_addr()
102 }
103
104 /// Attempts to receive data on the socket, without removing that data from the queue, registering the current task for wakeup if data is not yet available.
105 ///
106 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_peek)
107 pub fn poll_peek(&self, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<usize>> {
108 self.tokio_stream.poll_peek(cx, buf)
109 }
110
111 /// Waits for any of the requested ready states.
112 ///
113 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ready)
114 pub async fn ready(&self, interest: Interest) -> IOResult<Ready> {
115 self.tokio_stream.ready(interest).await
116 }
117
118 /// Waits for the socket to become readable.
119 ///
120 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.readable)
121 pub async fn readable(&self) -> IOResult<()> {
122 self.tokio_stream.readable().await
123 }
124
125 /// Polls for read readiness.
126 ///
127 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_read_ready)
128 pub fn poll_read_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
129 self.tokio_stream.poll_read_ready(cx)
130 }
131
132 /// Tries to read data from the stream into the provided buffer, returning how many bytes were read.
133 ///
134 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read)
135 pub fn try_read(&self, buf: &mut [u8]) -> IOResult<usize> {
136 self.tokio_stream.try_read(buf)
137 }
138
139 /// Tries to read data from the stream into the provided buffers, returning how many bytes were read.
140 ///
141 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_vectored)
142 pub fn try_read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> IOResult<usize> {
143 self.tokio_stream.try_read_vectored(bufs)
144 }
145
146 /// Tries to read data from the stream into the provided buffer, advancing the buffer’s internal cursor, returning how many bytes were read.
147 ///
148 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_read_buf)
149 pub fn try_read_buf<B: BufMut>(&self, buf: &mut B) -> IOResult<usize> {
150 self.tokio_stream.try_read_buf(buf)
151 }
152
153 /// Waits for the socket to become writable.
154 ///
155 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.writable)
156 pub async fn writable(&self) -> IOResult<()> {
157 self.tokio_stream.writable().await
158 }
159
160 /// Polls for write readiness.
161 ///
162 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.poll_write_ready)
163 pub fn poll_write_ready(&self, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
164 self.tokio_stream.poll_write_ready(cx)
165 }
166
167 /// Tries to write several buffers to the stream, returning how many bytes were written.
168 ///
169 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write)
170 pub fn try_write(&self, buf: &[u8]) -> IOResult<usize> {
171 self.tokio_stream.try_write(buf)
172 }
173
174 /// Tries to write several buffers to the stream, returning how many bytes were written.
175 ///
176 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_write_vectored)
177 pub fn try_write_vectored(&self, bufs: &[IoSlice<'_>]) -> IOResult<usize> {
178 self.tokio_stream.try_write_vectored(bufs)
179 }
180
181 /// Tries to read or write from the socket using a user-provided IO operation.
182 ///
183 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.try_io)
184 pub fn try_io<R>(&self, interest: Interest, f: impl FnOnce() -> IOResult<R>) -> IOResult<R> {
185 self.tokio_stream.try_io(interest, f)
186 }
187
188 /// Reads or writes from the socket using a user-provided IO operation.
189 ///
190 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.async_io)
191 pub async fn async_io<R>(&self, interest: Interest, f: impl FnMut() -> IOResult<R>) -> IOResult<R> {
192 self.tokio_stream.async_io(interest, f).await
193 }
194
195 /// Receives data on the socket from the remote address to which it is connected, without removing that data from the queue.
196 /// On success, returns the number of bytes peeked.
197 ///
198 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.peek)
199 pub async fn peek(&self, buf: &mut [u8]) -> IOResult<usize> {
200 self.tokio_stream.peek(buf).await
201 }
202
203 /// Gets the value of the TCP_NODELAY option on this socket.
204 ///
205 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.nodelay)
206 pub fn nodelay(&self) -> IOResult<bool> {
207 self.tokio_stream.nodelay()
208 }
209
210 /// Sets the value of the TCP_NODELAY option on this socket.
211 ///
212 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_nodelay)
213 pub fn set_nodelay(&self, nodelay: bool) -> IOResult<()> {
214 self.tokio_stream.set_nodelay(nodelay)
215 }
216
217 /// Reads the linger duration for this socket by getting the SO_LINGER option.
218 ///
219 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.linger)
220 pub fn linger(&self) -> IOResult<Option<Duration>> {
221 self.tokio_stream.linger()
222 }
223
224 /// Sets the linger duration of this socket by setting the SO_LINGER option.
225 ///
226 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_linger)
227 pub fn set_linger(&self, dur: Option<Duration>) -> IOResult<()> {
228 self.tokio_stream.set_linger(dur)
229 }
230
231 /// Gets the value of the IP_TTL option for this socket.
232 ///
233 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.ttl)
234 pub fn ttl(&self) -> IOResult<u32> {
235 self.tokio_stream.ttl()
236 }
237
238 /// Sets the value for the IP_TTL option on this socket.
239 ///
240 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.set_ttl)
241 pub fn set_ttl(&self, ttl: u32) -> IOResult<()> {
242 self.tokio_stream.set_ttl(ttl)
243 }
244
245 /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
246 ///
247 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.split)
248 pub fn split<'a>(&'a mut self) -> (ReadHalf<'a>, WriteHalf<'a>) {
249 self.tokio_stream.split()
250 }
251
252 /// Splits a TcpStream into a read half and a write half, which can be used to read and write the stream concurrently.
253 ///
254 /// [Read more](https://docs.rs/tokio/latest/tokio/net/struct.TcpStream.html#method.into_split)
255 pub fn into_split(self) -> (OwnedReadHalf, OwnedWriteHalf) {
256 self.tokio_stream.into_split()
257 }
258}
259
260impl TryFrom<StdStream> for RtmpStream {
261 type Error = IOError;
262
263 fn try_from(std_stream: StdStream) -> IOResult<Self> {
264 Self::from_std(std_stream)
265 }
266}
267
268impl From<TokioStream> for RtmpStream {
269 fn from(tokio_stream: TokioStream) -> Self {
270 Self::new(tokio_stream)
271 }
272}
273
274impl AsyncRead for RtmpStream {
275 fn poll_read(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &mut ReadBuf<'_>) -> Poll<IOResult<()>> {
276 let this = self.project();
277 this.tokio_stream.poll_read(cx, buf)
278 }
279}
280
281impl AsyncWrite for RtmpStream {
282 fn poll_write(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, buf: &[u8]) -> Poll<IOResult<usize>> {
283 let this = self.project();
284 this.tokio_stream.poll_write(cx, buf)
285 }
286
287 fn poll_flush(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
288 let this = self.project();
289 this.tokio_stream.poll_flush(cx)
290 }
291
292 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut FutureContext<'_>) -> Poll<IOResult<()>> {
293 let this = self.project();
294 this.tokio_stream.poll_shutdown(cx)
295 }
296}
297
298impl Debug for RtmpStream {
299 fn fmt(&self, f: &mut Formatter<'_>) -> FormatResult {
300 self.tokio_stream.fmt(f)
301 }
302}
303
304#[cfg(unix)]
305mod sys {
306 use std::os::unix::prelude::*;
307 use super::RtmpStream;
308
309 impl AsRawFd for RtmpStream {
310 fn as_raw_fd(&self) -> RawFd {
311 self.tokio_stream.as_raw_fd()
312 }
313 }
314
315 impl AsFd for RtmpStream {
316 fn as_fd(&self) -> BorrowedFd<'_> {
317 self.tokio_stream.as_fd()
318 }
319 }
320}
321
322#[cfg(any(all(doc, docsrs), windows))]
323#[cfg_attr(docsrs, doc(cfg(windows)))]
324mod sys {
325 use tokio::os::windows::io::{
326 AsRawSocket,
327 AsSocket,
328 BorrowedSocket,
329 Rawsocket
330 };
331 use super::RtmpStream;
332
333 impl AsRawSocket for RtmpStream {
334 fn as_raw_socket(&self) -> RawSocket {
335 self.tokio_stream.as_raw_socket()
336 }
337 }
338
339 impl AsSocket for RtmpStream {
340 fn as_sokcet(&self) -> BorrowedSocket<'_> {
341 self.tokio_stream.as_socket()
342 }
343 }
344}
345
346#[cfg(all(tokio_unstable, target_os = "wasi"))]
347mod sys {
348 use std::os::wasi::prelude::*;
349 use super::RtmpStream;
350
351 impl AsRawFd for RtmpStream {
352 fn as_raw_fd(&self) -> RawFd {
353 self.tokio_stream.as_raw_fd()
354 }
355 }
356
357 impl AsFd for RtmpStream {
358 fn as_fd(&self) -> BorrowedFd<'_> {
359 self.tokio_stream.as_fd()
360 }
361 }
362}