diff --git a/application/remote_desk/remote_desk_client/remote_desk_client.cpp b/application/remote_desk/remote_desk_client/remote_desk_client.cpp index 4ceb65f..53e41e5 100644 --- a/application/remote_desk/remote_desk_client/remote_desk_client.cpp +++ b/application/remote_desk/remote_desk_client/remote_desk_client.cpp @@ -100,7 +100,7 @@ inline int ProcessMouseKeyEven(SDL_Event &ev) { void GuestReceiveBuffer(const char *data, size_t size, const char *user_id, size_t user_id_size) { - std::cout << "Receive: [" << user_id << "] " << std::endl; + // std::cout << "Receive: [" << user_id << "] " << std::endl; memcpy(dst_buffer, data, size); SDL_Event event; diff --git a/src/frame/frame.cpp b/src/frame/frame.cpp new file mode 100644 index 0000000..43c7e1f --- /dev/null +++ b/src/frame/frame.cpp @@ -0,0 +1,58 @@ +#include "frame.h" + +#include + +VideoFrame::VideoFrame() {} + +VideoFrame::VideoFrame(const uint8_t *buffer, size_t size) { + buffer_ = new uint8_t[size]; + memcpy(buffer_, buffer, size); + size_ = size; +} + +VideoFrame::VideoFrame(const VideoFrame &video_frame) { + if (video_frame.size_ > 0) { + buffer_ = new uint8_t[video_frame.size_]; + memcpy(buffer_, video_frame.buffer_, video_frame.size_); + size_ = video_frame.size_; + } +} + +VideoFrame::VideoFrame(VideoFrame &&video_frame) + : buffer_((uint8_t *)std::move(video_frame.buffer_)), + size_(video_frame.size_) { + video_frame.buffer_ = nullptr; + video_frame.size_ = 0; +} + +VideoFrame &VideoFrame::operator=(const VideoFrame &video_frame) { + if (&video_frame != this) { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } + buffer_ = new uint8_t[video_frame.size_]; + memcpy(buffer_, video_frame.buffer_, video_frame.size_); + size_ = video_frame.size_; + } + return *this; +} + +VideoFrame &VideoFrame::operator=(VideoFrame &&video_frame) { + if (&video_frame != this) { + buffer_ = std::move(video_frame.buffer_); + video_frame.buffer_ = nullptr; + size_ = video_frame.size_; + video_frame.size_ = 0; + } + return *this; +} + +VideoFrame::~VideoFrame() { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } + + size_ = 0; +} \ No newline at end of file diff --git a/src/frame/frame.h b/src/frame/frame.h new file mode 100644 index 0000000..8404a56 --- /dev/null +++ b/src/frame/frame.h @@ -0,0 +1,28 @@ +#ifndef _FRAME_H_ +#define _FRAME_H_ + +#include + +class VideoFrame { + public: + VideoFrame(); + VideoFrame(const uint8_t *buffer, size_t size); + VideoFrame(const VideoFrame &video_frame); + VideoFrame(VideoFrame &&video_frame); + VideoFrame &operator=(const VideoFrame &video_frame); + VideoFrame &operator=(VideoFrame &&video_frame); + + ~VideoFrame(); + + public: + const uint8_t *Buffer() { return buffer_; } + const size_t Size() { return size_; } + + private: + size_t width_ = 0; + size_t height_ = 0; + uint8_t *buffer_ = nullptr; + size_t size_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index ed2be0c..4444814 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -45,7 +45,6 @@ int PeerConnection::Init(PeerConnectionParams params, on_receive_ice_msg_ = [this](const char *data, size_t size, const char *user_id, size_t user_id_size) { int num_frame_returned = Decode((uint8_t *)data, size); - uint32_t width = 0; uint32_t height = 0; uint32_t frame_size = 0; diff --git a/src/rtp/rtp_packet.cpp b/src/rtp/rtp_packet.cpp index 3ac6feb..2506a4e 100644 --- a/src/rtp/rtp_packet.cpp +++ b/src/rtp/rtp_packet.cpp @@ -4,6 +4,17 @@ #include "log.h" +inline void RtpPacket::TryToDecodeH264RtpPacket(uint8_t *buffer) { + if (PAYLOAD_TYPE::H264 == NAL_UNIT_TYPE(buffer[1] & 0x7f)) { + nal_unit_type_ = NAL_UNIT_TYPE(buffer[12] & 0x1F); + if (NAL_UNIT_TYPE::NALU == nal_unit_type_) { + DecodeH264Nalu(); + } else if (NAL_UNIT_TYPE::FU_A == nal_unit_type_) { + DecodeH264Fua(); + } + } +} + RtpPacket::RtpPacket() : buffer_(new uint8_t[DEFAULT_MTU]), size_(DEFAULT_MTU) { memset(buffer_, 0, DEFAULT_MTU); } @@ -13,6 +24,8 @@ RtpPacket::RtpPacket(const uint8_t *buffer, size_t size) { buffer_ = new uint8_t[size]; memcpy(buffer_, buffer, size); size_ = size; + + TryToDecodeH264RtpPacket(buffer_); } } @@ -21,6 +34,8 @@ RtpPacket::RtpPacket(const RtpPacket &rtp_packet) { buffer_ = new uint8_t[rtp_packet.size_]; memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_); size_ = rtp_packet.size_; + + TryToDecodeH264RtpPacket(buffer_); } } @@ -29,6 +44,8 @@ RtpPacket::RtpPacket(RtpPacket &&rtp_packet) size_(rtp_packet.size_) { rtp_packet.buffer_ = nullptr; rtp_packet.size_ = 0; + + TryToDecodeH264RtpPacket(buffer_); } RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) { @@ -40,6 +57,8 @@ RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) { buffer_ = new uint8_t[rtp_packet.size_]; memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_); size_ = rtp_packet.size_; + + TryToDecodeH264RtpPacket(buffer_); } return *this; } @@ -50,6 +69,8 @@ RtpPacket &RtpPacket::operator=(RtpPacket &&rtp_packet) { rtp_packet.buffer_ = nullptr; size_ = rtp_packet.size_; rtp_packet.size_ = 0; + + TryToDecodeH264RtpPacket(buffer_); } return *this; } @@ -292,7 +313,9 @@ size_t RtpPacket::DecodeH264Nalu(uint8_t *payload) { payload_size_ = size_ - (13 + payload_offset); payload_ = buffer_ + 13 + payload_offset; - memcpy(payload, payload_, payload_size_); + if (payload) { + memcpy(payload, payload_, payload_size_); + } return payload_size_; } @@ -341,6 +364,8 @@ size_t RtpPacket::DecodeH264Fua(uint8_t *payload) { payload_size_ = size_ - (14 + payload_offset); payload_ = buffer_ + 14 + payload_offset; - memcpy(payload, payload_, payload_size_); + if (payload) { + memcpy(payload, payload_, payload_size_); + } return payload_size_; } \ No newline at end of file diff --git a/src/rtp/rtp_packet.h b/src/rtp/rtp_packet.h index 5563a5a..733cd79 100644 --- a/src/rtp/rtp_packet.h +++ b/src/rtp/rtp_packet.h @@ -64,6 +64,7 @@ #define DEFAULT_MTU 1500 #define MAX_NALU_LEN 1400 typedef enum { H264 = 96, OPUS = 97, USER_DEFINED = 127 } PAYLOAD_TYPE; +typedef enum { UNKNOWN = 0, NALU = 1, FU_A = 28, FU_B = 29 } NAL_UNIT_TYPE; class RtpPacket { public: @@ -133,8 +134,8 @@ class RtpPacket { const uint8_t *EncodeH264Nalu(uint8_t *payload, size_t payload_size); const uint8_t *EncodeH264Fua(uint8_t *payload, size_t payload_size); size_t Decode(uint8_t *payload); - size_t DecodeH264Nalu(uint8_t *payload); - size_t DecodeH264Fua(uint8_t *payload); + size_t DecodeH264Nalu(uint8_t *payload = nullptr); + size_t DecodeH264Fua(uint8_t *payload = nullptr); public: // Get Header @@ -154,10 +155,18 @@ class RtpPacket { const uint8_t *Payload() { return payload_; }; const size_t PayloadSize() { return payload_size_; } - public: + // Entire RTP buffer const uint8_t *Buffer() { return buffer_; } const size_t Size() { return size_; } + // NAL + const NAL_UNIT_TYPE NalUnitType() { return nal_unit_type_; } + const bool FuAStart() { return fu_header_.start; } + const bool FuAEnd() { return fu_header_.end; } + + private: + inline void TryToDecodeH264RtpPacket(uint8_t *buffer); + private: // Header uint32_t version_ = 0; @@ -184,6 +193,9 @@ class RtpPacket { // Entire RTP buffer uint8_t *buffer_ = nullptr; size_t size_ = 0; + + // NAL + NAL_UNIT_TYPE nal_unit_type_ = NAL_UNIT_TYPE::UNKNOWN; }; #endif \ No newline at end of file diff --git a/src/rtp/rtp_video_receiver.cpp b/src/rtp/rtp_video_receiver.cpp new file mode 100644 index 0000000..34e5b7b --- /dev/null +++ b/src/rtp/rtp_video_receiver.cpp @@ -0,0 +1,76 @@ +#include "rtp_video_receiver.h" + +#include "log.h" + +#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2) + +RtpVideoReceiver::RtpVideoReceiver() {} + +RtpVideoReceiver::~RtpVideoReceiver() {} + +void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { + if (NAL_UNIT_TYPE::NALU == rtp_packet.NalUnitType()) { + // compelete_video_frame_queue_.push( + // VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); + if (on_receive_complete_frame_) { + on_receive_complete_frame_( + VideoFrame(rtp_packet.Payload(), rtp_packet.Size())); + } + } else if (NAL_UNIT_TYPE::FU_A == rtp_packet.NalUnitType()) { + incomplete_frame_list_[rtp_packet.SequenceNumber()] = rtp_packet; + bool complete = CheckIsFrameCompleted(rtp_packet); + } +} + +bool RtpVideoReceiver::CheckIsFrameCompleted(RtpPacket& rtp_packet) { + if (rtp_packet.FuAEnd()) { + size_t complete_frame_size = 0; + uint16_t end_seq = rtp_packet.SequenceNumber(); + if (incomplete_frame_list_.size() == end_seq) { + return true; + } + + while (end_seq--) { + auto it = incomplete_frame_list_.find(end_seq); + complete_frame_size += it->second.PayloadSize(); + if (it == incomplete_frame_list_.end()) { + return false; + } else if (!it->second.FuAStart()) { + continue; + } else if (it->second.FuAStart()) { + if (!nv12_data_) { + nv12_data_ = new uint8_t[NV12_BUFFER_SIZE]; + } + + size_t complete_frame_size = 0; + for (size_t start = it->first; start <= rtp_packet.SequenceNumber(); + start++) { + memcpy(nv12_data_ + complete_frame_size, + incomplete_frame_list_[start].Payload(), + incomplete_frame_list_[start].PayloadSize()); + + complete_frame_size += incomplete_frame_list_[start].PayloadSize(); + incomplete_frame_list_.erase(start); + } + + // compelete_video_frame_queue_.push( + // VideoFrame(nv12_data_, complete_frame_size)); + + // LOG_ERROR("Size of compelete_video_frame_queue_ [{}]", + // compelete_video_frame_queue_.size()); + + if (on_receive_complete_frame_) { + on_receive_complete_frame_( + VideoFrame(nv12_data_, complete_frame_size)); + } + return true; + } else { + LOG_WARN("What happened?") + return false; + } + } + + return true; + } + return false; +} \ No newline at end of file diff --git a/src/rtp/rtp_video_receiver.h b/src/rtp/rtp_video_receiver.h new file mode 100644 index 0000000..05490f7 --- /dev/null +++ b/src/rtp/rtp_video_receiver.h @@ -0,0 +1,37 @@ +#ifndef _RTP_VIDEO_RECEIVER_H_ +#define _RTP_VIDEO_RECEIVER_H_ + +#include +#include +#include + +#include "frame.h" +#include "rtp_video_session.h" + +class RtpVideoReceiver { + public: + RtpVideoReceiver(); + ~RtpVideoReceiver(); + + public: + void InsertRtpPacket(RtpPacket& rtp_packet); + + void SetOnReceiveCompleteFrame( + std::function on_receive_complete_frame) { + on_receive_complete_frame_ = on_receive_complete_frame; + } + + private: + bool CheckIsFrameCompleted(RtpPacket& rtp_packet); + + // private: + // void OnReceiveFrame(uint8_t* payload) {} + + private: + std::map incomplete_frame_list_; + std::queue compelete_video_frame_queue_; + uint8_t* nv12_data_ = nullptr; + std::function on_receive_complete_frame_ = nullptr; +}; + +#endif diff --git a/src/rtp/rtp_session.cpp b/src/rtp/rtp_video_session.cpp similarity index 92% rename from src/rtp/rtp_session.cpp rename to src/rtp/rtp_video_session.cpp index 6af2cc6..ba0bdb4 100644 --- a/src/rtp/rtp_session.cpp +++ b/src/rtp/rtp_video_session.cpp @@ -1,4 +1,4 @@ -#include "rtp_session.h" +#include "rtp_video_session.h" #include @@ -9,14 +9,14 @@ #define FU_A 28 #define FU_B 29 -RtpSession ::RtpSession(PAYLOAD_TYPE payload_type) +RtpVideoSession ::RtpVideoSession(PAYLOAD_TYPE payload_type) : version_(RTP_VERSION), has_padding_(false), has_extension_(false), payload_type_(payload_type), sequence_number_(0) {} -RtpSession ::~RtpSession() { +RtpVideoSession ::~RtpVideoSession() { if (extension_data_) { delete extension_data_; extension_data_ = nullptr; @@ -28,8 +28,8 @@ RtpSession ::~RtpSession() { // } } -void RtpSession::Encode(uint8_t* buffer, size_t size, - std::vector& packets) { +void RtpVideoSession::Encode(uint8_t* buffer, size_t size, + std::vector& packets) { // if (!rtp_packet_) { // rtp_packet_ = new RtpPacket(); // } @@ -118,7 +118,7 @@ void RtpSession::Encode(uint8_t* buffer, size_t size, } } -size_t RtpSession::Decode(RtpPacket& packet, uint8_t* payload) { +size_t RtpVideoSession::Decode(RtpPacket& packet, uint8_t* payload) { // if ((packet.Buffer()[13] >> 6) & 0x01) { // LOG_ERROR("End bit!!!!!!!!!!!!!!!"); // } diff --git a/src/rtp/rtp_session.h b/src/rtp/rtp_video_session.h similarity index 76% rename from src/rtp/rtp_session.h rename to src/rtp/rtp_video_session.h index 6b7d973..fb318a2 100644 --- a/src/rtp/rtp_session.h +++ b/src/rtp/rtp_video_session.h @@ -1,5 +1,5 @@ -#ifndef _RTP_SESSION_H_ -#define _RTP_SESSION_H_ +#ifndef _RTP_VIDEO_SESSION_H_ +#define _RTP_VIDEO_SESSION_H_ #include @@ -7,17 +7,18 @@ #include "rtp_packet.h" -class RtpSession - -{ +class RtpVideoSession { public: - RtpSession(PAYLOAD_TYPE payload_type); - ~RtpSession(); + RtpVideoSession(PAYLOAD_TYPE payload_type); + ~RtpVideoSession(); public: void Encode(uint8_t* buffer, size_t size, std::vector& packets); size_t Decode(RtpPacket& packet, uint8_t* payload); + // protected: + // void OnReceiveFrame(uint8_t* payload) = 0; + private: uint32_t version_ = 0; bool has_padding_ = false; diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 9cfb2d1..b95c587 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -37,9 +37,14 @@ IceTransmission::~IceTransmission() { kcp_update_thread_ = nullptr; } - if (video_rtp_session_) { - delete video_rtp_session_; - video_rtp_session_ = nullptr; + if (rtp_video_session_) { + delete rtp_video_session_; + rtp_video_session_ = nullptr; + } + + if (rtp_video_receiver_) { + delete rtp_video_receiver_; + rtp_video_receiver_ = nullptr; } if (rtp_payload_) { @@ -54,76 +59,16 @@ IceTransmission::~IceTransmission() { } int IceTransmission::InitIceTransmission(std::string &ip, int port) { - kcp_update_thread_ = new std::thread([this]() { - int ret = 0; - ikcpcb *kcp = ikcp_create(0x11223344, (void *)this); - ikcp_setoutput( - kcp, [](const char *buf, int len, ikcpcb *kcp, void *user) -> int { - IceTransmission *ice_transmission_obj = - static_cast(user); - return ice_transmission_obj->ice_agent_->Send(buf, len); - }); - ikcp_wndsize(kcp, 2048, 2048); - ikcp_nodelay(kcp, 1, 20, 2, 1); - // ikcp_setmtu(kcp, 4000); - // kcp_->rx_minrto = 10; - // kcp_->fastresend = 1; + rtp_video_session_ = new RtpVideoSession(PAYLOAD_TYPE::H264); + rtp_video_receiver_ = new RtpVideoReceiver(); + rtp_video_receiver_->SetOnReceiveCompleteFrame( + [this](VideoFrame &video_frame) -> void { + LOG_ERROR("OnReceiveCompleteFrame {}", video_frame.Size()); + on_receive_ice_msg_cb_((const char *)video_frame.Buffer(), + video_frame.Size(), remote_user_id_.data(), + remote_user_id_.size()); + }); - while (!kcp_stop_) { - auto clock = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - ikcp_update(kcp, clock); - - if (!send_ringbuffer_.isEmpty()) { - // Data buffer; - RtpPacket buffer; - if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) { - send_ringbuffer_.pop(buffer); - - ice_agent_->Send((const char *)buffer.Buffer(), buffer.Size()); - } - } - - if (!recv_ringbuffer_.isEmpty()) { - // RtpPacket packet; - recv_ringbuffer_.pop(pop_packet_); - if (!rtp_payload_) { - rtp_payload_ = new uint8_t[1400]; - } - size_t rtp_payload_size = - video_rtp_session_->Decode(pop_packet_, rtp_payload_); - - on_receive_ice_msg_cb_((const char *)rtp_payload_, rtp_payload_size, - remote_user_id_.data(), remote_user_id_.size()); - } - - // int len = 0; - // int total_len = 0; - // while (1) { - // len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000); - - // total_len += len; - - // if (len <= 0) { - // if (on_receive_ice_msg_cb_ && total_len > 0) { - // LOG_ERROR("Receive size: {}", total_len); - // on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len, - // remote_user_id_.data(), - // remote_user_id_.size()); - // } - // break; - // } - // } - - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - } - - ikcp_release(kcp); - }); - - video_rtp_session_ = new RtpSession(PAYLOAD_TYPE::H264); ice_agent_ = new IceAgent(ip, port); ice_agent_->CreateIceAgent( @@ -167,30 +112,12 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { IceTransmission *ice_transmission_obj = static_cast(user_ptr); if (ice_transmission_obj) { - // ice_transmission_obj->recv_ringbuffer_.push( - // std::move(Data(data, size))); - RtpPacket packet((uint8_t *)data, size); - ice_transmission_obj->recv_ringbuffer_.push(packet); - - // int ret = ikcp_input(ice_transmission_obj->kcp_, data, size); - // ikcp_update(ice_transmission_obj->kcp_, iclock()); - // LOG_ERROR("ikcp_input {}", ret); - // auto clock = - // std::chrono::duration_cast( - // std::chrono::system_clock::now().time_since_epoch()) - // .count(); - - // ikcp_update(ice_transmission_obj->kcp_, clock); - + ice_transmission_obj->rtp_video_receiver_->InsertRtpPacket(packet); // ice_transmission_obj->on_receive_ice_msg_cb_( - // ice_transmission_obj->kcp_complete_buffer_, total_len, + // (const char *)packet.Payload(), packet.PayloadSize(), // ice_transmission_obj->remote_user_id_.data(), // ice_transmission_obj->remote_user_id_.size()); - - // ice_transmission_obj->on_receive_ice_msg_cb_( - // data, size, ice_transmission_obj->remote_user_id_.data(), - // ice_transmission_obj->remote_user_id_.size()); } } }, @@ -283,30 +210,15 @@ int IceTransmission::SendAnswer() { int IceTransmission::SendData(const char *data, size_t size) { if (JUICE_STATE_COMPLETED == state_) { - // send_ringbuffer_.push(std::move(Data(data, size))); - std::vector packets; - // for (size_t num = 0, next_packet_size = 0; num * MAX_NALU_LEN < size; - // num++) { - // next_packet_size = size - num * MAX_NALU_LEN; - // if (next_packet_size < MAX_NALU_LEN) { - // video_rtp_session_->Encode((uint8_t *)(data + num * MAX_NALU_LEN), - // next_packet_size, packets); - // } else { - // video_rtp_session_->Encode((uint8_t *)(data + num * MAX_NALU_LEN), - // MAX_NALU_LEN, packets); - // } - // } - - video_rtp_session_->Encode((uint8_t *)data, size, packets); - + rtp_video_session_->Encode((uint8_t *)data, size, packets); for (auto &packet : packets) { - send_ringbuffer_.push(packet); + ice_agent_->Send((const char *)packet.Buffer(), packet.Size()); } // std::vector packets = - // video_rtp_session_->Encode((uint8_t *)(data), size); + // rtp_video_session_->Encode((uint8_t *)(data), size); // send_ringbuffer_.insert(send_ringbuffer_.end(), packets.begin(), // packets.end()); diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index 22f094b..4740767 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -7,8 +7,10 @@ #include "ice_agent.h" #include "ringbuffer.h" #include "rtp_packet.h" -#include "rtp_session.h" +#include "rtp_video_receiver.h" +#include "rtp_video_session.h" #include "ws_transmission.h" + class IceTransmission { public: IceTransmission( @@ -77,7 +79,8 @@ class IceTransmission { std::thread *kcp_update_thread_ = nullptr; private: - RtpSession *video_rtp_session_ = nullptr; + RtpVideoSession *rtp_video_session_ = nullptr; + RtpVideoReceiver *rtp_video_receiver_ = nullptr; uint8_t *rtp_payload_ = nullptr; RtpPacket pop_packet_; }; diff --git a/xmake.lua b/xmake.lua index 27c85e5..63e272b 100644 --- a/xmake.lua +++ b/xmake.lua @@ -47,10 +47,15 @@ target("inih") target("ringbuffer") set_kind("headeronly") add_includedirs("src/ringbuffer", {public = true}) + +target("frame") + set_kind("static") + add_files("src/frame/*.cpp") + add_includedirs("src/frame", {public = true}) target("rtp") set_kind("static") - add_deps("log") + add_deps("log", "frame") add_files("src/rtp/*.cpp") add_includedirs("src/rtp", {public = true})