sheave_client/handlers/
rtmp.rs

1use std::{
2    future::Future,
3    io::{
4        Error as IOError,
5        ErrorKind,
6        Result as IOResult
7    },
8    pin::{
9        Pin,
10        pin
11    },
12    sync::Arc,
13    task::{
14        Context as FutureContext,
15        Poll
16    },
17    time::{
18        Duration,
19        Instant
20    }
21};
22use log::{
23    error,
24    info
25};
26use futures::ready;
27use tokio::io::{
28    AsyncRead,
29    AsyncWrite
30};
31use sheave_core::{
32    ByteBuffer,
33    Decoder,
34    Encoder,
35    U24_MAX,
36    flv::tags::*,
37    handlers::{
38        AsyncHandler,
39        AsyncHandlerExt,
40        ClientType,
41        ErrorHandler,
42        HandlerConstructor,
43        LastChunk,
44        PublisherStatus,
45        RtmpContext,
46        StreamWrapper,
47        SubscriberStatus,
48        inconsistent_sha,
49        stream_got_exhausted
50    },
51    handshake::{
52        EncryptionAlgorithm,
53        Handshake,
54        Version
55    },
56    messages::{
57        /* Used in common */
58        Channel,
59        ChunkData,
60        Connect,
61        ConnectResult,
62        CreateStream,
63        CreateStreamResult,
64        UserControl,
65        EventType,
66        OnStatus,
67        Audio,
68        Video,
69        SetDataFrame,
70        Acknowledgement,
71        amf::v0::{
72            Number,
73            AmfString,
74            Object
75        },
76        headers::MessageType,
77
78        /* Publisher-side */
79        ReleaseStream,
80        ReleaseStreamResult,
81        FcPublish,
82        OnFcPublish,
83        StreamBegin,
84        Publish,
85        FcUnpublish,
86        DeleteStream,
87
88        /* Subscriber-side */
89        WindowAcknowledgementSize,
90        FcSubscribe,
91        SetBufferLength,
92        Play,
93        amf::v0::Boolean
94    },
95    net::RtmpReadExt,
96    object,
97    readers::*,
98    writers::*
99};
100use super::{
101    error_response,
102    middlewares::write_acknowledgement
103};
104
105#[doc(hidden)]
106#[derive(Debug)]
107struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
108
109#[doc(hidden)]
110impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
111    async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
112        let encryption_algorithm = EncryptionAlgorithm::default();
113
114        let version = if rtmp_context.is_signed() {
115            Version::LATEST_CLIENT
116        } else {
117            Version::UNSIGNED
118        };
119        let mut client_request = Handshake::new(Instant::now().elapsed(), version);
120        if rtmp_context.is_signed() {
121            client_request.imprint_digest(encryption_algorithm, Handshake::CLIENT_KEY);
122        }
123
124        write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
125        write_handshake(self.0.as_mut(), &client_request).await?;
126
127        rtmp_context.set_encryption_algorithm(encryption_algorithm);
128        rtmp_context.set_client_handshake(client_request);
129
130        info!("First handshake got handled.");
131        Ok(())
132    }
133
134    async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
135        let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
136        let mut server_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
137        let server_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
138
139        if !rtmp_context.is_signed() {
140            write_handshake(self.0.as_mut(), &server_request).await?;
141
142            rtmp_context.set_server_handshake(server_request);
143            rtmp_context.set_client_handshake(server_response);
144
145        } else if !server_request.did_digest_match(encryption_algorithm, Handshake::SERVER_KEY) {
146            error!("Invalid SHA digest/signature: {:x?}", server_request.get_digest(encryption_algorithm));
147            return Err(inconsistent_sha(server_response.get_digest(encryption_algorithm).to_vec()))
148        } else {
149            let mut server_response_key: Vec<u8> = Vec::new();
150            server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151            server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152
153            if !server_response.did_signature_match(encryption_algorithm, &server_response_key) {
154                error!("Invalid SHA digest/signature: {:x?}", server_response.get_signature());
155                return Err(inconsistent_sha(server_response.get_signature().to_vec()))
156            } else {
157                let mut client_response_key: Vec<u8> = Vec::new();
158                client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
159                client_response_key.extend_from_slice(Handshake::COMMON_KEY);
160                server_request.imprint_signature(encryption_algorithm, &client_response_key);
161                write_handshake(self.0.as_mut(), &server_request).await?;
162
163                rtmp_context.set_server_handshake(server_request);
164                rtmp_context.set_client_handshake(server_response);
165            }
166        }
167
168        info!("Second handshake got handled.");
169        Ok(())
170    }
171}
172
173#[doc(hidden)]
174impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
175    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
176        ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
177        pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
178    }
179}
180
181#[doc(hidden)]
182fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
183    HandshakeHandler(stream)
184}
185
186#[doc(hidden)]
187#[derive(Debug)]
188struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
189
190#[doc(hidden)]
191impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
192    async fn write_connect_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
193        use ClientType::*;
194
195        let client_type = rtmp_context.get_client_type().unwrap();
196
197        rtmp_context.increase_transaction_id();
198
199        let command_object = match client_type {
200            Publisher => object!(
201                "app" => rtmp_context.get_app().unwrap().clone(),
202                "type" => AmfString::from("nonprivate"),
203                "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
204                "tcUrl" => rtmp_context.get_tc_url().unwrap().clone()
205            ),
206            Subscriber => object!(
207                "app" => rtmp_context.get_app().unwrap().clone(),
208                "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
209                "tcUrl" => rtmp_context.get_tc_url().unwrap().clone(),
210                "fpad" => Boolean::new(0),
211                "capabilities" => Number::from(15u8),
212                "audioCodecs" => Number::from(4071u16),
213                "videoCodecs" => Number::from(252u8),
214                "videoFunction" => Number::from(1u8)
215            )
216        };
217        let connect = Connect::new(command_object);
218        let mut buffer = ByteBuffer::default();
219        buffer.encode(&AmfString::from("connect"));
220        buffer.encode(&rtmp_context.get_transaction_id());
221        buffer.encode(&connect);
222        write_chunk(self.0.as_mut(), rtmp_context, Connect::CHANNEL.into(), Duration::default(), Connect::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
223
224        rtmp_context.set_command_object(connect.into());
225
226        info!("connect got sent.");
227        Ok(())
228    }
229
230    async fn write_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
231        let mut buffer = ByteBuffer::default();
232        buffer.encode(&rtmp_context.get_window_acknowledgement_size());
233        write_chunk(self.0.as_mut(), rtmp_context, WindowAcknowledgementSize::CHANNEL.into(), Duration::default(), WindowAcknowledgementSize::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
234
235        rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
236
237        info!("Window Acknowledgement Size got sent.");
238        Ok(())
239    }
240
241    async fn write_release_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
242        rtmp_context.increase_transaction_id();
243
244        let mut buffer = ByteBuffer::default();
245        buffer.encode(&AmfString::from("releaseStream"));
246        buffer.encode(&rtmp_context.get_transaction_id());
247        buffer.encode(&ReleaseStream::new(rtmp_context.get_topic_path().unwrap().clone()));
248        write_chunk(self.0.as_mut(), rtmp_context, ReleaseStream::CHANNEL.into(), Duration::default(), ReleaseStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
249
250        info!("releaseStream got sent.");
251        Ok(())
252    }
253
254    async fn write_fc_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
255        rtmp_context.increase_transaction_id();
256
257        let mut buffer = ByteBuffer::default();
258        buffer.encode(&AmfString::from("FCPublish"));
259        buffer.encode(&rtmp_context.get_transaction_id());
260        buffer.encode(&FcPublish::new(rtmp_context.get_topic_path().unwrap().clone()));
261        write_chunk(self.0.as_mut(), rtmp_context, FcPublish::CHANNEL.into(), Duration::default(), FcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
262
263        info!("FCPublish got sent.");
264        Ok(())
265    }
266
267    async fn write_create_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
268        rtmp_context.increase_transaction_id();
269
270        let mut buffer = ByteBuffer::default();
271        buffer.encode(&AmfString::from("createStream"));
272        buffer.encode(&rtmp_context.get_transaction_id());
273        buffer.encode(&CreateStream);
274        write_chunk(self.0.as_mut(), rtmp_context, CreateStream::CHANNEL.into(), Duration::default(), CreateStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
275
276        info!("createStream got sent.");
277        Ok(())
278    }
279
280    async fn write_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
281        rtmp_context.increase_transaction_id();
282
283        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
284
285        let mut buffer = ByteBuffer::default();
286        buffer.encode(&FcSubscribe::new(topic_path));
287        write_chunk(self.0.as_mut(), rtmp_context, FcSubscribe::CHANNEL.into(), Duration::default(), FcSubscribe::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
288
289        rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
290
291        info!("FCSubscribe got sent.");
292        Ok(())
293    }
294
295    async fn write_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
296        rtmp_context.increase_transaction_id();
297
298        let publishing_name = rtmp_context.get_topic_path().unwrap().clone();
299        let publishing_type = "live";
300        let mut buffer = ByteBuffer::default();
301        buffer.encode(&AmfString::from("publish"));
302        buffer.encode(&rtmp_context.get_transaction_id());
303        buffer.encode(&Publish::new(publishing_name.clone(), publishing_type.into()));
304        let message_id = rtmp_context.get_message_id().unwrap();
305        write_chunk(self.0.as_mut(), rtmp_context, Publish::CHANNEL.into(), Duration::default(), Publish::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
306
307        info!("publish got sent.");
308        Ok(())
309    }
310
311    async fn write_play_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
312        rtmp_context.increase_transaction_id();
313
314        let stream_name = rtmp_context.get_topic_path().unwrap().clone();
315        let start_time = rtmp_context.get_start_time().unwrap();
316        let play_mode = rtmp_context.get_play_mode().unwrap();
317
318        let mut buffer = ByteBuffer::default();
319        buffer.encode(&AmfString::from("play"));
320        buffer.encode(&rtmp_context.get_transaction_id());
321        buffer.encode(&Play::new(stream_name.clone(), start_time, play_mode));
322        write_chunk(self.0.as_mut(), rtmp_context, Play::CHANNEL.into(), Duration::default(), Play::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
323
324        info!("play got sent.");
325        Ok(())
326    }
327
328    async fn write_buffer_length(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
329        let message_id = rtmp_context.get_message_id().unwrap();
330        let mut buffer = ByteBuffer::default();
331        buffer.put_u16_be(SetBufferLength::EVENT_TYPE.into());
332        buffer.encode(&SetBufferLength::new(message_id, rtmp_context.get_buffer_length()));
333        write_chunk(self.0.as_mut(), rtmp_context, SetBufferLength::CHANNEL.into(), Duration::default(), SetBufferLength::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
334
335        rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
336
337        info!("Buffer Length got sent.");
338        Ok(())
339    }
340
341    async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
342        for next in rtmp_context.get_topic_mut().unwrap() {
343            let flv_tag = next?;
344            let message_id = rtmp_context.get_message_id().unwrap();
345
346            let channel;
347            let message_type;
348            match flv_tag.get_tag_type() {
349                TagType::Audio => {
350                    channel = Audio::CHANNEL;
351                    message_type = Audio::MESSAGE_TYPE;
352                },
353                TagType::Video => {
354                    channel = Video::CHANNEL;
355                    message_type = Video::MESSAGE_TYPE;
356                },
357                TagType::ScriptData => {
358                    channel = SetDataFrame::CHANNEL;
359                    message_type = SetDataFrame::MESSAGE_TYPE;
360                },
361                TagType::Other => {
362                    channel = Channel::Other;
363                    message_type = MessageType::Other;
364                }
365            }
366            let timestamp = flv_tag.get_timestamp();
367            let data: Vec<u8> = if let MessageType::Data = message_type {
368                let mut buffer = ByteBuffer::default();
369                buffer.encode(&AmfString::from("@setDataFrame"));
370                buffer.put_bytes(flv_tag.get_data());
371                buffer.into()
372            } else {
373                flv_tag.get_data().to_vec()
374            };
375            write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
376
377            info!("FLV chunk got sent.");
378            return Ok(())
379        }
380
381        // NOTE: Default return value when no FLV tag exists.
382        info!("FLV data became empty.");
383        Err(stream_got_exhausted())
384    }
385
386    async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
387        Decoder::<Acknowledgement>::decode(&mut buffer)?;
388
389        info!("Acknowledgement got handled.");
390        Ok(())
391    }
392
393    async fn handle_stream_begin(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
394        use ClientType::*;
395
396        let client_type = rtmp_context.get_client_type().unwrap();
397
398        Decoder::<StreamBegin>::decode(&mut buffer)?;
399
400        match client_type {
401            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
402            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
403        }
404
405        info!("Stream Begin got handled.");
406        Ok(())
407    }
408
409    async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
410        use EventType::*;
411
412        let event_type: EventType = buffer.get_u16_be()?.into();
413        match event_type {
414            StreamBegin => self.handle_stream_begin(rtmp_context, buffer).await,
415            _ => unreachable!("Publisher gets just a Stream Begin event.")
416        }
417    }
418
419    async fn handle_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object) -> IOResult<()> {
420        let error = error_response(information.clone());
421        rtmp_context.set_information(information);
422
423        error!("{error}");
424        Err(error)
425    }
426
427    async fn handle_connect_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
428        use ClientType::*;
429
430        let response: ConnectResult = buffer.decode()?;
431        let (properties, information): (Object, Object) = response.into();
432
433        rtmp_context.set_properties(properties);
434        rtmp_context.set_information(information);
435
436        match rtmp_context.get_client_type().unwrap() {
437            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
438            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
439        }
440
441        info!("connect result got handled.");
442        Ok(())
443    }
444
445    async fn handle_release_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
446        Decoder::<ReleaseStreamResult>::decode(&mut buffer)?;
447
448        rtmp_context.set_publisher_status(PublisherStatus::Released);
449
450        info!("releaseStream result got handled.");
451        Ok(())
452    }
453
454    async fn handle_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
455        Decoder::<OnFcPublish>::decode(&mut buffer)?;
456
457        rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
458
459        info!("onFCPublish got handled.");
460        Ok(())
461    }
462
463    async fn handle_create_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
464        use ClientType::*;
465
466        let client_type = rtmp_context.get_client_type().unwrap();
467
468        let response: CreateStreamResult = buffer.decode()?;
469        let message_id: u32 = response.into();
470        rtmp_context.set_message_id(message_id);
471
472        match client_type {
473            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
474            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
475        }
476
477        info!("createStream result got handled.");
478        Ok(())
479    }
480
481    async fn handle_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
482        let response: OnStatus = buffer.decode()?;
483        let information: Object = response.into();
484
485        /*
486         *  NOTE:
487         *      Some error in publication step is checkable only by information the field.
488         *      Because the publish command doesn't have _error command.
489         */
490        if information.get_properties()["level"] == AmfString::from("error") {
491            return self.handle_error_response(rtmp_context, information).await
492        }
493
494        rtmp_context.set_information(information);
495
496        rtmp_context.set_publisher_status(PublisherStatus::Published);
497
498        info!("onStatus(publish) got handled.");
499        Ok(())
500    }
501
502    async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
503        Decoder::<FcUnpublish>::decode(&mut buffer)?;
504        rtmp_context.reset_topic_path();
505
506        info!("FCUnpublish got handled.");
507        Ok(())
508    }
509
510    async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
511        Decoder::<DeleteStream>::decode(&mut buffer)?;
512        rtmp_context.reset_message_id();
513
514        info!("deleteStream got handled.");
515        Ok(())
516    }
517
518    async fn handle_publisher_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
519        use PublisherStatus::*;
520
521        let command: AmfString = buffer.decode()?;
522
523        // NOTE: onFCPublish has no transaction ID.
524        if command != "onFCPublish" {
525            // NOTE: Otherwise, currently unused but exists.
526            Decoder::<Number>::decode(&mut buffer)?;
527        }
528
529        if command == "FCUnpublish" {
530            return self.handle_fc_unpublish_request(rtmp_context, buffer).await
531        } else if command == "deleteStream" {
532            return self.handle_delete_stream_request(rtmp_context, buffer).await
533        } else if command == "_error" {
534            let information: Object = buffer.decode()?;
535            return self.handle_error_response(rtmp_context, information).await
536        } else {
537            /* In this step, does nothing unless command is either "FCUnpublish" or "deleteStream". */
538        }
539
540        if let Some(publisher_status) = rtmp_context.get_publisher_status() {
541            match publisher_status {
542                Connected => self.handle_release_stream_response(rtmp_context, buffer).await,
543                Released => self.handle_fc_publish_response(rtmp_context, buffer).await,
544                FcPublished => self.handle_create_stream_response(rtmp_context, buffer).await,
545                Began => self.handle_publish_response(rtmp_context, buffer).await,
546                _ => Ok(())
547            }
548        } else {
549            self.handle_connect_response(rtmp_context, buffer).await
550        }
551    }
552
553    async fn handle_play_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
554        let response: OnStatus = buffer.decode()?;
555        let information: Object = response.into();
556
557        /*
558         *  NOTE:
559         *      Some error in subscription step is checkable only by information the field.
560         *      Because the play command doesn't have _error command.
561         */
562        if information.get_properties()["level"] == AmfString::from("error") {
563            return self.handle_error_response(rtmp_context, information).await
564        }
565
566        rtmp_context.set_information(information);
567
568        rtmp_context.set_subscriber_status(SubscriberStatus::Played);
569
570        info!("onStatus(play) got handled.");
571        Ok(())
572    }
573
574    async fn handle_subscriber_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
575        use SubscriberStatus::*;
576
577        let command: AmfString = buffer.decode()?;
578        Decoder::<Number>::decode(&mut buffer)?;
579
580        if command == "_error" {
581            let information: Object = buffer.decode()?;
582            return self.handle_error_response(rtmp_context, information).await
583        }
584
585        if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
586            match subscriber_status {
587                WindowAcknowledgementSizeGotSent => self.handle_create_stream_response(rtmp_context, buffer).await,
588                Began => self.handle_play_response(rtmp_context, buffer).await,
589                _ => return Ok(())
590            }
591        } else {
592            self.handle_connect_response(rtmp_context, buffer).await
593        }
594    }
595
596    async fn handle_command_response(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
597        use ClientType::*;
598
599        match rtmp_context.get_client_type().unwrap() {
600            Publisher => self.handle_publisher_response(rtmp_context, buffer).await,
601            Subscriber => self.handle_subscriber_response(rtmp_context, buffer).await
602        }
603    }
604
605    async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
606        let topic = rtmp_context.get_topic().unwrap();
607
608        let tag_type = match message_type {
609            MessageType::Audio => TagType::Audio,
610            MessageType::Video => TagType::Video,
611            MessageType::Data => TagType::ScriptData,
612            _ => TagType::Other
613        };
614
615        if let TagType::ScriptData = tag_type {
616            Decoder::<AmfString>::decode(&mut buffer)?;
617        }
618
619        let data: Vec<u8> = buffer.into();
620        let flv_tag = FlvTag::new(tag_type, timestamp, data);
621        topic.append_flv_tag(flv_tag)?;
622
623        info!("FLV chunk got handled.");
624        Ok(())
625    }
626}
627
628#[doc(hidden)]
629impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
630    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
631        use MessageType::*;
632
633        if let Some(publisher_status) = rtmp_context.get_publisher_status() {
634            match publisher_status {
635                PublisherStatus::Connected => ready!(pin!(self.write_release_stream_request(rtmp_context)).poll(cx))?,
636                PublisherStatus::Released => ready!(pin!(self.write_fc_publish_request(rtmp_context)).poll(cx))?,
637                PublisherStatus::FcPublished => ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?,
638                PublisherStatus::Created => ready!(pin!(self.write_publish_request(rtmp_context)).poll(cx))?,
639                PublisherStatus::Published => ready!(pin!(self.write_flv(rtmp_context)).poll(cx))?,
640                _ => {}
641            }
642        } else if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
643            match subscriber_status {
644                SubscriberStatus::Connected => {
645                    ready!(pin!(self.write_window_acknowledgement_size(rtmp_context)).poll(cx))?;
646                    ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?
647                },
648                SubscriberStatus::Created => {
649                    ready!(pin!(self.write_fc_subscribe_request(rtmp_context)).poll(cx))?;
650                    rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
651                },
652                SubscriberStatus::AdditionalCommandGotSent => {
653                    ready!(pin!(self.write_play_request(rtmp_context)).poll(cx))?;
654                    ready!(pin!(self.write_buffer_length(rtmp_context)).poll(cx))?
655                },
656                _ => {}
657            }
658        } else {
659            ready!(pin!(self.write_connect_request(rtmp_context)).poll(cx))?;
660        }
661
662        let basic_header = if let Some(PublisherStatus::Published) = rtmp_context.get_publisher_status() {
663            ready!(pin!(read_basic_header(pin!(self.0.try_read_after(rtmp_context.get_await_duration().unwrap())))).poll(cx))?
664        } else {
665            ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?
666        };
667        let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
668        let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
669            if timestamp.as_millis() == U24_MAX as u128 {
670                let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
671                Some(extended_timestamp)
672            } else {
673                None
674            }
675        } else {
676            None
677        };
678
679        let chunk_id = basic_header.get_chunk_id();
680        if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
681            if let Some(extended_timestamp) = extended_timestamp {
682                last_received_chunk.set_timestamp(extended_timestamp);
683            } else {
684                if let Some(timestamp) = message_header.get_timestamp() {
685                    last_received_chunk.set_timestamp(timestamp);
686                }
687            }
688
689            if let Some(message_length) = message_header.get_message_length() {
690                last_received_chunk.set_message_length(message_length);
691            }
692
693            if let Some(message_type) = message_header.get_message_type() {
694                last_received_chunk.set_message_type(message_type);
695            }
696
697            if let Some(message_id) = message_header.get_message_id() {
698                last_received_chunk.set_message_id(message_id);
699            }
700        } else {
701            rtmp_context.insert_received_chunk(
702                chunk_id,
703                LastChunk::new(
704                    message_header.get_timestamp().unwrap(),
705                    message_header.get_message_length().unwrap(),
706                    message_header.get_message_type().unwrap(),
707                    message_header.get_message_id().unwrap()
708                )
709            );
710        }
711
712        let message_length = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length();
713        let receiving_chunk_size = rtmp_context.get_receiving_chunk_size();
714        let data = ready!(pin!(read_chunk_data(pin!(self.0.await_until_receiving()), receiving_chunk_size, message_length)).poll(cx))?;
715        let buffer: ByteBuffer = data.into();
716
717        let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
718        match message_type {
719            Acknowledgement => pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx),
720            UserControl => pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx),
721            Command => pin!(self.handle_command_response(rtmp_context, buffer)).poll(cx),
722            Audio | Video | Data => {
723                let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
724                pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx)
725            },
726            other => unimplemented!("Undefined Message: {other:?}")
727        }
728    }
729}
730
731#[doc(hidden)]
732fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
733    MessageHandler(stream)
734}
735
736#[doc(hidden)]
737#[derive(Debug)]
738struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
739
740#[doc(hidden)]
741impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
742    async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
743        rtmp_context.increase_transaction_id();
744
745        let mut buffer = ByteBuffer::default();
746        buffer.encode(&AmfString::from("FCUnpublish"));
747        buffer.encode(&FcUnpublish::new(rtmp_context.get_topic_path().unwrap().clone()));
748        write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
749
750        info!("FCUnpublish got sent.");
751        Ok(())
752    }
753
754    async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
755        let message_id = rtmp_context.get_message_id().unwrap();
756
757        rtmp_context.increase_transaction_id();
758
759        let mut buffer = ByteBuffer::default();
760        buffer.encode(&AmfString::from("deleteStream"));
761        buffer.encode(&DeleteStream::new(message_id.into()));
762        write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
763
764        info!("deleteStream got sent.");
765        Ok(())
766    }
767}
768
769#[doc(hidden)]
770impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
771    fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
772        if error.kind() != ErrorKind::Other {
773            if let Some(publisher_status) = rtmp_context.get_publisher_status() {
774                if publisher_status >= PublisherStatus::FcPublished {
775                    ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
776                }
777
778                if publisher_status >= PublisherStatus::Created {
779                    ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
780                }
781            }
782        }
783
784        self.0.as_mut().poll_shutdown(cx)
785    }
786}
787
788#[doc(hidden)]
789fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
790    CloseHandler(stream)
791}
792
793/// The default RTMP handler.
794///
795/// This handles the raw RTMP by well-known communication steps. That is, this performs just following steps.
796///
797/// # As a publisher
798///
799/// 1. Specifies the application name via the [`Connect`] command.
800/// 2. Specifies the topic path via the [`ReleaseStream`]/[`FcPublish`] command.
801/// 3. Requests a message ID via the [`CreateStream`] command.
802/// 4. Specifies publication informations via the [`Publish`] command.
803/// 5. Then sends FLV media data.
804///
805/// If some error occurs in any step, sends commands which are [`FcUnpublish`] and [`DeleteStream`] to its server, then terminates its connection.
806/// These perform to delete the topic path and a message ID from its context.
807/// However also these can be sent from servers.
808///
809/// # As a subscriber
810///
811/// 1. Specifies the application name via the [`Connect`] command.
812/// 2. Tells the size of receiving bandwidth via the [`WindowAcknowledgementSize`] message.
813/// 3. Requests a message ID via the [`CreateStream`] command.
814/// 4. Specified the topic path via the [`FcSubscribe`] command.
815/// 5. Following additional command may be required.
816///    * Requests the duration of its topic via the [`GetStreamLength`] command. (in FFmpeg)
817///    * Requests a list of topics as a playlist via the [`SetPlaylist`] command. (in OBS)
818/// 6. Specifies subscription information via the [`Play`] command.
819/// 7. Specifies a time range to buffer its topic via the [`SetBufferLength`] event.
820/// 8. Then receives FLV media data.
821///
822/// If receiving data size exceeds client's bandwidth, this reports its thing via the [`Acknowledgement`] message to its server.
823///
824/// # Examples
825///
826/// ```rust
827/// use std::marker::PhantomData;
828/// use sheave_core::handlers::{
829///     RtmpContext,
830///     VecStream
831/// };
832/// use sheave_client::{
833///     Client,
834///     handlers::RtmpHandler,
835/// };
836///
837/// let stream = VecStream::default();
838/// let rtmp_context = RtmpContext::default();
839/// let client = Client::new(stream, rtmp_context, PhantomData::<RtmpHandler<VecStream>>);
840/// ```
841///
842/// [`Connect`]: sheave_core::messages::Connect
843/// [`ReleaseSream`]: sheave_core::messages::ReleaseStream
844/// [`FcPublish`]: sheave_core::messages::FcPublish
845/// [`CreateStream`]: sheave_core::messages::CreateStream
846/// [`Publish`]: sheave_core::messages::Publish
847/// [`Acknowledgement`]: sheave_core::messages::Acknowledgement
848/// [`WindowAcknowledgementSize`]: sheave_core::messages::WindowAcknowledgementSize
849/// [`FcSubscribe`]: sheave_core::messages::FcSubscribe
850/// [`GetStreamLength`]: sheave_core::messages::GetStreamLength
851/// [`SetPlaylist`]: sheave_core::messages::SetPlaylist
852/// [`Play`]: sheave_core::messages::Play
853/// [`SetBufferLength`]: sheave_core::messages::SetBufferLength
854#[derive(Debug)]
855pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
856
857impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
858    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
859        pin!(
860            handle_handshake(self.0.make_weak_pin())
861                .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
862                .map_err(handle_close(self.0.make_weak_pin()))
863        ).poll_handle(cx, rtmp_context)
864    }
865}
866
867impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
868    fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
869        Self(stream)
870    }
871}
872
873#[cfg(test)]
874mod tests {
875    use uuid::Uuid;
876    use sheave_core::{
877        handlers::VecStream,
878        messages::PlayMode
879    };
880    use super::*;
881
882    #[tokio::test]
883    async fn ok_handshake_got_handled() {
884        let mut stream = pin!(VecStream::default());
885        let mut rtmp_context = RtmpContext::default();
886        rtmp_context.set_signed(true);
887
888        handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await.unwrap();
889
890        let sent_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
891        let mut sent_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
892        assert_eq!(EncryptionAlgorithm::NotEncrypted, sent_encryption_algorithm);
893        assert!(sent_client_handshake.did_digest_match(EncryptionAlgorithm::NotEncrypted, Handshake::CLIENT_KEY));
894
895        let mut stream = pin!(VecStream::default());
896        let received_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
897        let mut received_server_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
898        received_server_handshake.imprint_digest(received_encryption_algorithm, Handshake::SERVER_KEY);
899        let mut server_response_key: Vec<u8> = Vec::new();
900        server_response_key.extend_from_slice(Handshake::SERVER_KEY);
901        server_response_key.extend_from_slice(Handshake::COMMON_KEY);
902        sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_response_key);
903        write_encryption_algorithm(stream.as_mut(), received_encryption_algorithm).await.unwrap();
904        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
905        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
906        assert!(handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await.is_ok());
907
908        let sent_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
909        let mut client_response_key: Vec<u8> = Vec::new();
910        client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
911        client_response_key.extend_from_slice(Handshake::COMMON_KEY);
912        assert!(sent_server_handshake.did_signature_match(sent_encryption_algorithm, &client_response_key))
913    }
914
915    #[tokio::test]
916    async fn ok_publisher_sequence() {
917        let mut stream = pin!(VecStream::default());
918        let mut rtmp_context = RtmpContext::default();
919        rtmp_context.set_tc_url("");
920        rtmp_context.set_app("");
921        rtmp_context.set_client_type(ClientType::Publisher);
922
923        handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
924        let mut stream = pin!(VecStream::default());
925        let mut buffer = ByteBuffer::default();
926        buffer.encode(
927            &ConnectResult::new(
928                object!(
929                    "fmsVer" => AmfString::from("FMS/5,0,17"),
930                    "capabilities" => Number::new(31f64)
931                ),
932                object!(
933                    "level" => AmfString::from("status"),
934                    "code" => AmfString::from("NetConnection.Connect.Success"),
935                    "description" => AmfString::from("Connection succeeded."),
936                    "objectEncoding" => Number::from(0)
937                )
938            )
939        );
940        assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
941        assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
942
943        rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
944        let mut stream = pin!(VecStream::default());
945        handle_message(stream.as_mut()).write_release_stream_request(&mut rtmp_context).await.unwrap();
946        let mut buffer = ByteBuffer::default();
947        buffer.encode(&ReleaseStreamResult);
948        assert!(handle_message(stream.as_mut()).handle_release_stream_response(&mut rtmp_context, buffer).await.is_ok());
949        assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
950
951        handle_message(stream.as_mut()).write_fc_publish_request(&mut rtmp_context).await.unwrap();
952        let mut stream = pin!(VecStream::default());
953        let mut buffer = ByteBuffer::default();
954        buffer.encode(&OnFcPublish);
955        assert!(handle_message(stream.as_mut()).handle_fc_publish_response(&mut rtmp_context, buffer).await.is_ok());
956        assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
957
958        handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
959        let mut stream = pin!(VecStream::default());
960        let mut buffer = ByteBuffer::default();
961        buffer.encode(&CreateStreamResult::new(0.into()));
962        assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
963        assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
964
965        handle_message(stream.as_mut()).write_publish_request(&mut rtmp_context).await.unwrap();
966        let message_id = rtmp_context.get_message_id().unwrap();
967        let mut stream = pin!(VecStream::default());
968        let mut buffer = ByteBuffer::default();
969        buffer.encode(&StreamBegin::new(message_id));
970        assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
971        assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
972
973        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
974        let mut buffer = ByteBuffer::default();
975        buffer.encode(
976            &OnStatus::new(
977                object!(
978                    "level" => AmfString::from("status"),
979                    "code" => AmfString::from("NetStream.Publish.Start"),
980                    "description" => AmfString::new(format!("{topic_path} is now published")),
981                    "details" => topic_path
982                )
983            )
984        );
985        assert!(handle_message(stream.as_mut()).handle_publish_response(&mut rtmp_context, buffer).await.is_ok());
986        assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
987    }
988
989    #[tokio::test]
990    async fn ok_subscriber_sequence() {
991        let mut stream = pin!(VecStream::default());
992        let mut rtmp_context = RtmpContext::default();
993        rtmp_context.set_tc_url("");
994        rtmp_context.set_app("");
995        rtmp_context.set_client_type(ClientType::Subscriber);
996
997        handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
998        let mut buffer = ByteBuffer::default();
999        buffer.encode(
1000            &ConnectResult::new(
1001                object!(
1002                    "fmsVer" => AmfString::from("FMS/5,0,17"),
1003                    "capabilities" => Number::from(31)
1004                ),
1005                object!(
1006                    "level" => AmfString::from("status"),
1007                    "code" => AmfString::from("NetConnection.Connect.Success"),
1008                    "description" => AmfString::from("Connection succeeded."),
1009                    "objectEncoding" => Number::from(0)
1010                )
1011            )
1012        );
1013        assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
1014        assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1015
1016        let mut stream = pin!(VecStream::default());
1017        handle_message(stream.as_mut()).write_window_acknowledgement_size(&mut rtmp_context).await.unwrap();
1018        assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1019
1020        let mut stream = pin!(VecStream::default());
1021        handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
1022        let mut buffer = ByteBuffer::default();
1023        buffer.encode(&CreateStreamResult::new(0.into()));
1024        assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
1025        assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1026
1027        rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
1028        let mut stream = pin!(VecStream::default());
1029        handle_message(stream.as_mut()).write_fc_subscribe_request(&mut rtmp_context).await.unwrap();
1030        assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1031
1032        rtmp_context.set_start_time(Duration::default());
1033        rtmp_context.set_play_mode(PlayMode::Both);
1034        let mut stream = pin!(VecStream::default());
1035        handle_message(stream.as_mut()).write_play_request(&mut rtmp_context).await.unwrap();
1036        let mut buffer = ByteBuffer::default();
1037        buffer.encode(&StreamBegin::new(rtmp_context.get_message_id().unwrap()));
1038        assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
1039        assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1040
1041        let mut buffer = ByteBuffer::default();
1042        buffer.encode(
1043            &OnStatus::new(
1044                object!(
1045                    "level" => AmfString::from("status"),
1046                    "code" => AmfString::from("NetStream.Play.Start"),
1047                    "description" => AmfString::from("Playing stream")
1048                )
1049            )
1050        );
1051        assert!(handle_message(stream.as_mut()).handle_play_response(&mut rtmp_context, buffer).await.is_ok());
1052        assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1053
1054        rtmp_context.set_buffer_length(30000);
1055        let mut stream = pin!(VecStream::default());
1056        handle_message(stream.as_mut()).write_buffer_length(&mut rtmp_context).await.unwrap();
1057        assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1058    }
1059}