[feat] add rtp packet history module

This commit is contained in:
dijunkun
2025-02-14 17:30:12 +08:00
parent 7b4bba4166
commit 1ef7c536f1
27 changed files with 365 additions and 1161 deletions

View File

@@ -47,8 +47,8 @@ void AudioChannelSend::Destroy() {
int AudioChannelSend::SendAudio(char *data, size_t size) {
if (rtp_audio_sender_ && rtp_packetizer_) {
std::vector<RtpPacket> rtp_packets =
rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size);
std::vector<std::shared_ptr<RtpPacket>> rtp_packets =
rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size, true);
rtp_audio_sender_->Enqueue(rtp_packets);
}

View File

@@ -47,8 +47,8 @@ void DataChannelSend::Destroy() {
int DataChannelSend::SendData(const char *data, size_t size) {
if (rtp_data_sender_ && rtp_packetizer_) {
std::vector<RtpPacket> rtp_packets =
rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size);
std::vector<std::shared_ptr<RtpPacket>> rtp_packets =
rtp_packetizer_->Build((uint8_t *)data, (uint32_t)size, true);
rtp_data_sender_->Enqueue(rtp_packets);
}

View File

@@ -22,7 +22,8 @@ RtpAudioSender::~RtpAudioSender() {
SSRCManager::Instance().DeleteSsrc(ssrc_);
}
void RtpAudioSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
void RtpAudioSender::Enqueue(
std::vector<std::shared_ptr<RtpPacket>> rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
@@ -38,20 +39,20 @@ void RtpAudioSender::SetSendDataFunc(
data_send_func_ = data_send_func;
}
int RtpAudioSender::SendRtpPacket(RtpPacket& rtp_packet) {
int RtpAudioSender::SendRtpPacket(std::shared_ptr<RtpPacket> rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (0 != data_send_func_((const char*)rtp_packet.Buffer().data(),
rtp_packet.Size())) {
if (0 != data_send_func_((const char*)rtp_packet->Buffer().data(),
rtp_packet->Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
last_send_bytes_ += (uint32_t)rtp_packet.Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet.PayloadSize();
last_send_bytes_ += (uint32_t)rtp_packet->Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet->PayloadSize();
total_rtp_packets_sent_++;
if (io_statistics_) {
@@ -136,7 +137,7 @@ bool RtpAudioSender::Process() {
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
std::shared_ptr<RtpPacket> rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
}

View File

@@ -23,12 +23,12 @@ class RtpAudioSender : public ThreadBase {
virtual ~RtpAudioSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void Enqueue(std::vector<std::shared_ptr<RtpPacket>> rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
uint32_t GetSsrc() { return ssrc_; }
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtpPacket(std::shared_ptr<RtpPacket> rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
@@ -38,7 +38,7 @@ class RtpAudioSender : public ThreadBase {
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
RingBuffer<std::shared_ptr<RtpPacket>> rtp_packe_queue_;
private:
uint32_t ssrc_ = 0;

View File

@@ -22,7 +22,8 @@ RtpDataSender::~RtpDataSender() {
SSRCManager::Instance().DeleteSsrc(ssrc_);
}
void RtpDataSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
void RtpDataSender::Enqueue(
std::vector<std::shared_ptr<RtpPacket>> rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
@@ -38,20 +39,20 @@ void RtpDataSender::SetSendDataFunc(
data_send_func_ = data_send_func;
}
int RtpDataSender::SendRtpPacket(RtpPacket& rtp_packet) {
int RtpDataSender::SendRtpPacket(std::shared_ptr<RtpPacket> rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (0 != data_send_func_((const char*)rtp_packet.Buffer().data(),
rtp_packet.Size())) {
if (0 != data_send_func_((const char*)rtp_packet->Buffer().data(),
rtp_packet->Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
last_send_bytes_ += (uint32_t)rtp_packet.Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet.PayloadSize();
last_send_bytes_ += (uint32_t)rtp_packet->Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet->PayloadSize();
total_rtp_packets_sent_++;
if (io_statistics_) {
@@ -136,7 +137,7 @@ bool RtpDataSender::Process() {
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
std::shared_ptr<RtpPacket> rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
}

View File

@@ -23,13 +23,13 @@ class RtpDataSender : public ThreadBase {
virtual ~RtpDataSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void Enqueue(std::vector<std::shared_ptr<RtpPacket>> rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
uint32_t GetSsrc() { return ssrc_; }
private:
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtpPacket(std::shared_ptr<RtpPacket> rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
@@ -39,7 +39,7 @@ class RtpDataSender : public ThreadBase {
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
RingBuffer<std::shared_ptr<RtpPacket>> rtp_packe_queue_;
private:
uint32_t ssrc_ = 0;

View File

@@ -38,14 +38,15 @@ RtpVideoSender::~RtpVideoSender() {
#endif
}
void RtpVideoSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
void RtpVideoSender::Enqueue(
std::vector<std::shared_ptr<RtpPacket>>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packe_queue_.push(rtp_packet);
rtp_packe_queue_.push(std::move(rtp_packet));
}
}
@@ -59,35 +60,33 @@ void RtpVideoSender::SetOnSentPacketFunc(
on_sent_packet_func_ = on_sent_packet_func;
}
int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) {
int RtpVideoSender::SendRtpPacket(std::shared_ptr<RtpPacket> rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (on_sent_packet_func_) {
webrtc::RtpPacketToSend rtp_packet_to_send;
rtp_packet_to_send.SetSequenceNumber(rtp_packet.SequenceNumber());
rtp_packet_to_send.SetSsrc(rtp_packet.Ssrc());
rtp_packet_to_send.SetSize(rtp_packet.Size());
rtp_packet_to_send.set_transport_sequence_number(transport_seq_++);
rtp_packet_to_send.set_packet_type(webrtc::RtpPacketMediaType::kVideo);
on_sent_packet_func_(rtp_packet_to_send);
webrtc::RtpPacketToSend* rtp_packet_to_send =
dynamic_cast<webrtc::RtpPacketToSend*>(rtp_packet.get());
rtp_packet_to_send->set_transport_sequence_number(transport_seq_++);
rtp_packet_to_send->set_packet_type(webrtc::RtpPacketMediaType::kVideo);
on_sent_packet_func_(*rtp_packet_to_send);
}
if (0 != data_send_func_((const char*)rtp_packet.Buffer().data(),
rtp_packet.Size())) {
if (0 != data_send_func_((const char*)rtp_packet->Buffer().data(),
rtp_packet->Size())) {
// LOG_ERROR("Send rtp packet failed");
return -1;
}
#ifdef SAVE_RTP_SENT_STREAM
fwrite((unsigned char*)rtp_packet.Payload(), 1, rtp_packet.PayloadSize(),
fwrite((unsigned char*)rtp_packet->Payload(), 1, rtp_packet->PayloadSize(),
file_rtp_sent_);
#endif
last_send_bytes_ += (uint32_t)rtp_packet.Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet.PayloadSize();
last_send_bytes_ += (uint32_t)rtp_packet->Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet->PayloadSize();
total_rtp_packets_sent_++;
if (io_statistics_) {
@@ -131,7 +130,7 @@ int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) {
rtcp_sr.Encode();
// SendRtcpSR(rtcp_sr);
SendRtcpSR(rtcp_sr);
}
return 0;
@@ -168,13 +167,16 @@ bool RtpVideoSender::CheckIsTimeSendSR() {
}
bool RtpVideoSender::Process() {
bool pop_success = false;
last_send_bytes_ = 0;
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
std::shared_ptr<RtpPacket> rtp_packet;
pop_success = rtp_packe_queue_.pop(rtp_packet);
if (pop_success) {
SendRtpPacket(rtp_packet);
}
}
if (rtp_statistics_) {

View File

@@ -18,14 +18,14 @@ class RtpVideoSender : public ThreadBase {
virtual ~RtpVideoSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void Enqueue(std::vector<std::shared_ptr<RtpPacket>> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
void SetOnSentPacketFunc(
std::function<void(const webrtc::RtpPacketToSend &)> on_sent_packet_func);
uint32_t GetSsrc() { return ssrc_; }
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtpPacket(std::shared_ptr<RtpPacket> rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
@@ -37,7 +37,7 @@ class RtpVideoSender : public ThreadBase {
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
std::function<void(const webrtc::RtpPacketToSend &)> on_sent_packet_func_ =
nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
RingBuffer<std::shared_ptr<RtpPacket>> rtp_packe_queue_;
private:
uint32_t ssrc_ = 0;

View File

@@ -58,8 +58,8 @@ void VideoChannelSend::Destroy() {
int VideoChannelSend::SendVideo(char* data, size_t size) {
if (rtp_video_sender_ && rtp_packetizer_) {
std::vector<RtpPacket> rtp_packets =
rtp_packetizer_->Build((uint8_t*)data, (uint32_t)size);
std::vector<std::shared_ptr<RtpPacket>> rtp_packets =
rtp_packetizer_->Build((uint8_t*)data, (uint32_t)size, true);
rtp_video_sender_->Enqueue(rtp_packets);
}

View File

@@ -15,7 +15,6 @@
#include "ice_agent.h"
#include "rtp_packetizer.h"
#include "rtp_video_sender.h"
#include "transport_feedback.h"
#include "transport_feedback_adapter.h"
class VideoChannelSend {
@@ -38,9 +37,6 @@ class VideoChannelSend {
Timestamp recv_ts,
const webrtc::rtcp::CongestionControlFeedback& feedback);
void HandleTransportPacketsFeedback(
const webrtc::TransportPacketsFeedback& feedback);
private:
void PostUpdates(webrtc::NetworkControlUpdate update);
void UpdateControlState();