From 58c73f10c61f17f68dd928aaa6e28e2eabcd6650 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Thu, 7 Sep 2023 15:46:26 +0800 Subject: [PATCH] Nalu slices test pass --- .../remote_desk_client/remote_desk_client.cpp | 2 +- src/pc/peer_connection.cpp | 17 ++- src/pc/peer_connection.h | 3 + src/rtp/rtp_packet.cpp | 39 ++++--- src/rtp/rtp_packet.h | 4 +- src/rtp/rtp_session.cpp | 80 +++++++------- src/rtp/rtp_session.h | 6 +- src/transmission/ice_transmission.cpp | 102 ++++++++++-------- src/transmission/ice_transmission.h | 5 +- 9 files changed, 147 insertions(+), 111 deletions(-) diff --git a/application/remote_desk/remote_desk_client/remote_desk_client.cpp b/application/remote_desk/remote_desk_client/remote_desk_client.cpp index b9d6e9f..4ceb65f 100644 --- a/application/remote_desk/remote_desk_client/remote_desk_client.cpp +++ b/application/remote_desk/remote_desk_client/remote_desk_client.cpp @@ -130,7 +130,7 @@ std::string GetMac() { int main() { Params params; - params.cfg_path = "config.ini"; + params.cfg_path = "../../../../config/config.ini"; params.on_receive_buffer = GuestReceiveBuffer; std::string transmission_id = "000000"; diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index f0e01ee..ed2be0c 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -57,6 +57,14 @@ int PeerConnection::Init(PeerConnectionParams params, } }; + on_ice_status_change_ = [this](std::string ice_status) { + if ("JUICE_STATE_COMPLETED" == ice_status) { + ice_ready_ = true; + } else { + ice_ready_ = false; + } + }; + ws_transport_ = new WsTransmission(on_receive_ws_msg_); uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; if (ws_transport_) { @@ -142,9 +150,9 @@ void PeerConnection::ProcessSignal(const std::string &signal) { LOG_INFO("]"); for (auto &remote_user_id : user_id_list_) { - ice_transmission_list_[remote_user_id] = - new IceTransmission(true, transmission_id, user_id_, remote_user_id, - ws_transport_, on_receive_ice_msg_); + ice_transmission_list_[remote_user_id] = new IceTransmission( + true, transmission_id, user_id_, remote_user_id, ws_transport_, + on_receive_ice_msg_, on_ice_status_change_); ice_transmission_list_[remote_user_id]->InitIceTransmission( cfg_stun_server_ip_, stun_server_port_); ice_transmission_list_[remote_user_id]->JoinTransmission(); @@ -179,7 +187,7 @@ void PeerConnection::ProcessSignal(const std::string &signal) { ice_transmission_list_[remote_user_id] = new IceTransmission( false, transmission_id, user_id_, remote_user_id, ws_transport_, - on_receive_ice_msg_); + on_receive_ice_msg_, on_ice_status_change_); ice_transmission_list_[remote_user_id]->InitIceTransmission( cfg_stun_server_ip_, stun_server_port_); @@ -240,6 +248,7 @@ int PeerConnection::Destroy() { SignalStatus PeerConnection::GetSignalStatus() { return signal_status_; } int PeerConnection::SendVideoData(const char *data, size_t size) { + if (!ice_ready_) return -1; int ret = Encode((uint8_t *)data, size); if (0 != ret) { LOG_ERROR("Encode failed"); diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index 6e0f866..5808d75 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -68,6 +68,9 @@ class PeerConnection : public VideoEncoder, VideoDecoder { std::function on_receive_ws_msg_ = nullptr; std::function on_receive_ice_msg_ = nullptr; + std::function on_ice_status_change_ = nullptr; + bool ice_ready_ = false; + unsigned int ws_connection_id_ = 0; std::string user_id_ = ""; std::string transmission_id_ = ""; diff --git a/src/rtp/rtp_packet.cpp b/src/rtp/rtp_packet.cpp index a989300..0dd6ebe 100644 --- a/src/rtp/rtp_packet.cpp +++ b/src/rtp/rtp_packet.cpp @@ -2,9 +2,10 @@ #include -RtpPacket::RtpPacket() - : buffer_(new uint8_t[MAX_NALU_LEN]), size_(MAX_NALU_LEN) { - memset(buffer_, 0, MAX_NALU_LEN); +#include "log.h" + +RtpPacket::RtpPacket() : buffer_(new uint8_t[DEFAULT_MTU]), size_(DEFAULT_MTU) { + memset(buffer_, 0, DEFAULT_MTU); } RtpPacket::RtpPacket(const uint8_t *buffer, size_t size) { @@ -23,8 +24,19 @@ RtpPacket::RtpPacket(const RtpPacket &rtp_packet) { } } +RtpPacket::RtpPacket(RtpPacket &&rtp_packet) + : buffer_((uint8_t *)std::move(rtp_packet.buffer_)), + size_(rtp_packet.size_) { + rtp_packet.buffer_ = nullptr; + rtp_packet.size_ = 0; +} + RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) { - if (&rtp_packet != this && rtp_packet.size_ > 0) { + if (&rtp_packet != this) { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } buffer_ = new uint8_t[rtp_packet.size_]; memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_); size_ = rtp_packet.size_; @@ -32,8 +44,15 @@ RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) { return *this; } -RtpPacket::RtpPacket(RtpPacket &&rtp_packet) - : buffer_((uint8_t *)rtp_packet.Buffer()), size_(rtp_packet.Size()) {} +RtpPacket &RtpPacket::operator=(RtpPacket &&rtp_packet) { + if (&rtp_packet != this) { + buffer_ = std::move(rtp_packet.buffer_); + rtp_packet.buffer_ = nullptr; + size_ = rtp_packet.size_; + rtp_packet.size_ = 0; + } + return *this; +} RtpPacket::~RtpPacket() { if (buffer_) { @@ -47,11 +66,6 @@ RtpPacket::~RtpPacket() { extension_data_ = nullptr; } extension_len_ = 0; - - if (payload_) { - delete payload_; - payload_ = nullptr; - } payload_size_ = 0; } @@ -227,10 +241,7 @@ size_t RtpPacket::DecodeH264Nalu(uint8_t *payload) { nalu_header_.nal_unit_type = (buffer_[12 + payload_offset]) & 0x31; payload_size_ = size_ - (13 + payload_offset); - // payload_ = new uint8_t[payload_size_]; - // memcpy(payload_, buffer_ + 12 + payload_offset, payload_size_); payload_ = buffer_ + 13 + payload_offset; - memcpy(payload, payload_, payload_size_); return payload_size_; } \ No newline at end of file diff --git a/src/rtp/rtp_packet.h b/src/rtp/rtp_packet.h index b4b1c3c..fb29f74 100644 --- a/src/rtp/rtp_packet.h +++ b/src/rtp/rtp_packet.h @@ -61,6 +61,7 @@ // |F|NRI| Type |S|E|R| Type | // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +#define DEFAULT_MTU 1500 #define MAX_NALU_LEN 1400 typedef enum { H264 = 96, OPUS = 97, USER_DEFINED = 127 } PAYLOAD_TYPE; @@ -69,8 +70,9 @@ class RtpPacket { RtpPacket(); RtpPacket(const uint8_t *buffer, size_t size); RtpPacket(const RtpPacket &rtp_packet); - RtpPacket &operator=(const RtpPacket &rtp_packet); RtpPacket(RtpPacket &&rtp_packet); + RtpPacket &operator=(const RtpPacket &rtp_packet); + RtpPacket &operator=(RtpPacket &&rtp_packet); ~RtpPacket(); diff --git a/src/rtp/rtp_session.cpp b/src/rtp/rtp_session.cpp index f78ef35..52532cb 100644 --- a/src/rtp/rtp_session.cpp +++ b/src/rtp/rtp_session.cpp @@ -19,49 +19,50 @@ RtpSession ::~RtpSession() { extension_data_ = nullptr; } - if (rtp_packet_) { - delete rtp_packet_; - rtp_packet_ = nullptr; - } + // if (rtp_packet_) { + // delete rtp_packet_; + // rtp_packet_ = nullptr; + // } } -std::vector RtpSession::Encode(uint8_t* buffer, size_t size) { - if (!rtp_packet_) { - rtp_packet_ = new RtpPacket(); - } +void RtpSession::Encode(uint8_t* buffer, size_t size, + std::vector& packets) { + // if (!rtp_packet_) { + // rtp_packet_ = new RtpPacket(); + // } - std::vector packets; + RtpPacket rtp_packet; if (size <= MAX_NALU_LEN) { - rtp_packet_->SetVerion(version_); - rtp_packet_->SetHasPadding(has_padding_); - rtp_packet_->SetHasExtension(has_extension_); - rtp_packet_->SetMarker(1); - rtp_packet_->SetPayloadType(PAYLOAD_TYPE(payload_type_)); - rtp_packet_->SetSequenceNumber(sequence_number_++); + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(1); + rtp_packet.SetPayloadType(PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); timestamp_ = std::chrono::high_resolution_clock::now().time_since_epoch().count(); - rtp_packet_->SetTimestamp(timestamp_); - rtp_packet_->SetSsrc(ssrc_); + rtp_packet.SetTimestamp(timestamp_); + rtp_packet.SetSsrc(ssrc_); if (!csrcs_.empty()) { - rtp_packet_->SetCsrcs(csrcs_); + rtp_packet.SetCsrcs(csrcs_); } if (has_extension_) { - rtp_packet_->SetExtensionProfile(extension_profile_); - rtp_packet_->SetExtensionData(extension_data_, extension_len_); + rtp_packet.SetExtensionProfile(extension_profile_); + rtp_packet.SetExtensionData(extension_data_, extension_len_); } RtpPacket::NALU_HEADER nalu_header; nalu_header.forbidden_bit = 0; nalu_header.nal_reference_idc = 1; nalu_header.nal_unit_type = 1; - rtp_packet_->SetNaluHeader(nalu_header); + rtp_packet.SetNaluHeader(nalu_header); - rtp_packet_->EncodeH264Nalu(buffer, size); - packets.push_back(*rtp_packet_); + rtp_packet.EncodeH264Nalu(buffer, size); + packets.emplace_back(rtp_packet); } else { size_t last_packet_size = size % MAX_NALU_LEN; @@ -69,36 +70,33 @@ std::vector RtpSession::Encode(uint8_t* buffer, size_t size) { for (size_t index = 0; index * MAX_NALU_LEN < size + MAX_NALU_LEN; index++) { - rtp_packet_->SetVerion(version_); - rtp_packet_->SetHasPadding(has_padding_); - rtp_packet_->SetHasExtension(has_extension_); - rtp_packet_->SetMarker(1); - rtp_packet_->SetPayloadType(PAYLOAD_TYPE(payload_type_)); - rtp_packet_->SetSequenceNumber(sequence_number_++); + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(1); + rtp_packet.SetPayloadType(PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); timestamp_ = std::chrono::high_resolution_clock::now().time_since_epoch().count(); - rtp_packet_->SetTimestamp(timestamp_); - rtp_packet_->SetSsrc(ssrc_); + rtp_packet.SetTimestamp(timestamp_); + rtp_packet.SetSsrc(ssrc_); if (!csrcs_.empty()) { - rtp_packet_->SetCsrcs(csrcs_); + rtp_packet.SetCsrcs(csrcs_); } if (has_extension_) { - rtp_packet_->SetExtensionProfile(extension_profile_); - rtp_packet_->SetExtensionData(extension_data_, extension_len_); + rtp_packet.SetExtensionProfile(extension_profile_); + rtp_packet.SetExtensionData(extension_data_, extension_len_); } - rtp_packet_->Encode(buffer, size); - packets.push_back(*rtp_packet_); + rtp_packet.Encode(buffer, size); + packets.emplace_back(rtp_packet); } } - - return std::move(packets); } -size_t RtpSession::Decode(uint8_t* buffer, size_t size, uint8_t* payload) { - *rtp_packet_ = std::move(RtpPacket(buffer, size)); - return rtp_packet_->DecodeH264Nalu(payload); +size_t RtpSession::Decode(RtpPacket& packet, uint8_t* payload) { + return packet.DecodeH264Nalu(payload); } \ No newline at end of file diff --git a/src/rtp/rtp_session.h b/src/rtp/rtp_session.h index c017b1b..4b57a34 100644 --- a/src/rtp/rtp_session.h +++ b/src/rtp/rtp_session.h @@ -15,8 +15,8 @@ class RtpSession ~RtpSession(); public: - std::vector Encode(uint8_t* buffer, size_t size); - size_t Decode(uint8_t* buffer, size_t size, uint8_t* payload); + void Encode(uint8_t* buffer, size_t size, std::vector& packets); + size_t Decode(RtpPacket& packet, uint8_t* payload); private: uint32_t version_ = 0; @@ -35,7 +35,7 @@ class RtpSession uint8_t* extension_data_ = nullptr; private: - RtpPacket* rtp_packet_ = nullptr; + // RtpPacket* rtp_packet_ = nullptr; }; #endif \ No newline at end of file diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 43d4222..6fa2e85 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -20,15 +20,23 @@ IceTransmission::IceTransmission( bool offer_peer, std::string &transmission_id, std::string &user_id, std::string &remote_user_id, WsTransmission *ice_ws_transmission, std::function - on_receive_ice_msg) + on_receive_ice_msg, + std::function on_ice_status_change) : offer_peer_(offer_peer), transmission_id_(transmission_id), user_id_(user_id), remote_user_id_(remote_user_id), ice_ws_transport_(ice_ws_transmission), - on_receive_ice_msg_cb_(on_receive_ice_msg) {} + on_receive_ice_msg_cb_(on_receive_ice_msg), + on_ice_status_change_(on_ice_status_change) {} IceTransmission::~IceTransmission() { + if (kcp_update_thread_ && kcp_update_thread_->joinable()) { + kcp_update_thread_->join(); + delete kcp_update_thread_; + kcp_update_thread_ = nullptr; + } + if (video_rtp_session_) { delete video_rtp_session_; video_rtp_session_ = nullptr; @@ -43,13 +51,6 @@ IceTransmission::~IceTransmission() { delete ice_agent_; ice_agent_ = nullptr; } - - if (kcp_update_thread_ && kcp_update_thread_->joinable()) { - kcp_update_thread_->join(); - } - - delete kcp_update_thread_; - kcp_update_thread_ = nullptr; } int IceTransmission::InitIceTransmission(std::string &ip, int port) { @@ -80,44 +81,43 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { RtpPacket buffer; if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) { send_ringbuffer_.pop(buffer); - // ret = ikcp_send(kcp, buffer.data(), buffer.size()); - ret = ikcp_send(kcp, (const char *)buffer.Buffer(), buffer.Size()); + + ice_agent_->Send((const char *)buffer.Buffer(), buffer.Size()); } } if (!recv_ringbuffer_.isEmpty()) { - // Data buffer; - RtpPacket buffer; - recv_ringbuffer_.pop(buffer); + // RtpPacket packet; + recv_ringbuffer_.pop(pop_packet_); if (!rtp_payload_) { rtp_payload_ = new uint8_t[1400]; } - size_t rtp_payload_size = video_rtp_session_->Decode( - (uint8_t *)buffer.Buffer(), buffer.Size(), rtp_payload_); - // ret = ikcp_input(kcp, buffer.data(), buffer.size()); - // ret = ikcp_input(kcp, (const char *)buffer.Buffer(), buffer.Size()); - ret = ikcp_input(kcp, (const char *)rtp_payload_, rtp_payload_size); + size_t rtp_payload_size = + video_rtp_session_->Decode(pop_packet_, rtp_payload_); + + on_receive_ice_msg_cb_((const char *)rtp_payload_, rtp_payload_size, + remote_user_id_.data(), remote_user_id_.size()); } - int len = 0; - int total_len = 0; - while (1) { - len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000); + // int len = 0; + // int total_len = 0; + // while (1) { + // len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000); - total_len += len; + // total_len += len; - if (len <= 0) { - if (on_receive_ice_msg_cb_ && total_len > 0) { - LOG_ERROR("Receive size: {}", total_len); - on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len, - remote_user_id_.data(), - remote_user_id_.size()); - } - break; - } - } + // if (len <= 0) { + // if (on_receive_ice_msg_cb_ && total_len > 0) { + // LOG_ERROR("Receive size: {}", total_len); + // on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len, + // remote_user_id_.data(), + // remote_user_id_.size()); + // } + // break; + // } + // } - // std::this_thread::sleep_for(std::chrono::milliseconds(1)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } ikcp_release(kcp); @@ -134,6 +134,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { LOG_INFO("[{}->{}] state_change: {}", ice_transmission_obj->user_id_, ice_transmission_obj->remote_user_id_, ice_status[state]); ice_transmission_obj->state_ = state; + ice_transmission_obj->on_ice_status_change_(ice_status[state]); } else { LOG_INFO("state_change: {}", ice_status[state]); } @@ -284,20 +285,29 @@ int IceTransmission::SendData(const char *data, size_t size) { if (JUICE_STATE_COMPLETED == state_) { // send_ringbuffer_.push(std::move(Data(data, size))); - for (int num = 0; num * 1400 < size + 1400; num++) { - std::vector packets = - video_rtp_session_->Encode((uint8_t *)(data + num * 1400), 1400); + std::vector packets; - for (auto &packet : packets) { - send_ringbuffer_.push(packet); + for (size_t num = 0, next_packet_size = 0; num * MAX_NALU_LEN < size; + num++) { + next_packet_size = size - num * MAX_NALU_LEN; + if (next_packet_size < MAX_NALU_LEN) { + video_rtp_session_->Encode((uint8_t *)(data + num * MAX_NALU_LEN), + next_packet_size, packets); + } else { + video_rtp_session_->Encode((uint8_t *)(data + num * MAX_NALU_LEN), + MAX_NALU_LEN, packets); } - - // std::vector packets = - // video_rtp_session_->Encode((uint8_t *)(data), size); - - // send_ringbuffer_.insert(send_ringbuffer_.end(), packets.begin(), - // packets.end()); } + + for (auto &packet : packets) { + send_ringbuffer_.push(packet); + } + + // std::vector packets = + // video_rtp_session_->Encode((uint8_t *)(data), size); + + // send_ringbuffer_.insert(send_ringbuffer_.end(), packets.begin(), + // packets.end()); } return 0; } \ No newline at end of file diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index 077fc84..22f094b 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -15,7 +15,8 @@ class IceTransmission { bool offer_peer, std::string &transmission_id, std::string &user_id, std::string &remote_user_id, WsTransmission *ice_ws_transmission, std::function - on_receive_ice_msg); + on_receive_ice_msg, + std::function on_ice_status_change); ~IceTransmission(); @@ -53,6 +54,7 @@ class IceTransmission { CongestionControl *congestion_control_ = nullptr; std::function on_receive_ice_msg_cb_ = nullptr; + std::function on_ice_status_change_ = nullptr; std::string local_sdp_; std::string remote_sdp_; std::string local_candidates_; @@ -77,6 +79,7 @@ class IceTransmission { private: RtpSession *video_rtp_session_ = nullptr; uint8_t *rtp_payload_ = nullptr; + RtpPacket pop_packet_; }; #endif \ No newline at end of file