Implementation for rtp module

This commit is contained in:
dijunkun
2023-09-05 17:38:10 +08:00
parent 3389dc5751
commit 213318bfa3
6 changed files with 242 additions and 90 deletions

View File

@@ -3,50 +3,50 @@
#include <string.h>
class Data {
public:
Data() = default;
Data(const char* data, size_t size) {
data_ = new char[size];
memcpy(data_, data, size);
size_ = size;
}
Data(const Data& obj) {
data_ = new char[obj.size_];
memcpy(data_, obj.data_, obj.size_);
size_ = obj.size_;
}
Data& operator=(const Data& obj) {
data_ = new char[obj.size_];
memcpy(data_, obj.data_, obj.size_);
size_ = obj.size_;
return *this;
}
~Data() {
if (data_) {
delete data_;
data_ = nullptr;
}
size_ = 0;
}
size_t size() const { return size_; }
char* data() const { return data_; }
public:
char* data_ = nullptr;
size_t size_ = 0;
};
template <typename T>
class RingBuffer {
public:
class Data {
public:
Data() = default;
Data(const char* data, size_t size) {
data_ = new char[size];
memcpy(data_, data, size);
size_ = size;
}
Data(const Data& obj) {
data_ = new char[obj.size_];
memcpy(data_, obj.data_, obj.size_);
size_ = obj.size_;
}
Data& operator=(const Data& obj) {
data_ = new char[obj.size_];
memcpy(data_, obj.data_, obj.size_);
size_ = obj.size_;
return *this;
}
~Data() {
if (data_) {
delete data_;
data_ = nullptr;
}
size_ = 0;
}
size_t size() const { return size_; }
char* data() const { return data_; }
public:
char* data_ = nullptr;
size_t size_ = 0;
};
public:
RingBuffer(unsigned size = 128) : m_size(size), m_front(0), m_rear(0) {
m_data = new Data[size];
m_data = new T[size];
}
~RingBuffer() {
@@ -58,7 +58,7 @@ class RingBuffer {
inline bool isFull() const { return m_front == (m_rear + 1) % m_size; }
bool push(const Data& value) {
bool push(const T& value) {
if (isFull()) {
return false;
}
@@ -67,7 +67,7 @@ class RingBuffer {
return true;
}
bool push(const Data* value) {
bool push(const T* value) {
if (isFull()) {
return false;
}
@@ -76,7 +76,7 @@ class RingBuffer {
return true;
}
inline bool pop(Data& value) {
inline bool pop(T& value) {
if (isEmpty()) {
return false;
}
@@ -95,7 +95,7 @@ class RingBuffer {
unsigned int m_size;
int m_front;
int m_rear;
Data* m_data;
T* m_data;
};
#endif

View File

@@ -1,6 +1,7 @@
#include "rtp_packet.h"
#include <string>
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
@@ -13,7 +14,7 @@
// | Contributing source (CSRC) identifiers |
// | .... |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// | header eXtension profile id | length in 32bits |
// | defined by profile | length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Extensions |
// | .... |
@@ -24,26 +25,141 @@
// | padding | Padding size |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
constexpr size_t kDefaultMtu = 1400;
RtpPacket::RtpPacket() : buffer_(new uint8_t[kDefaultMtu]), size_(kDefaultMtu) {
memset(buffer_, 0, kDefaultMtu);
}
RtpPacket::RtpPacket(const uint8_t *buffer, size_t size) {
if (size > 0) {
payload_ = new uint8_t[size];
memcpy(payload_, buffer, size);
payload_size_ = size;
buffer_ = new uint8_t[size];
memcpy(buffer_, buffer, size);
size_ = size;
}
}
RtpPacket::RtpPacket(const RtpPacket &rhs) {
if (rhs.payload_size_ > 0) {
payload_ = new uint8_t[rhs.payload_size_];
memcpy(payload_, rhs.payload_, rhs.payload_size_);
payload_size_ = rhs.payload_size_;
RtpPacket::RtpPacket(const RtpPacket &rtp_packet) {
if (rtp_packet.size_ > 0) {
buffer_ = new uint8_t[rtp_packet.size_];
memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_);
size_ = rtp_packet.size_;
}
}
RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) {
if (&rtp_packet != this && rtp_packet.size_ > 0) {
buffer_ = new uint8_t[rtp_packet.size_];
memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_);
size_ = rtp_packet.size_;
}
return *this;
}
RtpPacket::RtpPacket(RtpPacket &&rtp_packet)
: buffer_((uint8_t *)rtp_packet.Buffer()), size_(rtp_packet.Size()) {}
RtpPacket::~RtpPacket() {
if (buffer_) {
delete buffer_;
buffer_ = nullptr;
}
size_ = 0;
if (extension_data_) {
delete extension_data_;
extension_data_ = nullptr;
}
extension_len_ = 0;
if (payload_) {
delete payload_;
payload_ = nullptr;
}
payload_size_ = 0;
}
const uint8_t *RtpPacket::Encode(uint8_t *payload, size_t payload_size) {
buffer_[0] = (version_ << 6) | (has_padding_ << 5) | (has_extension_ << 4) |
total_csrc_number_;
buffer_[1] = (marker_ << 7) | payload_type_;
buffer_[2] = (sequence_number_ >> 8) & 0xFF;
buffer_[3] = sequence_number_ & 0xFF;
buffer_[4] = (timestamp_ >> 24) & 0xFF;
buffer_[5] = (timestamp_ >> 16) & 0xFF;
buffer_[6] = (timestamp_ >> 8) & 0xFF;
buffer_[7] = timestamp_ & 0xFF;
buffer_[8] = (ssrc_ >> 24) & 0xFF;
buffer_[9] = (ssrc_ >> 16) & 0xFF;
buffer_[10] = (ssrc_ >> 8) & 0xFF;
buffer_[11] = ssrc_ & 0xFF;
for (uint32_t index = 0; index < total_csrc_number_ && !csrcs_.empty();
index++) {
buffer_[12 + index] = (csrcs_[index] >> 24) & 0xFF;
buffer_[13 + index] = (csrcs_[index] >> 16) & 0xFF;
buffer_[14 + index] = (csrcs_[index] >> 8) & 0xFF;
buffer_[15 + index] = csrcs_[index] & 0xFF;
}
uint32_t extension_offset =
total_csrc_number_ && !csrcs_.empty() ? total_csrc_number_ * 4 : 0;
if (has_extension_ && extension_data_) {
buffer_[12 + extension_offset] = extension_profile_ >> 8;
buffer_[13 + extension_offset] = extension_profile_ & 0xff;
buffer_[14 + extension_offset] = (extension_len_ >> 8) & 0xFF;
buffer_[15 + extension_offset] = extension_len_ & 0xFF;
memcpy(buffer_ + 16 + extension_offset, extension_data_, extension_len_);
}
uint32_t payload_offset =
(has_extension_ && extension_data_ ? extension_len_ : 0) +
extension_offset;
memcpy(buffer_ + 12 + payload_offset, payload, payload_size);
size_ = payload_size + (12 + payload_offset);
return buffer_;
}
const uint8_t *RtpPacket::Decode() {
version_ = (buffer_[0] >> 6) & 0x03;
has_padding_ = (buffer_[0] >> 5) & 0x01;
has_extension_ = (buffer_[0] >> 4) & 0x01;
total_csrc_number_ = buffer_[0] & 0x0f;
marker_ = (buffer_[1] >> 7) & 0x01;
payload_type_ = buffer_[1] & 0x7f;
sequence_number_ = (buffer_[2] << 8) | buffer_[3];
timestamp_ =
(buffer_[4] << 24) | (buffer_[5] << 16) | (buffer_[6] << 8) | buffer_[7];
ssrc_ = (buffer_[8] << 24) | (buffer_[9] << 16) | (buffer_[10] << 8) |
buffer_[11];
for (uint32_t index = 0; index < total_csrc_number_; index++) {
uint32_t csrc = (buffer_[12 + index] << 24) | (buffer_[13 + index] << 16) |
(buffer_[14 + index] << 8) | buffer_[15 + index];
csrcs_.push_back(csrc);
}
uint32_t extension_offset = total_csrc_number_ * 4;
if (has_extension_) {
extension_profile_ =
(buffer_[12 + extension_offset] << 8) | buffer_[13 + extension_offset];
extension_len_ =
(buffer_[14 + extension_offset] << 8) | buffer_[15 + extension_offset];
// extension_data_ = new uint8_t[extension_len_];
// memcpy(extension_data_, buffer_ + 16 + extension_offset, extension_len_);
extension_data_ = buffer_ + 16 + extension_offset;
}
uint32_t payload_offset =
(has_extension_ ? extension_len_ : 0) + extension_offset;
payload_size_ = size_ - (12 + payload_offset);
// payload_ = new uint8_t[payload_size_];
// memcpy(payload_, buffer_ + 12 + payload_offset, payload_size_);
payload_ = buffer_ + 12 + payload_offset;
return payload_;
}

View File

@@ -16,7 +16,7 @@
// | Contributing source (CSRC) identifiers |
// | .... |
// +=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
// | header eXtension profile id | length in 32bits |
// | defined by profile | length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | Extensions |
// | .... |
@@ -29,15 +29,17 @@
class RtpPacket {
public:
RtpPacket();
RtpPacket(const uint8_t *buffer, size_t size);
RtpPacket(const RtpPacket &rhs);
RtpPacket() {}
RtpPacket(const RtpPacket &rtp_packet);
RtpPacket &operator=(const RtpPacket &rtp_packet);
RtpPacket(RtpPacket &&rtp_packet);
~RtpPacket();
public:
// Header
void SetMarker(bool marker) { marker_ = marker; }
// Set Header
void SetMarker(bool has_marker) { marker_ = has_marker; }
void SetPayloadType(uint8_t payload_type) { payload_type_ = payload_type; }
void SetSequenceNumber(uint16_t sequence_number) {
sequence_number_ = sequence_number;
@@ -45,56 +47,65 @@ class RtpPacket {
void SetTimestamp(uint32_t timestamp) { timestamp_ = timestamp; }
void SetSsrc(uint32_t ssrc) { ssrc_ = ssrc; }
void SetCsrcs(){};
void SetHeadersSize(size_t payload_offset) {
payload_offset_ = payload_offset;
}
void SetHasExtension(bool has_extension) { has_extension_ = has_extension; };
void SetExtensionID(uint16_t extension_id) { extension_id_ = extension_id; }
void SetExtensionData(uint8_t *extension_data) {
extension_data_ = extension_data;
void SetExtensionProfile(uint16_t extension_profile) {
extension_profile_ = extension_profile;
}
void SetExtensionData(uint8_t *extension_data, size_t extension_len) {
extension_len_ = extension_len;
extension_data_ = new uint8_t[extension_len_];
memcpy(extension_data_, extension_data, extension_len_);
}
// Payload
void SetPayload(uint8_t *payload) {
payload_ = new uint8_t[payload_size_];
memcpy(payload_, payload, payload_size_);
};
void SetPayloadSize(size_t payload_size) { payload_size_ = payload_size; };
void SetPaddingSize(size_t padding_size) { padding_size_ = padding_size; }
public:
const uint8_t *Encode(uint8_t *payload, size_t payload_size);
const uint8_t *Decode();
// Header
public:
// Get Header
bool Marker() const { return marker_; }
uint8_t PayloadType() const { return payload_type_; }
uint16_t SequenceNumber() const { return sequence_number_; }
uint32_t Timestamp() const { return timestamp_; }
uint32_t Ssrc() const { return ssrc_; }
std::vector<uint32_t> Csrcs() const;
size_t HeadersSize() const { return payload_offset_; }
bool HasExtension() const { return has_extension_; };
uint16_t ExtensionID() const { return extension_id_; }
uint16_t ExtensionProfile() const { return extension_profile_; }
uint8_t *ExtensionData() const { return extension_data_; }
// Payload
uint8_t *Payload() const { return payload_; };
size_t PayloadSize() const { return payload_size_; }
size_t PaddingSize() const { return padding_size_; }
public:
const uint8_t *Buffer() { return buffer_; }
const size_t Size() { return size_; }
private:
// Header
uint32_t version_ = 0;
bool has_padding_ = false;
bool has_extension_ = false;
uint32_t total_csrc_number_ = 0;
bool marker_ = false;
uint8_t payload_type_ = 0;
uint16_t sequence_number_ = 0;
uint32_t timestamp_ = 0;
uint32_t ssrc_ = 0;
size_t payload_offset_ = 0;
bool has_extension_ = false;
uint16_t extension_id_ = 0;
std::vector<uint32_t> csrcs_;
uint16_t profile_ = 0;
uint16_t extension_profile_ = 0;
uint16_t extension_len_ = 0;
uint8_t *extension_data_ = nullptr;
// Payload
uint8_t *payload_ = nullptr;
size_t payload_size_ = 0;
uint8_t padding_size_;
// Entire RTP buffer
uint8_t *buffer_ = nullptr;
size_t size_ = 0;
};
#endif

View File

@@ -66,17 +66,21 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
ikcp_update(kcp, clock);
if (!send_ringbuffer_.isEmpty()) {
RingBuffer::Data buffer;
// Data buffer;
RtpPacket buffer;
if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) {
send_ringbuffer_.pop(buffer);
ret = ikcp_send(kcp, buffer.data(), buffer.size());
// ret = ikcp_send(kcp, buffer.data(), buffer.size());
ret = ikcp_send(kcp, (const char *)buffer.Buffer(), buffer.Size());
}
}
if (!recv_ringbuffer_.isEmpty()) {
RingBuffer::Data buffer;
// Data buffer;
RtpPacket buffer;
recv_ringbuffer_.pop(buffer);
ret = ikcp_input(kcp, buffer.data(), buffer.size());
// ret = ikcp_input(kcp, buffer.data(), buffer.size());
ret = ikcp_input(kcp, (const char *)buffer.Buffer(), buffer.Size());
}
int len = 0;
@@ -145,8 +149,11 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr);
if (ice_transmission_obj->on_receive_ice_msg_cb_) {
ice_transmission_obj->recv_ringbuffer_.push(
std::move(RingBuffer::Data(data, size)));
// ice_transmission_obj->recv_ringbuffer_.push(
// std::move(Data(data, size)));
RtpPacket packet((uint8_t *)data, size);
ice_transmission_obj->recv_ringbuffer_.push(packet);
// int ret = ikcp_input(ice_transmission_obj->kcp_, data, size);
// ikcp_update(ice_transmission_obj->kcp_, iclock());
@@ -258,7 +265,18 @@ int IceTransmission::SendAnswer() {
int IceTransmission::SendData(const char *data, size_t size) {
if (JUICE_STATE_COMPLETED == state_) {
send_ringbuffer_.push(std::move(RingBuffer::Data(data, size)));
// send_ringbuffer_.push(std::move(Data(data, size)));
int num = 0;
for (num = 0; num * 1400 < size; num++) {
RtpPacket packet((uint8_t *)(data + num * 1400), 1400);
send_ringbuffer_.push(packet);
}
if (size - num * 1400 > 0) {
RtpPacket packet((uint8_t *)(data + num * 1400), size - num * 1400);
send_ringbuffer_.push(packet);
}
}
return 0;
}

View File

@@ -6,6 +6,7 @@
#include "congestion_control.h"
#include "ice_agent.h"
#include "ringbuffer.h"
#include "rtp_packet.h"
#include "ws_transmission.h"
class IceTransmission {
public:
@@ -67,8 +68,8 @@ class IceTransmission {
// ikcpcb *kcp_ = nullptr;
char kcp_complete_buffer_[2560 * 1440 * 4];
std::mutex mtx_;
RingBuffer send_ringbuffer_;
RingBuffer recv_ringbuffer_;
RingBuffer<RtpPacket> send_ringbuffer_;
RingBuffer<RtpPacket> recv_ringbuffer_;
bool kcp_stop_ = false;
std::thread *kcp_update_thread_ = nullptr;
};

View File

@@ -46,6 +46,12 @@ target("inih")
target("ringbuffer")
set_kind("headeronly")
add_includedirs("src/ringbuffer", {public = true})
target("rtp")
set_kind("static")
add_deps("log")
add_files("src/rtp/*.cpp")
add_includedirs("src/rtp", {public = true})
target("ice")
set_kind("static")
@@ -83,7 +89,7 @@ target("qos")
target("transmission")
set_kind("static")
add_deps("log", "ws", "ice", "qos", "ringbuffer")
add_deps("log", "ws", "ice", "qos", "ringbuffer", "rtp")
add_files("src/transmission/*.cpp")
add_packages("asio", "nlohmann_json")
add_includedirs("src/ws", "src/ice", "src/qos", {public = true})