[feat] update transport module and channel module

This commit is contained in:
dijunkun
2025-01-06 17:10:56 +08:00
parent eef35ff0d4
commit 0737eee95a
17 changed files with 927 additions and 605 deletions

View File

@@ -62,11 +62,49 @@ int IceTransport::InitIceTransmission(
std::string &stun_ip, int stun_port, std::string &turn_ip, int turn_port,
std::string &turn_username, std::string &turn_password,
RtpPacket::PAYLOAD_TYPE video_codec_payload_type) {
ice_agent_ = std::make_unique<IceAgent>(
offer_peer_, use_trickle_ice_, use_reliable_ice_, enable_turn_,
force_turn_, stun_ip, stun_port, turn_ip, turn_port, turn_username,
turn_password);
InitializeIOStatistics();
InitializeChannels(video_codec_payload_type);
ice_agent_->CreateIceAgent(
[](NiceAgent *agent, guint stream_id, guint component_id,
NiceComponentState state, gpointer user_ptr) {
static_cast<IceTransport *>(user_ptr)->OnIceStateChange(
agent, stream_id, component_id, state, user_ptr);
},
[](NiceAgent *agent, guint stream_id, guint component_id,
gchar *foundation, gpointer user_ptr) {
static_cast<IceTransport *>(user_ptr)->OnNewLocalCandidate(
agent, stream_id, component_id, foundation, user_ptr);
},
[](NiceAgent *agent, guint stream_id, gpointer user_ptr) {
static_cast<IceTransport *>(user_ptr)->OnGatheringDone(agent, stream_id,
user_ptr);
},
[](NiceAgent *agent, guint stream_id, guint component_id,
const char *lfoundation, const char *rfoundation, gpointer user_ptr) {
static_cast<IceTransport *>(user_ptr)->OnNewSelectedPair(
agent, stream_id, component_id, lfoundation, rfoundation, user_ptr);
},
[](NiceAgent *agent, guint stream_id, guint component_id, guint size,
gchar *buffer, gpointer user_ptr) {
static_cast<IceTransport *>(user_ptr)->OnReceiveBuffer(
agent, stream_id, component_id, size, buffer, user_ptr);
},
this);
return 0;
}
void IceTransport::InitializeIOStatistics() {
ice_io_statistics_ = std::make_unique<IOStatistics>(
[this](const IOStatistics::NetTrafficStats &net_traffic_stats) {
if (on_receive_net_status_report_) {
XNetTrafficStats xnet_traffic_stats;
memcpy(&xnet_traffic_stats, &net_traffic_stats,
sizeof(XNetTrafficStats));
on_receive_net_status_report_(
@@ -75,299 +113,180 @@ int IceTransport::InitIceTransmission(
remote_user_id_.size(), user_data_);
}
});
}
void IceTransport::InitializeChannels(
RtpPacket::PAYLOAD_TYPE video_codec_payload_type) {
video_codec_payload_type_ = video_codec_payload_type;
rtp_video_receiver_ = std::make_unique<RtpVideoReceiver>(ice_io_statistics_);
// rr sender
rtp_video_receiver_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {
LOG_ERROR("ice_agent_ is nullptr");
return -1;
}
video_channel_send_ =
std::make_unique<VideoChannelSend>(ice_agent_, ice_io_statistics_);
audio_channel_send_ =
std::make_unique<AudioChannelSend>(ice_agent_, ice_io_statistics_);
data_channel_send_ =
std::make_unique<DataChannelSend>(ice_agent_, ice_io_statistics_);
if (state_ != NICE_COMPONENT_STATE_CONNECTED &&
state_ != NICE_COMPONENT_STATE_READY) {
LOG_ERROR("Ice is not connected, state = [{}]",
nice_component_state_to_string(state_));
return -2;
}
video_channel_send_->Initialize(video_codec_payload_type_);
audio_channel_send_->Initialize(RtpPacket::PAYLOAD_TYPE::OPUS);
data_channel_send_->Initialize(RtpPacket::PAYLOAD_TYPE::DATA);
return ice_agent_->Send(data, size);
});
rtp_video_receiver_->SetOnReceiveCompleteFrame(
[this](VideoFrame &video_frame) -> void {
ice_io_statistics_->UpdateVideoInboundBytes(
(uint32_t)video_frame.Size());
[[maybe_unused]] int num_frame_returned = video_decoder_->Decode(
(uint8_t *)video_frame.Buffer(), video_frame.Size(),
[this](VideoFrame video_frame) {
if (on_receive_video_) {
XVideoFrame x_video_frame;
x_video_frame.data = (const char *)video_frame.Buffer();
x_video_frame.width = video_frame.Width();
x_video_frame.height = video_frame.Height();
x_video_frame.size = video_frame.Size();
on_receive_video_(&x_video_frame, remote_user_id_.data(),
remote_user_id_.size(), user_data_);
}
});
video_channel_receive_ = std::make_unique<VideoChannelReceive>(
ice_agent_, ice_io_statistics_,
[this](VideoFrame &video_frame) { OnReceiveCompleteFrame(video_frame); });
audio_channel_receive_ = std::make_unique<AudioChannelReceive>(
ice_agent_, ice_io_statistics_, [this](const char *data, size_t size) {
OnReceiveCompleteAudio(data, size);
});
rtp_video_receiver_->Start();
rtp_video_sender_ = std::make_unique<RtpVideoSender>(ice_io_statistics_);
rtp_video_sender_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {
LOG_ERROR("ice_agent_ is nullptr");
return -1;
}
if (state_ != NICE_COMPONENT_STATE_CONNECTED &&
state_ != NICE_COMPONENT_STATE_READY) {
LOG_ERROR("Ice is not connected, state = [{}]",
nice_component_state_to_string(state_));
return -2;
}
ice_io_statistics_->UpdateVideoOutboundBytes((uint32_t)size);
return ice_agent_->Send(data, size);
data_channel_receive_ = std::make_unique<DataChannelReceive>(
ice_agent_, ice_io_statistics_, [this](const char *data, size_t size) {
OnReceiveCompleteData(data, size);
});
rtp_video_sender_->Start();
video_channel_receive_->Initialize(video_codec_payload_type_);
audio_channel_receive_->Initialize(RtpPacket::PAYLOAD_TYPE::OPUS);
data_channel_receive_->Initialize(RtpPacket::PAYLOAD_TYPE::DATA);
}
rtp_audio_receiver_ = std::make_unique<RtpAudioReceiver>(ice_io_statistics_);
// rr sender
rtp_audio_receiver_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {
LOG_ERROR("ice_agent_ is nullptr");
return -1;
void IceTransport::OnIceStateChange(NiceAgent *agent, guint stream_id,
guint component_id,
NiceComponentState state,
gpointer user_ptr) {
if (!is_closed_) {
LOG_INFO("[{}->{}] state_change: {}", user_id_, remote_user_id_,
nice_component_state_to_string(state));
state_ = state;
if (state == NICE_COMPONENT_STATE_READY ||
state == NICE_COMPONENT_STATE_CONNECTED) {
ice_io_statistics_->Start();
}
on_ice_status_change_(nice_component_state_to_string(state),
remote_user_id_);
}
}
void IceTransport::OnNewLocalCandidate(NiceAgent *agent, guint stream_id,
guint component_id, gchar *foundation,
gpointer user_ptr) {
if (use_trickle_ice_) {
GSList *cands =
nice_agent_get_local_candidates(agent, stream_id, component_id);
NiceCandidate *cand;
for (GSList *i = cands; i; i = i->next) {
cand = (NiceCandidate *)i->data;
if (g_strcmp0(cand->foundation, foundation) == 0) {
new_local_candidate_ =
nice_agent_generate_local_candidate_sdp(agent, cand);
json message = {{"type", "new_candidate"},
{"transmission_id", transmission_id_},
{"user_id", user_id_},
{"remote_user_id", remote_user_id_},
{"sdp", new_local_candidate_}};
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
}
}
}
if (state_ != NICE_COMPONENT_STATE_CONNECTED &&
state_ != NICE_COMPONENT_STATE_READY) {
LOG_ERROR("Ice is not connected, state = [{}]",
nice_component_state_to_string(state_));
return -2;
}
g_slist_free_full(cands, (GDestroyNotify)nice_candidate_free);
}
}
return ice_agent_->Send(data, size);
});
rtp_audio_receiver_->SetOnReceiveData([this](const char *data,
size_t size) -> void {
ice_io_statistics_->UpdateAudioInboundBytes((uint32_t)size);
void IceTransport::OnGatheringDone(NiceAgent *agent, guint stream_id,
gpointer user_ptr) {
LOG_INFO("[{}->{}] gather_done", user_id_, remote_user_id_);
[[maybe_unused]] int num_frame_returned = audio_decoder_->Decode(
(uint8_t *)data, size, [this](uint8_t *data, int size) {
if (on_receive_audio_) {
on_receive_audio_((const char *)data, size, remote_user_id_.data(),
remote_user_id_.size(), user_data_);
}
});
});
if (!use_trickle_ice_) {
if (offer_peer_) {
SendOffer();
} else {
SendAnswer();
}
}
}
rtp_audio_sender_ = std::make_unique<RtpAudioSender>(ice_io_statistics_);
rtp_audio_sender_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {
LOG_ERROR("ice_agent_ is nullptr");
return -1;
}
void IceTransport::OnNewSelectedPair(NiceAgent *agent, guint stream_id,
guint component_id,
const char *lfoundation,
const char *rfoundation,
gpointer user_ptr) {
LOG_INFO("new selected pair: [{}] [{}]", lfoundation, rfoundation);
NiceCandidate *local = nullptr;
NiceCandidate *remote = nullptr;
nice_agent_get_selected_pair(agent, stream_id, component_id, &local, &remote);
if (local->type == NICE_CANDIDATE_TYPE_RELAYED &&
remote->type == NICE_CANDIDATE_TYPE_RELAYED) {
LOG_INFO("Traversal using relay server");
traversal_type_ = TraversalType::TRelay;
} else {
LOG_INFO("Traversal using p2p");
traversal_type_ = TraversalType::TP2P;
}
XNetTrafficStats net_traffic_stats;
memset(&net_traffic_stats, 0, sizeof(net_traffic_stats));
if (state_ != NICE_COMPONENT_STATE_CONNECTED &&
state_ != NICE_COMPONENT_STATE_READY) {
LOG_ERROR("Ice is not connected, state = [{}]",
nice_component_state_to_string(state_));
return -2;
}
on_receive_net_status_report_(user_id_.data(), user_id_.size(),
TraversalMode(traversal_type_),
&net_traffic_stats, remote_user_id_.data(),
remote_user_id_.size(), user_data_);
}
ice_io_statistics_->UpdateAudioOutboundBytes((uint32_t)size);
return ice_agent_->Send(data, size);
});
void IceTransport::OnReceiveBuffer(NiceAgent *agent, guint stream_id,
guint component_id, guint size,
gchar *buffer, gpointer user_ptr) {
if (!is_closed_) {
if (CheckIsRtpPacket(buffer, size)) {
if (CheckIsVideoPacket(buffer, size)) {
video_channel_receive_->OnReceiveRtpPacket(buffer, size);
} else if (CheckIsAudioPacket(buffer, size)) {
audio_channel_receive_->OnReceiveRtpPacket(buffer, size);
} else if (CheckIsDataPacket(buffer, size)) {
data_channel_receive_->OnReceiveRtpPacket(buffer, size);
}
} else if (CheckIsRtcpPacket(buffer, size)) {
LOG_ERROR("Rtcp packet [{}]", (uint8_t)(buffer[1]));
} else {
LOG_ERROR("Unknown packet");
}
}
}
rtp_audio_sender_->Start();
rtp_data_receiver_ = std::make_unique<RtpDataReceiver>(ice_io_statistics_);
// rr sender
rtp_data_receiver_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {
LOG_ERROR("ice_agent_ is nullptr");
return -1;
}
if (state_ != NICE_COMPONENT_STATE_CONNECTED &&
state_ != NICE_COMPONENT_STATE_READY) {
LOG_ERROR("Ice is not connected, state = [{}]",
nice_component_state_to_string(state_));
return -2;
}
return ice_agent_->Send(data, size);
});
rtp_data_receiver_->SetOnReceiveData(
[this](const char *data, size_t size) -> void {
ice_io_statistics_->UpdateDataInboundBytes((uint32_t)size);
if (on_receive_data_) {
on_receive_data_(data, size, remote_user_id_.data(),
remote_user_id_.size(), user_data_);
void IceTransport::OnReceiveCompleteFrame(VideoFrame &video_frame) {
int num_frame_returned = video_decoder_->Decode(
(uint8_t *)video_frame.Buffer(), video_frame.Size(),
[this](VideoFrame video_frame) {
if (on_receive_video_) {
XVideoFrame x_video_frame;
x_video_frame.data = (const char *)video_frame.Buffer();
x_video_frame.width = video_frame.Width();
x_video_frame.height = video_frame.Height();
x_video_frame.size = video_frame.Size();
on_receive_video_(&x_video_frame, remote_user_id_.data(),
remote_user_id_.size(), user_data_);
}
});
}
rtp_data_sender_ = std::make_unique<RtpDataSender>(ice_io_statistics_);
rtp_data_sender_->SetSendDataFunc(
[this](const char *data, size_t size) -> int {
if (!ice_agent_) {
LOG_ERROR("ice_agent_ is nullptr");
return -1;
void IceTransport::OnReceiveCompleteAudio(const char *data, size_t size) {
int num_frame_returned = audio_decoder_->Decode(
(uint8_t *)data, size, [this](uint8_t *data, int size) {
if (on_receive_audio_) {
on_receive_audio_((const char *)data, size, remote_user_id_.data(),
remote_user_id_.size(), user_data_);
}
if (state_ != NICE_COMPONENT_STATE_CONNECTED &&
state_ != NICE_COMPONENT_STATE_READY) {
LOG_ERROR("Ice is not connected, state = [{}]",
nice_component_state_to_string(state_));
return -2;
}
ice_io_statistics_->UpdateDataOutboundBytes((uint32_t)size);
return ice_agent_->Send(data, size);
});
}
rtp_data_sender_->Start();
ice_agent_ = std::make_unique<IceAgent>(
offer_peer_, use_trickle_ice_, use_reliable_ice_, enable_turn_,
force_turn_, stun_ip, stun_port, turn_ip, turn_port, turn_username,
turn_password);
ice_agent_->CreateIceAgent(
[]([[maybe_unused]] NiceAgent *agent, [[maybe_unused]] guint stream_id,
[[maybe_unused]] guint component_id, NiceComponentState state,
gpointer user_ptr) {
if (auto ice_transport = static_cast<IceTransport *>(user_ptr)) {
if (!ice_transport->is_closed_) {
LOG_INFO("[{}->{}] state_change: {}", ice_transport->user_id_,
ice_transport->remote_user_id_,
nice_component_state_to_string(state));
ice_transport->state_ = state;
if (state == NICE_COMPONENT_STATE_READY ||
state == NICE_COMPONENT_STATE_CONNECTED) {
ice_transport->ice_io_statistics_->Start();
}
ice_transport->on_ice_status_change_(
nice_component_state_to_string(state),
ice_transport->remote_user_id_);
}
}
},
[](NiceAgent *agent, guint stream_id, guint component_id,
gchar *foundation, gpointer user_ptr) {
if (auto ice_transport = static_cast<IceTransport *>(user_ptr)) {
if (ice_transport->use_trickle_ice_) {
GSList *cands =
nice_agent_get_local_candidates(agent, stream_id, component_id);
NiceCandidate *cand;
for (GSList *i = cands; i; i = i->next) {
cand = (NiceCandidate *)i->data;
if (g_strcmp0(cand->foundation, foundation) == 0) {
ice_transport->new_local_candidate_ =
nice_agent_generate_local_candidate_sdp(agent, cand);
json message = {
{"type", "new_candidate"},
{"transmission_id", ice_transport->transmission_id_},
{"user_id", ice_transport->user_id_},
{"remote_user_id", ice_transport->remote_user_id_},
{"sdp", ice_transport->new_local_candidate_}};
// LOG_INFO("[{}] Send new candidate to [{}]]:[{}]",
// ice_transport->user_id_,
// ice_transport->remote_user_id_,
// ice_transport->new_local_candidate_);
if (ice_transport->ice_ws_transport_) {
ice_transport->ice_ws_transport_->Send(message.dump());
}
}
}
g_slist_free_full(cands, (GDestroyNotify)nice_candidate_free);
}
}
},
[]([[maybe_unused]] NiceAgent *agent, [[maybe_unused]] guint stream_id,
gpointer user_ptr) {
// non-trickle
if (auto ice_transport = static_cast<IceTransport *>(user_ptr)) {
LOG_INFO("[{}->{}] gather_done", ice_transport->user_id_,
ice_transport->remote_user_id_);
if (!ice_transport->use_trickle_ice_) {
if (ice_transport->offer_peer_) {
ice_transport->SendOffer();
} else {
ice_transport->SendAnswer();
}
}
}
},
[](NiceAgent *agent, guint stream_id, guint component_id,
const char *lfoundation, const char *rfoundation, gpointer user_ptr) {
LOG_INFO("new selected pair: [{}] [{}]", lfoundation, rfoundation);
NiceCandidate *local = nullptr;
NiceCandidate *remote = nullptr;
nice_agent_get_selected_pair(agent, stream_id, component_id, &local,
&remote);
if (auto ice_transport = static_cast<IceTransport *>(user_ptr)) {
if (local->type == NICE_CANDIDATE_TYPE_RELAYED &&
remote->type == NICE_CANDIDATE_TYPE_RELAYED) {
LOG_INFO("Traversal using relay server");
ice_transport->traversal_type_ = TraversalType::TRelay;
} else {
LOG_INFO("Traversal using p2p");
ice_transport->traversal_type_ = TraversalType::TP2P;
}
XNetTrafficStats net_traffic_stats;
memset(&net_traffic_stats, 0, sizeof(net_traffic_stats));
ice_transport->on_receive_net_status_report_(
ice_transport->user_id_.data(), ice_transport->user_id_.size(),
TraversalMode(ice_transport->traversal_type_), &net_traffic_stats,
ice_transport->remote_user_id_.data(),
ice_transport->remote_user_id_.size(), ice_transport->user_data_);
}
},
[]([[maybe_unused]] NiceAgent *agent, [[maybe_unused]] guint stream_id,
[[maybe_unused]] guint component_id, guint size, gchar *buffer,
gpointer user_ptr) {
if (auto ice_transport = static_cast<IceTransport *>(user_ptr)) {
if (ice_transport && !ice_transport->is_closed_) {
if (ice_transport->CheckIsRtpPacket(buffer, size)) {
if (ice_transport->CheckIsVideoPacket(buffer, size)) {
RtpPacket packet((uint8_t *)buffer, size);
ice_transport->rtp_video_receiver_->InsertRtpPacket(packet);
} else if (ice_transport->CheckIsAudioPacket(buffer, size)) {
RtpPacket packet((uint8_t *)buffer, size);
ice_transport->rtp_audio_receiver_->InsertRtpPacket(packet);
} else if (ice_transport->CheckIsDataPacket(buffer, size)) {
RtpPacket packet((uint8_t *)buffer, size);
ice_transport->rtp_data_receiver_->InsertRtpPacket(packet);
}
} else if (ice_transport->CheckIsRtcpPacket(buffer, size)) {
LOG_ERROR("Rtcp packet [{}]", (uint8_t)(buffer[1]));
} else {
LOG_ERROR("Unknown packet");
}
}
}
},
this);
return 0;
void IceTransport::OnReceiveCompleteData(const char *data, size_t size) {
if (on_receive_data_) {
on_receive_data_(data, size, remote_user_id_.data(), remote_user_id_.size(),
user_data_);
}
}
int IceTransport::DestroyIceTransmission() {
@@ -972,14 +891,10 @@ int IceTransport::SendAudioFrame(const char *data, size_t size) {
int ret = audio_encoder_->Encode(
(uint8_t *)data, size,
[this](char *encoded_audio_buffer, size_t size) -> int {
if (rtp_audio_sender_) {
if (audio_rtp_codec_) {
std::vector<RtpPacket> packets;
audio_rtp_codec_->Encode((uint8_t *)encoded_audio_buffer,
(uint32_t)size, packets);
rtp_audio_sender_->Enqueue(packets);
}
if (audio_channel_send_) {
audio_channel_send_->SendAudio(encoded_audio_buffer, size);
}
return 0;
});
@@ -994,13 +909,8 @@ int IceTransport::SendDataFrame(const char *data, size_t size) {
return -2;
}
std::vector<RtpPacket> packets;
if (rtp_data_sender_) {
if (data_rtp_codec_) {
data_rtp_codec_->Encode((uint8_t *)data, (uint32_t)size, packets);
rtp_data_sender_->Enqueue(packets);
}
if (data_channel_send_) {
data_channel_send_->SendData(data, size);
}
return 0;

View File

@@ -143,6 +143,33 @@ class IceTransport {
uint8_t CheckIsAudioPacket(const char *buffer, size_t size);
uint8_t CheckIsDataPacket(const char *buffer, size_t size);
private:
void InitializeIOStatistics();
void InitializeChannels(RtpPacket::PAYLOAD_TYPE video_codec_payload_type);
void OnIceStateChange(NiceAgent *agent, guint stream_id, guint component_id,
NiceComponentState state, gpointer user_ptr);
void OnNewLocalCandidate(NiceAgent *agent, guint stream_id,
guint component_id, gchar *foundation,
gpointer user_ptr);
void OnGatheringDone(NiceAgent *agent, guint stream_id, gpointer user_ptr);
void OnNewSelectedPair(NiceAgent *agent, guint stream_id, guint component_id,
const char *lfoundation, const char *rfoundation,
gpointer user_ptr);
void OnReceiveBuffer(NiceAgent *agent, guint stream_id, guint component_id,
guint size, gchar *buffer, gpointer user_ptr);
void OnReceiveCompleteFrame(VideoFrame &video_frame);
void OnReceiveCompleteAudio(const char *data, size_t size);
void OnReceiveCompleteData(const char *data, size_t size);
private:
bool use_trickle_ice_ = true;
bool enable_turn_ = false;
@@ -168,7 +195,7 @@ class IceTransport {
void *user_data_ = nullptr;
private:
std::unique_ptr<IceAgent> ice_agent_ = nullptr;
std::shared_ptr<IceAgent> ice_agent_ = nullptr;
bool is_closed_ = false;
std::shared_ptr<WsClient> ice_ws_transport_ = nullptr;
// CongestionControl *congestion_control_ = nullptr;