sheave_server/handlers/
rtmp.rs

1use std::{
2    future::Future,
3    io::{
4        Error as IOError,
5        ErrorKind,
6        Result as IOResult
7    },
8    pin::{
9        Pin,
10        pin
11    },
12    sync::Arc,
13    task::{
14        Context as FutureContext,
15        Poll
16    },
17    time::{
18        Duration,
19        Instant
20    }
21};
22use log::{
23    debug,
24    error,
25    info
26};
27use futures::ready;
28use tokio::io::{
29    AsyncRead,
30    AsyncWrite
31};
32use sheave_core::{
33    ByteBuffer,
34    Decoder,
35    Encoder,
36    U24_MAX,
37    flv::tags::*,
38    handlers::{
39        AsyncHandler,
40        AsyncHandlerExt,
41        ClientType,
42        ErrorHandler,
43        HandlerConstructor,
44        LastChunk,
45        PublisherStatus,
46        RtmpContext,
47        StreamWrapper,
48        SubscriberStatus,
49        inconsistent_sha,
50        stream_got_exhausted
51    },
52    handshake::{
53        Handshake,
54        Version
55    },
56    messages::{
57        /* Used in common */
58        Channel,
59        ChunkData,
60        CommandError,
61        Connect,
62        ConnectResult,
63        CreateStream,
64        CreateStreamResult,
65        EventType,
66        UserControl,
67        OnStatus,
68        Audio,
69        Video,
70        SetDataFrame,
71        Acknowledgement,
72        amf::v0::{
73            AmfString,
74            Number,
75            Object
76        },
77        headers::MessageType,
78
79        /* Publisher-side */
80        ReleaseStream,
81        ReleaseStreamResult,
82        FcPublish,
83        OnFcPublish,
84        StreamBegin,
85        Publish,
86        FcUnpublish,
87        DeleteStream,
88
89        /* Subscriber-side */
90        WindowAcknowledgementSize,
91        FcSubscribe,
92        GetStreamLength,
93        GetStreamLengthResult,
94        SetPlaylist,
95        PlaylistReady,
96        Play,
97        SetBufferLength,
98    },
99    net::RtmpReadExt,
100    object,
101    readers::*,
102    writers::*
103};
104use super::{
105    /* Used in common */
106    inconsistent_app_path,
107    undistinguishable_client,
108    empty_topic_path,
109    inconsistent_topic_path,
110    middlewares::write_acknowledgement,
111
112    /* Publisher-side */
113    publish_topic,
114    provide_message_id,
115    unpublish_topic,
116    return_message_id,
117
118    /* Subscriver-side */
119    subscribe_topic,
120    metadata_not_found,
121};
122
123#[doc(hidden)]
124#[derive(Debug)]
125struct HandshakeHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
126
127#[doc(hidden)]
128impl<RW: AsyncRead + AsyncWrite + Unpin> HandshakeHandler<'_, RW> {
129    async fn handle_first_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
130        let encryption_algorithm = read_encryption_algorithm(pin!(self.0.await_until_receiving())).await?;
131        let mut client_request = read_handshake(pin!(self.0.await_until_receiving())).await?;
132
133        if client_request.get_version() == Version::UNSIGNED {
134            let server_request = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
135            write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
136            write_handshake(self.0.as_mut(), &server_request).await?;
137            write_handshake(self.0.as_mut(), &client_request).await?;
138
139            rtmp_context.set_encryption_algorithm(encryption_algorithm);
140            rtmp_context.set_server_handshake(server_request);
141            rtmp_context.set_client_handshake(client_request);
142        } else {
143            if !client_request.did_digest_match(encryption_algorithm, Handshake::CLIENT_KEY) {
144                error!("Invalid SHA digest/signature: {:x?}", client_request.get_digest(encryption_algorithm));
145                return Err(inconsistent_sha(client_request.get_digest(encryption_algorithm).to_vec()))
146            } else {
147                let mut server_request = Handshake::new(Instant::now().elapsed(), Version::LATEST_SERVER);
148                server_request.imprint_digest(encryption_algorithm, Handshake::SERVER_KEY);
149                let mut server_response_key: Vec<u8> = Vec::new();
150                server_response_key.extend_from_slice(Handshake::SERVER_KEY);
151                server_response_key.extend_from_slice(Handshake::COMMON_KEY);
152                client_request.imprint_signature(encryption_algorithm, &server_response_key);
153                write_encryption_algorithm(self.0.as_mut(), encryption_algorithm).await?;
154                write_handshake(self.0.as_mut(), &server_request).await?;
155                write_handshake(self.0.as_mut(), &client_request).await?;
156
157                rtmp_context.set_signed(true);
158                rtmp_context.set_encryption_algorithm(encryption_algorithm);
159                rtmp_context.set_server_handshake(server_request);
160                rtmp_context.set_client_handshake(client_request);
161            }
162        }
163
164        info!("First handshake got handled.");
165        Ok(())
166    }
167
168    async fn handle_second_handshake(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
169        let client_response = read_handshake(pin!(self.0.await_until_receiving())).await?;
170
171        if !rtmp_context.is_signed() {
172            rtmp_context.set_server_handshake(client_response);
173        } else {
174            let encryption_algorithm = rtmp_context.get_encryption_algorithm().unwrap();
175            let mut client_response_key: Vec<u8> = Vec::new();
176            client_response_key.extend_from_slice(Handshake::CLIENT_KEY);
177            client_response_key.extend_from_slice(Handshake::COMMON_KEY);
178            let server_request = rtmp_context.get_server_handshake().unwrap();
179            // NOTE: FFmpeg acts the handshake but imprints no signature.
180            if !client_response.did_signature_match(encryption_algorithm, &client_response_key) && server_request.get_signature() != client_response.get_signature() {
181                error!("Invalid SHA digest/signature: {:x?}", client_response.get_signature());
182                return Err(inconsistent_sha(client_response.get_signature().to_vec()))
183            } else {
184                debug!("Handshake version: {:?}", client_response.get_version());
185                debug!("Signature: {:x?}", client_response.get_signature());
186                rtmp_context.set_server_handshake(client_response);
187            }
188        }
189
190        info!("Second handshake got handled.");
191        Ok(())
192    }
193}
194
195#[doc(hidden)]
196impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for HandshakeHandler<'_, RW> {
197    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
198        ready!(pin!(self.handle_first_handshake(rtmp_context)).poll(cx))?;
199        pin!(self.handle_second_handshake(rtmp_context)).poll(cx)
200    }
201}
202
203#[doc(hidden)]
204fn handle_handshake<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> HandshakeHandler<'a, RW> {
205    HandshakeHandler(stream)
206}
207
208#[doc(hidden)]
209#[derive(Debug)]
210struct MessageHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
211
212#[doc(hidden)]
213impl<RW: AsyncRead + AsyncWrite + Unpin> MessageHandler<'_, RW> {
214    async fn handle_acknowledgement(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
215        Decoder::<Acknowledgement>::decode(&mut buffer)?;
216
217        info!("Acknowledgement got handled.");
218        Ok(())
219    }
220
221    async fn handle_connect_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
222        let connect_request: Connect = buffer.decode()?;
223        rtmp_context.set_command_object(connect_request.into());
224
225        info!("connect got handled.");
226        Ok(())
227    }
228
229    async fn handle_window_acknowledgement_size(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
230        let window_acknowledgement_size: WindowAcknowledgementSize = buffer.decode()?;
231        rtmp_context.set_window_acknowledgement_size(window_acknowledgement_size);
232
233        /*
234         *  NOTE:
235         *      Makes status to update during request handling.
236         *      Because of response not required.
237         */
238        rtmp_context.set_subscriber_status(SubscriberStatus::WindowAcknowledgementSizeGotSent);
239
240        info!("Window Acknowledgement Size got handled.");
241        Ok(())
242    }
243
244    async fn handle_release_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
245        let release_stream_request: ReleaseStream = buffer.decode()?;
246        rtmp_context.set_topic_path(release_stream_request.into());
247
248        info!("releaseStream got handled.");
249        Ok(())
250    }
251
252    async fn handle_fc_publish_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
253        Decoder::<FcPublish>::decode(&mut buffer)?;
254
255        info!("FCPublish got handled.");
256        Ok(())
257    }
258
259    async fn handle_create_stream_request(&mut self, _: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
260        Decoder::<CreateStream>::decode(&mut buffer)?;
261
262        info!("createStream got handled.");
263        Ok(())
264    }
265
266    async fn handle_fc_subscribe_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
267        let topic_database_url = rtmp_context.get_topic_database_url().unwrap().clone();
268        let topic_storage_path = rtmp_context.get_topic_storage_path().unwrap().clone();
269        let client_addr = rtmp_context.get_client_addr().unwrap();
270        let app = rtmp_context.get_app().unwrap().clone();
271
272        let fc_subscribe_request: FcSubscribe = buffer.decode()?;
273        /*
274         *  NOTE:
275         *      Makes topic to subscribe during request handling.
276         *      Because onFCSubscribe command is undefined about its specification.
277         */
278        let topic = subscribe_topic(&topic_database_url, &topic_storage_path, &app, fc_subscribe_request.get_topic_path(), RtmpContext::DIRECTORY_SEPARATOR, client_addr).await?;
279        rtmp_context.set_topic(topic);
280        rtmp_context.set_topic_path(fc_subscribe_request.into());
281
282        rtmp_context.set_subscriber_status(SubscriberStatus::FcSubscribed);
283
284        info!("FCSubscribe got handled.");
285        Ok(())
286    }
287
288    async fn handle_stream_length_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
289        let get_stream_length_request: GetStreamLength = buffer.decode()?;
290        rtmp_context.set_topic_path(get_stream_length_request.into());
291
292        info!("getStreamLength got handled.");
293        Ok(())
294    }
295
296    async fn handle_playlist_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
297        let set_playlist_request: SetPlaylist = buffer.decode()?;
298        rtmp_context.set_playlist(set_playlist_request.into());
299
300        info!("set playlist got handled.");
301        Ok(())
302    }
303
304    async fn handle_publish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
305        let publish_request: Publish = buffer.decode()?;
306        let (publishing_name, publishing_type): (AmfString, AmfString) = publish_request.into();
307        rtmp_context.set_publishing_name(publishing_name);
308        rtmp_context.set_publishing_type(publishing_type);
309
310        info!("publish got handled.");
311        Ok(())
312    }
313
314    async fn handle_play_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
315        let play_request: Play = buffer.decode()?;
316        let (stream_name, start_time, play_mode) = play_request.into();
317        rtmp_context.set_stream_name(stream_name);
318        rtmp_context.set_start_time(start_time);
319        rtmp_context.set_play_mode(play_mode);
320
321        info!("play got handled.");
322        Ok(())
323    }
324
325    async fn handle_buffer_length(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
326        let buffer_length: SetBufferLength = buffer.decode()?;
327        rtmp_context.set_buffer_length(buffer_length.get_buffering_time());
328
329        /*
330         * NOTE:
331         *  OBS sends SetBufferLength event also before the play command.
332         *  However its step isn't necessarily also other tools have implemented.
333         *  Therefore The sheave doesn't count up status except received after the play command implemented as the common step.
334         */
335        if let Some(SubscriberStatus::Played) = rtmp_context.get_subscriber_status() {
336            rtmp_context.set_subscriber_status(SubscriberStatus::BufferLengthGotSent);
337        }
338
339        Ok(())
340    }
341
342    async fn handle_user_control(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
343        use EventType::*;
344
345        let event_type: EventType = buffer.get_u16_be()?.into();
346        match event_type {
347            SetBufferLength => self.handle_buffer_length(rtmp_context, buffer).await,
348            _ => unimplemented!("Undefined event type: {event_type:?}")
349        }
350    }
351
352    async fn handle_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
353        let topic_database_url = rtmp_context.get_topic_database_url().unwrap().clone();
354        let topic_storage_path = rtmp_context.get_topic_storage_path().unwrap().clone();
355        let client_addr = rtmp_context.get_client_addr().unwrap();
356        let app = rtmp_context.get_app().unwrap().clone();
357
358        let fc_unpublish_request: FcUnpublish = buffer.decode()?;
359        unpublish_topic(&topic_database_url, &topic_storage_path, &app, fc_unpublish_request.get_topic_path(), RtmpContext::DIRECTORY_SEPARATOR, client_addr).await?;
360        rtmp_context.reset_topic_path();
361
362        info!("FCUnpublish got handled.");
363        Ok(())
364    }
365
366    async fn handle_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
367        let delete_stream_request: DeleteStream = buffer.decode()?;
368        return_message_id(delete_stream_request.into());
369        rtmp_context.reset_message_id();
370
371        info!("deleteStream got handled.");
372        Ok(())
373    }
374
375    async fn handle_publisher_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
376        use PublisherStatus::*;
377
378        let command: AmfString = buffer.decode()?;
379        let transaction_id: Number = buffer.decode()?;
380        rtmp_context.set_command_name(command.clone());
381        rtmp_context.set_transaction_id(transaction_id);
382
383        if command == "FCUnpublish" {
384            return self.handle_fc_unpublish_request(rtmp_context, buffer).await
385        }
386        if command == "deleteStream" {
387            return self.handle_delete_stream_request(rtmp_context, buffer).await
388        }
389
390        match rtmp_context.get_publisher_status().unwrap() {
391            Connected => self.handle_release_stream_request(rtmp_context, buffer).await,
392            Released => self.handle_fc_publish_request(rtmp_context, buffer).await,
393            FcPublished => self.handle_create_stream_request(rtmp_context, buffer).await,
394            Created => self.handle_publish_request(rtmp_context, buffer).await,
395            _ => Ok(())
396        }
397    }
398
399    async fn handle_subscriber_request(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer) -> IOResult<()> {
400        use SubscriberStatus::*;
401
402        let subscriber_status = rtmp_context.get_subscriber_status().unwrap();
403
404        let command: AmfString = buffer.decode()?;
405        let transaction_id: Number = buffer.decode()?;
406        rtmp_context.set_command_name(command.clone());
407        rtmp_context.set_transaction_id(transaction_id);
408
409        if subscriber_status == FcSubscribed {
410            /* NOTE: FFmpeg will send this. */
411            if command == "getStreamLength" {
412                return self.handle_stream_length_request(rtmp_context, buffer).await
413            }
414            /* NOTE: OBS will send this. */
415            if command == "set_playlist" {
416                return self.handle_playlist_request(rtmp_context, buffer).await
417            }
418        }
419
420        match subscriber_status {
421            Connected => Ok(()),
422            /* Subscriber sends a Window Acknowledgement Size chunk just after the connect command. */
423            WindowAcknowledgementSizeGotSent => self.handle_create_stream_request(rtmp_context, buffer).await,
424            Created => self.handle_fc_subscribe_request(rtmp_context, buffer).await,
425            AdditionalCommandGotSent => self.handle_play_request(rtmp_context, buffer).await,
426            _ => Ok(())
427        }
428    }
429
430    async fn handle_command_request(&mut self, rtmp_context: &mut RtmpContext, buffer: ByteBuffer) -> IOResult<()> {
431        use ClientType::*;
432
433        if let Some(client_type) = rtmp_context.get_client_type() {
434            match client_type {
435                Publisher => self.handle_publisher_request(rtmp_context, buffer).await,
436                Subscriber => self.handle_subscriber_request(rtmp_context, buffer).await
437            }
438        } else {
439            self.handle_connect_request(rtmp_context, buffer).await
440        }
441    }
442
443    async fn handle_flv(&mut self, rtmp_context: &mut RtmpContext, mut buffer: ByteBuffer, message_type: MessageType, timestamp: Duration) -> IOResult<()> {
444        let topic = rtmp_context.get_topic().unwrap();
445
446        let tag_type = match message_type {
447            MessageType::Audio => TagType::Audio,
448            MessageType::Video => TagType::Video,
449            MessageType::Data => TagType::ScriptData,
450            _ => TagType::Other
451        };
452
453        if let TagType::ScriptData = tag_type {
454            // NOTE: Currently @setDataFrame command is used for nothing.
455            Decoder::<AmfString>::decode(&mut buffer)?;
456        }
457
458        let data: Vec<u8> = buffer.into();
459        let flv_tag = FlvTag::new(tag_type, timestamp, data);
460        topic.append_flv_tag(flv_tag)?;
461
462        info!("FLV chunk got handled.");
463        Ok(())
464    }
465
466    async fn write_error_response(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
467        let mut buffer = ByteBuffer::default();
468        buffer.encode(&AmfString::from("_error"));
469        buffer.encode(&rtmp_context.get_transaction_id());
470        buffer.encode(&CommandError::new(information.clone()));
471        write_chunk(self.0.as_mut(), rtmp_context, CommandError::CHANNEL.into(), Duration::default(), CommandError::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
472
473        rtmp_context.set_information(information);
474
475        error!("{error}");
476        return Err(error)
477    }
478
479    async fn write_connect_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
480        use ClientType::*;
481
482        let command_object = rtmp_context.get_command_object().unwrap().clone();
483
484        let client_type = if command_object.get_properties().get("type").is_some() {
485            Publisher
486        } else if command_object.get_properties().get("fpad").is_some() {
487            Subscriber
488        } else {
489            let information = object!(
490                "level" => AmfString::from("error"),
491                "code" => AmfString::from("NetConnection.Connect.UndistinguishableClient"),
492                "description" => AmfString::from("Server couldn't distinguish you are either publisher or subscriber.")
493            );
494            return self.write_error_response(rtmp_context, information, undistinguishable_client()).await
495        };
496
497        let app = rtmp_context.get_app().unwrap().clone();
498        let requested_app: &AmfString = (&command_object.get_properties()["app"]).into();
499        if *requested_app != app {
500            let information = object!(
501                "level" => AmfString::from("error"),
502                "code" => AmfString::from("NetConnection.Connect.InconsistentAppPath"),
503                "description" => AmfString::new(format!("Requested app path is inconsistent. expected: {}, actual: {}", app, requested_app))
504            );
505            return self.write_error_response(rtmp_context, information, inconsistent_app_path(app, requested_app.clone())).await
506        }
507
508        let properties = object!(
509            "fmsVer" => AmfString::from("FMS/5,0,17"),
510            "capabilities" => Number::from(31)
511        );
512        let information = object!(
513            "level" => AmfString::from("status"),
514            "code" => AmfString::from("NetConnection.Connect.Success"),
515            "description" => AmfString::from("Connection succeeded."),
516            "objectEncoding" => Number::from(0)
517        );
518        let mut buffer = ByteBuffer::default();
519        buffer.encode(&AmfString::from("_result"));
520        buffer.encode(&rtmp_context.get_transaction_id());
521        buffer.encode(&ConnectResult::new(properties.clone(), information.clone()));
522        write_chunk(self.0.as_mut(), rtmp_context, ConnectResult::CHANNEL.into(), Duration::default(), ConnectResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
523
524        rtmp_context.set_client_type(client_type);
525        rtmp_context.set_properties(properties);
526        rtmp_context.set_information(information);
527
528        match client_type {
529            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Connected),
530            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Connected)
531        }
532
533        info!("connect result got sent.");
534        Ok(())
535    }
536
537    async fn write_release_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
538        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
539
540        if topic_path.is_empty() {
541            let information = object!(
542                "level" => AmfString::from("error"),
543                "code" => AmfString::from("NetConnection.ReleaseStream.EmptyTopicPath"),
544                "description" => AmfString::from("The topic path must not be empty.")
545            );
546            return self.write_error_response(rtmp_context, information, empty_topic_path()).await
547        }
548
549        let topic_database_url = rtmp_context.get_topic_database_url().unwrap().clone();
550        let topic_storage_path = rtmp_context.get_topic_storage_path().unwrap().clone();
551        let client_addr = rtmp_context.get_client_addr().unwrap();
552        let app = rtmp_context.get_app().unwrap().clone();
553
554        let topic = match publish_topic(&topic_database_url, &topic_storage_path, &app, &topic_path, RtmpContext::DIRECTORY_SEPARATOR, client_addr).await {
555            Ok(topic) => topic,
556            Err(e) => {
557                let information = object!(
558                    "level" => AmfString::from("error"),
559                    "code" => AmfString::from("NetConnection.ReleaseStream.StreamIsUnpublished"),
560                    "description" => AmfString::new(format!("A stream of {topic_path} is unpublished."))
561                );
562                return self.write_error_response(rtmp_context, information, e).await
563            }
564        };
565
566        let mut buffer = ByteBuffer::default();
567        buffer.encode(&AmfString::from("_result"));
568        buffer.encode(&rtmp_context.get_transaction_id());
569        buffer.encode(&ReleaseStreamResult);
570        write_chunk(self.0.as_mut(), rtmp_context, ReleaseStreamResult::CHANNEL.into(), Duration::default(), ReleaseStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
571
572        rtmp_context.set_topic(topic);
573
574        rtmp_context.set_publisher_status(PublisherStatus::Released);
575
576        info!("releaseStream result got sent.");
577        Ok(())
578    }
579
580    async fn write_fc_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
581        let mut buffer = ByteBuffer::default();
582        buffer.encode(&AmfString::from("onFCPublish"));
583        buffer.encode(&OnFcPublish);
584        write_chunk(self.0.as_mut(), rtmp_context, OnFcPublish::CHANNEL.into(), Duration::default(), OnFcPublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
585
586        rtmp_context.set_publisher_status(PublisherStatus::FcPublished);
587
588        info!("onFCPublish got sent.");
589        Ok(())
590    }
591
592    async fn write_create_stream_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
593        use ClientType::*;
594
595        let client_type = rtmp_context.get_client_type().unwrap();
596
597        let message_id = provide_message_id();
598        let mut buffer = ByteBuffer::default();
599        buffer.encode(&AmfString::from("_result"));
600        buffer.encode(&rtmp_context.get_transaction_id());
601        buffer.encode(&CreateStreamResult::new(message_id.into()));
602        write_chunk(self.0.as_mut(), rtmp_context, CreateStreamResult::CHANNEL.into(), Duration::default(), CreateStreamResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
603
604        rtmp_context.set_message_id(message_id);
605
606        match client_type {
607            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Created),
608            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Created)
609        }
610
611        info!("createStream result got sent.");
612        Ok(())
613    }
614
615    async fn write_stream_length_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
616        let transaction_id = rtmp_context.get_transaction_id();
617        let topic = rtmp_context.get_topic_mut().unwrap();
618
619        for result in topic {
620            let flv_tag = result?;
621
622            if flv_tag.get_tag_type() == TagType::ScriptData {
623                let mut buffer = ByteBuffer::default();
624                buffer.put_bytes(flv_tag.get_data());
625                let script_data: ScriptDataTag = buffer.decode()?;
626
627                if *script_data.get_name() == "onMetaData" {
628                    let duration: &Number = (&script_data.get_value().get_properties()["duration"]).into();
629                    let mut buffer = ByteBuffer::default();
630                    buffer.encode(&AmfString::from("_result"));
631                    buffer.encode(&transaction_id);
632                    buffer.encode(&GetStreamLengthResult::new(*duration));
633                    write_chunk(self.0.as_mut(), rtmp_context, GetStreamLengthResult::CHANNEL.into(), Duration::default(), GetStreamLengthResult::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
634
635                    rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
636
637                    info!("getStreamLength result got sent.");
638                    return Ok(())
639                }
640            }
641        }
642
643        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
644        let information = object!(
645            "level" => AmfString::from("error"),
646            "code" => AmfString::from("NetConnection.GetStreamLength.MetadataNotFound"),
647            "description" => AmfString::new(format!("Metadata didn't find in specified topic path: {topic_path}"))
648        );
649        self.write_error_response(rtmp_context, information, metadata_not_found(topic_path)).await
650    }
651
652    async fn write_playlist_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
653        let mut buffer = ByteBuffer::default();
654        buffer.encode(&AmfString::from("playlist_ready"));
655        buffer.encode(&PlaylistReady);
656        write_chunk(self.0.as_mut(), rtmp_context, PlaylistReady::CHANNEL.into(), Duration::default(), PlaylistReady::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
657
658        rtmp_context.set_subscriber_status(SubscriberStatus::AdditionalCommandGotSent);
659
660        info!("playlist_ready got sent.");
661        Ok(())
662    }
663
664    async fn write_stream_begin(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
665        use ClientType::*;
666
667        let message_id = rtmp_context.get_message_id().unwrap();
668        let mut buffer = ByteBuffer::default();
669        buffer.put_u16_be(StreamBegin::EVENT_TYPE.into());
670        buffer.encode(&StreamBegin::new(message_id));
671        write_chunk(self.0.as_mut(), rtmp_context, StreamBegin::CHANNEL.into(), Duration::default(), StreamBegin::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
672
673        match rtmp_context.get_client_type().unwrap() {
674            Publisher => rtmp_context.set_publisher_status(PublisherStatus::Began),
675            Subscriber => rtmp_context.set_subscriber_status(SubscriberStatus::Began)
676        }
677
678        info!("Stream Begin got sent.");
679        Ok(())
680    }
681
682    async fn write_error_status(&mut self, rtmp_context: &mut RtmpContext, information: Object, error: IOError) -> IOResult<()> {
683        let message_id = rtmp_context.get_message_id().unwrap();
684
685        let mut buffer = ByteBuffer::default();
686        buffer.encode(&AmfString::from("onStatus"));
687        buffer.encode(&Number::from(0));
688        buffer.encode(&OnStatus::new(information.clone()));
689        write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
690
691        rtmp_context.set_information(information);
692
693        error!("{error}");
694        return Err(error)
695    }
696
697    async fn write_publish_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
698        let message_id = rtmp_context.get_message_id().unwrap();
699        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
700        let publishing_name = rtmp_context.get_publishing_name().unwrap().clone();
701
702        if topic_path != publishing_name {
703            let information = object!(
704                "level" => AmfString::from("error"),
705                "code" => AmfString::from("NetStream.Publish.InconsistentPlaypath"),
706                "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_path}, actual: {publishing_name}"))
707            );
708            return self.write_error_status(rtmp_context, information, inconsistent_topic_path(topic_path, publishing_name)).await
709        }
710
711        let information = object!(
712            "level" => AmfString::from("status"),
713            "code" => AmfString::from("NetStream.Publish.Start"),
714            "description" => AmfString::new(format!("{publishing_name} is now published")),
715            "details" => publishing_name.clone()
716        );
717        let mut buffer = ByteBuffer::default();
718        buffer.encode(&AmfString::from("onStatus"));
719        buffer.encode(&Number::from(0));
720        buffer.encode(&OnStatus::new(information.clone()));
721        write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
722
723        rtmp_context.set_information(information);
724
725        rtmp_context.set_publisher_status(PublisherStatus::Published);
726
727        info!("onStatus(publish) got sent.");
728        Ok(())
729    }
730
731    async fn write_play_response(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
732        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
733        let message_id = rtmp_context.get_message_id().unwrap();
734        let stream_name = rtmp_context.get_stream_name().unwrap().clone();
735
736        if topic_path != stream_name {
737            let information = object!(
738                "level" => AmfString::from("error"),
739                "code" => AmfString::from("NetStream.Play.InconsistentTopicPath"),
740                "description" => AmfString::new(format!("Requested name is inconsistent. expected: {topic_path}, actual: {stream_name}"))
741            );
742            return self.write_error_status(rtmp_context, information, inconsistent_topic_path(topic_path, stream_name)).await
743        }
744
745        let information = object!(
746            "level" => AmfString::from("status"),
747            "code" => AmfString::from("NetStream.Play.Start"),
748            "description" => AmfString::from("Playing stream.")
749        );
750        let mut buffer = ByteBuffer::default();
751        buffer.encode(&AmfString::from("onStatus"));
752        buffer.encode(&Number::from(0));
753        buffer.encode(&OnStatus::new(information.clone()));
754        write_chunk(self.0.as_mut(), rtmp_context, OnStatus::CHANNEL.into(), Duration::default(), OnStatus::MESSAGE_TYPE, message_id, &Vec::<u8>::from(buffer)).await?;
755
756        rtmp_context.set_information(information);
757
758        rtmp_context.set_subscriber_status(SubscriberStatus::Played);
759
760        info!("onStatus(play) got sent.");
761        Ok(())
762    }
763
764    async fn write_flv(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
765        for next in rtmp_context.get_topic_mut().unwrap() {
766            let flv_tag = next?;
767            let message_id = rtmp_context.get_message_id().unwrap();
768
769            let channel;
770            let message_type;
771            match flv_tag.get_tag_type() {
772                TagType::Audio => {
773                    channel = Audio::CHANNEL;
774                    message_type = Audio::MESSAGE_TYPE;
775                },
776                TagType::Video => {
777                    channel = Video::CHANNEL;
778                    message_type = Video::MESSAGE_TYPE;
779                },
780                TagType::ScriptData => {
781                    channel = SetDataFrame::CHANNEL;
782                    message_type = SetDataFrame::MESSAGE_TYPE;
783                },
784                TagType::Other => {
785                    channel = Channel::Other;
786                    message_type = MessageType::Other;
787                }
788            }
789            let timestamp = flv_tag.get_timestamp();
790            let data: Vec<u8> = if let MessageType::Data = message_type {
791                let mut buffer = ByteBuffer::default();
792                buffer.encode(&AmfString::from("@setDataFrame"));
793                buffer.put_bytes(flv_tag.get_data());
794                buffer.into()
795            } else {
796                flv_tag.get_data().to_vec()
797            };
798            write_chunk(self.0.as_mut(), rtmp_context, channel.into(), timestamp, message_type, message_id, &data).await?;
799
800            info!("FLV chunk got sent.");
801            return Ok(())
802        }
803
804        info!("FLV data became empty.");
805        Err(stream_got_exhausted())
806    }
807}
808
809#[doc(hidden)]
810impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for MessageHandler<'_, RW> {
811    fn poll_handle(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
812        use MessageType::*;
813
814        let basic_header = ready!(pin!(read_basic_header(pin!(self.0.await_until_receiving()))).poll(cx))?;
815        let message_header = ready!(pin!(read_message_header(pin!(self.0.await_until_receiving()), basic_header.get_message_format())).poll(cx))?;
816        let extended_timestamp = if let Some(timestamp) = message_header.get_timestamp() {
817            if timestamp.as_millis() == U24_MAX as u128 {
818                let extended_timestamp = ready!(pin!(read_extended_timestamp(pin!(self.0.await_until_receiving()))).poll(cx))?;
819                Some(extended_timestamp)
820            } else {
821                None
822            }
823        } else {
824            None
825        };
826
827        let chunk_id = basic_header.get_chunk_id();
828        if let Some(last_received_chunk) = rtmp_context.get_last_received_chunk_mut(&chunk_id) {
829            if let Some(extended_timestamp) = extended_timestamp {
830                last_received_chunk.set_timestamp(extended_timestamp);
831            } else {
832                message_header.get_timestamp().map(
833                    |timestamp| last_received_chunk.set_timestamp(timestamp)
834                );
835            }
836            message_header.get_message_length().map(
837                |message_length| last_received_chunk.set_message_length(message_length)
838            );
839            message_header.get_message_type().map(
840                |message_type| last_received_chunk.set_message_type(message_type)
841            );
842            message_header.get_message_id().map(
843                |message_id| last_received_chunk.set_message_id(message_id)
844            );
845        } else {
846            rtmp_context.insert_received_chunk(
847                chunk_id,
848                LastChunk::new(
849                    message_header.get_timestamp().unwrap(),
850                    message_header.get_message_length().unwrap(),
851                    message_header.get_message_type().unwrap(),
852                    message_header.get_message_id().unwrap()
853                )
854            );
855        } 
856        let data = ready!(
857            pin!(
858                read_chunk_data(
859                    pin!(self.0.await_until_receiving()),
860                    rtmp_context.get_receiving_chunk_size(),
861                    rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_length()
862                )
863            ).poll(cx)
864        )?;
865        let buffer: ByteBuffer = data.into();
866
867        let message_type = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_message_type();
868        match message_type {
869            Acknowledgement => ready!(pin!(self.handle_acknowledgement(rtmp_context, buffer)).poll(cx))?,
870            UserControl => ready!(pin!(self.handle_user_control(rtmp_context, buffer)).poll(cx))?,
871            WindowAcknowledgementSize => ready!(pin!(self.handle_window_acknowledgement_size(rtmp_context, buffer)).poll(cx))?,
872            Audio | Video | Data => {
873                let timestamp = rtmp_context.get_last_received_chunk(&chunk_id).unwrap().get_timestamp();
874                ready!(pin!(self.handle_flv(rtmp_context, buffer, message_type, timestamp)).poll(cx))?
875            },
876            Command => ready!(pin!(self.handle_command_request(rtmp_context, buffer)).poll(cx))?,
877            other => unimplemented!("Undefined Message: {other:?}")
878        }
879
880        if let Some(publisher_status) = rtmp_context.get_publisher_status() {
881            match publisher_status {
882                PublisherStatus::Connected => pin!(self.write_release_stream_response(rtmp_context)).poll(cx),
883                PublisherStatus::Released => pin!(self.write_fc_publish_response(rtmp_context)).poll(cx),
884                PublisherStatus::FcPublished => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
885                PublisherStatus::Created => {
886                    ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
887                    pin!(self.write_publish_response(rtmp_context)).poll(cx)
888                },
889                _ => {
890                    /* Just receiving flv after publishing. */
891                    Poll::Ready(Ok(()))
892                }
893            }
894        } else if let Some(mut subscriber_status) = rtmp_context.get_subscriber_status() {
895            if subscriber_status == SubscriberStatus::FcSubscribed {
896                let command = rtmp_context.get_command_name().unwrap().clone();
897
898                if command == "getStreamLength" {
899                    return pin!(self.write_stream_length_response(rtmp_context)).poll(cx)
900                } else if command == "set_playlist" {
901                    return pin!(self.write_playlist_response(rtmp_context)).poll(cx)
902                } else {
903                    subscriber_status = SubscriberStatus::AdditionalCommandGotSent;
904                }
905            }
906
907            match subscriber_status {
908                SubscriberStatus::WindowAcknowledgementSizeGotSent => pin!(self.write_create_stream_response(rtmp_context)).poll(cx),
909                SubscriberStatus::AdditionalCommandGotSent => {
910                    ready!(pin!(self.write_stream_begin(rtmp_context)).poll(cx))?;
911                    pin!(self.write_play_response(rtmp_context)).poll(cx)
912                },
913                SubscriberStatus::Played => pin!(self.write_flv(rtmp_context)).poll(cx),
914                _ => {
915                    /* NOTE: There are plural chunks just to receive. */
916                    Poll::Ready(Ok(()))
917                }
918            }
919        } else {
920            pin!(self.write_connect_response(rtmp_context)).poll(cx)
921        }
922    }
923}
924
925#[doc(hidden)]
926fn handle_message<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> MessageHandler<'a, RW> {
927    MessageHandler(stream)
928}
929
930#[doc(hidden)]
931#[derive(Debug)]
932struct CloseHandler<'a, RW: AsyncRead + AsyncWrite + Unpin>(Pin<&'a mut RW>);
933
934#[doc(hidden)]
935impl<RW: AsyncRead + AsyncWrite + Unpin> CloseHandler<'_, RW> {
936    async fn write_fc_unpublish_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
937        let topic_path = rtmp_context.get_topic_path().unwrap().clone();
938        rtmp_context.increase_transaction_id();
939
940        let mut buffer = ByteBuffer::default();
941        buffer.encode(&AmfString::from("FCUnpublish"));
942        buffer.encode(&rtmp_context.get_transaction_id());
943        buffer.encode(&FcUnpublish::new(topic_path));
944        write_chunk(self.0.as_mut(), rtmp_context, FcUnpublish::CHANNEL.into(), Duration::default(), FcUnpublish::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
945
946        info!("FCUnpublish got sent.");
947        Ok(())
948    }
949
950    async fn write_delete_stream_request(&mut self, rtmp_context: &mut RtmpContext) -> IOResult<()> {
951        let message_id = rtmp_context.get_message_id().unwrap();
952        rtmp_context.increase_transaction_id();
953
954        let mut buffer = ByteBuffer::default();
955        buffer.encode(&AmfString::from("deleteStream"));
956        buffer.encode(&rtmp_context.get_transaction_id());
957        buffer.encode(&DeleteStream::new(message_id.into()));
958        write_chunk(self.0.as_mut(), rtmp_context, DeleteStream::CHANNEL.into(), Duration::default(), DeleteStream::MESSAGE_TYPE, u32::default(), &Vec::<u8>::from(buffer)).await?;
959
960        info!("deleteStream got sent.");
961        Ok(())
962    }
963}
964
965#[doc(hidden)]
966fn handle_close<'a, RW: AsyncRead + AsyncWrite + Unpin>(stream: Pin<&'a mut RW>) -> CloseHandler<'a, RW> {
967    CloseHandler(stream)
968}
969
970#[doc(hidden)]
971impl<RW: AsyncRead + AsyncWrite + Unpin> ErrorHandler for CloseHandler<'_, RW> {
972    fn poll_handle_error(mut self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext, error: IOError) -> Poll<IOResult<()>> {
973        if error.kind() != ErrorKind::Other {
974            if let Some(publisher_status) = rtmp_context.get_publisher_status() {
975                if publisher_status >= PublisherStatus::FcPublished {
976                    ready!(pin!(self.write_fc_unpublish_request(rtmp_context)).poll(cx))?;
977                }
978
979                if publisher_status >= PublisherStatus::Created {
980                    ready!(pin!(self.write_delete_stream_request(rtmp_context)).poll(cx))?;
981                }
982            }
983        }
984
985        self.0.as_mut().poll_shutdown(cx)
986    }
987}
988
989/// The default RTMP handler.
990///
991/// This handles the raw RTMP by well-known communication steps. That is, this performs just following steps.
992///
993/// # With publishers
994///
995/// 1. Checks the application name from the [`Connect`] command.
996/// 2. Checks the topic path from the [`ReleaseStream`]/[`FcPublish`] command.
997/// 3. Provides a message ID when receives the [`CreateStream`] command.
998/// 4. Checks publication informations from the [`Publish`] command.
999/// 5. Then receives FLV media data.
1000///
1001/// If some error occurs in any step, sends commands which are [`FcUnpublish`] and [`DeleteStream`] to its client, then terminates its connection.
1002/// These perform to delete the topic path and a message ID from its context.
1003/// However also these can be sent from clients.
1004///
1005/// # With subscribers
1006///
1007/// 1. Checks the application name from the [`Connect`] command.
1008/// 2. Checks partner's bandwidsth from the [`WindowAcknowledgementSize`] message.
1009/// 3. Provides a message ID when receives the [`CreateStream`] command.
1010/// 4. Checks the topic path from the [`FcSubscribe`] command.
1011/// 5. Handles one of additional commands either [`GetStreamLength`] or [`SetPlaylist`].
1012/// 6. Checkes subscription informaitons from [`Play`] command/
1013/// 7. The sends FLV media data.
1014///
1015/// In Both sides, if receiving data size exceeds server's bandwidth, this reports its thing via the [`Acknowledgement`] message to its client.
1016///
1017/// # Examples
1018///
1019/// ```rust
1020/// use std::marker::PhantomData;
1021/// use sheave_core::handlers::{
1022///     RtmpContext,
1023///     VecStream
1024/// };
1025/// use sheave_server::{
1026///     Server,
1027///     handlers::RtmpHandler,
1028/// };
1029///
1030/// let stream = VecStream::default();
1031/// let rtmp_context = RtmpContext::default();
1032/// let server = Server::new(stream, rtmp_context, PhantomData::<RtmpHandler<VecStream>>);
1033/// ```
1034///
1035/// [`Connect`]: sheave_core::messages::Connect
1036/// [`ReleaseSream`]: sheave_core::messages::ReleaseStream
1037/// [`FcPublish`]: sheave_core::messages::FcPublish
1038/// [`CreateStream`]: sheave_core::messages::CreateStream
1039/// [`Publish`]: sheave_core::messages::Publish
1040/// [`Acknowledgement`]: sheave_core::messages::Acknowledgement
1041/// [`FcUnpublish`]: sheave_core::messages::FcUnpublish
1042/// [`DeleteStream`]: sheave_core::messages::DeleteStream
1043/// [`WindowAcknowledgementSize`]: sheave_core::messages::WindowAcknowledgementSize
1044/// [`FcSubscribe`]: sheave_core::messages::FcSubscribe
1045/// [`GetStreamLength`]: sheave_core::messages::GetStreamLength
1046/// [`SetPlaylist`]: sheave_core::messages::SetPlaylist
1047/// [`Play`]: sheave_core::messages::Play
1048#[derive(Debug)]
1049pub struct RtmpHandler<RW: AsyncRead + AsyncWrite + Unpin>(Arc<StreamWrapper<RW>>);
1050
1051impl<RW: AsyncRead + AsyncWrite + Unpin> AsyncHandler for RtmpHandler<RW> {
1052    fn poll_handle(self: Pin<&mut Self>, cx: &mut FutureContext<'_>, rtmp_context: &mut RtmpContext) -> Poll<IOResult<()>> {
1053        pin!(
1054            handle_handshake(self.0.make_weak_pin())
1055                .while_ok(handle_message(self.0.make_weak_pin()).wrap(write_acknowledgement(self.0.make_weak_pin())))
1056                .map_err(handle_close(self.0.make_weak_pin()))
1057        ).poll_handle(cx, rtmp_context)
1058    }
1059}
1060
1061impl<RW: AsyncRead + AsyncWrite + Unpin> HandlerConstructor<StreamWrapper<RW>> for RtmpHandler<RW> {
1062    fn new(stream: Arc<StreamWrapper<RW>>) -> Self {
1063        Self(stream)
1064    }
1065}
1066
1067#[cfg(test)]
1068mod tests {
1069    use std::{
1070        fs::{
1071            copy,
1072            create_dir_all
1073        },
1074        net::{
1075            IpAddr,
1076            Ipv4Addr,
1077            SocketAddr
1078        },
1079        path::Path
1080    };
1081    use rand::fill;
1082    use uuid::Uuid;
1083    use sqlx::{
1084        Connection,
1085        SqliteConnection,
1086        migrate::Migrator,
1087        query
1088    };
1089    use sheave_core::{
1090        ecma_array,
1091        flv::Flv,
1092        handlers::VecStream,
1093        handshake::EncryptionAlgorithm,
1094        messages::{
1095            ChunkSize,
1096            PlayMode,
1097            SetPlaylist,
1098            amf::v0::Boolean
1099        }
1100    };
1101    use super::*;
1102
1103
1104    const CLIENT_ADDR: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 1935);
1105    const STATEMENT: &str = "INSERT INTO topics (path, client_addr) VALUES (?1, ?2)";
1106
1107    #[tokio::test]
1108    async fn ok_unsigned_handshake_got_handled() {
1109        let mut stream = pin!(VecStream::default());
1110        let mut rtmp_context = RtmpContext::default();
1111
1112        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1113        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1114        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::UNSIGNED);
1115        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1116        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1117        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1118        assert!(result.is_ok());
1119
1120        let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1121        let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1122        let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1123        assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1124        assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1125
1126        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1127        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1128        assert!(result.is_ok());
1129        let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1130        assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1131    }
1132
1133    #[tokio::test]
1134    async fn err_digest_did_not_match() {
1135        let mut stream = pin!(VecStream::default());
1136        let mut rtmp_context = RtmpContext::default();
1137
1138        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1139        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1140        let sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1141        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1142        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1143        assert!(result.is_err())
1144    }
1145
1146    #[tokio::test]
1147    async fn err_signature_did_not_match() {
1148        let mut stream = pin!(VecStream::default());
1149        let mut rtmp_context = RtmpContext::default();
1150
1151        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1152        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1153        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1154        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1155        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1156        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1157        assert!(result.is_ok());
1158
1159        read_encryption_algorithm(stream.as_mut()).await.unwrap();
1160        let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1161        read_handshake(stream.as_mut()).await.unwrap();
1162        let mut invalid_signature_key: [u8; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()] = [0; Handshake::CLIENT_KEY.len() + Handshake::COMMON_KEY.len()];
1163        fill(&mut invalid_signature_key);
1164        received_server_handshake.imprint_signature(sent_encryption_algorithm, &invalid_signature_key);
1165        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1166        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1167        assert!(result.is_err())
1168    }
1169
1170    #[tokio::test]
1171    async fn ok_singed_handshake_got_handled() {
1172        let mut stream = pin!(VecStream::default());
1173        let mut rtmp_context = RtmpContext::default();
1174
1175        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1176        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1177        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1178        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1179        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1180        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1181        assert!(result.is_ok());
1182
1183        let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1184        let mut received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1185        let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1186        let mut server_signature_key: Vec<u8> = Vec::new();
1187        server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1188        server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1189        sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1190        assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1191        assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1192
1193        let mut client_signature_key: Vec<u8> = Vec::new();
1194        client_signature_key.extend_from_slice(Handshake::CLIENT_KEY);
1195        client_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1196        received_server_handshake.imprint_signature(sent_encryption_algorithm, &client_signature_key);
1197        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1198        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1199        assert!(result.is_ok());
1200        let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1201        assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes())
1202    }
1203
1204    #[tokio::test]
1205    async fn ok_signed_handshake_as_ffmpeg_got_handled() {
1206        let mut stream = pin!(VecStream::default());
1207        let mut rtmp_context = RtmpContext::default();
1208
1209        let sent_encryption_algorithm = EncryptionAlgorithm::NotEncrypted;
1210        write_encryption_algorithm(stream.as_mut(), sent_encryption_algorithm).await.unwrap();
1211        let mut sent_client_handshake = Handshake::new(Instant::now().elapsed(), Version::LATEST_CLIENT);
1212        sent_client_handshake.imprint_digest(sent_encryption_algorithm, Handshake::CLIENT_KEY);
1213        write_handshake(stream.as_mut(), &sent_client_handshake).await.unwrap();
1214        let result = handle_handshake(stream.as_mut()).handle_first_handshake(&mut rtmp_context).await;
1215        assert!(result.is_ok());
1216
1217        let received_encryption_algorithm = read_encryption_algorithm(stream.as_mut()).await.unwrap();
1218        let received_server_handshake = read_handshake(stream.as_mut()).await.unwrap();
1219        let received_client_handshake = read_handshake(stream.as_mut()).await.unwrap();
1220        let mut server_signature_key: Vec<u8> = Vec::new();
1221        server_signature_key.extend_from_slice(Handshake::SERVER_KEY);
1222        server_signature_key.extend_from_slice(Handshake::COMMON_KEY);
1223        sent_client_handshake.imprint_signature(sent_encryption_algorithm, &server_signature_key);
1224        assert_eq!(sent_encryption_algorithm, received_encryption_algorithm);
1225        assert_eq!(sent_client_handshake.get_bytes(), received_client_handshake.get_bytes());
1226
1227        write_handshake(stream.as_mut(), &received_server_handshake).await.unwrap();
1228        let result = handle_handshake(stream.as_mut()).handle_second_handshake(&mut rtmp_context).await;
1229        assert!(result.is_ok());
1230        let sent_server_handshake = rtmp_context.get_server_handshake().unwrap();
1231        assert_eq!(received_server_handshake.get_bytes(), sent_server_handshake.get_bytes());
1232    }
1233
1234    #[tokio::test]
1235    async fn err_undistinguishable_client() {
1236        let mut stream = pin!(VecStream::default());
1237        let mut rtmp_context = RtmpContext::default();
1238
1239        let mut buffer = ByteBuffer::default();
1240        buffer.encode(&Connect::default());
1241        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1242        let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1243        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1244        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1245        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1246        let mut buffer: ByteBuffer = chunk.into();
1247        let command: AmfString = buffer.decode().unwrap();
1248        assert!(result.is_err());
1249        assert_eq!(command, "_error");
1250        assert!(rtmp_context.get_information().is_some())
1251    }
1252
1253    #[tokio::test]
1254    async fn err_inconsistent_app_path() {
1255        let mut stream = pin!(VecStream::default());
1256        let mut rtmp_context = RtmpContext::default();
1257        rtmp_context.set_app("ondemand");
1258
1259        let mut buffer = ByteBuffer::default();
1260        buffer.encode(&Connect::new(object!("app" => AmfString::default())));
1261        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1262        let result = handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await;
1263        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1264        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1265        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1266        let mut buffer: ByteBuffer = chunk.into();
1267        let command: AmfString = buffer.decode().unwrap();
1268        assert!(result.is_err());
1269        assert_eq!(command, "_error");
1270        assert!(rtmp_context.get_information().is_some())
1271    }
1272
1273    #[tokio::test]
1274    async fn err_empty_topic_path() {
1275        let mut stream = pin!(VecStream::default());
1276        let mut rtmp_context = RtmpContext::default();
1277        let mut buffer = ByteBuffer::default();
1278        buffer.encode(&ReleaseStream::new(AmfString::from("")));
1279        handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1280        let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1281        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1282        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1283        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1284        let mut buffer: ByteBuffer = chunk.into();
1285        let command: AmfString = buffer.decode().unwrap();
1286        assert!(result.is_err());
1287        assert_eq!(command, "_error");
1288        assert!(rtmp_context.get_information().is_some())
1289    }
1290
1291    #[tokio::test]
1292    async fn err_unpublished_stream() {
1293        let app = "ondemand";
1294
1295        #[cfg(windows)]
1296        let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1297        #[cfg(windows)]
1298        let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1299        #[cfg(windows)]
1300        let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1301        #[cfg(unix)]
1302        let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1303        #[cfg(unix)]
1304        let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1305        #[cfg(unix)]
1306        let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1307
1308        let mut stream = pin!(VecStream::default());
1309        let mut rtmp_context = RtmpContext::default();
1310        rtmp_context.set_topic_storage_path(&topic_storage_path);
1311        rtmp_context.set_topic_database_url(&topic_database_url);
1312        rtmp_context.set_app(app);
1313        rtmp_context.set_client_addr(CLIENT_ADDR);
1314
1315        let topic_path = Uuid::now_v7().to_string();
1316
1317        let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1318        let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1319        migrator.run(&mut connection).await.unwrap();
1320
1321        let mut buffer = ByteBuffer::default();
1322        buffer.encode(&ReleaseStream::new(AmfString::new(topic_path)));
1323        handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1324        let result = handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await;
1325        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1326        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1327        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1328        let mut buffer: ByteBuffer = chunk.into();
1329        let command: AmfString = buffer.decode().unwrap();
1330        assert!(result.is_err());
1331        assert_eq!(command, "_error");
1332        assert!(rtmp_context.get_information().is_some())
1333    }
1334
1335    #[tokio::test]
1336    async fn err_inconsistent_topic_path_in_publish() {
1337        let mut stream = pin!(VecStream::default());
1338        let mut rtmp_context = RtmpContext::default();
1339        rtmp_context.set_message_id(0);
1340        rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
1341
1342        let mut buffer = ByteBuffer::default();
1343        buffer.encode(&Publish::new(AmfString::default(), "live".into()));
1344        handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1345        let result = handle_message(stream.as_mut()).write_publish_response(&mut rtmp_context).await;
1346        assert!(result.is_err());
1347        assert!(rtmp_context.get_information().is_some())
1348    }
1349
1350    #[tokio::test]
1351    async fn err_metadata_not_found() {
1352        let topic_path = Uuid::now_v7().to_string();
1353
1354        #[cfg(windows)]
1355        let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1356        #[cfg(windows)]
1357        let topic = {
1358            create_dir_all(&topic_storage_path).unwrap();
1359            Flv::create(&format!("{}\\{}.flv", &topic_storage_path, &topic_path)).unwrap()
1360        };
1361        #[cfg(unix)]
1362        let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1363        #[cfg(unix)]
1364        let topic = {
1365            create_dir_all(&topic_storage_path).unwrap();
1366            Flv::create(&format!("{}/{}.flv", &topic_storage_path, &topic_path)).unwrap()
1367        };
1368
1369        let mut stream = pin!(VecStream::default());
1370        let mut rtmp_context = RtmpContext::default();
1371        rtmp_context.set_topic(topic);
1372
1373        let mut buffer = ByteBuffer::default();
1374        buffer.encode(&GetStreamLength::new(AmfString::new(topic_path)));
1375        handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1376        let result = handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await;
1377        let basic_header = read_basic_header(stream.as_mut()).await.unwrap();
1378        let message_header = read_message_header(stream.as_mut(), basic_header.get_message_format()).await.unwrap();
1379        let chunk = read_chunk_data(stream.as_mut(), ChunkSize::default(), message_header.get_message_length().unwrap()).await.unwrap();
1380        let mut buffer: ByteBuffer = chunk.into();
1381        let command: AmfString = buffer.decode().unwrap();
1382        assert!(result.is_err());
1383        assert_eq!(command, "_error");
1384        assert!(rtmp_context.get_information().is_some())
1385    }
1386
1387    #[tokio::test]
1388    async fn err_inconsistent_topic_path_in_play() {
1389        let mut stream = pin!(VecStream::default());
1390        let mut rtmp_context = RtmpContext::default();
1391        rtmp_context.set_topic_path(AmfString::new(Uuid::now_v7().to_string()));
1392        rtmp_context.set_message_id(0);
1393
1394        let mut buffer = ByteBuffer::default();
1395        buffer.encode(&Play::new(AmfString::new(Uuid::now_v7().to_string()), Duration::default(), PlayMode::default()));
1396        handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1397        let result = handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await;
1398        assert!(result.is_err());
1399        assert!(rtmp_context.get_information().is_some())
1400    }
1401
1402    #[tokio::test]
1403    async fn ok_valid_publisher_sequence() {
1404        let app = "ondemand";
1405
1406        #[cfg(windows)]
1407        let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1408        #[cfg(windows)]
1409        let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1410        #[cfg(windows)]
1411        let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1412        #[cfg(unix)]
1413        let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1414        let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1415        #[cfg(unix)]
1416        let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1417
1418        let mut rtmp_context = RtmpContext::default();
1419        rtmp_context.set_topic_storage_path(&topic_storage_path);
1420        rtmp_context.set_topic_database_url(&topic_database_url);
1421        rtmp_context.set_client_addr(CLIENT_ADDR);
1422        rtmp_context.set_app(app);
1423
1424        let topic_path = Uuid::now_v7().to_string();
1425
1426        let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1427        let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1428        migrator.run(&mut connection).await.unwrap();
1429        query(STATEMENT)
1430            .bind(&topic_path)
1431            .bind(&CLIENT_ADDR.to_string())
1432            .execute(&mut connection)
1433            .await
1434            .unwrap();
1435
1436        let mut stream = pin!(VecStream::default());
1437        let mut buffer = ByteBuffer::default();
1438        buffer.encode(
1439            &Connect::new(
1440                object!(
1441                    "app" => AmfString::from(app),
1442                    "type" => AmfString::from("nonprivate")
1443                )
1444            )
1445        );
1446        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1447        assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1448        assert_eq!(PublisherStatus::Connected, rtmp_context.get_publisher_status().unwrap());
1449
1450        let mut buffer = ByteBuffer::default();
1451        buffer.encode(&ReleaseStream::new(AmfString::new(topic_path.clone())));
1452        handle_message(stream.as_mut()).handle_release_stream_request(&mut rtmp_context, buffer).await.unwrap();
1453        let mut stream = pin!(VecStream::default());
1454        assert!(handle_message(stream.as_mut()).write_release_stream_response(&mut rtmp_context).await.is_ok());
1455        assert_eq!(PublisherStatus::Released, rtmp_context.get_publisher_status().unwrap());
1456
1457        let mut buffer = ByteBuffer::default();
1458        buffer.encode(&FcPublish::new(AmfString::new(topic_path.clone())));
1459        handle_message(stream.as_mut()).handle_fc_publish_request(&mut rtmp_context, buffer).await.unwrap();
1460        let mut stream = pin!(VecStream::default());
1461        assert!(handle_message(stream.as_mut()).write_fc_publish_response(&mut rtmp_context).await.is_ok());
1462        assert_eq!(PublisherStatus::FcPublished, rtmp_context.get_publisher_status().unwrap());
1463
1464        let mut buffer = ByteBuffer::default();
1465        buffer.encode(&CreateStream);
1466        handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1467        let mut stream = pin!(VecStream::default());
1468        assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1469        assert_eq!(PublisherStatus::Created, rtmp_context.get_publisher_status().unwrap());
1470
1471        let mut buffer = ByteBuffer::default();
1472        buffer.encode(&Publish::new(AmfString::new(topic_path), "live".into()));
1473        handle_message(stream.as_mut()).handle_publish_request(&mut rtmp_context, buffer).await.unwrap();
1474        let mut stream = pin!(VecStream::default());
1475        assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1476        assert_eq!(PublisherStatus::Began, rtmp_context.get_publisher_status().unwrap());
1477        assert!(handle_message(stream).write_publish_response(&mut rtmp_context).await.is_ok());
1478        assert_eq!(PublisherStatus::Published, rtmp_context.get_publisher_status().unwrap())
1479    }
1480
1481    #[tokio::test]
1482    async fn ok_valid_subscriber_sequence_in_ffmpeg() {
1483        let app = "ondemand";
1484        let topic_path = Uuid::now_v7().to_string();
1485
1486        #[cfg(windows)]
1487        let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1488        #[cfg(windows)]
1489        let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1490        #[cfg(windows)]
1491        let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1492        #[cfg(windows)]
1493        {
1494            let copy_to = format!("{topic_storage_path}\\{app}");
1495            create_dir_all(&copy_to).unwrap();
1496            copy(&format!("{}\\..\\resources\\test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}\\{}.flv", copy_to, topic_path)).unwrap();
1497        }
1498        #[cfg(unix)]
1499        let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1500        #[cfg(unix)]
1501        let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1502        #[cfg(unix)]
1503        let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1504        #[cfg(unix)]
1505        {
1506            let copy_to = format!("{topic_storage_path}/{app}");
1507            create_dir_all(&copy_to).unwrap();
1508            copy(&format!("{}/../resources/test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}/{}.flv", copy_to, topic_path)).unwrap();
1509        }
1510
1511        let mut rtmp_context = RtmpContext::default();
1512        rtmp_context.set_topic_storage_path(&topic_storage_path);
1513        rtmp_context.set_topic_database_url(&topic_database_url);
1514        rtmp_context.set_client_addr(CLIENT_ADDR);
1515        rtmp_context.set_app(app);
1516
1517        let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1518        let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1519        migrator.run(&mut connection).await.unwrap();
1520        query(STATEMENT)
1521            .bind(&topic_path)
1522            .bind(&CLIENT_ADDR.to_string())
1523            .execute(&mut connection)
1524            .await
1525            .unwrap();
1526
1527        let mut stream = pin!(VecStream::default());
1528        let mut buffer = ByteBuffer::default();
1529        buffer.encode(
1530            &Connect::new(
1531                object!(
1532                    "app" => AmfString::from(app),
1533                    "fpad" => Boolean::new(0)
1534                )
1535            )
1536        );
1537        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1538        assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1539        assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1540
1541        let mut buffer = ByteBuffer::default();
1542        buffer.encode(&WindowAcknowledgementSize::default());
1543        handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1544        assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1545
1546        let mut buffer = ByteBuffer::default();
1547        buffer.encode(&CreateStream);
1548        handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1549        let mut stream = pin!(VecStream::default());
1550        assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1551        assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1552
1553        let mut stream = pin!(VecStream::default());
1554        let mut buffer = ByteBuffer::default();
1555        buffer.encode(&FcSubscribe::new(AmfString::new(topic_path.clone())));
1556        handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1557        assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1558
1559        let mut buffer = ByteBuffer::default();
1560        buffer.encode(&GetStreamLength::new(AmfString::new(topic_path.clone())));
1561        handle_message(stream.as_mut()).handle_stream_length_request(&mut rtmp_context, buffer).await.unwrap();
1562        let mut stream = pin!(VecStream::default());
1563        assert!(handle_message(stream.as_mut()).write_stream_length_response(&mut rtmp_context).await.is_ok());
1564        assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1565
1566        let mut buffer = ByteBuffer::default();
1567        buffer.encode(
1568            &Play::new(
1569                AmfString::new(topic_path),
1570                Duration::default(),
1571                PlayMode::default()
1572            )
1573        );
1574        handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1575        let mut stream = pin!(VecStream::default());
1576        assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1577        assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1578        assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1579        assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1580
1581        let mut buffer = ByteBuffer::default();
1582        buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1583        handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1584        assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1585    }
1586
1587    #[tokio::test]
1588    async fn ok_valid_subscriber_sequence_in_obs() {
1589        let app = "ondemand";
1590        let topic_path = Uuid::now_v7().to_string();
1591
1592        #[cfg(windows)]
1593        let topic_storage_path = format!("{}\\sheave", env!("TEMP"));
1594        #[cfg(windows)]
1595        let topic_database_url = format!("sqlite:{topic_storage_path}\\sheave.db?mode=rwc");
1596        #[cfg(windows)]
1597        let migrations = format!("{}\\migrations", env!("CARGO_MANIFEST_DIR"));
1598        #[cfg(windows)]
1599        {
1600            let copy_to = format!("{topic_storage_path}\\{app}");
1601            create_dir_all(&copy_to).unwrap();
1602            copy(&format!("{}\\..\\resources\\test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}\\{}.flv", copy_to, topic_path)).unwrap();
1603        }
1604
1605        #[cfg(unix)]
1606        let topic_storage_path = format!("{}/sheave", env!("TMPDIR"));
1607        #[cfg(unix)]
1608        let topic_database_url = format!("sqlite:{topic_storage_path}/sheave.db?mode=rwc");
1609        #[cfg(unix)]
1610        let migrations = format!("{}/migrations", env!("CARGO_MANIFEST_DIR"));
1611        #[cfg(unix)]
1612        {
1613            let copy_to = format!("{topic_storage_path}/{app}");
1614            create_dir_all(&copy_to).unwrap();
1615            copy(&format!("{}/../resources/test.flv", env!("CARGO_MANIFEST_DIR")), &format!("{}/{}.flv", copy_to, topic_path)).unwrap();
1616        }
1617
1618        let mut rtmp_context = RtmpContext::default();
1619        rtmp_context.set_topic_storage_path(&topic_storage_path);
1620        rtmp_context.set_topic_database_url(&topic_database_url);
1621        rtmp_context.set_client_addr(CLIENT_ADDR);
1622        rtmp_context.set_app(app);
1623
1624        let mut connection = SqliteConnection::connect(&topic_database_url).await.unwrap();
1625        let migrator = Migrator::new(Path::new(&migrations)).await.unwrap();
1626        migrator.run(&mut connection).await.unwrap();
1627        query(STATEMENT)
1628            .bind(&topic_path)
1629            .bind(&CLIENT_ADDR.to_string())
1630            .execute(&mut connection)
1631            .await
1632            .unwrap();
1633
1634        let mut stream = pin!(VecStream::default());
1635        let mut buffer = ByteBuffer::default();
1636        buffer.encode(
1637            &Connect::new(
1638                object!(
1639                    "app" => AmfString::from(app),
1640                    "fpad" => Boolean::new(0)
1641                )
1642            )
1643        );
1644        handle_message(stream.as_mut()).handle_connect_request(&mut rtmp_context, buffer).await.unwrap();
1645        assert!(handle_message(stream.as_mut()).write_connect_response(&mut rtmp_context).await.is_ok());
1646        assert_eq!(SubscriberStatus::Connected, rtmp_context.get_subscriber_status().unwrap());
1647
1648        let mut buffer = ByteBuffer::default();
1649        buffer.encode(&WindowAcknowledgementSize::default());
1650        handle_message(stream.as_mut()).handle_window_acknowledgement_size(&mut rtmp_context, buffer).await.unwrap();
1651        assert_eq!(SubscriberStatus::WindowAcknowledgementSizeGotSent, rtmp_context.get_subscriber_status().unwrap());
1652
1653        let mut buffer = ByteBuffer::default();
1654        buffer.encode(&CreateStream);
1655        handle_message(stream.as_mut()).handle_create_stream_request(&mut rtmp_context, buffer).await.unwrap();
1656        let mut stream = pin!(VecStream::default());
1657        assert!(handle_message(stream.as_mut()).write_create_stream_response(&mut rtmp_context).await.is_ok());
1658        assert_eq!(SubscriberStatus::Created, rtmp_context.get_subscriber_status().unwrap());
1659
1660        let mut buffer = ByteBuffer::default();
1661        buffer.encode(&FcSubscribe::new(AmfString::new(topic_path.clone())));
1662        handle_message(stream.as_mut()).handle_fc_subscribe_request(&mut rtmp_context, buffer).await.unwrap();
1663        assert_eq!(SubscriberStatus::FcSubscribed, rtmp_context.get_subscriber_status().unwrap());
1664
1665        let mut buffer = ByteBuffer::default();
1666        buffer.encode(
1667            &SetPlaylist::new(
1668                ecma_array!(
1669                    "0" => AmfString::new(topic_path.clone())
1670                )
1671            )
1672        );
1673        handle_message(stream.as_mut()).handle_playlist_request(&mut rtmp_context, buffer).await.unwrap();
1674        let mut stream = pin!(VecStream::default());
1675        assert!(handle_message(stream.as_mut()).write_playlist_response(&mut rtmp_context).await.is_ok());
1676        assert_eq!(SubscriberStatus::AdditionalCommandGotSent, rtmp_context.get_subscriber_status().unwrap());
1677
1678        let mut buffer = ByteBuffer::default();
1679        buffer.encode(
1680            &Play::new(
1681                AmfString::new(topic_path),
1682                Duration::default(),
1683                PlayMode::default()
1684            )
1685        );
1686        handle_message(stream.as_mut()).handle_play_request(&mut rtmp_context, buffer).await.unwrap();
1687        let mut stream = pin!(VecStream::default());
1688        assert!(handle_message(stream.as_mut()).write_stream_begin(&mut rtmp_context).await.is_ok());
1689        assert_eq!(SubscriberStatus::Began, rtmp_context.get_subscriber_status().unwrap());
1690        assert!(handle_message(stream.as_mut()).write_play_response(&mut rtmp_context).await.is_ok());
1691        assert_eq!(SubscriberStatus::Played, rtmp_context.get_subscriber_status().unwrap());
1692
1693        let mut buffer = ByteBuffer::default();
1694        buffer.encode(&SetBufferLength::new(rtmp_context.get_message_id().unwrap(), 0));
1695        handle_message(stream.as_mut()).handle_buffer_length(&mut rtmp_context, buffer).await.unwrap();
1696        assert_eq!(SubscriberStatus::BufferLengthGotSent, rtmp_context.get_subscriber_status().unwrap())
1697    }
1698}