sheave_client/handlers/
rtmp.rs

1use std::{
2    future::Future,
3    io::{
4        Error as IOError,
5        ErrorKind,
6        Result as IOResult
7    },
8    pin::{
9        Pin,
10        pin
11    },
12    sync::Arc,
13    task::{
14        Context as FutureContext,
15        Poll
16    },
17    time::{
18        Duration,
19        Instant
20    }
21};
22use log::{
23    error,
24    info
25};
26use futures::ready;
27use tokio::io::{
28    AsyncRead,
29    AsyncWrite
30};
31use sheave_core::{
32    ByteBuffer,
33    Decoder,
34    Encoder,
35    U24_MAX,
36    flv::tags::*,
37    handlers::{
38        AsyncHandler,
39        AsyncHandlerExt,
40        ClientType,
41        ErrorHandler,
42        HandlerConstructor,
43        LastChunk,
44        PublisherStatus,
45        RtmpContext,
46        StreamWrapper,
47        SubscriberStatus,
48        inconsistent_sha,
49        stream_got_exhausted
50    },
51    handshake::{
52        EncryptionAlgorithm,
53        Handshake,
54        Version
55    },
56    messages::{
57        /* Used in common */
58        Channel,
59        ChunkData,
60        Connect,
61        ConnectResult,
62        CreateStream,
63        CreateStreamResult,
64        UserControl,
65        EventType,
66        OnStatus,
67        Audio,
68        Video,
69        SetDataFrame,
70        Acknowledgement,
71        amf::v0::{
72            Number,
73            AmfString,
74            Object
75        },
76        headers::MessageType,
77
78        /* Publisher-side */
79        ReleaseStream,
80        ReleaseStreamResult,
81        FcPublish,
82        OnFcPublish,
83        StreamBegin,
84        Publish,
85        FcUnpublish,
86        DeleteStream,
87
88        /* Subscriber-side */
89        WindowAcknowledgementSize,
90        FcSubscribe,
91        SetBufferLength,
92        Play,
93        amf::v0::Boolean
94    },
95    net::RtmpReadExt,
96    object,
97    readers::*,
98    writers::*
99};
100use super::{
101    error_response,
102    middlewares::write_acknowledgement
103};
104
105#[doc(hidden)]
106#[derive(Debug)]
107struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
108
109#[doc(hidden)]
110impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
111    async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
112        let encryption_algorithm = EncryptionAlgorithm::default();
113
114        let version = if rtmp_context.is_signed() {
115            Version::LATEST_CLIENT
116        } else {
117            Version::UNSIGNED
118        };
119        let mut client_request = Handshake::new(Instant::now().elapsed(), version);
120        if rtmp_context.is_signed() {
121            client_request.imprint_digest(encryption_algorithm, Handshake::CLIENT_KEY);
122        }
123
124        write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
125        write_handshake(self.0.as_mut(), &client_request).await?;
126
127        rtmp_context.set_encryption_algorithm(encryption_algorithm);
128        rtmp_context.set_client_handshake(client_request);
129
130        info!("First handshake got handled.");
131        Ok(())
132    }
133
134    async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
135        let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
136        let mut server_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
137        let server_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
138
139        if !rtmp_context.is_signed() {
140            write_handshake(self.0.as_mut(), &server_request).await?;
141
142            rtmp_context.set_server_handshake(server_request);
143            rtmp_context.set_client_handshake(server_response);
144
145        } else if !server_request.did_digest_match(encryption_algorithm, Handshake::SERVER_KEY) {
146            error!("Invalid SHA digest/signature: {:x?}", server_request.get_digest(encryption_algorithm));
147            return Err(inconsistent_sha(server_response.get_digest(encryption_algorithm).to_vec()))
148        } else {
149            let mut server_response_key: Vec<u8> = Vec::new();
150            server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151            server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152
153            if !server_response.did_signature_match(encryption_algorithm, &server_response_key) {
154                error!("Invalid SHA digest/signature: {:x?}", server_response.get_signature());
155                return Err(inconsistent_sha(server_response.get_signature().to_vec()))
156            } else {
157                let mut client_response_key: Vec<u8> = Vec::new();
158                client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
159                client_response_key.extend_from_slice(Handshake::COMMON_KEY);
160                server_request.imprint_signature(encryption_algorithm, &client_response_key);
161                write_handshake(self.0.as_mut(), &server_request).await?;
162
163                rtmp_context.set_server_handshake(server_request);
164                rtmp_context.set_client_handshake(server_response);
165            }
166        }
167
168        info!("Second handshake got handled.");
169        Ok(())
170    }
171}
172
173#[doc(hidden)]
174impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
175    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
176        ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
177        pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
178    }
179}
180
181#[doc(hidden)]
182fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
183    HandshakeHandler(stream)
184}
185
186#[doc(hidden)]
187#[derive(Debug)]
188struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
189
190#[doc(hidden)]
191impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
192    async fn write_connect_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
193        use ClientType::*;
194
195        let client_type = rtmp_context.get_client_type().unwrap();
196
197        rtmp_context.increase_transaction_id();
198
199        let command_object = match client_type {
200            Publisher => object!(
201                "app" => rtmp_context.get_app().unwrap().clone(),
202                "type" => AmfString::from("nonprivate"),
203                "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
204                "tcUrl" => rtmp_context.get_tc_url().unwrap().clone()
205            ),
206            Subscriber => object!(
207                "app" => rtmp_context.get_app().unwrap().clone(),
208                "flashVer" => AmfString::from("FMLE/3.0 (compatible; Lavf 60.10.100)"),
209                "tcUrl" => rtmp_context.get_tc_url().unwrap().clone(),
210                "fpad" => Boolean::new(0),
211                "capabilities" => Number::from(15u8),
212                "audioCodecs" => Number::from(4071u16),
213                "videoCodecs" => Number::from(252u8),
214                "videoFunction" => Number::from(1u8)
215            )
216        };
217        let connect = Connect::new(command_object);
218        let mut buffer = ByteBuffer::default();
219        buffer.encode(&AmfString::from("connect"));
220        buffer.encode(&rtmp_context.get_transaction_id());
221        buffer.encode(&connect);
222        write_chunk(self.0.as_mut(), rtmp_context, Connect::CHANNEL.into(), Duration::default(), Connect::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
223
224        rtmp_context.set_command_object(connect.into());
225
226        info!("connect got sent.");
227        Ok(())
228    }
229
230    async fn write_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
231        let mut buffer = ByteBuffer::default();
232        buffer.encode(&rtmp_context.get_window_acknowledgement_size());
233        write_chunk(self.0.as_mut(), rtmp_context, WindowAcknowledgementSize::CHANNEL.into(), Duration::default(), WindowAcknowledgementSize::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
234
235        rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
236
237        info!("Window Acknowledgement Size got sent.");
238        Ok(())
239    }
240
241    async fn write_release_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
242        rtmp_context.increase_transaction_id();
243
244        let mut buffer = ByteBuffer::default();
245        buffer.encode(&AmfString::from("releaseStream"));
246        buffer.encode(&rtmp_context.get_transaction_id());
247        buffer.encode(&ReleaseStream::new(rtmp_context.get_topic_id().unwrap().clone()));
248        write_chunk(self.0.as_mut(), rtmp_context, ReleaseStream::CHANNEL.into(), Duration::default(), ReleaseStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
249
250        info!("releaseStream got sent.");
251        Ok(())
252    }
253
254    async fn write_fc_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
255        rtmp_context.increase_transaction_id();
256
257        let mut buffer = ByteBuffer::default();
258        buffer.encode(&AmfString::from("FCPublish"));
259        buffer.encode(&rtmp_context.get_transaction_id());
260        buffer.encode(&FcPublish::new(rtmp_context.get_topic_id().unwrap().clone()));
261        write_chunk(self.0.as_mut(), rtmp_context, FcPublish::CHANNEL.into(), Duration::default(), FcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
262
263        info!("FCPublish got sent.");
264        Ok(())
265    }
266
267    async fn write_create_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
268        rtmp_context.increase_transaction_id();
269
270        let mut buffer = ByteBuffer::default();
271        buffer.encode(&AmfString::from("createStream"));
272        buffer.encode(&rtmp_context.get_transaction_id());
273        buffer.encode(&CreateStream);
274        write_chunk(self.0.as_mut(), rtmp_context, CreateStream::CHANNEL.into(), Duration::default(), CreateStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
275
276        info!("createStream got sent.");
277        Ok(())
278    }
279
280    async fn write_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
281        rtmp_context.increase_transaction_id();
282
283        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
284
285        let mut buffer = ByteBuffer::default();
286        buffer.encode(&FcSubscribe::new(topic_id));
287        write_chunk(self.0.as_mut(), rtmp_context, FcSubscribe::CHANNEL.into(), Duration::default(), FcSubscribe::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
288
289        rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
290
291        info!("FCSubscribe got sent.");
292        Ok(())
293    }
294
295    async fn write_publish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
296        rtmp_context.increase_transaction_id();
297
298        let publishing_name = rtmp_context.get_topic_id().unwrap().clone();
299        let publishing_type = "live";
300        let mut buffer = ByteBuffer::default();
301        buffer.encode(&AmfString::from("publish"));
302        buffer.encode(&rtmp_context.get_transaction_id());
303        buffer.encode(&Publish::new(publishing_name.clone(), publishing_type.into()));
304        let message_id = rtmp_context.get_message_id().unwrap();
305        write_chunk(self.0.as_mut(), rtmp_context, Publish::CHANNEL.into(), Duration::default(), Publish::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
306
307        info!("publish got sent.");
308        Ok(())
309    }
310
311    async fn write_play_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
312        rtmp_context.increase_transaction_id();
313
314        let stream_name = rtmp_context.get_topic_id().unwrap().clone();
315        let play_mode = rtmp_context.get_play_mode().unwrap();
316        let start_time: Number = if let Some(start_time) = rtmp_context.get_start_time() {
317            Number::new(start_time.as_millis() as u64 as f64)
318        } else {
319            Number::new((1000 * play_mode as i64) as f64)
320        };
321
322        let mut buffer = ByteBuffer::default();
323        buffer.encode(&AmfString::from("play"));
324        buffer.encode(&rtmp_context.get_transaction_id());
325        buffer.encode(&Play::new(stream_name.clone(), start_time));
326        write_chunk(self.0.as_mut(), rtmp_context, Play::CHANNEL.into(), Duration::default(), Play::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
327
328        info!("play got sent.");
329        Ok(())
330    }
331
332    async fn write_buffer_length(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
333        let message_id = rtmp_context.get_message_id().unwrap();
334        let mut buffer = ByteBuffer::default();
335        buffer.put_u16_be(SetBufferLength::EVENT_TYPE.into());
336        buffer.encode(&SetBufferLength::new(message_id, rtmp_context.get_buffer_length()));
337        write_chunk(self.0.as_mut(), rtmp_context, SetBufferLength::CHANNEL.into(), Duration::default(), SetBufferLength::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
338
339        rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
340
341        info!("Buffer Length got sent.");
342        Ok(())
343    }
344
345    async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
346        for next in rtmp_context.get_topic_mut().unwrap() {
347            let flv_tag = next?;
348            let message_id = rtmp_context.get_message_id().unwrap();
349
350            let channel;
351            let message_type;
352            match flv_tag.get_tag_type() {
353                TagType::Audio => {
354                    channel = Audio::CHANNEL;
355                    message_type = Audio::MESSAGE_TYPE;
356                },
357                TagType::Video => {
358                    channel = Video::CHANNEL;
359                    message_type = Video::MESSAGE_TYPE;
360                },
361                TagType::ScriptData => {
362                    channel = SetDataFrame::CHANNEL;
363                    message_type = SetDataFrame::MESSAGE_TYPE;
364                },
365                TagType::Other => {
366                    channel = Channel::Other;
367                    message_type = MessageType::Other;
368                }
369            }
370            let timestamp = flv_tag.get_timestamp();
371            let data: Vec<u8> = if let MessageType::Data = message_type {
372                let mut buffer = ByteBuffer::default();
373                buffer.encode(&AmfString::from("@setDataFrame"));
374                buffer.put_bytes(flv_tag.get_data());
375                buffer.into()
376            } else {
377                flv_tag.get_data().to_vec()
378            };
379            write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
380
381            info!("FLV chunk got sent.");
382            return Ok(())
383        }
384
385        // NOTE: Default return value when no FLV tag exists.
386        info!("FLV data became empty.");
387        Err(stream_got_exhausted())
388    }
389
390    async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
391        Decoder::<Acknowledgement>::decode(&mut buffer)?;
392
393        info!("Acknowledgement got handled.");
394        Ok(())
395    }
396
397    async fn handle_stream_begin(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
398        use ClientType::*;
399
400        let client_type = rtmp_context.get_client_type().unwrap();
401
402        Decoder::<StreamBegin>::decode(&mut buffer)?;
403
404        match client_type {
405            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
406            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
407        }
408
409        info!("Stream Begin got handled.");
410        Ok(())
411    }
412
413    async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
414        use EventType::*;
415
416        let event_type: EventType = buffer.get_u16_be()?.into();
417        match event_type {
418            StreamBegin => self.handle_stream_begin(rtmp_context, buffer).await,
419            _ => unreachable!("Publisher gets just a Stream Begin event.")
420        }
421    }
422
423    async fn handle_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object) -> IOResult<()> {
424        let error = error_response(information.clone());
425        rtmp_context.set_information(information);
426
427        error!("{error}");
428        Err(error)
429    }
430
431    async fn handle_connect_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
432        use ClientType::*;
433
434        let response: ConnectResult = buffer.decode()?;
435        let (properties, information): (Object, Object) = response.into();
436
437        rtmp_context.set_properties(properties);
438        rtmp_context.set_information(information);
439
440        match rtmp_context.get_client_type().unwrap() {
441            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
442            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
443        }
444
445        info!("connect result got handled.");
446        Ok(())
447    }
448
449    async fn handle_release_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
450        Decoder::<ReleaseStreamResult>::decode(&mut buffer)?;
451
452        rtmp_context.set_publisher_status(PublisherStatus::Released);
453
454        info!("releaseStream result got handled.");
455        Ok(())
456    }
457
458    async fn handle_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
459        Decoder::<OnFcPublish>::decode(&mut buffer)?;
460
461        rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
462
463        info!("onFCPublish got handled.");
464        Ok(())
465    }
466
467    async fn handle_create_stream_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
468        use ClientType::*;
469
470        let client_type = rtmp_context.get_client_type().unwrap();
471
472        let response: CreateStreamResult = buffer.decode()?;
473        let message_id: u32 = response.into();
474        rtmp_context.set_message_id(message_id);
475
476        match client_type {
477            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
478            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
479        }
480
481        info!("createStream result got handled.");
482        Ok(())
483    }
484
485    async fn handle_publish_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
486        let response: OnStatus = buffer.decode()?;
487        let information: Object = response.into();
488
489        /*
490         *  NOTE:
491         *      Some error in publication step is checkable only by information the field.
492         *      Because the publish command doesn't have _error command.
493         */
494        if information.get_properties()["level"] == AmfString::from("error") {
495            return self.handle_error_response(rtmp_context, information).await
496        }
497
498        rtmp_context.set_information(information);
499
500        rtmp_context.set_publisher_status(PublisherStatus::Published);
501
502        info!("onStatus(publish) got handled.");
503        Ok(())
504    }
505
506    async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
507        Decoder::<FcUnpublish>::decode(&mut buffer)?;
508        rtmp_context.reset_topic_id();
509
510        info!("FCUnpublish got handled.");
511        Ok(())
512    }
513
514    async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
515        Decoder::<DeleteStream>::decode(&mut buffer)?;
516        rtmp_context.reset_message_id();
517
518        info!("deleteStream got handled.");
519        Ok(())
520    }
521
522    async fn handle_publisher_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
523        use PublisherStatus::*;
524
525        let command: AmfString = buffer.decode()?;
526
527        // NOTE: onFCPublish has no transaction ID.
528        if command != "onFCPublish" {
529            // NOTE: Otherwise, currently unused but exists.
530            Decoder::<Number>::decode(&mut buffer)?;
531        }
532
533        if command == "FCUnpublish" {
534            return self.handle_fc_unpublish_request(rtmp_context, buffer).await
535        } else if command == "deleteStream" {
536            return self.handle_delete_stream_request(rtmp_context, buffer).await
537        } else if command == "_error" {
538            let information: Object = buffer.decode()?;
539            return self.handle_error_response(rtmp_context, information).await
540        } else {
541            /* In this step, does nothing unless command is either "FCUnpublish" or "deleteStream". */
542        }
543
544        if let Some(publisher_status) = rtmp_context.get_publisher_status() {
545            match publisher_status {
546                Connected => self.handle_release_stream_response(rtmp_context, buffer).await,
547                Released => self.handle_fc_publish_response(rtmp_context, buffer).await,
548                FcPublished => self.handle_create_stream_response(rtmp_context, buffer).await,
549                Began => self.handle_publish_response(rtmp_context, buffer).await,
550                _ => Ok(())
551            }
552        } else {
553            self.handle_connect_response(rtmp_context, buffer).await
554        }
555    }
556
557    async fn handle_play_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
558        let response: OnStatus = buffer.decode()?;
559        let information: Object = response.into();
560
561        /*
562         *  NOTE:
563         *      Some error in subscription step is checkable only by information the field.
564         *      Because the play command doesn't have _error command.
565         */
566        if information.get_properties()["level"] == AmfString::from("error") {
567            return self.handle_error_response(rtmp_context, information).await
568        }
569
570        rtmp_context.set_information(information);
571
572        rtmp_context.set_subscriber_status(SubscriberStatus::Played);
573
574        info!("onStatus(play) got handled.");
575        Ok(())
576    }
577
578    async fn handle_subscriber_response(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
579        use SubscriberStatus::*;
580
581        let command: AmfString = buffer.decode()?;
582        Decoder::<Number>::decode(&mut buffer)?;
583
584        if command == "_error" {
585            let information: Object = buffer.decode()?;
586            return self.handle_error_response(rtmp_context, information).await
587        }
588
589        if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
590            match subscriber_status {
591                WindowAcknowledgementSizeGotSent => self.handle_create_stream_response(rtmp_context, buffer).await,
592                Began => self.handle_play_response(rtmp_context, buffer).await,
593                _ => return Ok(())
594            }
595        } else {
596            self.handle_connect_response(rtmp_context, buffer).await
597        }
598    }
599
600    async fn handle_command_response(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
601        use ClientType::*;
602
603        match rtmp_context.get_client_type().unwrap() {
604            Publisher => self.handle_publisher_response(rtmp_context, buffer).await,
605            Subscriber => self.handle_subscriber_response(rtmp_context, buffer).await
606        }
607    }
608
609    async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
610        let topic = rtmp_context.get_topic().unwrap();
611
612        let tag_type = match message_type {
613            MessageType::Audio => TagType::Audio,
614            MessageType::Video => TagType::Video,
615            MessageType::Data => TagType::ScriptData,
616            _ => TagType::Other
617        };
618
619        if let TagType::ScriptData = tag_type {
620            Decoder::<AmfString>::decode(&mut buffer)?;
621        }
622
623        let data: Vec<u8> = buffer.into();
624        let flv_tag = FlvTag::new(tag_type, timestamp, data);
625        topic.append_flv_tag(flv_tag)?;
626
627        info!("FLV chunk got handled.");
628        Ok(())
629    }
630}
631
632#[doc(hidden)]
633impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
634    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
635        use MessageType::*;
636
637        if let Some(publisher_status) = rtmp_context.get_publisher_status() {
638            match publisher_status {
639                PublisherStatus::Connected => ready!(pin!(self.write_release_stream_request(rtmp_context)).poll(cx))?,
640                PublisherStatus::Released => ready!(pin!(self.write_fc_publish_request(rtmp_context)).poll(cx))?,
641                PublisherStatus::FcPublished => ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?,
642                PublisherStatus::Created => ready!(pin!(self.write_publish_request(rtmp_context)).poll(cx))?,
643                PublisherStatus::Published => ready!(pin!(self.write_flv(rtmp_context)).poll(cx))?,
644                _ => {}
645            }
646        } else if let Some(subscriber_status) = rtmp_context.get_subscriber_status() {
647            match subscriber_status {
648                SubscriberStatus::Connected => {
649                    ready!(pin!(self.write_window_acknowledgement_size(rtmp_context)).poll(cx))?;
650                    ready!(pin!(self.write_create_stream_request(rtmp_context)).poll(cx))?
651                },
652                SubscriberStatus::Created => {
653                    ready!(pin!(self.write_fc_subscribe_request(rtmp_context)).poll(cx))?;
654                    rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
655                },
656                SubscriberStatus::AdditionalCommandGotSent => {
657                    ready!(pin!(self.write_play_request(rtmp_context)).poll(cx))?;
658                    ready!(pin!(self.write_buffer_length(rtmp_context)).poll(cx))?
659                },
660                _ => {}
661            }
662        } else {
663            ready!(pin!(self.write_connect_request(rtmp_context)).poll(cx))?;
664        }
665
666        let basic_header = if let Some(PublisherStatus::Published) = rtmp_context.get_publisher_status() {
667            ready!(pin!(read_basic_header(pin!(self.0.try_read_after(rtmp_context.get_await_duration().unwrap())))).poll(cx))?
668        } else {
669            ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?
670        };
671        let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
672        let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
673            if timestamp.as_millis() == U24_MAX as u128 {
674                let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
675                Some(extended_timestamp)
676            } else {
677                None
678            }
679        } else {
680            None
681        };
682
683        let chunk_id = basic_header.get_chunk_id();
684        if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
685            if let Some(extended_timestamp) = extended_timestamp {
686                last_received_chunk.set_timestamp(extended_timestamp);
687            } else {
688                if let Some(timestamp) = message_header.get_timestamp() {
689                    last_received_chunk.set_timestamp(timestamp);
690                }
691            }
692
693            if let Some(message_length) = message_header.get_message_length() {
694                last_received_chunk.set_message_length(message_length);
695            }
696
697            if let Some(message_type) = message_header.get_message_type() {
698                last_received_chunk.set_message_type(message_type);
699            }
700
701            if let Some(message_id) = message_header.get_message_id() {
702                last_received_chunk.set_message_id(message_id);
703            }
704        } else {
705            rtmp_context.insert_received_chunk(
706                chunk_id,
707                LastChunk::new(
708                    message_header.get_timestamp().unwrap(),
709                    message_header.get_message_length().unwrap(),
710                    message_header.get_message_type().unwrap(),
711                    message_header.get_message_id().unwrap()
712                )
713            );
714        }
715
716        let message_length = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length();
717        let receiving_chunk_size = rtmp_context.get_receiving_chunk_size();
718        let data = ready!(pin!(read_chunk_data(pin!(self.0.await_until_receiving()), receiving_chunk_size, message_length)).poll(cx))?;
719        let buffer: ByteBuffer = data.into();
720
721        let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
722        match message_type {
723            Acknowledgement => pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx),
724            UserControl => pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx),
725            Command => pin!(self.handle_command_response(rtmp_context, buffer)).poll(cx),
726            Audio | Video | Data => {
727                let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
728                pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx)
729            },
730            other => unimplemented!("Undefined Message: {other:?}")
731        }
732    }
733}
734
735#[doc(hidden)]
736fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
737    MessageHandler(stream)
738}
739
740#[doc(hidden)]
741#[derive(Debug)]
742struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
743
744#[doc(hidden)]
745impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
746    async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
747        rtmp_context.increase_transaction_id();
748
749        let mut buffer = ByteBuffer::default();
750        buffer.encode(&AmfString::from("FCUnpublish"));
751        buffer.encode(&FcUnpublish::new(rtmp_context.get_topic_id().unwrap().clone()));
752        write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
753
754        info!("FCUnpublish got sent.");
755        Ok(())
756    }
757
758    async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
759        let message_id = rtmp_context.get_message_id().unwrap();
760
761        rtmp_context.increase_transaction_id();
762
763        let mut buffer = ByteBuffer::default();
764        buffer.encode(&AmfString::from("deleteStream"));
765        buffer.encode(&DeleteStream::new(message_id.into()));
766        write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
767
768        info!("deleteStream got sent.");
769        Ok(())
770    }
771}
772
773#[doc(hidden)]
774impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
775    fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
776        if error.kind() != ErrorKind::Other {
777            if let Some(publisher_status) = rtmp_context.get_publisher_status() {
778                if publisher_status >= PublisherStatus::FcPublished {
779                    ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
780                }
781
782                if publisher_status >= PublisherStatus::Created {
783                    ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
784                }
785            }
786        }
787
788        self.0.as_mut().poll_shutdown(cx)
789    }
790}
791
792#[doc(hidden)]
793fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
794    CloseHandler(stream)
795}
796
797/// The default RTMP handler.
798///
799/// This handles the raw RTMP by well-known communication steps, that is, this performs just following steps.
800///
801/// # As a publisher
802///
803/// 1. Specifies the application name via the [`Connect`] command.
804/// 2. Specifies the topic path via the [`ReleaseStream`]/[`FcPublish`] command.
805/// 3. Requests a message ID via the [`CreateStream`] command.
806/// 4. Specifies publication informations via the [`Publish`] command.
807/// 5. Then sends FLV media data.
808///
809/// If some error occurs in any step, sends commands which are [`FcUnpublish`] and [`DeleteStream`] to its server, then terminates its connection.
810/// These perform to delete the topic path and a message ID from its context.
811/// However also these can be sent from servers.
812///
813/// # As a subscriber
814///
815/// 1. Specifies the application name via the [`Connect`] command.
816/// 2. Tells the size of receiving bandwidth via the [`WindowAcknowledgementSize`] message.
817/// 3. Requests a message ID via the [`CreateStream`] command.
818/// 4. Specified the topic path via the [`FcSubscribe`] command.
819/// 5. Following additional command may be required.
820///    * Requests the duration of its topic via the [`GetStreamLength`] command. (in FFmpeg)
821///    * Requests a list of topics as a playlist via the [`SetPlaylist`] command. (in OBS)
822/// 6. Specifies subscription information via the [`Play`] command.
823/// 7. Specifies a time range to buffer its topic via the [`SetBufferLength`] event.
824/// 8. Then receives FLV media data.
825///
826/// If receiving data size exceeds client's bandwidth, this reports its thing via the [`Acknowledgement`] message to its server.
827///
828/// # Examples
829///
830/// ```rust
831/// use std::marker::PhantomData;
832/// use sheave_core::handlers::{
833///     RtmpContext,
834///     VecStream
835/// };
836/// use sheave_client::{
837///     Client,
838///     handlers::RtmpHandler,
839/// };
840///
841/// let stream = VecStream::default();
842/// let rtmp_context = RtmpContext::default();
843/// let client = Client::new(stream, rtmp_context, PhantomData::<RtmpHandler<VecStream>>);
844/// ```
845///
846/// [`Connect`]: sheave_core::messages::Connect
847/// [`ReleaseSream`]: sheave_core::messages::ReleaseStream
848/// [`FcPublish`]: sheave_core::messages::FcPublish
849/// [`CreateStream`]: sheave_core::messages::CreateStream
850/// [`Publish`]: sheave_core::messages::Publish
851/// [`Acknowledgement`]: sheave_core::messages::Acknowledgement
852/// [`WindowAcknowledgementSize`]: sheave_core::messages::WindowAcknowledgementSize
853/// [`FcSubscribe`]: sheave_core::messages::FcSubscribe
854/// [`GetStreamLength`]: sheave_core::messages::GetStreamLength
855/// [`SetPlaylist`]: sheave_core::messages::SetPlaylist
856/// [`Play`]: sheave_core::messages::Play
857/// [`SetBufferLength`]: sheave_core::messages::SetBufferLength
858#[derive(Debug)]
859pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
860
861impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
862    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
863        pin!(
864            handle_handshake(self.0.make_weak_pin())
865                .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
866                .map_err(handle_close(self.0.make_weak_pin()))
867        ).poll_handle(cx, rtmp_context)
868    }
869}
870
871impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
872    fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
873        Self(stream)
874    }
875}
876
877#[cfg(test)]
878mod tests {
879    use uuid::Uuid;
880    use sheave_core::{
881        handlers::VecStream,
882        messages::PlayMode
883    };
884    use super::*;
885
886    #[tokio::test]
887    async fn ok_handshake_got_handled() {
888        let mut stream = pin!(VecStream::default());
889        let mut rtmp_context = RtmpContext::default();
890        rtmp_context.set_signed(true);
891
892        handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await.unwrap();
893
894        let sent_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
895        let mut sent_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
896        assert_eq!(EncryptionAlgorithm::NotEncrypted, sent_encryption_algorithm);
897        assert!(sent_client_handshake.did_digest_match(EncryptionAlgorithm::NotEncrypted, Handshake::CLIENT_KEY));
898
899        let mut stream = pin!(VecStream::default());
900        let received_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
901        let mut received_server_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
902        received_server_handshake.imprint_digest(received_encryption_algorithm, Handshake::SERVER_KEY);
903        let mut server_response_key: Vec<u8> = Vec::new();
904        server_response_key.extend_from_slice(Handshake::SERVER_KEY);
905        server_response_key.extend_from_slice(Handshake::COMMON_KEY);
906        sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_response_key);
907        write_encryption_algorithm(stream.as_mut(), received_encryption_algorithm).await.unwrap();
908        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
909        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
910        assert!(handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await.is_ok());
911
912        let sent_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
913        let mut client_response_key: Vec<u8> = Vec::new();
914        client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
915        client_response_key.extend_from_slice(Handshake::COMMON_KEY);
916        assert!(sent_server_handshake.did_signature_match(sent_encryption_algorithm, &client_response_key))
917    }
918
919    #[tokio::test]
920    async fn ok_publisher_sequence() {
921        let mut stream = pin!(VecStream::default());
922        let mut rtmp_context = RtmpContext::default();
923        rtmp_context.set_tc_url("");
924        rtmp_context.set_app("");
925        rtmp_context.set_client_type(ClientType::Publisher);
926
927        handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
928        let mut stream = pin!(VecStream::default());
929        let mut buffer = ByteBuffer::default();
930        buffer.encode(
931            &ConnectResult::new(
932                object!(
933                    "fmsVer" => AmfString::from("FMS/5,0,17"),
934                    "capabilities" => Number::new(31f64)
935                ),
936                object!(
937                    "level" => AmfString::from("status"),
938                    "code" => AmfString::from("NetConnection.Connect.Success"),
939                    "description" => AmfString::from("Connection succeeded."),
940                    "objectEncoding" => Number::from(0)
941                )
942            )
943        );
944        assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
945        assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
946
947        rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
948        let mut stream = pin!(VecStream::default());
949        handle_message(stream.as_mut()).write_release_stream_request(&mut rtmp_context).await.unwrap();
950        let mut buffer = ByteBuffer::default();
951        buffer.encode(&ReleaseStreamResult);
952        assert!(handle_message(stream.as_mut()).handle_release_stream_response(&mut rtmp_context, buffer).await.is_ok());
953        assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
954
955        handle_message(stream.as_mut()).write_fc_publish_request(&mut rtmp_context).await.unwrap();
956        let mut stream = pin!(VecStream::default());
957        let mut buffer = ByteBuffer::default();
958        buffer.encode(&OnFcPublish);
959        assert!(handle_message(stream.as_mut()).handle_fc_publish_response(&mut rtmp_context, buffer).await.is_ok());
960        assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
961
962        handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
963        let mut stream = pin!(VecStream::default());
964        let mut buffer = ByteBuffer::default();
965        buffer.encode(&CreateStreamResult::new(0.into()));
966        assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
967        assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
968
969        handle_message(stream.as_mut()).write_publish_request(&mut rtmp_context).await.unwrap();
970        let message_id = rtmp_context.get_message_id().unwrap();
971        let mut stream = pin!(VecStream::default());
972        let mut buffer = ByteBuffer::default();
973        buffer.encode(&StreamBegin::new(message_id));
974        assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
975        assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
976
977        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
978        let mut buffer = ByteBuffer::default();
979        buffer.encode(
980            &OnStatus::new(
981                object!(
982                    "level" => AmfString::from("status"),
983                    "code" => AmfString::from("NetStream.Publish.Start"),
984                    "description" => AmfString::new(format!("{topic_id} is now published")),
985                    "details" => topic_id
986                )
987            )
988        );
989        assert!(handle_message(stream.as_mut()).handle_publish_response(&mut rtmp_context, buffer).await.is_ok());
990        assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
991    }
992
993    #[tokio::test]
994    async fn ok_subscriber_sequence() {
995        let mut stream = pin!(VecStream::default());
996        let mut rtmp_context = RtmpContext::default();
997        rtmp_context.set_tc_url("");
998        rtmp_context.set_app("");
999        rtmp_context.set_client_type(ClientType::Subscriber);
1000
1001        handle_message(stream.as_mut()).write_connect_request(&mut rtmp_context).await.unwrap();
1002        let mut buffer = ByteBuffer::default();
1003        buffer.encode(
1004            &ConnectResult::new(
1005                object!(
1006                    "fmsVer" => AmfString::from("FMS/5,0,17"),
1007                    "capabilities" => Number::from(31)
1008                ),
1009                object!(
1010                    "level" => AmfString::from("status"),
1011                    "code" => AmfString::from("NetConnection.Connect.Success"),
1012                    "description" => AmfString::from("Connection succeeded."),
1013                    "objectEncoding" => Number::from(0)
1014                )
1015            )
1016        );
1017        assert!(handle_message(stream.as_mut()).handle_connect_response(&mut rtmp_context, buffer).await.is_ok());
1018        assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1019
1020        let mut stream = pin!(VecStream::default());
1021        handle_message(stream.as_mut()).write_window_acknowledgement_size(&mut rtmp_context).await.unwrap();
1022        assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1023
1024        let mut stream = pin!(VecStream::default());
1025        handle_message(stream.as_mut()).write_create_stream_request(&mut rtmp_context).await.unwrap();
1026        let mut buffer = ByteBuffer::default();
1027        buffer.encode(&CreateStreamResult::new(0.into()));
1028        assert!(handle_message(stream.as_mut()).handle_create_stream_response(&mut rtmp_context, buffer).await.is_ok());
1029        assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1030
1031        rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
1032        let mut stream = pin!(VecStream::default());
1033        handle_message(stream.as_mut()).write_fc_subscribe_request(&mut rtmp_context).await.unwrap();
1034        assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1035
1036        rtmp_context.set_start_time(Some(Duration::default()));
1037        rtmp_context.set_play_mode(PlayMode::Both);
1038        let mut stream = pin!(VecStream::default());
1039        handle_message(stream.as_mut()).write_play_request(&mut rtmp_context).await.unwrap();
1040        let mut buffer = ByteBuffer::default();
1041        buffer.encode(&StreamBegin::new(rtmp_context.get_message_id().unwrap()));
1042        assert!(handle_message(stream.as_mut()).handle_stream_begin(&mut rtmp_context, buffer).await.is_ok());
1043        assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1044
1045        let mut buffer = ByteBuffer::default();
1046        buffer.encode(
1047            &OnStatus::new(
1048                object!(
1049                    "level" => AmfString::from("status"),
1050                    "code" => AmfString::from("NetStream.Play.Start"),
1051                    "description" => AmfString::from("Playing stream")
1052                )
1053            )
1054        );
1055        assert!(handle_message(stream.as_mut()).handle_play_response(&mut rtmp_context, buffer).await.is_ok());
1056        assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1057
1058        rtmp_context.set_buffer_length(30000);
1059        let mut stream = pin!(VecStream::default());
1060        handle_message(stream.as_mut()).write_buffer_length(&mut rtmp_context).await.unwrap();
1061        assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1062    }
1063}