Implementation for user data sending

This commit is contained in:
dijunkun
2023-09-13 17:31:02 +08:00
parent e2533d18e4
commit a0abb7455c
27 changed files with 513 additions and 233 deletions

View File

@@ -36,46 +36,12 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size,
RtpPacket rtp_packet;
if (size <= MAX_NALU_LEN) {
rtp_packet.SetVerion(version_);
rtp_packet.SetHasPadding(has_padding_);
rtp_packet.SetHasExtension(has_extension_);
rtp_packet.SetMarker(1);
rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_));
rtp_packet.SetSequenceNumber(sequence_number_++);
timestamp_ =
std::chrono::high_resolution_clock::now().time_since_epoch().count();
rtp_packet.SetTimestamp(timestamp_);
rtp_packet.SetSsrc(ssrc_);
if (!csrcs_.empty()) {
rtp_packet.SetCsrcs(csrcs_);
}
if (has_extension_) {
rtp_packet.SetExtensionProfile(extension_profile_);
rtp_packet.SetExtensionData(extension_data_, extension_len_);
}
RtpPacket::FU_INDICATOR fu_indicator;
fu_indicator.forbidden_bit = 0;
fu_indicator.nal_reference_idc = 1;
fu_indicator.nal_unit_type = NALU;
rtp_packet.SetFuIndicator(fu_indicator);
rtp_packet.EncodeH264Nalu(buffer, size);
packets.emplace_back(rtp_packet);
} else {
size_t last_packet_size = size % MAX_NALU_LEN;
size_t packet_num = size / MAX_NALU_LEN + (last_packet_size ? 1 : 0);
for (size_t index = 0; index < packet_num; index++) {
if (RtpPacket::PAYLOAD_TYPE::H264 == payload_type_) {
if (size <= MAX_NALU_LEN) {
rtp_packet.SetVerion(version_);
rtp_packet.SetHasPadding(has_padding_);
rtp_packet.SetHasExtension(has_extension_);
rtp_packet.SetMarker(index == packet_num ? 1 : 0);
rtp_packet.SetMarker(1);
rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_));
rtp_packet.SetSequenceNumber(sequence_number_++);
@@ -95,26 +61,78 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size,
RtpPacket::FU_INDICATOR fu_indicator;
fu_indicator.forbidden_bit = 0;
fu_indicator.nal_reference_idc = 0;
fu_indicator.nal_unit_type = FU_A;
RtpPacket::FU_HEADER fu_header;
fu_header.start = index == 0 ? 1 : 0;
fu_header.end = index == packet_num - 1 ? 1 : 0;
fu_header.remain_bit = 0;
fu_header.nal_unit_type = FU_A;
fu_indicator.nal_reference_idc = 1;
fu_indicator.nal_unit_type = NALU;
rtp_packet.SetFuIndicator(fu_indicator);
rtp_packet.SetFuHeader(fu_header);
if (index == packet_num - 1 && last_packet_size > 0) {
rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN,
last_packet_size);
} else {
rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN, MAX_NALU_LEN);
}
rtp_packet.EncodeH264Nalu(buffer, size);
packets.emplace_back(rtp_packet);
} else {
size_t last_packet_size = size % MAX_NALU_LEN;
size_t packet_num = size / MAX_NALU_LEN + (last_packet_size ? 1 : 0);
for (size_t index = 0; index < packet_num; index++) {
rtp_packet.SetVerion(version_);
rtp_packet.SetHasPadding(has_padding_);
rtp_packet.SetHasExtension(has_extension_);
rtp_packet.SetMarker(index == packet_num ? 1 : 0);
rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_));
rtp_packet.SetSequenceNumber(sequence_number_++);
timestamp_ = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
rtp_packet.SetTimestamp(timestamp_);
rtp_packet.SetSsrc(ssrc_);
if (!csrcs_.empty()) {
rtp_packet.SetCsrcs(csrcs_);
}
if (has_extension_) {
rtp_packet.SetExtensionProfile(extension_profile_);
rtp_packet.SetExtensionData(extension_data_, extension_len_);
}
RtpPacket::FU_INDICATOR fu_indicator;
fu_indicator.forbidden_bit = 0;
fu_indicator.nal_reference_idc = 0;
fu_indicator.nal_unit_type = FU_A;
RtpPacket::FU_HEADER fu_header;
fu_header.start = index == 0 ? 1 : 0;
fu_header.end = index == packet_num - 1 ? 1 : 0;
fu_header.remain_bit = 0;
fu_header.nal_unit_type = FU_A;
rtp_packet.SetFuIndicator(fu_indicator);
rtp_packet.SetFuHeader(fu_header);
if (index == packet_num - 1 && last_packet_size > 0) {
rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN,
last_packet_size);
} else {
rtp_packet.EncodeH264Fua(buffer + index * MAX_NALU_LEN, MAX_NALU_LEN);
}
packets.emplace_back(rtp_packet);
}
}
} else if (RtpPacket::PAYLOAD_TYPE::DATA == payload_type_) {
rtp_packet.SetVerion(version_);
rtp_packet.SetHasPadding(has_padding_);
rtp_packet.SetHasExtension(has_extension_);
rtp_packet.SetMarker(1);
rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_));
rtp_packet.SetSequenceNumber(sequence_number_++);
timestamp_ =
std::chrono::high_resolution_clock::now().time_since_epoch().count();
rtp_packet.SetTimestamp(timestamp_);
rtp_packet.SetSsrc(ssrc_);
rtp_packet.Encode(buffer, size);
packets.emplace_back(rtp_packet);
}
}

View File

@@ -0,0 +1,5 @@
#include "rtp_data_receiver.h"
RtpDataReceiver::RtpDataReceiver() {}
RtpDataReceiver::~RtpDataReceiver() {}

View File

@@ -0,0 +1,13 @@
#ifndef _RTP_DATA_RECEIVER_H_
#define _RTP_DATA_RECEIVER_H_
class RtpDataReceiver {
public:
RtpDataReceiver();
~RtpDataReceiver();
private:
/* data */
};
#endif

140
src/rtp/rtp_data_sender.cpp Normal file
View File

@@ -0,0 +1,140 @@
#include "rtp_data_sender.h"
#include <chrono>
#include "log.h"
#define RTCP_SR_INTERVAL 1000
RtpDataSender::RtpDataSender() {}
RtpDataSender::~RtpDataSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpDataSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
rtp_packe_queue_.push(rtp_packet);
}
}
void RtpDataSender::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpDataSender::SendRtpPacket(RtpPacket& rtp_packet) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
int ret = 0;
if (0 !=
data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
last_send_bytes_ += rtp_packet.Size();
total_rtp_packets_sent_++;
total_rtp_payload_sent_ += rtp_packet.PayloadSize();
if (CheckIsTimeSendSR()) {
RtcpSenderReport rtcp_sr;
SenderInfo sender_info;
RtcpReportBlock report;
auto duration = std::chrono::system_clock::now().time_since_epoch();
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(duration);
uint32_t seconds_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::seconds>(duration).count());
uint32_t fraction_u32 = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::nanoseconds>(duration - seconds)
.count());
sender_info.sender_ssrc = 0x00;
sender_info.ntp_ts_msw = (uint32_t)seconds_u32;
sender_info.ntp_ts_lsw = (uint32_t)fraction_u32;
sender_info.rtp_ts =
std::chrono::high_resolution_clock::now().time_since_epoch().count() *
1000000;
sender_info.sender_packet_count = total_rtp_packets_sent_;
sender_info.sender_octet_count = total_rtp_payload_sent_;
rtcp_sr.SetSenderInfo(sender_info);
report.source_ssrc = 0x00;
report.fraction_lost = 0;
report.cumulative_lost = 0;
report.extended_high_seq_num = 0;
report.jitter = 0;
report.lsr = 0;
report.dlsr = 0;
rtcp_sr.SetReportBlock(report);
rtcp_sr.Encode();
SendRtcpSR(rtcp_sr);
}
return 0;
}
int RtpDataSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) {
LOG_ERROR("Send SR failed");
return -1;
}
LOG_ERROR("Send SR");
return 0;
}
bool RtpDataSender::CheckIsTimeSendSR() {
uint32_t now_ts = static_cast<uint32_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now().time_since_epoch())
.count());
if (now_ts - last_send_rtcp_sr_packet_ts_ >= RTCP_SR_INTERVAL) {
last_send_rtcp_sr_packet_ts_ = now_ts;
return true;
} else {
return false;
}
}
bool RtpDataSender::Process() {
last_send_bytes_ = 0;
for (size_t i = 0; i < 10; i++)
if (!rtp_packe_queue_.isEmpty()) {
RtpPacket rtp_packet;
rtp_packe_queue_.pop(rtp_packet);
SendRtpPacket(rtp_packet);
}
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
return true;
}

41
src/rtp/rtp_data_sender.h Normal file
View File

@@ -0,0 +1,41 @@
#ifndef _RTP_DATA_SENDER_H_
#define _RTP_DATA_SENDER_H_
#include <functional>
#include "ringbuffer.h"
#include "rtcp_sender_report.h"
#include "rtp_packet.h"
#include "rtp_statistics.h"
#include "thread_base.h"
class RtpDataSender : public ThreadBase {
public:
RtpDataSender();
~RtpDataSender();
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
private:
private:
int SendRtpPacket(RtpPacket &rtp_packet);
int SendRtcpSR(RtcpSenderReport &rtcp_sr);
bool CheckIsTimeSendSR();
private:
bool Process() override;
private:
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t last_send_rtcp_sr_packet_ts_ = 0;
uint32_t total_rtp_packets_sent_ = 0;
uint32_t total_rtp_payload_sent_ = 0;
};
#endif

View File

@@ -66,7 +66,7 @@
class RtpPacket {
public:
typedef enum { H264 = 96, OPUS = 97, USER_DEFINED = 127 } PAYLOAD_TYPE;
typedef enum { H264 = 96, OPUS = 97, DATA = 127 } PAYLOAD_TYPE;
typedef enum { UNKNOWN = 0, NALU = 1, FU_A = 28, FU_B = 29 } NAL_UNIT_TYPE;
public:

View File

@@ -0,0 +1,31 @@
#include "rtp_statistics.h"
#include "log.h"
RtpStatistics::RtpStatistics() {}
RtpStatistics::~RtpStatistics() {}
void RtpStatistics::UpdateSentBytes(uint32_t sent_bytes) {
sent_bytes_ += sent_bytes;
}
void RtpStatistics::UpdateReceiveBytes(uint32_t received_bytes) {
received_bytes_ += received_bytes;
}
bool RtpStatistics::Process() {
if (!sent_bytes_) {
LOG_INFO("rtp statistics: Send [{} bps]", sent_bytes_);
}
if (!received_bytes_) {
LOG_INFO("rtp statistics: Receive [{} bps]", received_bytes_);
}
sent_bytes_ = 0;
received_bytes_ = 0;
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}

23
src/rtp/rtp_statistics.h Normal file
View File

@@ -0,0 +1,23 @@
#ifndef _RTP_STATISTICS_H_
#define _RTP_STATISTICS_H_
#include "thread_base.h"
class RtpStatistics : public ThreadBase {
public:
RtpStatistics();
~RtpStatistics();
public:
void UpdateSentBytes(uint32_t sent_bytes);
void UpdateReceiveBytes(uint32_t received_bytes);
private:
bool Process();
private:
uint32_t sent_bytes_ = 0;
uint32_t received_bytes_ = 0;
};
#endif

View File

@@ -0,0 +1,5 @@
#include "rtp_transceiver.h"
RtpTransceiver::RtpTransceiver() {}
RtpTransceiver::~RtpTransceiver() {}

18
src/rtp/rtp_transceiver.h Normal file
View File

@@ -0,0 +1,18 @@
#ifndef _RTP_TRANSCEIVER_H_
#define _RTP_TRANSCEIVER_H_
#include <functional>
class RtpTransceiver {
public:
RtpTransceiver();
~RtpTransceiver();
public:
virtual void SetSendDataFunc(
std::function<int(const char *, size_t)> data_send_func) = 0;
virtual void OnReceiveData(const char *data, size_t size) = 0;
};
#endif

View File

@@ -1,18 +0,0 @@
#include "rtp_video_receive_statistics.h"
#include "log.h"
RtpVideoReceiveStatistics::RtpVideoReceiveStatistics() {}
RtpVideoReceiveStatistics::~RtpVideoReceiveStatistics() {}
void RtpVideoReceiveStatistics::UpdateReceiveBytes(uint32_t received_bytes) {
received_bytes_ += received_bytes;
}
bool RtpVideoReceiveStatistics::Process() {
LOG_INFO("rtp statistics: Receive [{} bps]", received_bytes_);
received_bytes_ = 0;
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}

View File

@@ -1,21 +0,0 @@
#ifndef _RTP_VIDEO_RECEIVE_STATISTICS_H_
#define _RTP_VIDEO_RECEIVE_STATISTICS_H_
#include "thread_base.h"
class RtpVideoReceiveStatistics : public ThreadBase {
public:
RtpVideoReceiveStatistics();
~RtpVideoReceiveStatistics();
public:
void UpdateReceiveBytes(uint32_t received_bytes);
private:
bool Process();
private:
uint32_t received_bytes_ = 0;
};
#endif

View File

@@ -7,17 +7,20 @@
RtpVideoReceiver::RtpVideoReceiver() {}
RtpVideoReceiver::~RtpVideoReceiver() {}
RtpVideoReceiver::~RtpVideoReceiver() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
if (!rtp_video_receive_statistics_) {
rtp_video_receive_statistics_ =
std::make_unique<RtpVideoReceiveStatistics>();
rtp_video_receive_statistics_->Start();
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
if (rtp_video_receive_statistics_) {
rtp_video_receive_statistics_->UpdateReceiveBytes(rtp_packet.Size());
if (rtp_statistics_) {
rtp_statistics_->UpdateReceiveBytes(rtp_packet.Size());
}
if (CheckIsTimeSendRR()) {
@@ -147,18 +150,18 @@ bool RtpVideoReceiver::Process() {
return true;
}
void RtpVideoReceiver::SetUdpSender(
std::function<int(const char*, size_t)> udp_sender) {
udp_sender_ = udp_sender;
void RtpVideoReceiver::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
if (!udp_sender_) {
LOG_ERROR("udp_sender_ is nullptr");
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (udp_sender_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) {
if (data_send_func_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) {
LOG_ERROR("Send RR failed");
return -1;
}

View File

@@ -9,7 +9,7 @@
#include "ringbuffer.h"
#include "rtcp_receiver_report.h"
#include "rtp_codec.h"
#include "rtp_video_receive_statistics.h"
#include "rtp_statistics.h"
#include "thread_base.h"
class RtpVideoReceiver : public ThreadBase {
@@ -20,8 +20,7 @@ class RtpVideoReceiver : public ThreadBase {
public:
void InsertRtpPacket(RtpPacket& rtp_packet);
void SetUdpSender(
std::function<int(const char*, size_t)> rtp_packet_send_func);
void SetSendDataFunc(std::function<int(const char*, size_t)> data_send_func);
void SetOnReceiveCompleteFrame(
std::function<void(VideoFrame&)> on_receive_complete_frame) {
@@ -44,10 +43,9 @@ class RtpVideoReceiver : public ThreadBase {
RingBuffer<VideoFrame> compelete_video_frame_queue_;
private:
std::unique_ptr<RtpVideoReceiveStatistics> rtp_video_receive_statistics_ =
nullptr;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
uint32_t last_send_rtcp_rr_packet_ts_ = 0;
std::function<int(const char*, size_t)> udp_sender_ = nullptr;
std::function<int(const char*, size_t)> data_send_func_ = nullptr;
};
#endif

View File

@@ -1,18 +0,0 @@
#include "rtp_video_send_statistics.h"
#include "log.h"
RtpVideoSendStatistics::RtpVideoSendStatistics() {}
RtpVideoSendStatistics::~RtpVideoSendStatistics() {}
void RtpVideoSendStatistics::UpdateSentBytes(uint32_t sent_bytes) {
sent_bytes_ += sent_bytes;
}
bool RtpVideoSendStatistics::Process() {
LOG_INFO("rtp statistics: Send [{} bps]", sent_bytes_);
sent_bytes_ = 0;
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}

View File

@@ -1,21 +0,0 @@
#ifndef _RTP_VIDEO_SEND_STATISTICS_H_
#define _RTP_VIDEO_SEND_STATISTICS_H_
#include "thread_base.h"
class RtpVideoSendStatistics : public ThreadBase {
public:
RtpVideoSendStatistics();
~RtpVideoSendStatistics();
public:
void UpdateSentBytes(uint32_t sent_bytes);
private:
bool Process();
private:
uint32_t sent_bytes_ = 0;
};
#endif

View File

@@ -8,12 +8,16 @@
RtpVideoSender::RtpVideoSender() {}
RtpVideoSender::~RtpVideoSender() { rtp_video_send_statistics_->Stop(); }
RtpVideoSender::~RtpVideoSender() {
if (rtp_statistics_) {
rtp_statistics_->Stop();
}
}
void RtpVideoSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
if (!rtp_video_send_statistics_) {
rtp_video_send_statistics_ = std::make_unique<RtpVideoSendStatistics>();
rtp_video_send_statistics_->Start();
if (!rtp_statistics_) {
rtp_statistics_ = std::make_unique<RtpStatistics>();
rtp_statistics_->Start();
}
for (auto& rtp_packet : rtp_packets) {
@@ -21,20 +25,21 @@ void RtpVideoSender::Enqueue(std::vector<RtpPacket>& rtp_packets) {
}
}
void RtpVideoSender::SetUdpSender(
std::function<int(const char*, size_t)> udp_sender) {
udp_sender_ = udp_sender;
void RtpVideoSender::SetSendDataFunc(
std::function<int(const char*, size_t)> data_send_func) {
data_send_func_ = data_send_func;
}
int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) {
if (!udp_sender_) {
LOG_ERROR("udp_sender_ is nullptr");
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
int ret = 0;
if (0 != udp_sender_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) {
if (0 !=
data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) {
LOG_ERROR("Send rtp packet failed");
return -1;
}
@@ -87,12 +92,12 @@ int RtpVideoSender::SendRtpPacket(RtpPacket& rtp_packet) {
}
int RtpVideoSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) {
if (!udp_sender_) {
LOG_ERROR("udp_sender_ is nullptr");
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
return -1;
}
if (udp_sender_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) {
if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) {
LOG_ERROR("Send SR failed");
return -1;
}
@@ -126,8 +131,8 @@ bool RtpVideoSender::Process() {
SendRtpPacket(rtp_packet);
}
if (rtp_video_send_statistics_) {
rtp_video_send_statistics_->UpdateSentBytes(last_send_bytes_);
if (rtp_statistics_) {
rtp_statistics_->UpdateSentBytes(last_send_bytes_);
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));

View File

@@ -6,7 +6,7 @@
#include "ringbuffer.h"
#include "rtcp_sender_report.h"
#include "rtp_packet.h"
#include "rtp_video_send_statistics.h"
#include "rtp_statistics.h"
#include "thread_base.h"
class RtpVideoSender : public ThreadBase {
@@ -16,10 +16,7 @@ class RtpVideoSender : public ThreadBase {
public:
void Enqueue(std::vector<RtpPacket> &rtp_packets);
public:
void SetUdpSender(
std::function<int(const char *, size_t)> rtp_packet_send_func);
void SetSendDataFunc(std::function<int(const char *, size_t)> data_send_func);
private:
int SendRtpPacket(RtpPacket &rtp_packet);
@@ -31,11 +28,11 @@ class RtpVideoSender : public ThreadBase {
bool Process() override;
private:
std::function<int(const char *, size_t)> udp_sender_ = nullptr;
std::function<int(const char *, size_t)> data_send_func_ = nullptr;
RingBuffer<RtpPacket> rtp_packe_queue_;
private:
std::unique_ptr<RtpVideoSendStatistics> rtp_video_send_statistics_ = nullptr;
std::unique_ptr<RtpStatistics> rtp_statistics_ = nullptr;
uint32_t last_send_bytes_ = 0;
uint32_t last_send_rtcp_sr_packet_ts_ = 0;
uint32_t total_rtp_packets_sent_ = 0;