From 79c838629ac9dc2da545c814a50923ccfdb110d2 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Fri, 8 Sep 2023 17:45:01 +0800 Subject: [PATCH] Implementation for jitter --- .gitignore | 1 + .../remote_desk_client/remote_desk_client.cpp | 30 +++++--- src/rtp/rtp_video_receiver.cpp | 74 +++++++++++++++---- src/rtp/rtp_video_receiver.h | 9 ++- src/rtp/rtp_video_sender.cpp | 38 ++++++++++ src/rtp/rtp_video_sender.h | 34 +++++++++ src/transmission/ice_transmission.cpp | 16 +++- src/transmission/ice_transmission.h | 5 +- xmake.lua | 4 +- 9 files changed, 177 insertions(+), 34 deletions(-) create mode 100644 src/rtp/rtp_video_sender.cpp create mode 100644 src/rtp/rtp_video_sender.h diff --git a/.gitignore b/.gitignore index 920c349..828c9fd 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ .xmake/ build/ thirdparty/ffmpeg/lib/* +.VSCodeCounter/ # MacOS Cache .DS_Store diff --git a/application/remote_desk/remote_desk_client/remote_desk_client.cpp b/application/remote_desk/remote_desk_client/remote_desk_client.cpp index 53e41e5..42b59d7 100644 --- a/application/remote_desk/remote_desk_client/remote_desk_client.cpp +++ b/application/remote_desk/remote_desk_client/remote_desk_client.cpp @@ -44,16 +44,16 @@ inline void FreshVideo() { SDL_RenderCopy(sdlRenderer, sdlTexture, NULL, &sdlRect); SDL_RenderPresent(sdlRenderer); - frame_count++; - end_time = SDL_GetTicks(); - elapsed_time = end_time - start_time; - if (elapsed_time >= 1000) { - fps = frame_count / (elapsed_time / 1000); - frame_count = 0; - window_title = "Remote Desk Client [FPS " + std::to_string(fps) + "]"; - SDL_SetWindowTitle(screen, window_title.data()); - start_time = end_time; - } + // frame_count++; + // end_time = SDL_GetTicks(); + // elapsed_time = end_time - start_time; + // if (elapsed_time >= 1000) { + // fps = frame_count / (elapsed_time / 1000); + // frame_count = 0; + // window_title = "Remote Desk Client [FPS " + std::to_string(fps) + "]"; + // SDL_SetWindowTitle(screen, window_title.data()); + // start_time = end_time; + // } } inline int ProcessMouseKeyEven(SDL_Event &ev) { @@ -106,6 +106,16 @@ void GuestReceiveBuffer(const char *data, size_t size, const char *user_id, SDL_Event event; event.type = REFRESH_EVENT; SDL_PushEvent(&event); + frame_count++; + end_time = SDL_GetTicks(); + elapsed_time = end_time - start_time; + if (elapsed_time >= 1000) { + fps = frame_count / (elapsed_time / 1000); + frame_count = 0; + window_title = "Remote Desk Client [FPS " + std::to_string(fps) + "]"; + SDL_SetWindowTitle(screen, window_title.data()); + start_time = end_time; + } } std::string GetMac() { diff --git a/src/rtp/rtp_video_receiver.cpp b/src/rtp/rtp_video_receiver.cpp index 34e5b7b..3a3ea04 100644 --- a/src/rtp/rtp_video_receiver.cpp +++ b/src/rtp/rtp_video_receiver.cpp @@ -6,16 +6,33 @@ RtpVideoReceiver::RtpVideoReceiver() {} -RtpVideoReceiver::~RtpVideoReceiver() {} +RtpVideoReceiver::~RtpVideoReceiver() { + if (jitter_thread_ && jitter_thread_->joinable()) { + jitter_thread_->join(); + delete jitter_thread_; + jitter_thread_ = nullptr; + } +} void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { + if (!jitter_thread_) { + jitter_thread_ = new std::thread(&RtpVideoReceiver::Process, this); + } + if (NAL_UNIT_TYPE::NALU == rtp_packet.NalUnitType()) { - // compelete_video_frame_queue_.push( - // VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); - if (on_receive_complete_frame_) { - on_receive_complete_frame_( - VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); - } + compelete_video_frame_queue_.push( + VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); + // if (on_receive_complete_frame_) { + // auto now_complete_frame_ts = + // std::chrono::high_resolution_clock::now().time_since_epoch().count() + // / 1000000; + // uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_; + // LOG_ERROR("Duration {}", 1000 / duration); + // last_complete_frame_ts_ = now_complete_frame_ts; + + // on_receive_complete_frame_( + // VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); + // } } else if (NAL_UNIT_TYPE::FU_A == rtp_packet.NalUnitType()) { incomplete_frame_list_[rtp_packet.SequenceNumber()] = rtp_packet; bool complete = CheckIsFrameCompleted(rtp_packet); @@ -53,16 +70,22 @@ bool RtpVideoReceiver::CheckIsFrameCompleted(RtpPacket& rtp_packet) { incomplete_frame_list_.erase(start); } - // compelete_video_frame_queue_.push( - // VideoFrame(nv12_data_, complete_frame_size)); + compelete_video_frame_queue_.push( + VideoFrame(nv12_data_, complete_frame_size)); - // LOG_ERROR("Size of compelete_video_frame_queue_ [{}]", - // compelete_video_frame_queue_.size()); + // if (on_receive_complete_frame_) { + // auto now_complete_frame_ts = + // std::chrono::high_resolution_clock::now() + // .time_since_epoch() + // .count() / + // 1000000; + // uint32_t duration = now_complete_frame_ts - + // last_complete_frame_ts_; LOG_ERROR("Duration {}", 1000 / duration); + // last_complete_frame_ts_ = now_complete_frame_ts; - if (on_receive_complete_frame_) { - on_receive_complete_frame_( - VideoFrame(nv12_data_, complete_frame_size)); - } + // on_receive_complete_frame_( + // VideoFrame(nv12_data_, complete_frame_size)); + // } return true; } else { LOG_WARN("What happened?") @@ -73,4 +96,25 @@ bool RtpVideoReceiver::CheckIsFrameCompleted(RtpPacket& rtp_packet) { return true; } return false; +} + +void RtpVideoReceiver::Process() { + while (1) { + if (!compelete_video_frame_queue_.isEmpty()) { + VideoFrame video_frame; + compelete_video_frame_queue_.pop(video_frame); + if (on_receive_complete_frame_) { + auto now_complete_frame_ts = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count() / + 1000000; + uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_; + LOG_ERROR("Duration {}", 1000 / duration); + last_complete_frame_ts_ = now_complete_frame_ts; + on_receive_complete_frame_(video_frame); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(13)); + } } \ No newline at end of file diff --git a/src/rtp/rtp_video_receiver.h b/src/rtp/rtp_video_receiver.h index 05490f7..2ba99fb 100644 --- a/src/rtp/rtp_video_receiver.h +++ b/src/rtp/rtp_video_receiver.h @@ -4,8 +4,10 @@ #include #include #include +#include #include "frame.h" +#include "ringbuffer.h" #include "rtp_video_session.h" class RtpVideoReceiver { @@ -23,15 +25,20 @@ class RtpVideoReceiver { private: bool CheckIsFrameCompleted(RtpPacket& rtp_packet); + void Process(); // private: // void OnReceiveFrame(uint8_t* payload) {} private: std::map incomplete_frame_list_; - std::queue compelete_video_frame_queue_; uint8_t* nv12_data_ = nullptr; std::function on_receive_complete_frame_ = nullptr; + uint32_t last_complete_frame_ts_ = 0; + + RingBuffer compelete_video_frame_queue_; + std::thread* jitter_thread_ = nullptr; + bool start_ = false; }; #endif diff --git a/src/rtp/rtp_video_sender.cpp b/src/rtp/rtp_video_sender.cpp new file mode 100644 index 0000000..f229cdb --- /dev/null +++ b/src/rtp/rtp_video_sender.cpp @@ -0,0 +1,38 @@ +#include "rtp_video_sender.h" + +#include + +RtpVideoSender::RtpVideoSender() {} + +RtpVideoSender::~RtpVideoSender() { + if (send_thread_ && send_thread_->joinable()) { + send_thread_->join(); + delete send_thread_; + send_thread_ = nullptr; + } +} + +void RtpVideoSender::Enqueue(std::vector& rtp_packets) { + if (!send_thread_) { + send_thread_ = new std::thread(&RtpVideoSender::Process, this); + } + + for (auto& rtp_packet : rtp_packets) { + start_ = true; + rtp_packe_queue_.push(rtp_packet); + } +} + +void RtpVideoSender::Process() { + while (1) { + if (!rtp_packe_queue_.isEmpty()) { + RtpPacket rtp_packet; + rtp_packe_queue_.pop(rtp_packet); + if (rtp_packet_send_func_) { + rtp_packet_send_func_(rtp_packet); + } + } + + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } +} \ No newline at end of file diff --git a/src/rtp/rtp_video_sender.h b/src/rtp/rtp_video_sender.h new file mode 100644 index 0000000..73e4ded --- /dev/null +++ b/src/rtp/rtp_video_sender.h @@ -0,0 +1,34 @@ +#ifndef _RTP_VIDEO_SENDER_H_ +#define _RTP_VIDEO_SENDER_H_ + +#include +#include + +#include "ringbuffer.h" +#include "rtp_packet.h" + +class RtpVideoSender { + public: + RtpVideoSender(); + ~RtpVideoSender(); + + public: + void Enqueue(std::vector &rtp_packets); + + public: + void SetRtpPacketSendFunc( + std::function rtp_packet_send_func) { + rtp_packet_send_func_ = rtp_packet_send_func; + } + + private: + void Process(); + + private: + std::function rtp_packet_send_func_ = nullptr; + RingBuffer rtp_packe_queue_; + std::thread *send_thread_ = nullptr; + bool start_ = false; +}; + +#endif \ No newline at end of file diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index b95c587..0f561fb 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -69,6 +69,15 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { remote_user_id_.size()); }); + rtp_video_sender_ = new RtpVideoSender(); + rtp_video_sender_->SetRtpPacketSendFunc([this]( + RtpPacket &rtp_packet) -> void { + if (ice_agent_) { + LOG_ERROR("Send rtp packet {}", rtp_packet.Size()); + ice_agent_->Send((const char *)rtp_packet.Buffer(), rtp_packet.Size()); + } + }); + ice_agent_ = new IceAgent(ip, port); ice_agent_->CreateIceAgent( @@ -213,9 +222,10 @@ int IceTransmission::SendData(const char *data, size_t size) { std::vector packets; rtp_video_session_->Encode((uint8_t *)data, size, packets); - for (auto &packet : packets) { - ice_agent_->Send((const char *)packet.Buffer(), packet.Size()); - } + rtp_video_sender_->Enqueue(packets); + // for (auto &packet : packets) { + // ice_agent_->Send((const char *)packet.Buffer(), packet.Size()); + // } // std::vector packets = // rtp_video_session_->Encode((uint8_t *)(data), size); diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index 4740767..e04724f 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -8,6 +8,7 @@ #include "ringbuffer.h" #include "rtp_packet.h" #include "rtp_video_receiver.h" +#include "rtp_video_sender.h" #include "rtp_video_session.h" #include "ws_transmission.h" @@ -72,15 +73,13 @@ class IceTransmission { private: // ikcpcb *kcp_ = nullptr; char kcp_complete_buffer_[2560 * 1440 * 4]; - std::mutex mtx_; - RingBuffer send_ringbuffer_; - RingBuffer recv_ringbuffer_; bool kcp_stop_ = false; std::thread *kcp_update_thread_ = nullptr; private: RtpVideoSession *rtp_video_session_ = nullptr; RtpVideoReceiver *rtp_video_receiver_ = nullptr; + RtpVideoSender *rtp_video_sender_ = nullptr; uint8_t *rtp_payload_ = nullptr; RtpPacket pop_packet_; }; diff --git a/xmake.lua b/xmake.lua index 63e272b..60b905d 100644 --- a/xmake.lua +++ b/xmake.lua @@ -55,7 +55,7 @@ target("frame") target("rtp") set_kind("static") - add_deps("log", "frame") + add_deps("log", "frame", "ringbuffer") add_files("src/rtp/*.cpp") add_includedirs("src/rtp", {public = true}) @@ -95,7 +95,7 @@ target("qos") target("transmission") set_kind("static") - add_deps("log", "ws", "ice", "qos", "ringbuffer", "rtp") + add_deps("log", "ws", "ice", "qos", "rtp") add_files("src/transmission/*.cpp") add_packages("asio", "nlohmann_json") add_includedirs("src/ws", "src/ice", "src/qos", {public = true})