[feat] implementation for qos module

This commit is contained in:
dijunkun
2025-01-08 17:30:13 +08:00
parent 7a84b25b5c
commit de212a8e75
32 changed files with 482 additions and 249 deletions

View File

@@ -1,105 +0,0 @@
#include "rtp_audio_receiver.h"
#define RTCP_RR_INTERVAL 1000
RtpAudioReceiver::RtpAudioReceiver() {}
RtpAudioReceiver::RtpAudioReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpAudioReceiver::~RtpAudioReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpAudioReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
last_recv_bytes_ = (uint32_t)rtp_packet.Size();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(last_recv_bytes_);
}
if (io_statistics_) {
io_statistics_->UpdateAudioInboundBytes(last_recv_bytes_);
io_statistics_->IncrementAudioInboundRtpPacketCount();
io_statistics_->UpdateAudioPacketLossCount(rtp_packet.SequenceNumber());
}
if (CheckIsTimeSendRR()) {
RtcpReceiverReport rtcp_rr;
RtcpReportBlock report;
// auto duration = std::chrono::system_clock::now().time_since_epoch();
// auto seconds =
// std::chrono::duration_cast<std::chrono::seconds>(duration); uint32_t
// seconds_u32 = static_cast<uint32_t>(
// std::chrono::duration_cast<std::chrono::seconds>(duration).count());
// uint32_t fraction_u32 = static_cast<uint32_t>(
// std::chrono::duration_cast<std::chrono::nanoseconds>(duration -
// seconds)
// .count());
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_rr.SetReportBlock(report);
rtcp_rr.Encode();
// SendRtcpRR(rtcp_rr);
}
if (on_receive_data_) {
on_receive_data_((const char*)rtp_packet.Payload(),
rtp_packet.PayloadSize());
}
}
void RtpAudioReceiver::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpAudioReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) {
LOG_ERROR("Send RR failed");
return -1;
}
// LOG_ERROR("Send RR");
return 0;
}
bool RtpAudioReceiver::CheckIsTimeSendRR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) {
last_send_rtcp_rr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}

View File

@@ -1,51 +0,0 @@
/*
* @Author: DI JUNKUN
* @Date: 2023-11-24
* Copyright (c) 2023 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _RTP_AUDIO_RECEIVER_H_
#define _RTP_AUDIO_RECEIVER_H_
#include <functional>
#include "io_statistics.h"
#include "rtcp_receiver_report.h"
#include "rtp_codec.h"
#include "rtp_statistics.h"
class RtpAudioReceiver {
public:
RtpAudioReceiver();
RtpAudioReceiver(std::shared_ptr<IOStatistics> io_statistics);
~RtpAudioReceiver();
public:
void InsertRtpPacket(RtpPacket& rtp_packet);
void SetSendDataFunc(std::function<int(const char*, size_t)> data_send_func);
void SetOnReceiveData(
std::function<void(const char*, size_t)> on_receive_data) {
on_receive_data_ = on_receive_data;
}
private:
bool CheckIsTimeSendRR();
int SendRtcpRR(RtcpReceiverReport& rtcp_rr);
private:
std::function<void(const char*, size_t)> on_receive_data_ = nullptr;
uint32_t last_complete_frame_ts_ = 0;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_recv_bytes_ = 0;
uint32_t total_rtp_payload_recv_ = 0;
uint32_t total_rtp_packets_recv_ = 0;
uint32_t last_send_rtcp_rr_packet_ts_ = 0;
std::function<int(const char*, size_t)> data_send_func_ = nullptr;
};
#endif

View File

@@ -1,145 +0,0 @@
#include "rtp_audio_sender.h"
#include <chrono>
#include "log.h"
#define RTCP_SR_INTERVAL 1000
RtpAudioSender::RtpAudioSender() {}
RtpAudioSender::RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpAudioSender::~RtpAudioSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpAudioSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packe_queue_.push(rtp_packet);
}
}
void RtpAudioSender::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpAudioSender::SendRtpPacket(RtpPacket& rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (0 !=
data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
last_send_bytes_ += (uint32_t)rtp_packet.Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_sent_++;
if (io_statistics_) {
io_statistics_->UpdateAudioOutboundBytes(last_send_bytes_);
io_statistics_->IncrementAudioOutboundRtpPacketCount();
}
if (CheckIsTimeSendSR()) {
RtcpSenderReport rtcp_sr;
SenderInfo sender_info;
RtcpReportBlock report;
auto duration = std::chrono::system_clock::now().time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
uint32_t seconds_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(duration).count());
uint32_t fraction_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(duration - seconds)
.count());
sender_info.sender_ssrc = 0x00;
sender_info.ntp_ts_msw = (uint32_t)seconds_u32;
sender_info.ntp_ts_lsw = (uint32_t)fraction_u32;
sender_info.rtp_ts =
std::chrono::system_clock::now().time_since_epoch().count() * 1000000;
sender_info.sender_packet_count = total_rtp_packets_sent_;
sender_info.sender_octet_count = total_rtp_payload_sent_;
rtcp_sr.SetSenderInfo(sender_info);
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_sr.SetReportBlock(report);
rtcp_sr.Encode();
SendRtcpSR(rtcp_sr);
}
return 0;
}
int RtpAudioSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) {
LOG_ERROR("Send SR failed");
return -1;
}
// LOG_ERROR("Send SR");
return 0;
}
bool RtpAudioSender::CheckIsTimeSendSR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_sr_packet_ts_ >= RTCP_SR_INTERVAL) {
last_send_rtcp_sr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}
bool RtpAudioSender::Process() {
last_send_bytes_ = 0;
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

View File

@@ -1,50 +0,0 @@
/*
* @Author: DI JUNKUN
* @Date: 2023-11-24
* Copyright (c) 2023 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _RTP_AUDIO_SENDER_H_
#define _RTP_AUDIO_SENDER_H_
#include <functional>
#include "io_statistics.h"
#include "ringbuffer.h"
#include "rtcp_sender_report.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "thread_base.h"
class RtpAudioSender : public ThreadBase {
public:
RtpAudioSender();
RtpAudioSender(std::shared_ptr<IOStatistics> io_statistics);
virtual ~RtpAudioSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
private:
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
private:
bool Process() override;
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t total_rtp_payload_sent_ = 0;
uint32_t total_rtp_packets_sent_ = 0;
uint32_t last_send_rtcp_sr_packet_ts_ = 0;
};
#endif

View File

@@ -1,105 +0,0 @@
#include "rtp_data_receiver.h"
#define RTCP_RR_INTERVAL 1000
RtpDataReceiver::RtpDataReceiver() {}
RtpDataReceiver::RtpDataReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpDataReceiver::~RtpDataReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpDataReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
last_recv_bytes_ = (uint32_t)rtp_packet.Size();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(last_recv_bytes_);
}
if (io_statistics_) {
io_statistics_->UpdateDataInboundBytes(last_recv_bytes_);
io_statistics_->IncrementDataInboundRtpPacketCount();
io_statistics_->UpdateDataPacketLossCount(rtp_packet.SequenceNumber());
}
if (CheckIsTimeSendRR()) {
RtcpReceiverReport rtcp_rr;
RtcpReportBlock report;
// auto duration = std::chrono::system_clock::now().time_since_epoch();
// auto seconds =
// std::chrono::duration_cast<std::chrono::seconds>(duration); uint32_t
// seconds_u32 = static_cast<uint32_t>(
// std::chrono::duration_cast<std::chrono::seconds>(duration).count());
// uint32_t fraction_u32 = static_cast<uint32_t>(
// std::chrono::duration_cast<std::chrono::nanoseconds>(duration -
// seconds)
// .count());
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_rr.SetReportBlock(report);
rtcp_rr.Encode();
SendRtcpRR(rtcp_rr);
}
if (on_receive_data_) {
on_receive_data_((const char*)rtp_packet.Payload(),
rtp_packet.PayloadSize());
}
}
void RtpDataReceiver::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpDataReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) {
LOG_ERROR("Send RR failed");
return -1;
}
// LOG_ERROR("Send RR");
return 0;
}
bool RtpDataReceiver::CheckIsTimeSendRR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) {
last_send_rtcp_rr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}

View File

@@ -1,46 +0,0 @@
#ifndef _RTP_DATA_RECEIVER_H_
#define _RTP_DATA_RECEIVER_H_
#include <functional>
#include "io_statistics.h"
#include "rtcp_receiver_report.h"
#include "rtp_codec.h"
#include "rtp_statistics.h"
class RtpDataReceiver {
public:
RtpDataReceiver();
RtpDataReceiver(std::shared_ptr<IOStatistics> io_statistics);
~RtpDataReceiver();
public:
void InsertRtpPacket(RtpPacket& rtp_packet);
void SetSendDataFunc(std::function<int(const char*, size_t)> data_send_func);
void SetOnReceiveData(
std::function<void(const char*, size_t)> on_receive_data) {
on_receive_data_ = on_receive_data;
}
private:
bool CheckIsTimeSendRR();
int SendRtcpRR(RtcpReceiverReport& rtcp_rr);
private:
std::function<void(const char*, size_t)> on_receive_data_ = nullptr;
uint32_t last_complete_frame_ts_ = 0;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_recv_bytes_ = 0;
uint32_t total_rtp_payload_recv_ = 0;
uint32_t total_rtp_packets_recv_ = 0;
uint32_t last_send_rtcp_rr_packet_ts_ = 0;
std::function<int(const char*, size_t)> data_send_func_ = nullptr;
};
#endif

View File

@@ -1,145 +0,0 @@
#include "rtp_data_sender.h"
#include <chrono>
#include "log.h"
#define RTCP_SR_INTERVAL 1000
RtpDataSender::RtpDataSender() {}
RtpDataSender::RtpDataSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpDataSender::~RtpDataSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpDataSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packe_queue_.push(rtp_packet);
}
}
void RtpDataSender::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpDataSender::SendRtpPacket(RtpPacket& rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (0 !=
data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
last_send_bytes_ += (uint32_t)rtp_packet.Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_sent_++;
if (io_statistics_) {
io_statistics_->UpdateDataOutboundBytes(last_send_bytes_);
io_statistics_->IncrementDataOutboundRtpPacketCount();
}
if (CheckIsTimeSendSR()) {
RtcpSenderReport rtcp_sr;
SenderInfo sender_info;
RtcpReportBlock report;
auto duration = std::chrono::system_clock::now().time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
uint32_t seconds_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(duration).count());
uint32_t fraction_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(duration - seconds)
.count());
sender_info.sender_ssrc = 0x00;
sender_info.ntp_ts_msw = (uint32_t)seconds_u32;
sender_info.ntp_ts_lsw = (uint32_t)fraction_u32;
sender_info.rtp_ts =
std::chrono::system_clock::now().time_since_epoch().count() * 1000000;
sender_info.sender_packet_count = total_rtp_packets_sent_;
sender_info.sender_octet_count = total_rtp_payload_sent_;
rtcp_sr.SetSenderInfo(sender_info);
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_sr.SetReportBlock(report);
rtcp_sr.Encode();
SendRtcpSR(rtcp_sr);
}
return 0;
}
int RtpDataSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) {
LOG_ERROR("Send SR failed");
return -1;
}
// LOG_ERROR("Send SR");
return 0;
}
bool RtpDataSender::CheckIsTimeSendSR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_sr_packet_ts_ >= RTCP_SR_INTERVAL) {
last_send_rtcp_sr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}
bool RtpDataSender::Process() {
last_send_bytes_ = 0;
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

View File

@@ -1,52 +0,0 @@
/*
* @Author: DI JUNKUN
* @Date: 2023-11-24
* Copyright (c) 2023 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _RTP_DATA_SENDER_H_
#define _RTP_DATA_SENDER_H_
#include <functional>
#include "io_statistics.h"
#include "ringbuffer.h"
#include "rtcp_sender_report.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "thread_base.h"
class RtpDataSender : public ThreadBase {
public:
RtpDataSender();
RtpDataSender(std::shared_ptr<IOStatistics> io_statistics);
virtual ~RtpDataSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
private:
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
private:
bool Process() override;
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t total_rtp_payload_sent_ = 0;
uint32_t total_rtp_packets_sent_ = 0;
uint32_t last_send_rtcp_sr_packet_ts_ = 0;
};
#endif

View File

@@ -1,385 +0,0 @@
#include "rtp_video_receiver.h"
#include "log.h"
#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2)
#define RTCP_RR_INTERVAL 1000
RtpVideoReceiver::RtpVideoReceiver() {}
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {
rtcp_thread_ = std::thread(&RtpVideoReceiver::RtcpThread, this);
}
RtpVideoReceiver::~RtpVideoReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
rtcp_stop_.store(true);
rtcp_cv_.notify_one();
if (rtcp_thread_.joinable()) {
rtcp_thread_.join();
}
}
void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
last_recv_bytes_ = (uint32_t)rtp_packet.PayloadSize();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_recv_++;
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(last_recv_bytes_);
}
if (io_statistics_) {
io_statistics_->UpdateVideoInboundBytes(last_recv_bytes_);
io_statistics_->IncrementVideoInboundRtpPacketCount();
io_statistics_->UpdateVideoPacketLossCount(rtp_packet.SequenceNumber());
}
if (CheckIsTimeSendRR()) {
RtcpReceiverReport rtcp_rr;
RtcpReportBlock report;
// auto duration = std::chrono::system_clock::now().time_since_epoch();
// auto seconds =
// std::chrono::duration_cast<std::chrono::seconds>(duration); uint32_t
// seconds_u32 = static_cast<uint32_t>(
// std::chrono::duration_cast<std::chrono::seconds>(duration).count());
// uint32_t fraction_u32 = static_cast<uint32_t>(
// std::chrono::duration_cast<std::chrono::nanoseconds>(duration -
// seconds)
// .count());
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_rr.SetReportBlock(report);
rtcp_rr.Encode();
SendRtcpRR(rtcp_rr);
}
if (rtp_packet.PayloadType() == RtpPacket::PAYLOAD_TYPE::AV1) {
ProcessAv1RtpPacket(rtp_packet);
} else {
ProcessH264RtpPacket(rtp_packet);
}
}
void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacket& rtp_packet) {
if (!fec_enable_) {
if (RtpPacket::PAYLOAD_TYPE::H264 == rtp_packet.PayloadType()) {
if (RtpPacket::NAL_UNIT_TYPE::NALU == rtp_packet.NalUnitType()) {
compelete_video_frame_queue_.push(
VideoFrame(rtp_packet.Payload(), rtp_packet.PayloadSize()));
} else if (RtpPacket::NAL_UNIT_TYPE::FU_A == rtp_packet.NalUnitType()) {
incomplete_frame_list_[rtp_packet.SequenceNumber()] = rtp_packet;
bool complete = CheckIsH264FrameCompleted(rtp_packet);
if (!complete) {
}
}
}
} else {
if (RtpPacket::PAYLOAD_TYPE::H264 == rtp_packet.PayloadType()) {
if (RtpPacket::NAL_UNIT_TYPE::NALU == rtp_packet.NalUnitType()) {
compelete_video_frame_queue_.push(
VideoFrame(rtp_packet.Payload(), rtp_packet.PayloadSize()));
} else if (RtpPacket::NAL_UNIT_TYPE::FU_A == rtp_packet.NalUnitType()) {
incomplete_frame_list_[rtp_packet.SequenceNumber()] = rtp_packet;
bool complete = CheckIsH264FrameCompleted(rtp_packet);
if (!complete) {
}
}
} else if (RtpPacket::PAYLOAD_TYPE::H264_FEC_SOURCE ==
rtp_packet.PayloadType()) {
if (last_packet_ts_ != rtp_packet.Timestamp()) {
fec_decoder_.Init();
fec_decoder_.ResetParams(rtp_packet.FecSourceSymbolNum());
last_packet_ts_ = rtp_packet.Timestamp();
}
incomplete_fec_packet_list_[rtp_packet.Timestamp()]
[rtp_packet.SequenceNumber()] = rtp_packet;
uint8_t** complete_frame = fec_decoder_.DecodeWithNewSymbol(
(const char*)incomplete_fec_packet_list_[rtp_packet.Timestamp()]
[rtp_packet.SequenceNumber()]
.Payload(),
rtp_packet.FecSymbolId());
if (nullptr != complete_frame) {
if (!nv12_data_) {
nv12_data_ = new uint8_t[NV12_BUFFER_SIZE];
}
size_t complete_frame_size = 0;
for (int index = 0; index < rtp_packet.FecSourceSymbolNum(); index++) {
if (nullptr == complete_frame[index]) {
LOG_ERROR("Invalid complete_frame[{}]", index);
}
memcpy(nv12_data_ + complete_frame_size, complete_frame[index], 1400);
complete_frame_size += 1400;
}
fec_decoder_.ReleaseSourcePackets(complete_frame);
fec_decoder_.Release();
LOG_ERROR("Release incomplete_fec_packet_list_");
incomplete_fec_packet_list_.erase(rtp_packet.Timestamp());
if (incomplete_fec_frame_list_.end() !=
incomplete_fec_frame_list_.find(rtp_packet.Timestamp())) {
incomplete_fec_frame_list_.erase(rtp_packet.Timestamp());
}
compelete_video_frame_queue_.push(
VideoFrame(nv12_data_, complete_frame_size));
} else {
incomplete_fec_frame_list_.insert(rtp_packet.Timestamp());
}
} else if (RtpPacket::PAYLOAD_TYPE::H264_FEC_REPAIR ==
rtp_packet.PayloadType()) {
if (incomplete_fec_frame_list_.end() ==
incomplete_fec_frame_list_.find(rtp_packet.Timestamp())) {
return;
}
if (last_packet_ts_ != rtp_packet.Timestamp()) {
fec_decoder_.Init();
fec_decoder_.ResetParams(rtp_packet.FecSourceSymbolNum());
last_packet_ts_ = rtp_packet.Timestamp();
}
incomplete_fec_packet_list_[rtp_packet.Timestamp()]
[rtp_packet.SequenceNumber()] = rtp_packet;
uint8_t** complete_frame = fec_decoder_.DecodeWithNewSymbol(
(const char*)incomplete_fec_packet_list_[rtp_packet.Timestamp()]
[rtp_packet.SequenceNumber()]
.Payload(),
rtp_packet.FecSymbolId());
if (nullptr != complete_frame) {
if (!nv12_data_) {
nv12_data_ = new uint8_t[NV12_BUFFER_SIZE];
}
size_t complete_frame_size = 0;
for (int index = 0; index < rtp_packet.FecSourceSymbolNum(); index++) {
if (nullptr == complete_frame[index]) {
LOG_ERROR("Invalid complete_frame[{}]", index);
}
memcpy(nv12_data_ + complete_frame_size, complete_frame[index], 1400);
complete_frame_size += 1400;
}
fec_decoder_.ReleaseSourcePackets(complete_frame);
fec_decoder_.Release();
incomplete_fec_packet_list_.erase(rtp_packet.Timestamp());
compelete_video_frame_queue_.push(
VideoFrame(nv12_data_, complete_frame_size));
}
}
}
}
void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacket& rtp_packet) {
// LOG_ERROR("recv payload size = {}, sequence_number_ = {}",
// rtp_packet.PayloadSize(), rtp_packet.SequenceNumber());
if (RtpPacket::PAYLOAD_TYPE::AV1 == rtp_packet.PayloadType()) {
incomplete_frame_list_[rtp_packet.SequenceNumber()] = rtp_packet;
bool complete = CheckIsAv1FrameCompleted(rtp_packet);
if (!complete) {
}
}
// std::vector<Obu> obus =
// ParseObus((uint8_t*)rtp_packet.Payload(), rtp_packet.PayloadSize());
// for (int i = 0; i < obus.size(); i++) {
// LOG_ERROR("2 [{}|{}] Obu size = [{}], Obu type [{}]", i, obus.size(),
// obus[i].size_,
// ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_)));
// }
}
bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacket& rtp_packet) {
if (rtp_packet.FuAEnd()) {
uint16_t end_seq = rtp_packet.SequenceNumber();
while (end_seq--) {
auto it = incomplete_frame_list_.find(end_seq);
if (it == incomplete_frame_list_.end()) {
// The last fragment has already received. If all fragments are in
// order, then some fragments lost in tranmission and need to be
// repaired using FEC
return false;
} else if (!it->second.FuAStart()) {
continue;
} else if (it->second.FuAStart()) {
if (!nv12_data_) {
nv12_data_ = new uint8_t[NV12_BUFFER_SIZE];
}
size_t complete_frame_size = 0;
for (uint16_t start = it->first; start <= rtp_packet.SequenceNumber();
start++) {
memcpy(nv12_data_ + complete_frame_size,
incomplete_frame_list_[start].Payload(),
incomplete_frame_list_[start].PayloadSize());
complete_frame_size += incomplete_frame_list_[start].PayloadSize();
incomplete_frame_list_.erase(start);
}
compelete_video_frame_queue_.push(
VideoFrame(nv12_data_, complete_frame_size));
return true;
} else {
LOG_WARN("What happened?")
return false;
}
}
return true;
}
return false;
}
bool RtpVideoReceiver::CheckIsAv1FrameCompleted(RtpPacket& rtp_packet) {
if (rtp_packet.Av1FrameEnd()) {
uint16_t end_seq = rtp_packet.SequenceNumber();
uint16_t start = end_seq;
while (end_seq--) {
auto it = incomplete_frame_list_.find(end_seq);
if (it == incomplete_frame_list_.end()) {
// The last fragment has already received. If all fragments are in
// order, then some fragments lost in tranmission and need to be
// repaired using FEC
// return false;
} else if (!it->second.Av1FrameStart()) {
continue;
} else if (it->second.Av1FrameStart()) {
start = it->second.SequenceNumber();
break;
} else {
LOG_WARN("What happened?")
return false;
}
}
if (start <= rtp_packet.SequenceNumber()) {
if (!nv12_data_) {
nv12_data_ = new uint8_t[NV12_BUFFER_SIZE];
}
size_t complete_frame_size = 0;
for (; start <= rtp_packet.SequenceNumber(); start++) {
const uint8_t* obu_frame = incomplete_frame_list_[start].Payload();
size_t obu_frame_size = incomplete_frame_list_[start].PayloadSize();
memcpy(nv12_data_ + complete_frame_size, obu_frame, obu_frame_size);
complete_frame_size += obu_frame_size;
incomplete_frame_list_.erase(start);
}
compelete_video_frame_queue_.push(
VideoFrame(nv12_data_, complete_frame_size));
return true;
}
}
return false;
}
void RtpVideoReceiver::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) {
LOG_ERROR("Send RR failed");
return -1;
}
return 0;
}
bool RtpVideoReceiver::CheckIsTimeSendRR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) {
last_send_rtcp_rr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}
bool RtpVideoReceiver::Process() {
if (!compelete_video_frame_queue_.isEmpty()) {
VideoFrame video_frame;
compelete_video_frame_queue_.pop(video_frame);
if (on_receive_complete_frame_) {
// auto now_complete_frame_ts =
// std::chrono::duration_cast<std::chrono::milliseconds>(
// std::chrono::system_clock::now().time_since_epoch())
// .count();
// uint32_t duration = now_complete_frame_ts - last_complete_frame_ts_;
// LOG_ERROR("Duration {}", duration);
// last_complete_frame_ts_ = now_complete_frame_ts;
on_receive_complete_frame_(video_frame);
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}
void RtpVideoReceiver::RtcpThread() {
while (!rtcp_stop_) {
std::unique_lock<std::mutex> lock(rtcp_mtx_);
if (rtcp_cv_.wait_for(
lock, std::chrono::milliseconds(rtcp_tcc_interval_ms_),
[&]() { return send_rtcp_rr_triggered_ || rtcp_stop_; })) {
if (rtcp_stop_) break;
send_rtcp_rr_triggered_ = false;
} else {
// LOG_ERROR("Send video tcc");
auto now = std::chrono::steady_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_send_rtcp_rr_ts_)
.count();
if (elapsed >= rtcp_rr_interval_ms_) {
LOG_ERROR("Send video rr [{}]", (void*)this);
last_send_rtcp_rr_ts_ = now;
}
}
}
}

View File

@@ -1,92 +0,0 @@
#ifndef _RTP_VIDEO_RECEIVER_H_
#define _RTP_VIDEO_RECEIVER_H_
#include <functional>
#include <map>
#include <queue>
#include <set>
#include "fec_decoder.h"
#include "io_statistics.h"
#include "receive_side_congestion_controller.h"
#include "ringbuffer.h"
#include "rtcp_receiver_report.h"
#include "rtp_codec.h"
#include "rtp_statistics.h"
#include "thread_base.h"
#include "video_frame.h"
class RtpVideoReceiver : public ThreadBase {
public:
RtpVideoReceiver();
RtpVideoReceiver(std::shared_ptr<IOStatistics> io_statistics);
virtual ~RtpVideoReceiver();
public:
void InsertRtpPacket(RtpPacket& rtp_packet);
void SetSendDataFunc(std::function<int(const char*, size_t)> data_send_func);
void SetOnReceiveCompleteFrame(
std::function<void(VideoFrame&)> on_receive_complete_frame) {
on_receive_complete_frame_ = on_receive_complete_frame;
}
private:
void ProcessAv1RtpPacket(RtpPacket& rtp_packet);
bool CheckIsAv1FrameCompleted(RtpPacket& rtp_packet);
private:
void ProcessH264RtpPacket(RtpPacket& rtp_packet);
bool CheckIsH264FrameCompleted(RtpPacket& rtp_packet);
private:
bool CheckIsTimeSendRR();
int SendRtcpRR(RtcpReceiverReport& rtcp_rr);
private:
bool Process() override;
void RtcpThread();
private:
std::map<uint16_t, RtpPacket> incomplete_frame_list_;
uint8_t* nv12_data_ = nullptr;
std::function<void(VideoFrame&)> on_receive_complete_frame_ = nullptr;
uint32_t last_complete_frame_ts_ = 0;
RingBuffer<VideoFrame> compelete_video_frame_queue_;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_recv_bytes_ = 0;
uint32_t total_rtp_packets_recv_ = 0;
uint32_t total_rtp_payload_recv_ = 0;
uint32_t last_send_rtcp_rr_packet_ts_ = 0;
std::function<int(const char*, size_t)> data_send_func_ = nullptr;
private:
bool fec_enable_ = false;
FecDecoder fec_decoder_;
uint64_t last_packet_ts_ = 0;
// std::map<uint16_t, RtpPacket> incomplete_fec_frame_list_;
// std::map<uint32_t, std::map<uint16_t, RtpPacket>> fec_source_symbol_list_;
// std::map<uint32_t, std::map<uint16_t, RtpPacket>> fec_repair_symbol_list_;
std::set<uint64_t> incomplete_fec_frame_list_;
std::map<uint64_t, std::map<uint16_t, RtpPacket>> incomplete_fec_packet_list_;
private:
std::thread rtcp_thread_;
std::mutex rtcp_mtx_;
std::condition_variable rtcp_cv_;
std::chrono::steady_clock::time_point last_send_rtcp_rr_ts_;
std::atomic<bool> send_rtcp_rr_triggered_ = false;
std::atomic<bool> rtcp_stop_ = false;
int rtcp_rr_interval_ms_ = 5000;
int rtcp_tcc_interval_ms_ = 200;
private:
ReceiveSideCongestionController congestion_controller_;
};
#endif

View File

@@ -1,145 +0,0 @@
#include "rtp_video_sender.h"
#include <chrono>
#include "log.h"
#define RTCP_SR_INTERVAL 1000
RtpVideoSender::RtpVideoSender() {}
RtpVideoSender::RtpVideoSender(std::shared_ptr<IOStatistics> io_statistics)
: io_statistics_(io_statistics) {}
RtpVideoSender::~RtpVideoSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpVideoSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packe_queue_.push(rtp_packet);
}
}
void RtpVideoSender::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (0 !=
data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
last_send_bytes_ += (uint32_t)rtp_packet.Size();
total_rtp_payload_sent_ += (uint32_t)rtp_packet.PayloadSize();
total_rtp_packets_sent_++;
if (io_statistics_) {
io_statistics_->UpdateVideoOutboundBytes(last_send_bytes_);
io_statistics_->IncrementVideoOutboundRtpPacketCount();
}
if (CheckIsTimeSendSR()) {
RtcpSenderReport rtcp_sr;
SenderInfo sender_info;
RtcpReportBlock report;
auto duration = std::chrono::system_clock::now().time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
uint32_t seconds_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(duration).count());
uint32_t fraction_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(duration - seconds)
.count());
sender_info.sender_ssrc = 0x00;
sender_info.ntp_ts_msw = (uint32_t)seconds_u32;
sender_info.ntp_ts_lsw = (uint32_t)fraction_u32;
sender_info.rtp_ts =
std::chrono::system_clock::now().time_since_epoch().count() * 1000000;
sender_info.sender_packet_count = total_rtp_packets_sent_;
sender_info.sender_octet_count = total_rtp_payload_sent_;
rtcp_sr.SetSenderInfo(sender_info);
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_sr.SetReportBlock(report);
rtcp_sr.Encode();
// SendRtcpSR(rtcp_sr);
}
return 0;
}
int RtpVideoSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) {
LOG_ERROR("Send SR failed");
return -1;
}
// LOG_ERROR("Send SR");
return 0;
}
bool RtpVideoSender::CheckIsTimeSendSR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_sr_packet_ts_ >= RTCP_SR_INTERVAL) {
last_send_rtcp_sr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}
bool RtpVideoSender::Process() {
last_send_bytes_ = 0;
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

View File

@@ -1,45 +0,0 @@
#ifndef _RTP_VIDEO_SENDER_H_
#define _RTP_VIDEO_SENDER_H_
#include <functional>
#include "io_statistics.h"
#include "ringbuffer.h"
#include "rtcp_sender_report.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "thread_base.h"
class RtpVideoSender : public ThreadBase {
public:
RtpVideoSender();
RtpVideoSender(std::shared_ptr<IOStatistics> io_statistics);
virtual ~RtpVideoSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
private:
bool Process() override;
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
private:
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t last_send_rtcp_sr_packet_ts_ = 0;
uint32_t total_rtp_payload_sent_ = 0;
uint32_t total_rtp_packets_sent_ = 0;
};
#endif

View File

@@ -190,7 +190,7 @@ class RtpPacket {
RtpPacket &operator=(const RtpPacket &rtp_packet);
RtpPacket &operator=(RtpPacket &&rtp_packet);
~RtpPacket();
virtual ~RtpPacket();
public:
// Set Header

View File

@@ -1,8 +1,6 @@
#include "rtp_packet_received.h"
RtpPacketReceived::RtpPacketReceived() = default;
RtpPacketReceived::RtpPacketReceived(int64_t arrival_time)
: arrival_time_(arrival_time) {}
RtpPacketReceived::RtpPacketReceived(const RtpPacketReceived& packet) = default;
RtpPacketReceived::RtpPacketReceived(RtpPacketReceived&& packet) = default;

View File

@@ -9,13 +9,12 @@
#include <limits>
#include "enc_mark.h"
#include "rtp_packet.h"
class RtpPacketReceived : public RtpPacket {
public:
RtpPacketReceived();
explicit RtpPacketReceived(
int64_t arrival_time = (std::numeric_limits<int64_t>::min)());
RtpPacketReceived(const RtpPacketReceived& packet);
RtpPacketReceived(RtpPacketReceived&& packet);
@@ -24,8 +23,29 @@ class RtpPacketReceived : public RtpPacket {
~RtpPacketReceived();
public:
int64_t arrival_time() const { return arrival_time_; }
void set_arrival_time(int64_t time) { arrival_time_ = time; }
// Explicit Congestion Notification (ECN), RFC-3168, Section 5.
// Used by L4S: https://www.rfc-editor.org/rfc/rfc9331.html
EcnMarking ecn() const { return ecn_; }
void set_ecn(EcnMarking ecn) { ecn_ = ecn; }
// Flag if packet was recovered via RTX or FEC.
bool recovered() const { return recovered_; }
void set_recovered(bool value) { recovered_ = value; }
int payload_type_frequency() const { return payload_type_frequency_; }
void set_payload_type_frequency(int value) {
payload_type_frequency_ = value;
}
private:
int64_t arrival_time_ = (std::numeric_limits<int64_t>::min)();
int64_t arrival_time_ = std::numeric_limits<int64_t>::min();
EcnMarking ecn_ = EcnMarking::kNotEct;
int payload_type_frequency_ = 0;
bool recovered_ = false;
};
#endif