Nalu slices test pass

This commit is contained in:
dijunkun
2023-09-07 15:46:26 +08:00
parent 952bb02df5
commit 58c73f10c6
9 changed files with 147 additions and 111 deletions

View File

@@ -130,7 +130,7 @@ std::string GetMac() {
int main() { int main() {
Params params; Params params;
params.cfg_path = "config.ini"; params.cfg_path = "../../../../config/config.ini";
params.on_receive_buffer = GuestReceiveBuffer; params.on_receive_buffer = GuestReceiveBuffer;
std::string transmission_id = "000000"; std::string transmission_id = "000000";

View File

@@ -57,6 +57,14 @@ int PeerConnection::Init(PeerConnectionParams params,
} }
}; };
on_ice_status_change_ = [this](std::string ice_status) {
if ("JUICE_STATE_COMPLETED" == ice_status) {
ice_ready_ = true;
} else {
ice_ready_ = false;
}
};
ws_transport_ = new WsTransmission(on_receive_ws_msg_); ws_transport_ = new WsTransmission(on_receive_ws_msg_);
uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_;
if (ws_transport_) { if (ws_transport_) {
@@ -142,9 +150,9 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
LOG_INFO("]"); LOG_INFO("]");
for (auto &remote_user_id : user_id_list_) { for (auto &remote_user_id : user_id_list_) {
ice_transmission_list_[remote_user_id] = ice_transmission_list_[remote_user_id] = new IceTransmission(
new IceTransmission(true, transmission_id, user_id_, remote_user_id, true, transmission_id, user_id_, remote_user_id, ws_transport_,
ws_transport_, on_receive_ice_msg_); on_receive_ice_msg_, on_ice_status_change_);
ice_transmission_list_[remote_user_id]->InitIceTransmission( ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_); cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_[remote_user_id]->JoinTransmission(); ice_transmission_list_[remote_user_id]->JoinTransmission();
@@ -179,7 +187,7 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
ice_transmission_list_[remote_user_id] = new IceTransmission( ice_transmission_list_[remote_user_id] = new IceTransmission(
false, transmission_id, user_id_, remote_user_id, ws_transport_, false, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_); on_receive_ice_msg_, on_ice_status_change_);
ice_transmission_list_[remote_user_id]->InitIceTransmission( ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_); cfg_stun_server_ip_, stun_server_port_);
@@ -240,6 +248,7 @@ int PeerConnection::Destroy() {
SignalStatus PeerConnection::GetSignalStatus() { return signal_status_; } SignalStatus PeerConnection::GetSignalStatus() { return signal_status_; }
int PeerConnection::SendVideoData(const char *data, size_t size) { int PeerConnection::SendVideoData(const char *data, size_t size) {
if (!ice_ready_) return -1;
int ret = Encode((uint8_t *)data, size); int ret = Encode((uint8_t *)data, size);
if (0 != ret) { if (0 != ret) {
LOG_ERROR("Encode failed"); LOG_ERROR("Encode failed");

View File

@@ -68,6 +68,9 @@ class PeerConnection : public VideoEncoder, VideoDecoder {
std::function<void(const std::string &)> on_receive_ws_msg_ = nullptr; std::function<void(const std::string &)> on_receive_ws_msg_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)> std::function<void(const char *, size_t, const char *, size_t)>
on_receive_ice_msg_ = nullptr; on_receive_ice_msg_ = nullptr;
std::function<void(std::string)> on_ice_status_change_ = nullptr;
bool ice_ready_ = false;
unsigned int ws_connection_id_ = 0; unsigned int ws_connection_id_ = 0;
std::string user_id_ = ""; std::string user_id_ = "";
std::string transmission_id_ = ""; std::string transmission_id_ = "";

View File

@@ -2,9 +2,10 @@
#include <string> #include <string>
RtpPacket::RtpPacket() #include "log.h"
: buffer_(new uint8_t[MAX_NALU_LEN]), size_(MAX_NALU_LEN) {
memset(buffer_, 0, MAX_NALU_LEN); RtpPacket::RtpPacket() : buffer_(new uint8_t[DEFAULT_MTU]), size_(DEFAULT_MTU) {
memset(buffer_, 0, DEFAULT_MTU);
} }
RtpPacket::RtpPacket(const uint8_t *buffer, size_t size) { RtpPacket::RtpPacket(const uint8_t *buffer, size_t size) {
@@ -23,8 +24,19 @@ RtpPacket::RtpPacket(const RtpPacket &rtp_packet) {
} }
} }
RtpPacket::RtpPacket(RtpPacket &&rtp_packet)
: buffer_((uint8_t *)std::move(rtp_packet.buffer_)),
size_(rtp_packet.size_) {
rtp_packet.buffer_ = nullptr;
rtp_packet.size_ = 0;
}
RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) { RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) {
if (&rtp_packet != this && rtp_packet.size_ > 0) { if (&rtp_packet != this) {
if (buffer_) {
delete buffer_;
buffer_ = nullptr;
}
buffer_ = new uint8_t[rtp_packet.size_]; buffer_ = new uint8_t[rtp_packet.size_];
memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_); memcpy(buffer_, rtp_packet.buffer_, rtp_packet.size_);
size_ = rtp_packet.size_; size_ = rtp_packet.size_;
@@ -32,8 +44,15 @@ RtpPacket &RtpPacket::operator=(const RtpPacket &rtp_packet) {
return *this; return *this;
} }
RtpPacket::RtpPacket(RtpPacket &&rtp_packet) RtpPacket &RtpPacket::operator=(RtpPacket &&rtp_packet) {
: buffer_((uint8_t *)rtp_packet.Buffer()), size_(rtp_packet.Size()) {} if (&rtp_packet != this) {
buffer_ = std::move(rtp_packet.buffer_);
rtp_packet.buffer_ = nullptr;
size_ = rtp_packet.size_;
rtp_packet.size_ = 0;
}
return *this;
}
RtpPacket::~RtpPacket() { RtpPacket::~RtpPacket() {
if (buffer_) { if (buffer_) {
@@ -47,11 +66,6 @@ RtpPacket::~RtpPacket() {
extension_data_ = nullptr; extension_data_ = nullptr;
} }
extension_len_ = 0; extension_len_ = 0;
if (payload_) {
delete payload_;
payload_ = nullptr;
}
payload_size_ = 0; payload_size_ = 0;
} }
@@ -227,10 +241,7 @@ size_t RtpPacket::DecodeH264Nalu(uint8_t *payload) {
nalu_header_.nal_unit_type = (buffer_[12 + payload_offset]) & 0x31; nalu_header_.nal_unit_type = (buffer_[12 + payload_offset]) & 0x31;
payload_size_ = size_ - (13 + payload_offset); payload_size_ = size_ - (13 + payload_offset);
// payload_ = new uint8_t[payload_size_];
// memcpy(payload_, buffer_ + 12 + payload_offset, payload_size_);
payload_ = buffer_ + 13 + payload_offset; payload_ = buffer_ + 13 + payload_offset;
memcpy(payload, payload_, payload_size_); memcpy(payload, payload_, payload_size_);
return payload_size_; return payload_size_;
} }

View File

@@ -61,6 +61,7 @@
// |F|NRI| Type |S|E|R| Type | // |F|NRI| Type |S|E|R| Type |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ // +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
#define DEFAULT_MTU 1500
#define MAX_NALU_LEN 1400 #define MAX_NALU_LEN 1400
typedef enum { H264 = 96, OPUS = 97, USER_DEFINED = 127 } PAYLOAD_TYPE; typedef enum { H264 = 96, OPUS = 97, USER_DEFINED = 127 } PAYLOAD_TYPE;
@@ -69,8 +70,9 @@ class RtpPacket {
RtpPacket(); RtpPacket();
RtpPacket(const uint8_t *buffer, size_t size); RtpPacket(const uint8_t *buffer, size_t size);
RtpPacket(const RtpPacket &rtp_packet); RtpPacket(const RtpPacket &rtp_packet);
RtpPacket &operator=(const RtpPacket &rtp_packet);
RtpPacket(RtpPacket &&rtp_packet); RtpPacket(RtpPacket &&rtp_packet);
RtpPacket &operator=(const RtpPacket &rtp_packet);
RtpPacket &operator=(RtpPacket &&rtp_packet);
~RtpPacket(); ~RtpPacket();

View File

@@ -19,49 +19,50 @@ RtpSession ::~RtpSession() {
extension_data_ = nullptr; extension_data_ = nullptr;
} }
if (rtp_packet_) { // if (rtp_packet_) {
delete rtp_packet_; // delete rtp_packet_;
rtp_packet_ = nullptr; // rtp_packet_ = nullptr;
} // }
} }
std::vector<RtpPacket> RtpSession::Encode(uint8_t* buffer, size_t size) { void RtpSession::Encode(uint8_t* buffer, size_t size,
if (!rtp_packet_) { std::vector<RtpPacket>& packets) {
rtp_packet_ = new RtpPacket(); // if (!rtp_packet_) {
} // rtp_packet_ = new RtpPacket();
// }
std::vector<RtpPacket> packets; RtpPacket rtp_packet;
if (size <= MAX_NALU_LEN) { if (size <= MAX_NALU_LEN) {
rtp_packet_->SetVerion(version_); rtp_packet.SetVerion(version_);
rtp_packet_->SetHasPadding(has_padding_); rtp_packet.SetHasPadding(has_padding_);
rtp_packet_->SetHasExtension(has_extension_); rtp_packet.SetHasExtension(has_extension_);
rtp_packet_->SetMarker(1); rtp_packet.SetMarker(1);
rtp_packet_->SetPayloadType(PAYLOAD_TYPE(payload_type_)); rtp_packet.SetPayloadType(PAYLOAD_TYPE(payload_type_));
rtp_packet_->SetSequenceNumber(sequence_number_++); rtp_packet.SetSequenceNumber(sequence_number_++);
timestamp_ = timestamp_ =
std::chrono::high_resolution_clock::now().time_since_epoch().count(); std::chrono::high_resolution_clock::now().time_since_epoch().count();
rtp_packet_->SetTimestamp(timestamp_); rtp_packet.SetTimestamp(timestamp_);
rtp_packet_->SetSsrc(ssrc_); rtp_packet.SetSsrc(ssrc_);
if (!csrcs_.empty()) { if (!csrcs_.empty()) {
rtp_packet_->SetCsrcs(csrcs_); rtp_packet.SetCsrcs(csrcs_);
} }
if (has_extension_) { if (has_extension_) {
rtp_packet_->SetExtensionProfile(extension_profile_); rtp_packet.SetExtensionProfile(extension_profile_);
rtp_packet_->SetExtensionData(extension_data_, extension_len_); rtp_packet.SetExtensionData(extension_data_, extension_len_);
} }
RtpPacket::NALU_HEADER nalu_header; RtpPacket::NALU_HEADER nalu_header;
nalu_header.forbidden_bit = 0; nalu_header.forbidden_bit = 0;
nalu_header.nal_reference_idc = 1; nalu_header.nal_reference_idc = 1;
nalu_header.nal_unit_type = 1; nalu_header.nal_unit_type = 1;
rtp_packet_->SetNaluHeader(nalu_header); rtp_packet.SetNaluHeader(nalu_header);
rtp_packet_->EncodeH264Nalu(buffer, size); rtp_packet.EncodeH264Nalu(buffer, size);
packets.push_back(*rtp_packet_); packets.emplace_back(rtp_packet);
} else { } else {
size_t last_packet_size = size % MAX_NALU_LEN; size_t last_packet_size = size % MAX_NALU_LEN;
@@ -69,36 +70,33 @@ std::vector<RtpPacket> RtpSession::Encode(uint8_t* buffer, size_t size) {
for (size_t index = 0; index * MAX_NALU_LEN < size + MAX_NALU_LEN; for (size_t index = 0; index * MAX_NALU_LEN < size + MAX_NALU_LEN;
index++) { index++) {
rtp_packet_->SetVerion(version_); rtp_packet.SetVerion(version_);
rtp_packet_->SetHasPadding(has_padding_); rtp_packet.SetHasPadding(has_padding_);
rtp_packet_->SetHasExtension(has_extension_); rtp_packet.SetHasExtension(has_extension_);
rtp_packet_->SetMarker(1); rtp_packet.SetMarker(1);
rtp_packet_->SetPayloadType(PAYLOAD_TYPE(payload_type_)); rtp_packet.SetPayloadType(PAYLOAD_TYPE(payload_type_));
rtp_packet_->SetSequenceNumber(sequence_number_++); rtp_packet.SetSequenceNumber(sequence_number_++);
timestamp_ = timestamp_ =
std::chrono::high_resolution_clock::now().time_since_epoch().count(); std::chrono::high_resolution_clock::now().time_since_epoch().count();
rtp_packet_->SetTimestamp(timestamp_); rtp_packet.SetTimestamp(timestamp_);
rtp_packet_->SetSsrc(ssrc_); rtp_packet.SetSsrc(ssrc_);
if (!csrcs_.empty()) { if (!csrcs_.empty()) {
rtp_packet_->SetCsrcs(csrcs_); rtp_packet.SetCsrcs(csrcs_);
} }
if (has_extension_) { if (has_extension_) {
rtp_packet_->SetExtensionProfile(extension_profile_); rtp_packet.SetExtensionProfile(extension_profile_);
rtp_packet_->SetExtensionData(extension_data_, extension_len_); rtp_packet.SetExtensionData(extension_data_, extension_len_);
} }
rtp_packet_->Encode(buffer, size); rtp_packet.Encode(buffer, size);
packets.push_back(*rtp_packet_); packets.emplace_back(rtp_packet);
} }
} }
return std::move(packets);
} }
size_t RtpSession::Decode(uint8_t* buffer, size_t size, uint8_t* payload) { size_t RtpSession::Decode(RtpPacket& packet, uint8_t* payload) {
*rtp_packet_ = std::move(RtpPacket(buffer, size)); return packet.DecodeH264Nalu(payload);
return rtp_packet_->DecodeH264Nalu(payload);
} }

View File

@@ -15,8 +15,8 @@ class RtpSession
~RtpSession(); ~RtpSession();
public: public:
std::vector<RtpPacket> Encode(uint8_t* buffer, size_t size); void Encode(uint8_t* buffer, size_t size, std::vector<RtpPacket>& packets);
size_t Decode(uint8_t* buffer, size_t size, uint8_t* payload); size_t Decode(RtpPacket& packet, uint8_t* payload);
private: private:
uint32_t version_ = 0; uint32_t version_ = 0;
@@ -35,7 +35,7 @@ class RtpSession
uint8_t* extension_data_ = nullptr; uint8_t* extension_data_ = nullptr;
private: private:
RtpPacket* rtp_packet_ = nullptr; // RtpPacket* rtp_packet_ = nullptr;
}; };
#endif #endif

View File

@@ -20,15 +20,23 @@ IceTransmission::IceTransmission(
bool offer_peer, std::string &transmission_id, std::string &user_id, bool offer_peer, std::string &transmission_id, std::string &user_id,
std::string &remote_user_id, WsTransmission *ice_ws_transmission, std::string &remote_user_id, WsTransmission *ice_ws_transmission,
std::function<void(const char *, size_t, const char *, size_t)> std::function<void(const char *, size_t, const char *, size_t)>
on_receive_ice_msg) on_receive_ice_msg,
std::function<void(std::string)> on_ice_status_change)
: offer_peer_(offer_peer), : offer_peer_(offer_peer),
transmission_id_(transmission_id), transmission_id_(transmission_id),
user_id_(user_id), user_id_(user_id),
remote_user_id_(remote_user_id), remote_user_id_(remote_user_id),
ice_ws_transport_(ice_ws_transmission), ice_ws_transport_(ice_ws_transmission),
on_receive_ice_msg_cb_(on_receive_ice_msg) {} on_receive_ice_msg_cb_(on_receive_ice_msg),
on_ice_status_change_(on_ice_status_change) {}
IceTransmission::~IceTransmission() { IceTransmission::~IceTransmission() {
if (kcp_update_thread_ && kcp_update_thread_->joinable()) {
kcp_update_thread_->join();
delete kcp_update_thread_;
kcp_update_thread_ = nullptr;
}
if (video_rtp_session_) { if (video_rtp_session_) {
delete video_rtp_session_; delete video_rtp_session_;
video_rtp_session_ = nullptr; video_rtp_session_ = nullptr;
@@ -43,13 +51,6 @@ IceTransmission::~IceTransmission() {
delete ice_agent_; delete ice_agent_;
ice_agent_ = nullptr; ice_agent_ = nullptr;
} }
if (kcp_update_thread_ && kcp_update_thread_->joinable()) {
kcp_update_thread_->join();
}
delete kcp_update_thread_;
kcp_update_thread_ = nullptr;
} }
int IceTransmission::InitIceTransmission(std::string &ip, int port) { int IceTransmission::InitIceTransmission(std::string &ip, int port) {
@@ -80,44 +81,43 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
RtpPacket buffer; RtpPacket buffer;
if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) { if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) {
send_ringbuffer_.pop(buffer); send_ringbuffer_.pop(buffer);
// ret = ikcp_send(kcp, buffer.data(), buffer.size());
ret = ikcp_send(kcp, (const char *)buffer.Buffer(), buffer.Size()); ice_agent_->Send((const char *)buffer.Buffer(), buffer.Size());
} }
} }
if (!recv_ringbuffer_.isEmpty()) { if (!recv_ringbuffer_.isEmpty()) {
// Data buffer; // RtpPacket packet;
RtpPacket buffer; recv_ringbuffer_.pop(pop_packet_);
recv_ringbuffer_.pop(buffer);
if (!rtp_payload_) { if (!rtp_payload_) {
rtp_payload_ = new uint8_t[1400]; rtp_payload_ = new uint8_t[1400];
} }
size_t rtp_payload_size = video_rtp_session_->Decode( size_t rtp_payload_size =
(uint8_t *)buffer.Buffer(), buffer.Size(), rtp_payload_); video_rtp_session_->Decode(pop_packet_, rtp_payload_);
// ret = ikcp_input(kcp, buffer.data(), buffer.size());
// ret = ikcp_input(kcp, (const char *)buffer.Buffer(), buffer.Size()); on_receive_ice_msg_cb_((const char *)rtp_payload_, rtp_payload_size,
ret = ikcp_input(kcp, (const char *)rtp_payload_, rtp_payload_size); remote_user_id_.data(), remote_user_id_.size());
} }
int len = 0; // int len = 0;
int total_len = 0; // int total_len = 0;
while (1) { // while (1) {
len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000); // len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000);
total_len += len; // total_len += len;
if (len <= 0) { // if (len <= 0) {
if (on_receive_ice_msg_cb_ && total_len > 0) { // if (on_receive_ice_msg_cb_ && total_len > 0) {
LOG_ERROR("Receive size: {}", total_len); // LOG_ERROR("Receive size: {}", total_len);
on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len, // on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len,
remote_user_id_.data(), // remote_user_id_.data(),
remote_user_id_.size()); // remote_user_id_.size());
} // }
break; // break;
} // }
} // }
// std::this_thread::sleep_for(std::chrono::milliseconds(1)); std::this_thread::sleep_for(std::chrono::milliseconds(1));
} }
ikcp_release(kcp); ikcp_release(kcp);
@@ -134,6 +134,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
LOG_INFO("[{}->{}] state_change: {}", ice_transmission_obj->user_id_, LOG_INFO("[{}->{}] state_change: {}", ice_transmission_obj->user_id_,
ice_transmission_obj->remote_user_id_, ice_status[state]); ice_transmission_obj->remote_user_id_, ice_status[state]);
ice_transmission_obj->state_ = state; ice_transmission_obj->state_ = state;
ice_transmission_obj->on_ice_status_change_(ice_status[state]);
} else { } else {
LOG_INFO("state_change: {}", ice_status[state]); LOG_INFO("state_change: {}", ice_status[state]);
} }
@@ -284,9 +285,19 @@ int IceTransmission::SendData(const char *data, size_t size) {
if (JUICE_STATE_COMPLETED == state_) { if (JUICE_STATE_COMPLETED == state_) {
// send_ringbuffer_.push(std::move(Data(data, size))); // send_ringbuffer_.push(std::move(Data(data, size)));
for (int num = 0; num * 1400 < size + 1400; num++) { std::vector<RtpPacket> packets;
std::vector<RtpPacket> packets =
video_rtp_session_->Encode((uint8_t *)(data + num * 1400), 1400); for (size_t num = 0, next_packet_size = 0; num * MAX_NALU_LEN < size;
num++) {
next_packet_size = size - num * MAX_NALU_LEN;
if (next_packet_size < MAX_NALU_LEN) {
video_rtp_session_->Encode((uint8_t *)(data + num * MAX_NALU_LEN),
next_packet_size, packets);
} else {
video_rtp_session_->Encode((uint8_t *)(data + num * MAX_NALU_LEN),
MAX_NALU_LEN, packets);
}
}
for (auto &packet : packets) { for (auto &packet : packets) {
send_ringbuffer_.push(packet); send_ringbuffer_.push(packet);
@@ -298,6 +309,5 @@ int IceTransmission::SendData(const char *data, size_t size) {
// send_ringbuffer_.insert(send_ringbuffer_.end(), packets.begin(), // send_ringbuffer_.insert(send_ringbuffer_.end(), packets.begin(),
// packets.end()); // packets.end());
} }
}
return 0; return 0;
} }

View File

@@ -15,7 +15,8 @@ class IceTransmission {
bool offer_peer, std::string &transmission_id, std::string &user_id, bool offer_peer, std::string &transmission_id, std::string &user_id,
std::string &remote_user_id, WsTransmission *ice_ws_transmission, std::string &remote_user_id, WsTransmission *ice_ws_transmission,
std::function<void(const char *, size_t, const char *, size_t)> std::function<void(const char *, size_t, const char *, size_t)>
on_receive_ice_msg); on_receive_ice_msg,
std::function<void(std::string)> on_ice_status_change);
~IceTransmission(); ~IceTransmission();
@@ -53,6 +54,7 @@ class IceTransmission {
CongestionControl *congestion_control_ = nullptr; CongestionControl *congestion_control_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)> std::function<void(const char *, size_t, const char *, size_t)>
on_receive_ice_msg_cb_ = nullptr; on_receive_ice_msg_cb_ = nullptr;
std::function<void(std::string)> on_ice_status_change_ = nullptr;
std::string local_sdp_; std::string local_sdp_;
std::string remote_sdp_; std::string remote_sdp_;
std::string local_candidates_; std::string local_candidates_;
@@ -77,6 +79,7 @@ class IceTransmission {
private: private:
RtpSession *video_rtp_session_ = nullptr; RtpSession *video_rtp_session_ = nullptr;
uint8_t *rtp_payload_ = nullptr; uint8_t *rtp_payload_ = nullptr;
RtpPacket pop_packet_;
}; };
#endif #endif