From a0abb7455ca8b10d13bc19cc0066cdd5b961c67d Mon Sep 17 00:00:00 2001 From: dijunkun Date: Wed, 13 Sep 2023 17:31:02 +0800 Subject: [PATCH] Implementation for user data sending --- .../remote_desk_client/remote_desk_client.cpp | 56 ++++--- src/interface/x.h | 2 +- src/media/video/encode/nvcodec/nv_encoder.cpp | 10 +- src/media/video/encode/nvcodec/nv_encoder.h | 8 +- src/pc/peer_connection.cpp | 26 ++-- src/pc/peer_connection.h | 3 - src/rtc/x_inner.cpp | 2 +- src/rtp/rtp_codec.cpp | 124 +++++++++------- src/rtp/rtp_data_receiver.cpp | 5 + src/rtp/rtp_data_receiver.h | 13 ++ src/rtp/rtp_data_sender.cpp | 140 ++++++++++++++++++ src/rtp/rtp_data_sender.h | 41 +++++ src/rtp/rtp_packet.h | 2 +- src/rtp/rtp_statistics.cpp | 31 ++++ src/rtp/rtp_statistics.h | 23 +++ src/rtp/rtp_transceiver.cpp | 5 + src/rtp/rtp_transceiver.h | 18 +++ src/rtp/rtp_video_receive_statistics.cpp | 18 --- src/rtp/rtp_video_receive_statistics.h | 21 --- src/rtp/rtp_video_receiver.cpp | 29 ++-- src/rtp/rtp_video_receiver.h | 10 +- src/rtp/rtp_video_send_statistics.cpp | 18 --- src/rtp/rtp_video_send_statistics.h | 21 --- src/rtp/rtp_video_sender.cpp | 35 +++-- src/rtp/rtp_video_sender.h | 11 +- src/transmission/ice_transmission.cpp | 60 ++++++-- src/transmission/ice_transmission.h | 14 +- 27 files changed, 513 insertions(+), 233 deletions(-) create mode 100644 src/rtp/rtp_data_receiver.cpp create mode 100644 src/rtp/rtp_data_receiver.h create mode 100644 src/rtp/rtp_data_sender.cpp create mode 100644 src/rtp/rtp_data_sender.h create mode 100644 src/rtp/rtp_statistics.cpp create mode 100644 src/rtp/rtp_statistics.h create mode 100644 src/rtp/rtp_transceiver.cpp create mode 100644 src/rtp/rtp_transceiver.h delete mode 100644 src/rtp/rtp_video_receive_statistics.cpp delete mode 100644 src/rtp/rtp_video_receive_statistics.h delete mode 100644 src/rtp/rtp_video_send_statistics.cpp delete mode 100644 src/rtp/rtp_video_send_statistics.h diff --git a/application/remote_desk/remote_desk_client/remote_desk_client.cpp b/application/remote_desk/remote_desk_client/remote_desk_client.cpp index 42b59d7..3c1bf9c 100644 --- a/application/remote_desk/remote_desk_client/remote_desk_client.cpp +++ b/application/remote_desk/remote_desk_client/remote_desk_client.cpp @@ -32,6 +32,14 @@ std::string window_title = "Remote Desk Client"; #define QUIT_EVENT (SDL_USEREVENT + 2) int thread_exit = 0; +PeerPtr *peer = nullptr; + +typedef struct { + SDL_KeyCode key; + SDL_EventType action; + int px; + int py; +} RemoteAction; inline void FreshVideo() { sdlRect.x = 0; @@ -43,50 +51,60 @@ inline void FreshVideo() { SDL_RenderClear(sdlRenderer); SDL_RenderCopy(sdlRenderer, sdlTexture, NULL, &sdlRect); SDL_RenderPresent(sdlRenderer); - - // frame_count++; - // end_time = SDL_GetTicks(); - // elapsed_time = end_time - start_time; - // if (elapsed_time >= 1000) { - // fps = frame_count / (elapsed_time / 1000); - // frame_count = 0; - // window_title = "Remote Desk Client [FPS " + std::to_string(fps) + "]"; - // SDL_SetWindowTitle(screen, window_title.data()); - // start_time = end_time; - // } } inline int ProcessMouseKeyEven(SDL_Event &ev) { + RemoteAction remote_action; + remote_action.key = SDL_KeyCode(ev.key.keysym.sym); + remote_action.action = SDL_EventType(ev.type); + remote_action.px = ev.button.x; + remote_action.py = ev.button.y; + + SendData(peer, DATA_TYPE::DATA, (const char *)&remote_action, + sizeof(remote_action)); + if (SDL_KEYDOWN == ev.type) // SDL_KEYUP { + printf("SDLK_DOWN: %d\n", SDL_KeyCode(ev.key.keysym.sym)); if (SDLK_DOWN == ev.key.keysym.sym) { - printf("SDLK_DOWN ...............\n"); + printf("SDLK_DOWN \n"); } else if (SDLK_UP == ev.key.keysym.sym) { - printf("SDLK_UP ...............\n"); + printf("SDLK_UP \n"); } else if (SDLK_LEFT == ev.key.keysym.sym) { - printf("SDLK_LEFT ...............\n"); + printf("SDLK_LEFT \n"); } else if (SDLK_RIGHT == ev.key.keysym.sym) { - printf("SDLK_RIGHT ...............\n"); + printf("SDLK_RIGHT \n"); } } else if (SDL_MOUSEBUTTONDOWN == ev.type) { if (SDL_BUTTON_LEFT == ev.button.button) { int px = ev.button.x; int py = ev.button.y; - printf("SDL_MOUSEBUTTONDOWN x, y %d %d ...............\n", px, py); + printf("SDL_MOUSEBUTTONDOWN x, y %d %d \n", px, py); } else if (SDL_BUTTON_RIGHT == ev.button.button) { int px = ev.button.x; int py = ev.button.y; - printf("SDL_BUTTON_RIGHT x, y %d %d ...............\n", px, py); + printf("SDL_BUTTON_RIGHT x, y %d %d \n", px, py); + } + } else if (SDL_MOUSEBUTTONUP == ev.type) { + if (SDL_BUTTON_LEFT == ev.button.button) { + int px = ev.button.x; + int py = ev.button.y; + printf("SDL_MOUSEBUTTONUP x, y %d %d \n", px, py); + + } else if (SDL_BUTTON_RIGHT == ev.button.button) { + int px = ev.button.x; + int py = ev.button.y; + printf("SDL_MOUSEBUTTONUP x, y %d %d \n", px, py); } } else if (SDL_MOUSEMOTION == ev.type) { int px = ev.motion.x; int py = ev.motion.y; - printf("SDL_MOUSEMOTION x, y %d %d ...............\n", px, py); + printf("SDL_MOUSEMOTION x, y %d %d \n", px, py); } else if (SDL_QUIT == ev.type) { SDL_Event event; event.type = SDL_QUIT; @@ -146,7 +164,7 @@ int main() { std::string transmission_id = "000000"; std::string user_id = GetMac(); - PeerPtr *peer = CreatePeer(¶ms); + peer = CreatePeer(¶ms); JoinConnection(peer, transmission_id.c_str(), user_id.c_str()); if (SDL_Init(SDL_INIT_VIDEO)) { diff --git a/src/interface/x.h b/src/interface/x.h index d4209e8..a2dbaa2 100644 --- a/src/interface/x.h +++ b/src/interface/x.h @@ -5,7 +5,7 @@ #include enum ws_status { WS_CONNECTING = 0, WS_OPEN, WS_FAILED, WS_CLOSED, WS_UNKNOWN }; -enum DATA_TYPE { VIDEO = 0, AUDIO, USER }; +enum DATA_TYPE { VIDEO = 0, AUDIO, DATA }; #ifdef __cplusplus extern "C" { diff --git a/src/media/video/encode/nvcodec/nv_encoder.cpp b/src/media/video/encode/nvcodec/nv_encoder.cpp index 7879416..257310f 100644 --- a/src/media/video/encode/nvcodec/nv_encoder.cpp +++ b/src/media/video/encode/nvcodec/nv_encoder.cpp @@ -73,7 +73,9 @@ int VideoEncoder::Init() { return 0; } -int VideoEncoder::Encode(const uint8_t *pData, int nSize) { +int VideoEncoder::Encode( + const uint8_t *pData, int nSize, + std::function on_encoded_image) { if (!encoder_) { LOG_ERROR("Invalid encoder"); return -1; @@ -104,7 +106,11 @@ int VideoEncoder::Encode(const uint8_t *pData, int nSize) { } for (const auto &packet : encoded_packets_) { - OnEncodedImage((char *)packet.data(), packet.size()); + if (on_encoded_image) { + on_encoded_image((char *)packet.data(), packet.size()); + } else { + OnEncodedImage((char *)packet.data(), packet.size()); + } if (SAVE_ENCODER_STREAM) { fwrite((unsigned char *)packet.data(), 1, packet.size(), file_); diff --git a/src/media/video/encode/nvcodec/nv_encoder.h b/src/media/video/encode/nvcodec/nv_encoder.h index 3767220..499fbf4 100644 --- a/src/media/video/encode/nvcodec/nv_encoder.h +++ b/src/media/video/encode/nvcodec/nv_encoder.h @@ -1,6 +1,8 @@ #ifndef _NV_ENCODER_H_ #define _NV_ENCODER_H_ +#include + #include "NvEncoderCuda.h" class VideoEncoder { @@ -9,8 +11,12 @@ class VideoEncoder { ~VideoEncoder(); int Init(); - int Encode(const uint8_t* pData, int nSize); + int Encode( + const uint8_t* pData, int nSize, + std::function on_encoded_image); + virtual int OnEncodedImage(char* encoded_packets, size_t size); + void ForceIdr(); private: diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 87ef6b3..58033fb 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -252,37 +252,33 @@ int PeerConnection::SendVideoData(const char *data, size_t size) { return -1; } - int ret = Encode((uint8_t *)data, size); + int ret = Encode( + (uint8_t *)data, size, [this](char *encoded_frame, size_t size) -> int { + for (auto &ice_trans : ice_transmission_list_) { + // LOG_ERROR("H264 frame size: [{}]", size); + ice_trans.second->SendData(IceTransmission::DATA_TYPE::VIDEO, + encoded_frame, size); + } + return 0; + }); if (0 != ret) { LOG_ERROR("Encode failed"); return -1; } - // for (auto ice_trans : ice_transmission_list_) { - // ice_trans.second->SendData(data, size); - // } - return 0; -} - -int PeerConnection::OnEncodedImage(char *encoded_packets, size_t size) { - for (auto &ice_trans : ice_transmission_list_) { - // LOG_ERROR("H264 frame size: [{}]", size); - ice_trans.second->SendData(encoded_packets, size); - } - return 0; } int PeerConnection::SendAudioData(const char *data, size_t size) { for (auto &ice_trans : ice_transmission_list_) { - ice_trans.second->SendData(data, size); + ice_trans.second->SendData(IceTransmission::DATA_TYPE::AUDIO, data, size); } return 0; } int PeerConnection::SendUserData(const char *data, size_t size) { for (auto &ice_trans : ice_transmission_list_) { - ice_trans.second->SendData(data, size); + ice_trans.second->SendData(IceTransmission::DATA_TYPE::DATA, data, size); } return 0; } \ No newline at end of file diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index 41e0dc5..f130462 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -51,9 +51,6 @@ class PeerConnection : public VideoEncoder, VideoDecoder { int RequestTransmissionMemberList(const std::string &transmission_id); - private: - int OnEncodedImage(char *encoded_packets, size_t size) override; - private: std::string uri_ = ""; std::string cfg_signal_server_ip_; diff --git a/src/rtc/x_inner.cpp b/src/rtc/x_inner.cpp index fa0cc61..9e6f7dc 100644 --- a/src/rtc/x_inner.cpp +++ b/src/rtc/x_inner.cpp @@ -47,7 +47,7 @@ int SendData(PeerPtr *peer_ptr, DATA_TYPE data_type, const char *data, peer_ptr->peer_connection->SendVideoData(data, size); } else if (DATA_TYPE::AUDIO == data_type) { peer_ptr->peer_connection->SendAudioData(data, size); - } else if (DATA_TYPE::USER == data_type) { + } else if (DATA_TYPE::DATA == data_type) { peer_ptr->peer_connection->SendUserData(data, size); } return 0; diff --git a/src/rtp/rtp_codec.cpp b/src/rtp/rtp_codec.cpp index a89ee0c..4bf5acf 100644 --- a/src/rtp/rtp_codec.cpp +++ b/src/rtp/rtp_codec.cpp @@ -36,46 +36,12 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, RtpPacket rtp_packet; - if (size <= MAX_NALU_LEN) { - rtp_packet.SetVerion(version_); - rtp_packet.SetHasPadding(has_padding_); - rtp_packet.SetHasExtension(has_extension_); - rtp_packet.SetMarker(1); - rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); - rtp_packet.SetSequenceNumber(sequence_number_++); - - timestamp_ = - std::chrono::high_resolution_clock::now().time_since_epoch().count(); - rtp_packet.SetTimestamp(timestamp_); - rtp_packet.SetSsrc(ssrc_); - - if (!csrcs_.empty()) { - rtp_packet.SetCsrcs(csrcs_); - } - - if (has_extension_) { - rtp_packet.SetExtensionProfile(extension_profile_); - rtp_packet.SetExtensionData(extension_data_, extension_len_); - } - - RtpPacket::FU_INDICATOR fu_indicator; - fu_indicator.forbidden_bit = 0; - fu_indicator.nal_reference_idc = 1; - fu_indicator.nal_unit_type = NALU; - rtp_packet.SetFuIndicator(fu_indicator); - - rtp_packet.EncodeH264Nalu(buffer, size); - packets.emplace_back(rtp_packet); - - } else { - size_t last_packet_size = size % MAX_NALU_LEN; - size_t packet_num = size / MAX_NALU_LEN + (last_packet_size ? 1 : 0); - - for (size_t index = 0; index < packet_num; index++) { + if (RtpPacket::PAYLOAD_TYPE::H264 == payload_type_) { + if (size <= MAX_NALU_LEN) { rtp_packet.SetVerion(version_); rtp_packet.SetHasPadding(has_padding_); rtp_packet.SetHasExtension(has_extension_); - rtp_packet.SetMarker(index == packet_num ? 1 : 0); + rtp_packet.SetMarker(1); rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); rtp_packet.SetSequenceNumber(sequence_number_++); @@ -95,26 +61,78 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, RtpPacket::FU_INDICATOR fu_indicator; fu_indicator.forbidden_bit = 0; - fu_indicator.nal_reference_idc = 0; - fu_indicator.nal_unit_type = FU_A; - - RtpPacket::FU_HEADER fu_header; - fu_header.start = index == 0 ? 1 : 0; - fu_header.end = index == packet_num - 1 ? 1 : 0; - fu_header.remain_bit = 0; - fu_header.nal_unit_type = FU_A; - + fu_indicator.nal_reference_idc = 1; + fu_indicator.nal_unit_type = NALU; rtp_packet.SetFuIndicator(fu_indicator); - rtp_packet.SetFuHeader(fu_header); - if (index == packet_num - 1 && last_packet_size > 0) { - rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN, - last_packet_size); - } else { - rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN, MAX_NALU_LEN); - } + rtp_packet.EncodeH264Nalu(buffer, size); packets.emplace_back(rtp_packet); + + } else { + size_t last_packet_size = size % MAX_NALU_LEN; + size_t packet_num = size / MAX_NALU_LEN + (last_packet_size ? 1 : 0); + + for (size_t index = 0; index < packet_num; index++) { + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(index == packet_num ? 1 : 0); + rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); + + timestamp_ = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + rtp_packet.SetTimestamp(timestamp_); + rtp_packet.SetSsrc(ssrc_); + + if (!csrcs_.empty()) { + rtp_packet.SetCsrcs(csrcs_); + } + + if (has_extension_) { + rtp_packet.SetExtensionProfile(extension_profile_); + rtp_packet.SetExtensionData(extension_data_, extension_len_); + } + + RtpPacket::FU_INDICATOR fu_indicator; + fu_indicator.forbidden_bit = 0; + fu_indicator.nal_reference_idc = 0; + fu_indicator.nal_unit_type = FU_A; + + RtpPacket::FU_HEADER fu_header; + fu_header.start = index == 0 ? 1 : 0; + fu_header.end = index == packet_num - 1 ? 1 : 0; + fu_header.remain_bit = 0; + fu_header.nal_unit_type = FU_A; + + rtp_packet.SetFuIndicator(fu_indicator); + rtp_packet.SetFuHeader(fu_header); + + if (index == packet_num - 1 && last_packet_size > 0) { + rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN, + last_packet_size); + } else { + rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN, MAX_NALU_LEN); + } + packets.emplace_back(rtp_packet); + } } + } else if (RtpPacket::PAYLOAD_TYPE::DATA == payload_type_) { + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(1); + rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); + + timestamp_ = + std::chrono::high_resolution_clock::now().time_since_epoch().count(); + rtp_packet.SetTimestamp(timestamp_); + rtp_packet.SetSsrc(ssrc_); + + rtp_packet.Encode(buffer, size); + packets.emplace_back(rtp_packet); } } diff --git a/src/rtp/rtp_data_receiver.cpp b/src/rtp/rtp_data_receiver.cpp new file mode 100644 index 0000000..c4fd976 --- /dev/null +++ b/src/rtp/rtp_data_receiver.cpp @@ -0,0 +1,5 @@ +#include "rtp_data_receiver.h" + +RtpDataReceiver::RtpDataReceiver() {} + +RtpDataReceiver::~RtpDataReceiver() {} \ No newline at end of file diff --git a/src/rtp/rtp_data_receiver.h b/src/rtp/rtp_data_receiver.h new file mode 100644 index 0000000..22197e4 --- /dev/null +++ b/src/rtp/rtp_data_receiver.h @@ -0,0 +1,13 @@ +#ifndef _RTP_DATA_RECEIVER_H_ +#define _RTP_DATA_RECEIVER_H_ + +class RtpDataReceiver { + public: + RtpDataReceiver(); + ~RtpDataReceiver(); + + private: + /* data */ +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_data_sender.cpp b/src/rtp/rtp_data_sender.cpp new file mode 100644 index 0000000..2ff6923 --- /dev/null +++ b/src/rtp/rtp_data_sender.cpp @@ -0,0 +1,140 @@ +#include "rtp_data_sender.h" + +#include + +#include "log.h" + +#define RTCP_SR_INTERVAL 1000 + +RtpDataSender::RtpDataSender() {} + +RtpDataSender::~RtpDataSender() { + if (rtp_statistics_) { + rtp_statistics_->Stop(); + } +} + +void RtpDataSender::Enqueue(std::vector& rtp_packets) { + if (!rtp_statistics_) { + rtp_statistics_ = std::make_unique(); + rtp_statistics_->Start(); + } + + for (auto& rtp_packet : rtp_packets) { + rtp_packe_queue_.push(rtp_packet); + } +} + +void RtpDataSender::SetSendDataFunc( + std::function data_send_func) { + data_send_func_ = data_send_func; +} + +int RtpDataSender::SendRtpPacket(RtpPacket& rtp_packet) { + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); + return -1; + } + + int ret = 0; + + if (0 != + data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) { + LOG_ERROR("Send rtp packet failed"); + return -1; + } + + last_send_bytes_ += rtp_packet.Size(); + total_rtp_packets_sent_++; + total_rtp_payload_sent_ += rtp_packet.PayloadSize(); + + if (CheckIsTimeSendSR()) { + RtcpSenderReport rtcp_sr; + SenderInfo sender_info; + RtcpReportBlock report; + + auto duration = std::chrono::system_clock::now().time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration); + uint32_t seconds_u32 = static_cast( + std::chrono::duration_cast(duration).count()); + + uint32_t fraction_u32 = static_cast( + std::chrono::duration_cast(duration - seconds) + .count()); + + sender_info.sender_ssrc = 0x00; + sender_info.ntp_ts_msw = (uint32_t)seconds_u32; + sender_info.ntp_ts_lsw = (uint32_t)fraction_u32; + sender_info.rtp_ts = + std::chrono::high_resolution_clock::now().time_since_epoch().count() * + 1000000; + sender_info.sender_packet_count = total_rtp_packets_sent_; + sender_info.sender_octet_count = total_rtp_payload_sent_; + + rtcp_sr.SetSenderInfo(sender_info); + + report.source_ssrc = 0x00; + report.fraction_lost = 0; + report.cumulative_lost = 0; + report.extended_high_seq_num = 0; + report.jitter = 0; + report.lsr = 0; + report.dlsr = 0; + + rtcp_sr.SetReportBlock(report); + + rtcp_sr.Encode(); + + SendRtcpSR(rtcp_sr); + } + + return 0; +} + +int RtpDataSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) { + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); + return -1; + } + + if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) { + LOG_ERROR("Send SR failed"); + return -1; + } + + LOG_ERROR("Send SR"); + + return 0; +} + +bool RtpDataSender::CheckIsTimeSendSR() { + uint32_t now_ts = static_cast( + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + + if (now_ts - last_send_rtcp_sr_packet_ts_ >= RTCP_SR_INTERVAL) { + last_send_rtcp_sr_packet_ts_ = now_ts; + return true; + } else { + return false; + } +} + +bool RtpDataSender::Process() { + last_send_bytes_ = 0; + + for (size_t i = 0; i < 10; i++) + if (!rtp_packe_queue_.isEmpty()) { + RtpPacket rtp_packet; + rtp_packe_queue_.pop(rtp_packet); + SendRtpPacket(rtp_packet); + } + + if (rtp_statistics_) { + rtp_statistics_->UpdateSentBytes(last_send_bytes_); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + return true; +} \ No newline at end of file diff --git a/src/rtp/rtp_data_sender.h b/src/rtp/rtp_data_sender.h new file mode 100644 index 0000000..5ab4d7e --- /dev/null +++ b/src/rtp/rtp_data_sender.h @@ -0,0 +1,41 @@ +#ifndef _RTP_DATA_SENDER_H_ +#define _RTP_DATA_SENDER_H_ + +#include + +#include "ringbuffer.h" +#include "rtcp_sender_report.h" +#include "rtp_packet.h" +#include "rtp_statistics.h" +#include "thread_base.h" + +class RtpDataSender : public ThreadBase { + public: + RtpDataSender(); + ~RtpDataSender(); + + public: + void Enqueue(std::vector &rtp_packets); + void SetSendDataFunc(std::function data_send_func); + + private: + private: + int SendRtpPacket(RtpPacket &rtp_packet); + int SendRtcpSR(RtcpSenderReport &rtcp_sr); + + bool CheckIsTimeSendSR(); + + private: + bool Process() override; + + private: + std::function data_send_func_ = nullptr; + RingBuffer rtp_packe_queue_; + std::unique_ptr rtp_statistics_ = nullptr; + uint32_t last_send_bytes_ = 0; + uint32_t last_send_rtcp_sr_packet_ts_ = 0; + uint32_t total_rtp_packets_sent_ = 0; + uint32_t total_rtp_payload_sent_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_packet.h b/src/rtp/rtp_packet.h index ade1319..60041ff 100644 --- a/src/rtp/rtp_packet.h +++ b/src/rtp/rtp_packet.h @@ -66,7 +66,7 @@ class RtpPacket { public: - typedef enum { H264 = 96, OPUS = 97, USER_DEFINED = 127 } PAYLOAD_TYPE; + typedef enum { H264 = 96, OPUS = 97, DATA = 127 } PAYLOAD_TYPE; typedef enum { UNKNOWN = 0, NALU = 1, FU_A = 28, FU_B = 29 } NAL_UNIT_TYPE; public: diff --git a/src/rtp/rtp_statistics.cpp b/src/rtp/rtp_statistics.cpp new file mode 100644 index 0000000..85b2d0e --- /dev/null +++ b/src/rtp/rtp_statistics.cpp @@ -0,0 +1,31 @@ +#include "rtp_statistics.h" + +#include "log.h" + +RtpStatistics::RtpStatistics() {} + +RtpStatistics::~RtpStatistics() {} + +void RtpStatistics::UpdateSentBytes(uint32_t sent_bytes) { + sent_bytes_ += sent_bytes; +} + +void RtpStatistics::UpdateReceiveBytes(uint32_t received_bytes) { + received_bytes_ += received_bytes; +} + +bool RtpStatistics::Process() { + if (!sent_bytes_) { + LOG_INFO("rtp statistics: Send [{} bps]", sent_bytes_); + } + + if (!received_bytes_) { + LOG_INFO("rtp statistics: Receive [{} bps]", received_bytes_); + } + + sent_bytes_ = 0; + received_bytes_ = 0; + + std::this_thread::sleep_for(std::chrono::seconds(1)); + return true; +} \ No newline at end of file diff --git a/src/rtp/rtp_statistics.h b/src/rtp/rtp_statistics.h new file mode 100644 index 0000000..b44ffad --- /dev/null +++ b/src/rtp/rtp_statistics.h @@ -0,0 +1,23 @@ +#ifndef _RTP_STATISTICS_H_ +#define _RTP_STATISTICS_H_ + +#include "thread_base.h" + +class RtpStatistics : public ThreadBase { + public: + RtpStatistics(); + ~RtpStatistics(); + + public: + void UpdateSentBytes(uint32_t sent_bytes); + void UpdateReceiveBytes(uint32_t received_bytes); + + private: + bool Process(); + + private: + uint32_t sent_bytes_ = 0; + uint32_t received_bytes_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_transceiver.cpp b/src/rtp/rtp_transceiver.cpp new file mode 100644 index 0000000..923eac6 --- /dev/null +++ b/src/rtp/rtp_transceiver.cpp @@ -0,0 +1,5 @@ +#include "rtp_transceiver.h" + +RtpTransceiver::RtpTransceiver() {} + +RtpTransceiver::~RtpTransceiver() {} \ No newline at end of file diff --git a/src/rtp/rtp_transceiver.h b/src/rtp/rtp_transceiver.h new file mode 100644 index 0000000..ce007a9 --- /dev/null +++ b/src/rtp/rtp_transceiver.h @@ -0,0 +1,18 @@ +#ifndef _RTP_TRANSCEIVER_H_ +#define _RTP_TRANSCEIVER_H_ + +#include + +class RtpTransceiver { + public: + RtpTransceiver(); + ~RtpTransceiver(); + + public: + virtual void SetSendDataFunc( + std::function data_send_func) = 0; + + virtual void OnReceiveData(const char *data, size_t size) = 0; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_video_receive_statistics.cpp b/src/rtp/rtp_video_receive_statistics.cpp deleted file mode 100644 index 03ebb80..0000000 --- a/src/rtp/rtp_video_receive_statistics.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "rtp_video_receive_statistics.h" - -#include "log.h" - -RtpVideoReceiveStatistics::RtpVideoReceiveStatistics() {} - -RtpVideoReceiveStatistics::~RtpVideoReceiveStatistics() {} - -void RtpVideoReceiveStatistics::UpdateReceiveBytes(uint32_t received_bytes) { - received_bytes_ += received_bytes; -} - -bool RtpVideoReceiveStatistics::Process() { - LOG_INFO("rtp statistics: Receive [{} bps]", received_bytes_); - received_bytes_ = 0; - std::this_thread::sleep_for(std::chrono::seconds(1)); - return true; -} \ No newline at end of file diff --git a/src/rtp/rtp_video_receive_statistics.h b/src/rtp/rtp_video_receive_statistics.h deleted file mode 100644 index 78ce9e6..0000000 --- a/src/rtp/rtp_video_receive_statistics.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef _RTP_VIDEO_RECEIVE_STATISTICS_H_ -#define _RTP_VIDEO_RECEIVE_STATISTICS_H_ - -#include "thread_base.h" - -class RtpVideoReceiveStatistics : public ThreadBase { - public: - RtpVideoReceiveStatistics(); - ~RtpVideoReceiveStatistics(); - - public: - void UpdateReceiveBytes(uint32_t received_bytes); - - private: - bool Process(); - - private: - uint32_t received_bytes_ = 0; -}; - -#endif \ No newline at end of file diff --git a/src/rtp/rtp_video_receiver.cpp b/src/rtp/rtp_video_receiver.cpp index 3b8e9d7..36ef803 100644 --- a/src/rtp/rtp_video_receiver.cpp +++ b/src/rtp/rtp_video_receiver.cpp @@ -7,17 +7,20 @@ RtpVideoReceiver::RtpVideoReceiver() {} -RtpVideoReceiver::~RtpVideoReceiver() {} +RtpVideoReceiver::~RtpVideoReceiver() { + if (rtp_statistics_) { + rtp_statistics_->Stop(); + } +} void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { - if (!rtp_video_receive_statistics_) { - rtp_video_receive_statistics_ = - std::make_unique(); - rtp_video_receive_statistics_->Start(); + if (!rtp_statistics_) { + rtp_statistics_ = std::make_unique(); + rtp_statistics_->Start(); } - if (rtp_video_receive_statistics_) { - rtp_video_receive_statistics_->UpdateReceiveBytes(rtp_packet.Size()); + if (rtp_statistics_) { + rtp_statistics_->UpdateReceiveBytes(rtp_packet.Size()); } if (CheckIsTimeSendRR()) { @@ -147,18 +150,18 @@ bool RtpVideoReceiver::Process() { return true; } -void RtpVideoReceiver::SetUdpSender( - std::function udp_sender) { - udp_sender_ = udp_sender; +void RtpVideoReceiver::SetSendDataFunc( + std::function data_send_func) { + data_send_func_ = data_send_func; } int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) { - if (!udp_sender_) { - LOG_ERROR("udp_sender_ is nullptr"); + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); return -1; } - if (udp_sender_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) { + if (data_send_func_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) { LOG_ERROR("Send RR failed"); return -1; } diff --git a/src/rtp/rtp_video_receiver.h b/src/rtp/rtp_video_receiver.h index 142f3b6..0332dd8 100644 --- a/src/rtp/rtp_video_receiver.h +++ b/src/rtp/rtp_video_receiver.h @@ -9,7 +9,7 @@ #include "ringbuffer.h" #include "rtcp_receiver_report.h" #include "rtp_codec.h" -#include "rtp_video_receive_statistics.h" +#include "rtp_statistics.h" #include "thread_base.h" class RtpVideoReceiver : public ThreadBase { @@ -20,8 +20,7 @@ class RtpVideoReceiver : public ThreadBase { public: void InsertRtpPacket(RtpPacket& rtp_packet); - void SetUdpSender( - std::function rtp_packet_send_func); + void SetSendDataFunc(std::function data_send_func); void SetOnReceiveCompleteFrame( std::function on_receive_complete_frame) { @@ -44,10 +43,9 @@ class RtpVideoReceiver : public ThreadBase { RingBuffer compelete_video_frame_queue_; private: - std::unique_ptr rtp_video_receive_statistics_ = - nullptr; + std::unique_ptr rtp_statistics_ = nullptr; uint32_t last_send_rtcp_rr_packet_ts_ = 0; - std::function udp_sender_ = nullptr; + std::function data_send_func_ = nullptr; }; #endif diff --git a/src/rtp/rtp_video_send_statistics.cpp b/src/rtp/rtp_video_send_statistics.cpp deleted file mode 100644 index c5b4f9f..0000000 --- a/src/rtp/rtp_video_send_statistics.cpp +++ /dev/null @@ -1,18 +0,0 @@ -#include "rtp_video_send_statistics.h" - -#include "log.h" - -RtpVideoSendStatistics::RtpVideoSendStatistics() {} - -RtpVideoSendStatistics::~RtpVideoSendStatistics() {} - -void RtpVideoSendStatistics::UpdateSentBytes(uint32_t sent_bytes) { - sent_bytes_ += sent_bytes; -} - -bool RtpVideoSendStatistics::Process() { - LOG_INFO("rtp statistics: Send [{} bps]", sent_bytes_); - sent_bytes_ = 0; - std::this_thread::sleep_for(std::chrono::seconds(1)); - return true; -} \ No newline at end of file diff --git a/src/rtp/rtp_video_send_statistics.h b/src/rtp/rtp_video_send_statistics.h deleted file mode 100644 index 811764d..0000000 --- a/src/rtp/rtp_video_send_statistics.h +++ /dev/null @@ -1,21 +0,0 @@ -#ifndef _RTP_VIDEO_SEND_STATISTICS_H_ -#define _RTP_VIDEO_SEND_STATISTICS_H_ - -#include "thread_base.h" - -class RtpVideoSendStatistics : public ThreadBase { - public: - RtpVideoSendStatistics(); - ~RtpVideoSendStatistics(); - - public: - void UpdateSentBytes(uint32_t sent_bytes); - - private: - bool Process(); - - private: - uint32_t sent_bytes_ = 0; -}; - -#endif \ No newline at end of file diff --git a/src/rtp/rtp_video_sender.cpp b/src/rtp/rtp_video_sender.cpp index f7a1e5d..34fc62b 100644 --- a/src/rtp/rtp_video_sender.cpp +++ b/src/rtp/rtp_video_sender.cpp @@ -8,12 +8,16 @@ RtpVideoSender::RtpVideoSender() {} -RtpVideoSender::~RtpVideoSender() { rtp_video_send_statistics_->Stop(); } +RtpVideoSender::~RtpVideoSender() { + if (rtp_statistics_) { + rtp_statistics_->Stop(); + } +} void RtpVideoSender::Enqueue(std::vector& rtp_packets) { - if (!rtp_video_send_statistics_) { - rtp_video_send_statistics_ = std::make_unique(); - rtp_video_send_statistics_->Start(); + if (!rtp_statistics_) { + rtp_statistics_ = std::make_unique(); + rtp_statistics_->Start(); } for (auto& rtp_packet : rtp_packets) { @@ -21,20 +25,21 @@ void RtpVideoSender::Enqueue(std::vector& rtp_packets) { } } -void RtpVideoSender::SetUdpSender( - std::function udp_sender) { - udp_sender_ = udp_sender; +void RtpVideoSender::SetSendDataFunc( + std::function data_send_func) { + data_send_func_ = data_send_func; } int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) { - if (!udp_sender_) { - LOG_ERROR("udp_sender_ is nullptr"); + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); return -1; } int ret = 0; - if (0 != udp_sender_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) { + if (0 != + data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) { LOG_ERROR("Send rtp packet failed"); return -1; } @@ -87,12 +92,12 @@ int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) { } int RtpVideoSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) { - if (!udp_sender_) { - LOG_ERROR("udp_sender_ is nullptr"); + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); return -1; } - if (udp_sender_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) { + if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) { LOG_ERROR("Send SR failed"); return -1; } @@ -126,8 +131,8 @@ bool RtpVideoSender::Process() { SendRtpPacket(rtp_packet); } - if (rtp_video_send_statistics_) { - rtp_video_send_statistics_->UpdateSentBytes(last_send_bytes_); + if (rtp_statistics_) { + rtp_statistics_->UpdateSentBytes(last_send_bytes_); } std::this_thread::sleep_for(std::chrono::milliseconds(5)); diff --git a/src/rtp/rtp_video_sender.h b/src/rtp/rtp_video_sender.h index 8dbb696..ca9cdcb 100644 --- a/src/rtp/rtp_video_sender.h +++ b/src/rtp/rtp_video_sender.h @@ -6,7 +6,7 @@ #include "ringbuffer.h" #include "rtcp_sender_report.h" #include "rtp_packet.h" -#include "rtp_video_send_statistics.h" +#include "rtp_statistics.h" #include "thread_base.h" class RtpVideoSender : public ThreadBase { @@ -16,10 +16,7 @@ class RtpVideoSender : public ThreadBase { public: void Enqueue(std::vector &rtp_packets); - - public: - void SetUdpSender( - std::function rtp_packet_send_func); + void SetSendDataFunc(std::function data_send_func); private: int SendRtpPacket(RtpPacket &rtp_packet); @@ -31,11 +28,11 @@ class RtpVideoSender : public ThreadBase { bool Process() override; private: - std::function udp_sender_ = nullptr; + std::function data_send_func_ = nullptr; RingBuffer rtp_packe_queue_; private: - std::unique_ptr rtp_video_send_statistics_ = nullptr; + std::unique_ptr rtp_statistics_ = nullptr; uint32_t last_send_bytes_ = 0; uint32_t last_send_rtcp_sr_packet_ts_ = 0; uint32_t total_rtp_packets_sent_ = 0; diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index ee84a85..38c79c8 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -40,6 +40,10 @@ IceTransmission::~IceTransmission() { rtp_video_receiver_->Stop(); } + if (rtp_data_sender_) { + rtp_data_sender_->Stop(); + } + if (rtp_payload_) { delete rtp_payload_; rtp_payload_ = nullptr; @@ -47,9 +51,11 @@ IceTransmission::~IceTransmission() { } int IceTransmission::InitIceTransmission(std::string &ip, int port) { - rtp_codec_ = std::make_unique(RtpPacket::PAYLOAD_TYPE::H264); + video_rtp_codec_ = std::make_unique(RtpPacket::PAYLOAD_TYPE::H264); + data_rtp_codec_ = std::make_unique(RtpPacket::PAYLOAD_TYPE::DATA); + rtp_video_receiver_ = std::make_unique(); - rtp_video_receiver_->SetUdpSender( + rtp_video_receiver_->SetSendDataFunc( [this](const char *data, size_t size) -> int { if (!ice_agent_) { LOG_ERROR("ice_agent_ is nullptr"); @@ -69,17 +75,31 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { rtp_video_receiver_->Start(); rtp_video_sender_ = std::make_unique(); - rtp_video_sender_->SetUdpSender([this](const char *data, size_t size) -> int { - if (!ice_agent_) { - LOG_ERROR("ice_agent_ is nullptr"); - return -1; - } + rtp_video_sender_->SetSendDataFunc( + [this](const char *data, size_t size) -> int { + if (!ice_agent_) { + LOG_ERROR("ice_agent_ is nullptr"); + return -1; + } - return ice_agent_->Send(data, size); - }); + return ice_agent_->Send(data, size); + }); rtp_video_sender_->Start(); + rtp_data_sender_ = std::make_unique(); + rtp_data_sender_->SetSendDataFunc( + [this](const char *data, size_t size) -> int { + if (!ice_agent_) { + LOG_ERROR("ice_agent_ is nullptr"); + return -1; + } + + return ice_agent_->Send(data, size); + }); + + rtp_data_sender_->Start(); + ice_agent_ = std::make_unique(ip, port); ice_agent_->CreateIceAgent( @@ -220,15 +240,25 @@ int IceTransmission::SendAnswer() { return 0; } -int IceTransmission::SendData(const char *data, size_t size) { +int IceTransmission::SendData(DATA_TYPE type, const char *data, size_t size) { if (JUICE_STATE_COMPLETED == state_) { std::vector packets; - if (rtp_codec_) { - rtp_codec_->Encode((uint8_t *)data, size, packets); - } - if (rtp_video_sender_) { - rtp_video_sender_->Enqueue(packets); + if (DATA_TYPE::VIDEO == type) { + if (rtp_video_sender_) { + if (video_rtp_codec_) { + video_rtp_codec_->Encode((uint8_t *)data, size, packets); + } + rtp_video_sender_->Enqueue(packets); + } + } else if (DATA_TYPE::AUDIO == type) { + } else if (DATA_TYPE::DATA == type) { + if (rtp_data_sender_) { + if (data_rtp_codec_) { + data_rtp_codec_->Encode((uint8_t *)data, size, packets); + } + rtp_data_sender_->Enqueue(packets); + } } } return 0; diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index 8277540..2cb1b22 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -7,12 +7,17 @@ #include "ice_agent.h" #include "ringbuffer.h" #include "rtp_codec.h" +#include "rtp_data_receiver.h" +#include "rtp_data_sender.h" #include "rtp_packet.h" #include "rtp_video_receiver.h" #include "rtp_video_sender.h" #include "ws_transmission.h" class IceTransmission { + public: + typedef enum { VIDEO = 96, AUDIO = 97, DATA = 127 } DATA_TYPE; + public: IceTransmission( bool offer_peer, std::string &transmission_id, std::string &user_id, @@ -21,7 +26,6 @@ class IceTransmission { std::function on_receive_ice_msg, std::function on_ice_status_change); - ~IceTransmission(); public: @@ -33,7 +37,7 @@ class IceTransmission { int SetTransmissionId(const std::string &transmission_id); - int SendData(const char *data, size_t size); + int SendData(DATA_TYPE type, const char *data, size_t size); public: int GatherCandidates(); @@ -77,9 +81,13 @@ class IceTransmission { juice_state_t state_ = JUICE_STATE_DISCONNECTED; private: - std::unique_ptr rtp_codec_ = nullptr; + std::unique_ptr video_rtp_codec_ = nullptr; + std::unique_ptr audio_rtp_codec_ = nullptr; + std::unique_ptr data_rtp_codec_ = nullptr; std::unique_ptr rtp_video_receiver_ = nullptr; std::unique_ptr rtp_video_sender_ = nullptr; + std::unique_ptr rtp_data_receiver_ = nullptr; + std::unique_ptr rtp_data_sender_ = nullptr; uint8_t *rtp_payload_ = nullptr; RtpPacket pop_packet_; bool start_send_packet_ = false;