sheave_server/handlers/
rtmp.rs

1use std::{
2    future::Future,
3    io::{
4        Error as IOError,
5        ErrorKind,
6        Result as IOResult
7    },
8    pin::{
9        Pin,
10        pin
11    },
12    sync::Arc,
13    task::{
14        Context as FutureContext,
15        Poll
16    },
17    time::{
18        Duration,
19        Instant
20    }
21};
22use log::{
23    debug,
24    error,
25    info
26};
27use futures::ready;
28use tokio::io::{
29    AsyncRead,
30    AsyncWrite
31};
32use sheave_core::{
33    ByteBuffer,
34    Decoder,
35    Encoder,
36    U24_MAX,
37    flv::tags::*,
38    handlers::{
39        AsyncHandler,
40        AsyncHandlerExt,
41        ClientType,
42        ErrorHandler,
43        HandlerConstructor,
44        LastChunk,
45        PublisherStatus,
46        RtmpContext,
47        StreamWrapper,
48        SubscriberStatus,
49        inconsistent_sha,
50        stream_got_exhausted
51    },
52    handshake::{
53        Handshake,
54        Version
55    },
56    messages::{
57        /* Used in common */
58        Channel,
59        ChunkData,
60        CommandError,
61        Connect,
62        ConnectResult,
63        CreateStream,
64        CreateStreamResult,
65        EventType,
66        UserControl,
67        OnStatus,
68        Audio,
69        Video,
70        SetDataFrame,
71        Acknowledgement,
72        amf::v0::{
73            AmfString,
74            Number,
75            Object
76        },
77        headers::MessageType,
78
79        /* Publisher-side */
80        ReleaseStream,
81        ReleaseStreamResult,
82        FcPublish,
83        OnFcPublish,
84        StreamBegin,
85        Publish,
86        FcUnpublish,
87        DeleteStream,
88
89        /* Subscriber-side */
90        WindowAcknowledgementSize,
91        FcSubscribe,
92        GetStreamLength,
93        GetStreamLengthResult,
94        SetPlaylist,
95        PlaylistReady,
96        Play,
97        SetBufferLength,
98    },
99    net::RtmpReadExt,
100    object,
101    readers::*,
102    writers::*
103};
104use super::{
105    /* Used in common */
106    inconsistent_app_path,
107    undistinguishable_client,
108    empty_topic_id,
109    inconsistent_topic_id,
110    middlewares::write_acknowledgement,
111
112    /* Publisher-side */
113    publish_topic,
114    provide_message_id,
115    unpublish_topic,
116    return_message_id,
117
118    /* Subscriver-side */
119    subscribe_topic,
120    metadata_not_found,
121};
122
123#[doc(hidden)]
124#[derive(Debug)]
125struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
126
127#[doc(hidden)]
128impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
129    async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
130        let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
131        let mut client_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
132
133        if client_request.get_version() == Version::UNSIGNED {
134            let server_request = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
135            write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
136            write_handshake(self.0.as_mut(), &server_request).await?;
137            write_handshake(self.0.as_mut(), &client_request).await?;
138
139            rtmp_context.set_encryption_algorithm(encryption_algorithm);
140            rtmp_context.set_server_handshake(server_request);
141            rtmp_context.set_client_handshake(client_request);
142        } else {
143            if !client_request.did_digest_match(encryption_algorithm, Handshake::CLIENT_KEY) {
144                error!("Invalid SHA digest/signature: {:x?}", client_request.get_digest(encryption_algorithm));
145                return Err(inconsistent_sha(client_request.get_digest(encryption_algorithm).to_vec()))
146            } else {
147                let mut server_request = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
148                server_request.imprint_digest(encryption_algorithm, Handshake::SERVER_KEY);
149                let mut server_response_key: Vec<u8> = Vec::new();
150                server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151                server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152                client_request.imprint_signature(encryption_algorithm, &server_response_key);
153                write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
154                write_handshake(self.0.as_mut(), &server_request).await?;
155                write_handshake(self.0.as_mut(), &client_request).await?;
156
157                rtmp_context.set_signed(true);
158                rtmp_context.set_encryption_algorithm(encryption_algorithm);
159                rtmp_context.set_server_handshake(server_request);
160                rtmp_context.set_client_handshake(client_request);
161            }
162        }
163
164        info!("First handshake got handled.");
165        Ok(())
166    }
167
168    async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
169        let client_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
170
171        if !rtmp_context.is_signed() {
172            rtmp_context.set_server_handshake(client_response);
173        } else {
174            let encryption_algorithm = rtmp_context.get_encryption_algorithm().unwrap();
175            let mut client_response_key: Vec<u8> = Vec::new();
176            client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
177            client_response_key.extend_from_slice(Handshake::COMMON_KEY);
178            let server_request = rtmp_context.get_server_handshake().unwrap();
179            // NOTE: FFmpeg acts the handshake but imprints no signature.
180            if !client_response.did_signature_match(encryption_algorithm, &client_response_key) && server_request.get_signature() != client_response.get_signature() {
181                error!("Invalid SHA digest/signature: {:x?}", client_response.get_signature());
182                return Err(inconsistent_sha(client_response.get_signature().to_vec()))
183            } else {
184                debug!("Handshake version: {:?}", client_response.get_version());
185                debug!("Signature: {:x?}", client_response.get_signature());
186                rtmp_context.set_server_handshake(client_response);
187            }
188        }
189
190        info!("Second handshake got handled.");
191        Ok(())
192    }
193}
194
195#[doc(hidden)]
196impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
197    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
198        ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
199        pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
200    }
201}
202
203#[doc(hidden)]
204fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
205    HandshakeHandler(stream)
206}
207
208#[doc(hidden)]
209#[derive(Debug)]
210struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
211
212#[doc(hidden)]
213impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
214    async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
215        Decoder::<Acknowledgement>::decode(&mut buffer)?;
216
217        info!("Acknowledgement got handled.");
218        Ok(())
219    }
220
221    async fn handle_connect_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
222        let connect_request: Connect = buffer.decode()?;
223        rtmp_context.set_command_object(connect_request.into());
224
225        info!("connect got handled.");
226        Ok(())
227    }
228
229    async fn handle_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
230        let window_acknowledgement_size: WindowAcknowledgementSize = buffer.decode()?;
231        rtmp_context.set_window_acknowledgement_size(window_acknowledgement_size);
232
233        /*
234         *  NOTE:
235         *      Makes status to update during request handling.
236         *      Because of response not required.
237         */
238        rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
239
240        info!("Window Acknowledgement Size got handled.");
241        Ok(())
242    }
243
244    async fn handle_release_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
245        let release_stream_request: ReleaseStream = buffer.decode()?;
246        rtmp_context.set_topic_id(release_stream_request.into());
247
248        info!("releaseStream got handled.");
249        Ok(())
250    }
251
252    async fn handle_fc_publish_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
253        Decoder::<FcPublish>::decode(&mut buffer)?;
254
255        info!("FCPublish got handled.");
256        Ok(())
257    }
258
259    async fn handle_create_stream_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
260        Decoder::<CreateStream>::decode(&mut buffer)?;
261
262        info!("createStream got handled.");
263        Ok(())
264    }
265
266    async fn handle_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
267        let database_url = rtmp_context.get_database_url().unwrap().clone();
268        let storage_path = rtmp_context.get_storage_path().unwrap().clone();
269        let client_addr = rtmp_context.get_client_addr().unwrap();
270        let app = rtmp_context.get_app().unwrap().clone();
271
272        let fc_subscribe_request: FcSubscribe = buffer.decode()?;
273        /*
274         *  NOTE:
275         *      Makes topic to subscribe during request handling.
276         *      Because onFCSubscribe command is undefined about its specification.
277         */
278        let topic = subscribe_topic(&database_url, &storage_path, &app, fc_subscribe_request.get_topic_id(), client_addr).await?;
279        rtmp_context.set_topic(topic);
280        rtmp_context.set_topic_id(fc_subscribe_request.into());
281
282        rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
283
284        info!("FCSubscribe got handled.");
285        Ok(())
286    }
287
288    async fn handle_stream_length_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
289        let get_stream_length_request: GetStreamLength = buffer.decode()?;
290        rtmp_context.set_topic_id(get_stream_length_request.into());
291
292        info!("getStreamLength got handled.");
293        Ok(())
294    }
295
296    async fn handle_playlist_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
297        let set_playlist_request: SetPlaylist = buffer.decode()?;
298        rtmp_context.set_playlist(set_playlist_request.into());
299
300        info!("set playlist got handled.");
301        Ok(())
302    }
303
304    async fn handle_publish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
305        let publish_request: Publish = buffer.decode()?;
306        let (publishing_name, publishing_type): (AmfString, AmfString) = publish_request.into();
307        rtmp_context.set_publishing_name(publishing_name);
308        rtmp_context.set_publishing_type(publishing_type);
309
310        info!("publish got handled.");
311        Ok(())
312    }
313
314    async fn handle_play_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
315        let play_request: Play = buffer.decode()?;
316        let (stream_name, start_time, play_mode) = play_request.into();
317        rtmp_context.set_stream_name(stream_name);
318        rtmp_context.set_start_time(start_time);
319        rtmp_context.set_play_mode(play_mode);
320
321        info!("play got handled.");
322        Ok(())
323    }
324
325    async fn handle_buffer_length(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
326        let buffer_length: SetBufferLength = buffer.decode()?;
327        rtmp_context.set_buffer_length(buffer_length.get_buffering_time());
328
329        /*
330         * NOTE:
331         *  OBS sends SetBufferLength event also before the play command.
332         *  However its step isn't necessarily also other tools have implemented.
333         *  Therefore The sheave doesn't count up status except received after the play command implemented as the common step.
334         */
335        if let Some(SubscriberStatus::Played) = rtmp_context.get_subscriber_status() {
336            rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
337        }
338
339        Ok(())
340    }
341
342    async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
343        use EventType::*;
344
345        let event_type: EventType = buffer.get_u16_be()?.into();
346        match event_type {
347            SetBufferLength => self.handle_buffer_length(rtmp_context, buffer).await,
348            _ => unimplemented!("Undefined event type: {event_type:?}")
349        }
350    }
351
352    async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
353        let database_url = rtmp_context.get_database_url().unwrap().clone();
354        let storage_path = rtmp_context.get_storage_path().unwrap().clone();
355        let client_addr = rtmp_context.get_client_addr().unwrap();
356        let app = rtmp_context.get_app().unwrap().clone();
357
358        let fc_unpublish_request: FcUnpublish = buffer.decode()?;
359        unpublish_topic(&database_url, &storage_path, &app, fc_unpublish_request.get_topic_id(), client_addr).await?;
360        rtmp_context.reset_topic_id();
361
362        info!("FCUnpublish got handled.");
363        Ok(())
364    }
365
366    async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
367        let delete_stream_request: DeleteStream = buffer.decode()?;
368        return_message_id(delete_stream_request.into());
369        rtmp_context.reset_message_id();
370
371        info!("deleteStream got handled.");
372        Ok(())
373    }
374
375    async fn handle_publisher_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
376        use PublisherStatus::*;
377
378        let command: AmfString = buffer.decode()?;
379        let transaction_id: Number = buffer.decode()?;
380        rtmp_context.set_command_name(command.clone());
381        rtmp_context.set_transaction_id(transaction_id);
382
383        if command == "FCUnpublish" {
384            return self.handle_fc_unpublish_request(rtmp_context, buffer).await
385        }
386        if command == "deleteStream" {
387            return self.handle_delete_stream_request(rtmp_context, buffer).await
388        }
389
390        match rtmp_context.get_publisher_status().unwrap() {
391            Connected => self.handle_release_stream_request(rtmp_context, buffer).await,
392            Released => self.handle_fc_publish_request(rtmp_context, buffer).await,
393            FcPublished => self.handle_create_stream_request(rtmp_context, buffer).await,
394            Created => self.handle_publish_request(rtmp_context, buffer).await,
395            _ => Ok(())
396        }
397    }
398
399    async fn handle_subscriber_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
400        use SubscriberStatus::*;
401
402        let subscriber_status = rtmp_context.get_subscriber_status().unwrap();
403
404        let command: AmfString = buffer.decode()?;
405        let transaction_id: Number = buffer.decode()?;
406        rtmp_context.set_command_name(command.clone());
407        rtmp_context.set_transaction_id(transaction_id);
408
409        if subscriber_status == FcSubscribed {
410            /* NOTE: FFmpeg will send this. */
411            if command == "getStreamLength" {
412                return self.handle_stream_length_request(rtmp_context, buffer).await
413            }
414            /* NOTE: OBS will send this. */
415            if command == "set_playlist" {
416                return self.handle_playlist_request(rtmp_context, buffer).await
417            }
418        }
419
420        match subscriber_status {
421            Connected => Ok(()),
422            /* Subscriber sends a Window Acknowledgement Size chunk just after the connect command. */
423            WindowAcknowledgementSizeGotSent => self.handle_create_stream_request(rtmp_context, buffer).await,
424            Created => self.handle_fc_subscribe_request(rtmp_context, buffer).await,
425            AdditionalCommandGotSent => self.handle_play_request(rtmp_context, buffer).await,
426            _ => Ok(())
427        }
428    }
429
430    async fn handle_command_request(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
431        use ClientType::*;
432
433        if let Some(client_type) = rtmp_context.get_client_type() {
434            match client_type {
435                Publisher => self.handle_publisher_request(rtmp_context, buffer).await,
436                Subscriber => self.handle_subscriber_request(rtmp_context, buffer).await
437            }
438        } else {
439            self.handle_connect_request(rtmp_context, buffer).await
440        }
441    }
442
443    async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
444        let topic = rtmp_context.get_topic().unwrap();
445
446        let tag_type = match message_type {
447            MessageType::Audio => TagType::Audio,
448            MessageType::Video => TagType::Video,
449            MessageType::Data => TagType::ScriptData,
450            _ => TagType::Other
451        };
452
453        if let TagType::ScriptData = tag_type {
454            // NOTE: Currently @setDataFrame command is used for nothing.
455            Decoder::<AmfString>::decode(&mut buffer)?;
456        }
457
458        let data: Vec<u8> = buffer.into();
459        let flv_tag = FlvTag::new(tag_type, timestamp, data);
460        topic.append_flv_tag(flv_tag)?;
461
462        info!("FLV chunk got handled.");
463        Ok(())
464    }
465
466    async fn write_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
467        let mut buffer = ByteBuffer::default();
468        buffer.encode(&AmfString::from("_error"));
469        buffer.encode(&rtmp_context.get_transaction_id());
470        buffer.encode(&CommandError::new(information.clone()));
471        write_chunk(self.0.as_mut(), rtmp_context, CommandError::CHANNEL.into(), Duration::default(), CommandError::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
472
473        rtmp_context.set_information(information);
474
475        error!("{error}");
476        return Err(error)
477    }
478
479    async fn write_connect_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
480        use ClientType::*;
481
482        let command_object = rtmp_context.get_command_object().unwrap().clone();
483
484        let client_type = if command_object.get_properties().get("type").is_some() {
485            Publisher
486        } else if command_object.get_properties().get("fpad").is_some() {
487            Subscriber
488        } else {
489            let information = object!(
490                "level" => AmfString::from("error"),
491                "code" => AmfString::from("NetConnection.Connect.UndistinguishableClient"),
492                "description" => AmfString::from("Server couldn't distinguish you are either publisher or subscriber.")
493            );
494            return self.write_error_response(rtmp_context, information, undistinguishable_client()).await
495        };
496
497        let app = rtmp_context.get_app().unwrap().clone();
498        let requested_app: &AmfString = (&command_object.get_properties()["app"]).into();
499        if *requested_app != app {
500            let information = object!(
501                "level" => AmfString::from("error"),
502                "code" => AmfString::from("NetConnection.Connect.InconsistentAppPath"),
503                "description" => AmfString::new(format!("Requested app path is inconsistent. expected: {}, actual: {}", app, requested_app))
504            );
505            return self.write_error_response(rtmp_context, information, inconsistent_app_path(app, requested_app.clone())).await
506        }
507
508        let properties = object!(
509            "fmsVer" => AmfString::from("FMS/5,0,17"),
510            "capabilities" => Number::from(31)
511        );
512        let information = object!(
513            "level" => AmfString::from("status"),
514            "code" => AmfString::from("NetConnection.Connect.Success"),
515            "description" => AmfString::from("Connection succeeded."),
516            "objectEncoding" => Number::from(0)
517        );
518        let mut buffer = ByteBuffer::default();
519        buffer.encode(&AmfString::from("_result"));
520        buffer.encode(&rtmp_context.get_transaction_id());
521        buffer.encode(&ConnectResult::new(properties.clone(), information.clone()));
522        write_chunk(self.0.as_mut(), rtmp_context, ConnectResult::CHANNEL.into(), Duration::default(), ConnectResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
523
524        rtmp_context.set_client_type(client_type);
525        rtmp_context.set_properties(properties);
526        rtmp_context.set_information(information);
527
528        match client_type {
529            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
530            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
531        }
532
533        info!("connect result got sent.");
534        Ok(())
535    }
536
537    async fn write_release_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
538        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
539
540        if topic_id.is_empty() {
541            let information = object!(
542                "level" => AmfString::from("error"),
543                "code" => AmfString::from("NetConnection.ReleaseStream.EmptyTopicPath"),
544                "description" => AmfString::from("The topic path must not be empty.")
545            );
546            return self.write_error_response(rtmp_context, information, empty_topic_id()).await
547        }
548
549        let database_url = rtmp_context.get_database_url().unwrap().clone();
550        let storage_path = rtmp_context.get_storage_path().unwrap().clone();
551        let client_addr = rtmp_context.get_client_addr().unwrap();
552        let app = rtmp_context.get_app().unwrap().clone();
553
554        let topic = match publish_topic(&database_url, &storage_path, &app, &topic_id, client_addr).await {
555            Ok(topic) => topic,
556            Err(e) => {
557                let information = object!(
558                    "level" => AmfString::from("error"),
559                    "code" => AmfString::from("NetConnection.ReleaseStream.StreamIsUnpublished"),
560                    "description" => AmfString::new(format!("A stream of {topic_id} is unpublished."))
561                );
562                return self.write_error_response(rtmp_context, information, e).await
563            }
564        };
565
566        let mut buffer = ByteBuffer::default();
567        buffer.encode(&AmfString::from("_result"));
568        buffer.encode(&rtmp_context.get_transaction_id());
569        buffer.encode(&ReleaseStreamResult);
570        write_chunk(self.0.as_mut(), rtmp_context, ReleaseStreamResult::CHANNEL.into(), Duration::default(), ReleaseStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
571
572        rtmp_context.set_topic(topic);
573
574        rtmp_context.set_publisher_status(PublisherStatus::Released);
575
576        info!("releaseStream result got sent.");
577        Ok(())
578    }
579
580    async fn write_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
581        let mut buffer = ByteBuffer::default();
582        buffer.encode(&AmfString::from("onFCPublish"));
583        buffer.encode(&OnFcPublish);
584        write_chunk(self.0.as_mut(), rtmp_context, OnFcPublish::CHANNEL.into(), Duration::default(), OnFcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
585
586        rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
587
588        info!("onFCPublish got sent.");
589        Ok(())
590    }
591
592    async fn write_create_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
593        use ClientType::*;
594
595        let client_type = rtmp_context.get_client_type().unwrap();
596
597        let message_id = provide_message_id();
598        let mut buffer = ByteBuffer::default();
599        buffer.encode(&AmfString::from("_result"));
600        buffer.encode(&rtmp_context.get_transaction_id());
601        buffer.encode(&CreateStreamResult::new(message_id.into()));
602        write_chunk(self.0.as_mut(), rtmp_context, CreateStreamResult::CHANNEL.into(), Duration::default(), CreateStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
603
604        rtmp_context.set_message_id(message_id);
605
606        match client_type {
607            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
608            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
609        }
610
611        info!("createStream result got sent.");
612        Ok(())
613    }
614
615    async fn write_stream_length_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
616        let transaction_id = rtmp_context.get_transaction_id();
617        let topic = rtmp_context.get_topic_mut().unwrap();
618
619        for result in topic {
620            let flv_tag = result?;
621
622            if flv_tag.get_tag_type() == TagType::ScriptData {
623                let mut buffer = ByteBuffer::default();
624                buffer.put_bytes(flv_tag.get_data());
625                let script_data: ScriptDataTag = buffer.decode()?;
626
627                if *script_data.get_name() == "onMetaData" {
628                    let duration: &Number = (&script_data.get_value().get_properties()["duration"]).into();
629                    let mut buffer = ByteBuffer::default();
630                    buffer.encode(&AmfString::from("_result"));
631                    buffer.encode(&transaction_id);
632                    buffer.encode(&GetStreamLengthResult::new(*duration));
633                    write_chunk(self.0.as_mut(), rtmp_context, GetStreamLengthResult::CHANNEL.into(), Duration::default(), GetStreamLengthResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
634
635                    rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
636
637                    info!("getStreamLength result got sent.");
638                    return Ok(())
639                }
640            }
641        }
642
643        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
644        let information = object!(
645            "level" => AmfString::from("error"),
646            "code" => AmfString::from("NetConnection.GetStreamLength.MetadataNotFound"),
647            "description" => AmfString::new(format!("Metadata didn't find in specified topic ID: {topic_id}"))
648        );
649        self.write_error_response(rtmp_context, information, metadata_not_found(topic_id)).await
650    }
651
652    async fn write_playlist_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
653        let mut buffer = ByteBuffer::default();
654        buffer.encode(&AmfString::from("playlist_ready"));
655        buffer.encode(&PlaylistReady);
656        write_chunk(self.0.as_mut(), rtmp_context, PlaylistReady::CHANNEL.into(), Duration::default(), PlaylistReady::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
657
658        rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
659
660        info!("playlist_ready got sent.");
661        Ok(())
662    }
663
664    async fn write_stream_begin(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
665        use ClientType::*;
666
667        let message_id = rtmp_context.get_message_id().unwrap();
668        let mut buffer = ByteBuffer::default();
669        buffer.put_u16_be(StreamBegin::EVENT_TYPE.into());
670        buffer.encode(&StreamBegin::new(message_id));
671        write_chunk(self.0.as_mut(), rtmp_context, StreamBegin::CHANNEL.into(), Duration::default(), StreamBegin::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
672
673        match rtmp_context.get_client_type().unwrap() {
674            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
675            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
676        }
677
678        info!("Stream Begin got sent.");
679        Ok(())
680    }
681
682    async fn write_error_status(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
683        let message_id = rtmp_context.get_message_id().unwrap();
684
685        let mut buffer = ByteBuffer::default();
686        buffer.encode(&AmfString::from("onStatus"));
687        buffer.encode(&Number::from(0));
688        buffer.encode(&OnStatus::new(information.clone()));
689        write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
690
691        rtmp_context.set_information(information);
692
693        error!("{error}");
694        return Err(error)
695    }
696
697    async fn write_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
698        let message_id = rtmp_context.get_message_id().unwrap();
699        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
700        let publishing_name = rtmp_context.get_publishing_name().unwrap().clone();
701
702        if topic_id != publishing_name {
703            let information = object!(
704                "level" => AmfString::from("error"),
705                "code" => AmfString::from("NetStream.Publish.InconsistentPlaypath"),
706                "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_id}, actual: {publishing_name}"))
707            );
708            return self.write_error_status(rtmp_context, information, inconsistent_topic_id(topic_id, publishing_name)).await
709        }
710
711        let information = object!(
712            "level" => AmfString::from("status"),
713            "code" => AmfString::from("NetStream.Publish.Start"),
714            "description" => AmfString::new(format!("{publishing_name} is now published")),
715            "details" => publishing_name.clone()
716        );
717        let mut buffer = ByteBuffer::default();
718        buffer.encode(&AmfString::from("onStatus"));
719        buffer.encode(&Number::from(0));
720        buffer.encode(&OnStatus::new(information.clone()));
721        write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
722
723        rtmp_context.set_information(information);
724
725        rtmp_context.set_publisher_status(PublisherStatus::Published);
726
727        info!("onStatus(publish) got sent.");
728        Ok(())
729    }
730
731    async fn write_play_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
732        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
733        let message_id = rtmp_context.get_message_id().unwrap();
734        let stream_name = rtmp_context.get_stream_name().unwrap().clone();
735
736        if topic_id != stream_name {
737            let information = object!(
738                "level" => AmfString::from("error"),
739                "code" => AmfString::from("NetStream.Play.InconsistentTopicPath"),
740                "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_id}, actual: {stream_name}"))
741            );
742            return self.write_error_status(rtmp_context, information, inconsistent_topic_id(topic_id, stream_name)).await
743        }
744
745        let information = object!(
746            "level" => AmfString::from("status"),
747            "code" => AmfString::from("NetStream.Play.Start"),
748            "description" => AmfString::from("Playing stream.")
749        );
750        let mut buffer = ByteBuffer::default();
751        buffer.encode(&AmfString::from("onStatus"));
752        buffer.encode(&Number::from(0));
753        buffer.encode(&OnStatus::new(information.clone()));
754        write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
755
756        rtmp_context.set_information(information);
757
758        rtmp_context.set_subscriber_status(SubscriberStatus::Played);
759
760        info!("onStatus(play) got sent.");
761        Ok(())
762    }
763
764    async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
765        for next in rtmp_context.get_topic_mut().unwrap() {
766            let flv_tag = next?;
767            let message_id = rtmp_context.get_message_id().unwrap();
768
769            let channel;
770            let message_type;
771            match flv_tag.get_tag_type() {
772                TagType::Audio => {
773                    channel = Audio::CHANNEL;
774                    message_type = Audio::MESSAGE_TYPE;
775                },
776                TagType::Video => {
777                    channel = Video::CHANNEL;
778                    message_type = Video::MESSAGE_TYPE;
779                },
780                TagType::ScriptData => {
781                    channel = SetDataFrame::CHANNEL;
782                    message_type = SetDataFrame::MESSAGE_TYPE;
783                },
784                TagType::Other => {
785                    channel = Channel::Other;
786                    message_type = MessageType::Other;
787                }
788            }
789            let timestamp = flv_tag.get_timestamp();
790            let data: Vec<u8> = if let MessageType::Data = message_type {
791                let mut buffer = ByteBuffer::default();
792                buffer.encode(&AmfString::from("@setDataFrame"));
793                buffer.put_bytes(flv_tag.get_data());
794                buffer.into()
795            } else {
796                flv_tag.get_data().to_vec()
797            };
798            write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
799
800            info!("FLV chunk got sent.");
801            return Ok(())
802        }
803
804        info!("FLV data became empty.");
805        Err(stream_got_exhausted())
806    }
807}
808
809#[doc(hidden)]
810impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
811    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
812        use MessageType::*;
813
814        let basic_header = ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?;
815        let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
816        let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
817            if timestamp.as_millis() == U24_MAX as u128 {
818                let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
819                Some(extended_timestamp)
820            } else {
821                None
822            }
823        } else {
824            None
825        };
826
827        let chunk_id = basic_header.get_chunk_id();
828        if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
829            if let Some(extended_timestamp) = extended_timestamp {
830                last_received_chunk.set_timestamp(extended_timestamp);
831            } else {
832                message_header.get_timestamp().map(
833                    |timestamp| last_received_chunk.set_timestamp(timestamp)
834                );
835            }
836            message_header.get_message_length().map(
837                |message_length| last_received_chunk.set_message_length(message_length)
838            );
839            message_header.get_message_type().map(
840                |message_type| last_received_chunk.set_message_type(message_type)
841            );
842            message_header.get_message_id().map(
843                |message_id| last_received_chunk.set_message_id(message_id)
844            );
845        } else {
846            rtmp_context.insert_received_chunk(
847                chunk_id,
848                LastChunk::new(
849                    message_header.get_timestamp().unwrap(),
850                    message_header.get_message_length().unwrap(),
851                    message_header.get_message_type().unwrap(),
852                    message_header.get_message_id().unwrap()
853                )
854            );
855        } 
856        let data = ready!(
857            pin!(
858                read_chunk_data(
859                    pin!(self.0.await_until_receiving()),
860                    rtmp_context.get_receiving_chunk_size(),
861                    rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length()
862                )
863            ).poll(cx)
864        )?;
865        let buffer: ByteBuffer = data.into();
866
867        let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
868        match message_type {
869            Acknowledgement => ready!(pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx))?,
870            UserControl => ready!(pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx))?,
871            WindowAcknowledgementSize => ready!(pin!(self.handle_window_acknowledgement_size(rtmp_context, buffer)).poll(cx))?,
872            Audio | Video | Data => {
873                let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
874                ready!(pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx))?
875            },
876            Command => ready!(pin!(self.handle_command_request(rtmp_context, buffer)).poll(cx))?,
877            other => unimplemented!("Undefined Message: {other:?}")
878        }
879
880        if let Some(publisher_status) = rtmp_context.get_publisher_status() {
881            match publisher_status {
882                PublisherStatus::Connected => pin!(self.write_release_stream_response(rtmp_context)).poll(cx),
883                PublisherStatus::Released => pin!(self.write_fc_publish_response(rtmp_context)).poll(cx),
884                PublisherStatus::FcPublished => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
885                PublisherStatus::Created => {
886                    ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
887                    pin!(self.write_publish_response(rtmp_context)).poll(cx)
888                },
889                _ => {
890                    /* Just receiving flv after publishing. */
891                    Poll::Ready(Ok(()))
892                }
893            }
894        } else if let Some(mut subscriber_status) = rtmp_context.get_subscriber_status() {
895            if subscriber_status == SubscriberStatus::FcSubscribed {
896                let command = rtmp_context.get_command_name().unwrap().clone();
897
898                if command == "getStreamLength" {
899                    return pin!(self.write_stream_length_response(rtmp_context)).poll(cx)
900                } else if command == "set_playlist" {
901                    return pin!(self.write_playlist_response(rtmp_context)).poll(cx)
902                } else {
903                    subscriber_status = SubscriberStatus::AdditionalCommandGotSent;
904                }
905            }
906
907            match subscriber_status {
908                SubscriberStatus::WindowAcknowledgementSizeGotSent => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
909                SubscriberStatus::AdditionalCommandGotSent => {
910                    ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
911                    pin!(self.write_play_response(rtmp_context)).poll(cx)
912                },
913                SubscriberStatus::Played => pin!(self.write_flv(rtmp_context)).poll(cx),
914                _ => {
915                    /* NOTE: There are plural chunks just to receive. */
916                    Poll::Ready(Ok(()))
917                }
918            }
919        } else {
920            pin!(self.write_connect_response(rtmp_context)).poll(cx)
921        }
922    }
923}
924
925#[doc(hidden)]
926fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
927    MessageHandler(stream)
928}
929
930#[doc(hidden)]
931#[derive(Debug)]
932struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
933
934#[doc(hidden)]
935impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
936    async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
937        let topic_id = rtmp_context.get_topic_id().unwrap().clone();
938        rtmp_context.increase_transaction_id();
939
940        let mut buffer = ByteBuffer::default();
941        buffer.encode(&AmfString::from("FCUnpublish"));
942        buffer.encode(&rtmp_context.get_transaction_id());
943        buffer.encode(&FcUnpublish::new(topic_id));
944        write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
945
946        info!("FCUnpublish got sent.");
947        Ok(())
948    }
949
950    async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
951        let message_id = rtmp_context.get_message_id().unwrap();
952        rtmp_context.increase_transaction_id();
953
954        let mut buffer = ByteBuffer::default();
955        buffer.encode(&AmfString::from("deleteStream"));
956        buffer.encode(&rtmp_context.get_transaction_id());
957        buffer.encode(&DeleteStream::new(message_id.into()));
958        write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
959
960        info!("deleteStream got sent.");
961        Ok(())
962    }
963}
964
965#[doc(hidden)]
966fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
967    CloseHandler(stream)
968}
969
970#[doc(hidden)]
971impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
972    fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
973        if error.kind() != ErrorKind::Other {
974            if let Some(publisher_status) = rtmp_context.get_publisher_status() {
975                if publisher_status >= PublisherStatus::FcPublished {
976                    ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
977                }
978
979                if publisher_status >= PublisherStatus::Created {
980                    ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
981                }
982            }
983        }
984
985        self.0.as_mut().poll_shutdown(cx)
986    }
987}
988
989/// The default RTMP handler.
990///
991/// This handles the raw RTMP by well-known communication steps. That is, this performs just following steps.
992///
993/// # With publishers
994///
995/// 1. Checks the application name from the [`Connect`] command.
996/// 2. Checks the topic path from the [`ReleaseStream`]/[`FcPublish`] command.
997/// 3. Provides a message ID when receives the [`CreateStream`] command.
998/// 4. Checks publication informations from the [`Publish`] command.
999/// 5. Then receives FLV media data.
1000///
1001/// If some error occurs in any step, sends commands which are [`FcUnpublish`] and [`DeleteStream`] to its client, then terminates its connection.
1002/// These perform to delete the topic path and a message ID from its context.
1003/// However also these can be sent from clients.
1004///
1005/// # With subscribers
1006///
1007/// 1. Checks the application name from the [`Connect`] command.
1008/// 2. Checks partner's bandwidsth from the [`WindowAcknowledgementSize`] message.
1009/// 3. Provides a message ID when receives the [`CreateStream`] command.
1010/// 4. Checks the topic path from the [`FcSubscribe`] command.
1011/// 5. Handles one of additional commands either [`GetStreamLength`] or [`SetPlaylist`].
1012/// 6. Checkes subscription informaitons from [`Play`] command/
1013/// 7. The sends FLV media data.
1014///
1015/// In Both sides, if receiving data size exceeds server's bandwidth, this reports its thing via the [`Acknowledgement`] message to its client.
1016///
1017/// # Examples
1018///
1019/// ```rust
1020/// use std::marker::PhantomData;
1021/// use sheave_core::handlers::{
1022///     RtmpContext,
1023///     VecStream
1024/// };
1025/// use sheave_server::{
1026///     Server,
1027///     handlers::RtmpHandler,
1028/// };
1029///
1030/// let stream = VecStream::default();
1031/// let rtmp_context = RtmpContext::default();
1032/// let server = Server::new(stream, rtmp_context, PhantomData::<RtmpHandler<VecStream>>);
1033/// ```
1034///
1035/// [`Connect`]: sheave_core::messages::Connect
1036/// [`ReleaseSream`]: sheave_core::messages::ReleaseStream
1037/// [`FcPublish`]: sheave_core::messages::FcPublish
1038/// [`CreateStream`]: sheave_core::messages::CreateStream
1039/// [`Publish`]: sheave_core::messages::Publish
1040/// [`Acknowledgement`]: sheave_core::messages::Acknowledgement
1041/// [`FcUnpublish`]: sheave_core::messages::FcUnpublish
1042/// [`DeleteStream`]: sheave_core::messages::DeleteStream
1043/// [`WindowAcknowledgementSize`]: sheave_core::messages::WindowAcknowledgementSize
1044/// [`FcSubscribe`]: sheave_core::messages::FcSubscribe
1045/// [`GetStreamLength`]: sheave_core::messages::GetStreamLength
1046/// [`SetPlaylist`]: sheave_core::messages::SetPlaylist
1047/// [`Play`]: sheave_core::messages::Play
1048#[derive(Debug)]
1049pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
1050
1051impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
1052    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
1053        pin!(
1054            handle_handshake(self.0.make_weak_pin())
1055                .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
1056                .map_err(handle_close(self.0.make_weak_pin()))
1057        ).poll_handle(cx, rtmp_context)
1058    }
1059}
1060
1061impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
1062    fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
1063        Self(stream)
1064    }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use std::{
1070        env::temp_dir,
1071        fs::{
1072            copy,
1073            create_dir_all,
1074            exists,
1075        },
1076        net::{
1077            IpAddr,
1078            Ipv4Addr,
1079            SocketAddr,
1080        },
1081        path::{
1082            MAIN_SEPARATOR,
1083            PathBuf,
1084        },
1085        str::FromStr,
1086    };
1087    use dotenvy::{
1088        from_filename,
1089        var
1090    };
1091    use log::LevelFilter;
1092    use rand::fill;
1093    use sqlx::{
1094        Connection,
1095        MySqlConnection,
1096        migrate::Migrator,
1097        query
1098    };
1099    use tokio::sync::OnceCell;
1100    use uuid::Uuid;
1101    use sheave_core::{
1102        ecma_array,
1103        flv::Flv,
1104        handlers::VecStream,
1105        handshake::EncryptionAlgorithm,
1106        messages::{
1107            ChunkSize,
1108            SetPlaylist,
1109            amf::v0::Boolean
1110        }
1111    };
1112    use super::*;
1113
1114    const CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1935);
1115    const STATEMENT: &str = "INSERT INTO topics (id, client_addr) VALUES (?, ?)";
1116    static MIGRATOR: OnceCell<()> = OnceCell::const_new();
1117
1118    async fn migrate_once() {
1119        let database_url = var("DATABASE_URL")
1120            .unwrap();
1121        let mut connection = MySqlConnection::connect(&database_url)
1122            .await
1123            .unwrap();
1124        let migrator = Migrator::new(format!("{}{MAIN_SEPARATOR}migrations", env!("CARGO_MANIFEST_DIR")).as_ref())
1125            .await
1126            .unwrap();
1127        migrator
1128            .run(&mut connection)
1129            .await
1130            .unwrap()
1131    }
1132
1133    #[tokio::test]
1134    async fn ok_unsigned_handshake_got_handled() {
1135        let mut stream = pin!(VecStream::default());
1136        let mut rtmp_context = RtmpContext::default();
1137
1138        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1139        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1140        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
1141        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1142        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1143        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1144        assert!(result.is_ok());
1145
1146        let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1147        let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1148        let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1149        assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1150        assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1151
1152        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1153        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1154        assert!(result.is_ok());
1155        let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1156        assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1157    }
1158
1159    #[tokio::test]
1160    async fn err_digest_did_not_match() {
1161        let mut stream = pin!(VecStream::default());
1162        let mut rtmp_context = RtmpContext::default();
1163
1164        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1165        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1166        let sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1167        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1168        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1169        assert!(result.is_err())
1170    }
1171
1172    #[tokio::test]
1173    async fn err_signature_did_not_match() {
1174        let mut stream = pin!(VecStream::default());
1175        let mut rtmp_context = RtmpContext::default();
1176
1177        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1178        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1179        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1180        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1181        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1182        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1183        assert!(result.is_ok());
1184
1185        read_encryption_algorithm(stream.as_mut()).await.unwrap();
1186        let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1187        read_handshake(stream.as_mut()).await.unwrap();
1188        let mut invalid_signature_key: [u8; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()] = [0; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()];
1189        fill(&mut invalid_signature_key);
1190        received_server_handshake.imprint_signature(sent_encryption_algorithm, &invalid_signature_key);
1191        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1192        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1193        assert!(result.is_err())
1194    }
1195
1196    #[tokio::test]
1197    async fn ok_singed_handshake_got_handled() {
1198        let mut stream = pin!(VecStream::default());
1199        let mut rtmp_context = RtmpContext::default();
1200
1201        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1202        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1203        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1204        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1205        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1206        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1207        assert!(result.is_ok());
1208
1209        let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1210        let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1211        let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1212        let mut server_signature_key: Vec<u8> = Vec::new();
1213        server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1214        server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1215        sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1216        assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1217        assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1218
1219        let mut client_signature_key: Vec<u8> = Vec::new();
1220        client_signature_key.extend_from_slice(Handshake::CLIENT_KEY);
1221        client_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1222        received_server_handshake.imprint_signature(sent_encryption_algorithm, &client_signature_key);
1223        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1224        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1225        assert!(result.is_ok());
1226        let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1227        assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1228    }
1229
1230    #[tokio::test]
1231    async fn ok_signed_handshake_as_ffmpeg_got_handled() {
1232        let mut stream = pin!(VecStream::default());
1233        let mut rtmp_context = RtmpContext::default();
1234
1235        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1236        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1237        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1238        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1239        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1240        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1241        assert!(result.is_ok());
1242
1243        let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1244        let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1245        let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1246        let mut server_signature_key: Vec<u8> = Vec::new();
1247        server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1248        server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1249        sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1250        assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1251        assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1252
1253        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1254        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1255        assert!(result.is_ok());
1256        let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1257        assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes());
1258    }
1259
1260    #[tokio::test]
1261    async fn err_undistinguishable_client() {
1262        let mut stream = pin!(VecStream::default());
1263        let mut rtmp_context = RtmpContext::default();
1264
1265        let mut buffer = ByteBuffer::default();
1266        buffer.encode(&Connect::default());
1267        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1268        let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1269        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1270        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1271        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1272        let mut buffer: ByteBuffer = chunk.into();
1273        let command: AmfString = buffer.decode().unwrap();
1274        assert!(result.is_err());
1275        assert_eq!(command, "_error");
1276        assert!(rtmp_context.get_information().is_some())
1277    }
1278
1279    #[tokio::test]
1280    async fn err_inconsistent_app_path() {
1281        let mut stream = pin!(VecStream::default());
1282        let mut rtmp_context = RtmpContext::default();
1283        rtmp_context.set_app("ondemand");
1284
1285        let mut buffer = ByteBuffer::default();
1286        buffer.encode(&Connect::new(object!("app" => AmfString::default())));
1287        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1288        let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1289        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1290        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1291        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1292        let mut buffer: ByteBuffer = chunk.into();
1293        let command: AmfString = buffer.decode().unwrap();
1294        assert!(result.is_err());
1295        assert_eq!(command, "_error");
1296        assert!(rtmp_context.get_information().is_some())
1297    }
1298
1299    #[tokio::test]
1300    async fn err_empty_topic_id() {
1301        let mut stream = pin!(VecStream::default());
1302        let mut rtmp_context = RtmpContext::default();
1303        let mut buffer = ByteBuffer::default();
1304        buffer.encode(&ReleaseStream::new(AmfString::from("")));
1305        handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1306        let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1307        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1308        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1309        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1310        let mut buffer: ByteBuffer = chunk.into();
1311        let command: AmfString = buffer.decode().unwrap();
1312        assert!(result.is_err());
1313        assert_eq!(command, "_error");
1314        assert!(rtmp_context.get_information().is_some())
1315    }
1316
1317    #[tokio::test]
1318    async fn err_unpublished_stream() {
1319        if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1320            from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1321        }
1322
1323        MIGRATOR.get_or_init(migrate_once).await;
1324
1325        let temp_dir = temp_dir();
1326        let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1327
1328        let database_url = var("DATABASE_URL").unwrap();
1329
1330        let app = "ondemand";
1331
1332        let mut stream = pin!(VecStream::default());
1333        let mut rtmp_context = RtmpContext::default();
1334        rtmp_context.set_storage_path(&storage_path);
1335        rtmp_context.set_database_url(&database_url);
1336        rtmp_context.set_app(app);
1337        rtmp_context.set_client_addr(CLIENT_ADDR);
1338
1339        let topic_id = Uuid::now_v7().to_string();
1340
1341        let mut buffer = ByteBuffer::default();
1342        buffer.encode(&ReleaseStream::new(AmfString::new(topic_id)));
1343        handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1344        let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1345        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1346        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1347        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1348        let mut buffer: ByteBuffer = chunk.into();
1349        let command: AmfString = buffer.decode().unwrap();
1350        assert!(result.is_err());
1351        assert_eq!(command, "_error");
1352        assert!(rtmp_context.get_information().is_some())
1353    }
1354
1355    #[tokio::test]
1356    async fn err_inconsistent_topic_id_in_publish() {
1357        let mut stream = pin!(VecStream::default());
1358        let mut rtmp_context = RtmpContext::default();
1359        rtmp_context.set_message_id(0);
1360        rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
1361
1362        let mut buffer = ByteBuffer::default();
1363        buffer.encode(&Publish::new(AmfString::default(), "live".into()));
1364        handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1365        let result = handle_message(stream.as_mut()).write_publish_response(&mut rtmp_context).await;
1366        assert!(result.is_err());
1367        assert!(rtmp_context.get_information().is_some())
1368    }
1369
1370    #[tokio::test]
1371    async fn err_metadata_not_found() {
1372        if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1373            from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1374        }
1375
1376        let temp_dir = temp_dir();
1377        let topic_storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1378        let topic_id = Uuid::now_v7().to_string();
1379        let topic = {
1380            create_dir_all(&topic_storage_path).unwrap();
1381            Flv::create(&format!("{topic_storage_path}{MAIN_SEPARATOR}{topic_id}.flv")).unwrap()
1382        };
1383
1384        let mut stream = pin!(VecStream::default());
1385        let mut rtmp_context = RtmpContext::default();
1386        rtmp_context.set_topic(topic);
1387
1388        let mut buffer = ByteBuffer::default();
1389        buffer.encode(&GetStreamLength::new(AmfString::new(topic_id)));
1390        handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1391        let result = handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await;
1392        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1393        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1394        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1395        let mut buffer: ByteBuffer = chunk.into();
1396        let command: AmfString = buffer.decode().unwrap();
1397        assert!(result.is_err());
1398        assert_eq!(command, "_error");
1399        assert!(rtmp_context.get_information().is_some())
1400    }
1401
1402    #[tokio::test]
1403    async fn err_inconsistent_topic_id_in_play() {
1404        let mut stream = pin!(VecStream::default());
1405        let mut rtmp_context = RtmpContext::default();
1406        rtmp_context.set_topic_id(AmfString::new(Uuid::now_v7().to_string()));
1407        rtmp_context.set_message_id(0);
1408
1409        let mut buffer = ByteBuffer::default();
1410        buffer.encode(&Play::new(AmfString::new(Uuid::now_v7().to_string()), Number::from(-2i8)));
1411        handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1412        let result = handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await;
1413        assert!(result.is_err());
1414        assert!(rtmp_context.get_information().is_some())
1415    }
1416
1417    #[tokio::test]
1418    async fn ok_valid_publisher_sequence() {
1419        if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1420            from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1421        }
1422
1423        env_logger::builder()
1424            .is_test(true)
1425            .filter_level(LevelFilter::from_str(&var("LOGLEVEL").unwrap_or("error".into())).unwrap())
1426            .init();
1427
1428        MIGRATOR.get_or_init(migrate_once).await;
1429
1430        let temp_dir = temp_dir();
1431        let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1432
1433        let database_url = var("DATABASE_URL").unwrap();
1434
1435        let app = "ondemand";
1436
1437        let mut rtmp_context = RtmpContext::default();
1438        rtmp_context.set_storage_path(&storage_path);
1439        rtmp_context.set_database_url(&database_url);
1440        rtmp_context.set_app(app);
1441        rtmp_context.set_client_addr(CLIENT_ADDR);
1442
1443        let topic_id = Uuid::now_v7().to_string();
1444
1445        query(STATEMENT)
1446            .bind(&topic_id)
1447            .bind(&CLIENT_ADDR.to_string())
1448            .execute(&mut MySqlConnection::connect(&database_url).await.unwrap())
1449            .await
1450            .unwrap();
1451
1452        let mut stream = pin!(VecStream::default());
1453        let mut buffer = ByteBuffer::default();
1454        buffer.encode(
1455            &Connect::new(
1456                object!(
1457                    "app" => AmfString::from(app),
1458                    "type" => AmfString::from("nonprivate")
1459                )
1460            )
1461        );
1462        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1463        assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1464        assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
1465
1466        let mut buffer = ByteBuffer::default();
1467        buffer.encode(&ReleaseStream::new(AmfString::new(topic_id.clone())));
1468        handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1469        let mut stream = pin!(VecStream::default());
1470        assert!(handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await.is_ok());
1471        assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
1472
1473        let mut buffer = ByteBuffer::default();
1474        buffer.encode(&FcPublish::new(AmfString::new(topic_id.clone())));
1475        handle_message(stream.as_mut()).handle_fc_publish_request(&mut rtmp_context, buffer).await.unwrap();
1476        let mut stream = pin!(VecStream::default());
1477        assert!(handle_message(stream.as_mut()).write_fc_publish_response(&mut rtmp_context).await.is_ok());
1478        assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
1479
1480        let mut buffer = ByteBuffer::default();
1481        buffer.encode(&CreateStream);
1482        handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1483        let mut stream = pin!(VecStream::default());
1484        assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1485        assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
1486
1487        let mut buffer = ByteBuffer::default();
1488        buffer.encode(&Publish::new(AmfString::new(topic_id), "live".into()));
1489        handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1490        let mut stream = pin!(VecStream::default());
1491        assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1492        assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
1493        assert!(handle_message(stream).write_publish_response(&mut rtmp_context).await.is_ok());
1494        assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
1495    }
1496
1497    #[tokio::test]
1498    async fn ok_valid_subscriber_sequence_in_ffmpeg() {
1499        if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1500            from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1501        }
1502
1503        MIGRATOR.get_or_init(migrate_once).await;
1504
1505        let temp_dir = temp_dir();
1506        let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1507        let app = "ondemand";
1508        let copy_to = format!("{storage_path}{MAIN_SEPARATOR}{app}");
1509        create_dir_all(&copy_to).unwrap();
1510
1511        let mut resources_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1512        resources_path.pop();
1513        resources_path.push("resources");
1514        resources_path.push("test.flv");
1515        let topic_id = Uuid::now_v7().to_string();
1516        copy(resources_path, format!("{copy_to}{MAIN_SEPARATOR}{topic_id}.flv")).unwrap();
1517
1518        let database_url = var("DATABASE_URL").unwrap();
1519
1520        let mut rtmp_context = RtmpContext::default();
1521        rtmp_context.set_storage_path(&storage_path);
1522        rtmp_context.set_database_url(&database_url);
1523        rtmp_context.set_app(app);
1524        rtmp_context.set_client_addr(CLIENT_ADDR);
1525
1526        query(STATEMENT)
1527            .bind(&topic_id)
1528            .bind(&CLIENT_ADDR.to_string())
1529            .execute(&mut MySqlConnection::connect(&database_url).await.unwrap())
1530            .await
1531            .unwrap();
1532
1533        let mut stream = pin!(VecStream::default());
1534        let mut buffer = ByteBuffer::default();
1535        buffer.encode(
1536            &Connect::new(
1537                object!(
1538                    "app" => AmfString::from(app),
1539                    "fpad" => Boolean::new(0)
1540                )
1541            )
1542        );
1543        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1544        assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1545        assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1546
1547        let mut buffer = ByteBuffer::default();
1548        buffer.encode(&WindowAcknowledgementSize::default());
1549        handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1550        assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1551
1552        let mut buffer = ByteBuffer::default();
1553        buffer.encode(&CreateStream);
1554        handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1555        let mut stream = pin!(VecStream::default());
1556        assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1557        assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1558
1559        let mut stream = pin!(VecStream::default());
1560        let mut buffer = ByteBuffer::default();
1561        buffer.encode(&FcSubscribe::new(AmfString::new(topic_id.clone())));
1562        handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1563        assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1564
1565        let mut buffer = ByteBuffer::default();
1566        buffer.encode(&GetStreamLength::new(AmfString::new(topic_id.clone())));
1567        handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1568        let mut stream = pin!(VecStream::default());
1569        assert!(handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await.is_ok());
1570        assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1571
1572        let mut buffer = ByteBuffer::default();
1573        buffer.encode(
1574            &Play::new(
1575                AmfString::new(topic_id),
1576                Number::from(-2i8)
1577            )
1578        );
1579        handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1580        let mut stream = pin!(VecStream::default());
1581        assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1582        assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1583        assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1584        assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1585
1586        let mut buffer = ByteBuffer::default();
1587        buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1588        handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1589        assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1590    }
1591
1592    #[tokio::test]
1593    async fn ok_valid_subscriber_sequence_in_obs() {
1594        if exists(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap() {
1595            from_filename(format!("{}{MAIN_SEPARATOR}.env.test", env!("CARGO_MANIFEST_DIR"))).unwrap();
1596        }
1597
1598        MIGRATOR.get_or_init(migrate_once).await;
1599
1600        let temp_dir = temp_dir();
1601        let storage_path = format!("{}{MAIN_SEPARATOR}sheave", temp_dir.display());
1602
1603        let app = "ondemand";
1604        let copy_to = format!("{storage_path}{MAIN_SEPARATOR}{app}");
1605        create_dir_all(&copy_to).unwrap();
1606
1607        let mut resources_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1608        resources_path.pop();
1609        resources_path.push("resources");
1610        resources_path.push("test.flv");
1611        let topic_id = Uuid::now_v7().to_string();
1612        copy(resources_path, &format!("{copy_to}{MAIN_SEPARATOR}{topic_id}.flv")).unwrap();
1613
1614        let database_url = var("DATABASE_URL").unwrap();
1615
1616        let mut rtmp_context = RtmpContext::default();
1617        rtmp_context.set_storage_path(&storage_path);
1618        rtmp_context.set_database_url(&database_url);
1619        rtmp_context.set_app(app);
1620        rtmp_context.set_client_addr(CLIENT_ADDR);
1621
1622        query(STATEMENT)
1623            .bind(&topic_id)
1624            .bind(&CLIENT_ADDR.to_string())
1625            .execute(&mut MySqlConnection::connect(&database_url).await.unwrap())
1626            .await
1627            .unwrap();
1628
1629        let mut stream = pin!(VecStream::default());
1630        let mut buffer = ByteBuffer::default();
1631        buffer.encode(
1632            &Connect::new(
1633                object!(
1634                    "app" => AmfString::from(app),
1635                    "fpad" => Boolean::new(0)
1636                )
1637            )
1638        );
1639        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1640        assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1641        assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1642
1643        let mut buffer = ByteBuffer::default();
1644        buffer.encode(&WindowAcknowledgementSize::default());
1645        handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1646        assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1647
1648        let mut buffer = ByteBuffer::default();
1649        buffer.encode(&CreateStream);
1650        handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1651        let mut stream = pin!(VecStream::default());
1652        assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1653        assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1654
1655        let mut buffer = ByteBuffer::default();
1656        buffer.encode(&FcSubscribe::new(AmfString::new(topic_id.clone())));
1657        handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1658        assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1659
1660        let mut buffer = ByteBuffer::default();
1661        buffer.encode(
1662            &SetPlaylist::new(
1663                ecma_array!(
1664                    "0" => AmfString::new(topic_id.clone())
1665                )
1666            )
1667        );
1668        handle_message(stream.as_mut()).handle_playlist_request(&mut rtmp_context, buffer).await.unwrap();
1669        let mut stream = pin!(VecStream::default());
1670        assert!(handle_message(stream.as_mut()).write_playlist_response(&mut rtmp_context).await.is_ok());
1671        assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1672
1673        let mut buffer = ByteBuffer::default();
1674        buffer.encode(
1675            &Play::new(
1676                AmfString::new(topic_id),
1677                Number::from(-2i8)
1678            )
1679        );
1680        handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1681        let mut stream = pin!(VecStream::default());
1682        assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1683        assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1684        assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1685        assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1686
1687        let mut buffer = ByteBuffer::default();
1688        buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1689        handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1690        assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1691    }
1692}