[fix] fix ice status error during closing ice connection

This commit is contained in:
dijunkun
2024-10-30 17:11:35 +08:00
parent 35d4f522c5
commit 9d2e6f0c2a
6 changed files with 125 additions and 100 deletions

View File

@@ -305,6 +305,11 @@ int IceAgent::Send(const char *data, size_t size) {
return -1;
}
if (agent_closed_) {
LOG_ERROR("Nice agent is closed");
return -1;
}
// if (NiceComponentState::NICE_COMPONENT_STATE_READY !=
// nice_agent_get_component_state(agent_, stream_id_, 1)) {
// LOG_ERROR("Nice agent not ready");

View File

@@ -51,15 +51,17 @@ typedef struct Peer PeerPtr;
typedef void (*OnReceiveBuffer)(const char*, size_t, const char*, const size_t,
void*);
typedef void (*OnReceiveVideoFrame)(const XVideoFrame* video_frame, const char*,
typedef void (*OnReceiveVideoFrame)(const XVideoFrame*, const char*,
const size_t, void*);
typedef void (*OnSignalStatus)(SignalStatus, void*);
typedef void (*OnConnectionStatus)(ConnectionStatus, void*);
typedef void (*OnConnectionStatus)(ConnectionStatus, const char*, const size_t,
void*);
typedef void (*NetStatusReport)(int, TraversalMode, const unsigned short,
const unsigned short, void*);
typedef void (*NetStatusReport)(const char*, const size_t, TraversalMode,
const unsigned short, const unsigned short,
void*);
typedef struct {
bool use_cfg_file;

View File

@@ -143,63 +143,70 @@ int PeerConnection::Init(PeerConnectionParams params,
}
};
on_receive_video_ = [this](const char *data, size_t size, const char *user_id,
size_t user_id_size) {
on_receive_video_ = [this](const char *data, size_t size,
const std::string &user_id) {
int num_frame_returned = video_decoder_->Decode(
(uint8_t *)data, size,
[this, user_id, user_id_size](VideoFrame video_frame) {
(uint8_t *)data, size, [this, user_id](VideoFrame video_frame) {
if (on_receive_video_frame_) {
XVideoFrame x_video_frame;
x_video_frame.data = (const char *)video_frame.Buffer();
x_video_frame.width = video_frame.Width();
x_video_frame.height = video_frame.Height();
x_video_frame.size = video_frame.Size();
on_receive_video_frame_(&x_video_frame, user_id, user_id_size,
on_receive_video_frame_(&x_video_frame, user_id.data(),
user_id.size(), user_data_);
}
});
};
on_receive_audio_ = [this](const char *data, size_t size,
const std::string &user_id) {
int num_frame_returned = audio_decoder_->Decode(
(uint8_t *)data, size, [this, user_id](uint8_t *data, int size) {
if (on_receive_audio_buffer_) {
on_receive_audio_buffer_((const char *)data, size, user_id.data(),
user_id.size(), user_data_);
}
});
};
on_receive_data_ = [this](const char *data, size_t size,
const std::string &user_id) {
if (on_receive_data_buffer_) {
on_receive_data_buffer_(data, size, user_id.data(), user_id.size(),
user_data_);
}
});
};
on_receive_audio_ = [this](const char *data, size_t size, const char *user_id,
size_t user_id_size) {
int num_frame_returned = audio_decoder_->Decode(
(uint8_t *)data, size,
[this, user_id, user_id_size](uint8_t *data, int size) {
if (on_receive_audio_buffer_) {
on_receive_audio_buffer_((const char *)data, size, user_id,
user_id_size, user_data_);
}
});
};
on_receive_data_ = [this](const char *data, size_t size, const char *user_id,
size_t user_id_size) {
if (on_receive_data_buffer_) {
on_receive_data_buffer_(data, size, user_id, user_id_size, user_data_);
}
};
on_ice_status_change_ = [this](std::string ice_status) {
on_ice_status_change_ = [this](std::string ice_status,
const std::string &user_id) {
if ("connecting" == ice_status) {
on_connection_status_(ConnectionStatus::Connecting, user_data_);
on_connection_status_(ConnectionStatus::Connecting, user_id.data(),
user_id.size(), user_data_);
} else if ("gathering" == ice_status) {
on_connection_status_(ConnectionStatus::Gathering, user_data_);
on_connection_status_(ConnectionStatus::Gathering, user_id.data(),
user_id.size(), user_data_);
} else if ("disconnected" == ice_status) {
on_connection_status_(ConnectionStatus::Disconnected, user_data_);
on_connection_status_(ConnectionStatus::Disconnected, user_id.data(),
user_id.size(), user_data_);
} else if ("connected" == ice_status) {
ice_ready_ = true;
on_connection_status_(ConnectionStatus::Connected, user_data_);
// std::string transmission_id = std::string(user_id, user_id_size);
is_ice_transmission_ready_[user_id] = true;
on_connection_status_(ConnectionStatus::Connected, user_id.data(),
user_id.size(), user_data_);
b_force_i_frame_ = true;
LOG_INFO("Ice connected");
} else if ("ready" == ice_status) {
ice_ready_ = true;
on_connection_status_(ConnectionStatus::Connected, user_data_);
is_ice_transmission_ready_[user_id] = true;
on_connection_status_(ConnectionStatus::Connected, user_id.data(),
user_id.size(), user_data_);
} else if ("closed" == ice_status) {
ice_ready_ = false;
is_ice_transmission_ready_[user_id] = false;
LOG_INFO("Ice closed");
on_connection_status_(ConnectionStatus::Closed, user_data_);
on_connection_status_(ConnectionStatus::Closed, user_id.data(),
user_id.size(), user_data_);
} else if ("failed" == ice_status) {
ice_ready_ = false;
is_ice_transmission_ready_[user_id] = false;
if (offer_peer_ && try_rejoin_with_turn_) {
enable_turn_ = true;
reliable_ice_ = false;
@@ -218,21 +225,22 @@ int PeerConnection::Init(PeerConnectionParams params,
}
} else {
LOG_INFO("Ice failed");
on_connection_status_(ConnectionStatus::Failed, user_data_);
on_connection_status_(ConnectionStatus::Failed, user_id.data(),
user_id.size(), user_data_);
}
} else {
ice_ready_ = false;
is_ice_transmission_ready_[user_id] = false;
LOG_INFO("Unknown ice state [{}]", ice_status);
}
};
on_net_status_report_ = [this](int transmission_id,
on_net_status_report_ = [this](const std::string &user_id,
IceTransmission::TraversalType mode,
const unsigned short send,
const unsigned short receive, void *user_ptr) {
if (net_status_report_) {
net_status_report_(transmission_id, TraversalMode(mode), send, receive,
user_data_);
net_status_report_(user_id.data(), user_id.size(), TraversalMode(mode),
send, receive, user_data_);
}
};
@@ -450,7 +458,7 @@ int PeerConnection::Leave(const std::string &transmission_id) {
transmission_id);
}
ice_ready_ = false;
is_ice_transmission_ready_[user_id_] = false;
leave_ = true;
IceWorkMsg msg;
@@ -506,10 +514,6 @@ SignalStatus PeerConnection::GetSignalStatus() {
}
int PeerConnection::SendVideoData(const char *data, size_t size) {
if (!ice_ready_) {
return -1;
}
if (ice_transmission_list_.empty()) {
return -1;
}
@@ -525,6 +529,9 @@ int PeerConnection::SendVideoData(const char *data, size_t size) {
[this](char *encoded_frame, size_t size,
VideoEncoder::VideoFrameType frame_type) -> int {
for (auto &ice_trans : ice_transmission_list_) {
if (!is_ice_transmission_ready_[ice_trans.first]) {
continue;
}
// LOG_ERROR("Send frame size: [{}]", size);
ice_trans.second->SendVideoData(
static_cast<IceTransmission::VideoFrameType>(frame_type),
@@ -542,10 +549,6 @@ int PeerConnection::SendVideoData(const char *data, size_t size) {
}
int PeerConnection::SendAudioData(const char *data, size_t size) {
if (!ice_ready_) {
return -1;
}
if (ice_transmission_list_.empty()) {
return -1;
}
@@ -554,6 +557,9 @@ int PeerConnection::SendAudioData(const char *data, size_t size) {
(uint8_t *)data, size,
[this](char *encoded_audio_buffer, size_t size) -> int {
for (auto &ice_trans : ice_transmission_list_) {
if (!is_ice_transmission_ready_[ice_trans.first]) {
continue;
}
// LOG_ERROR("opus frame size: [{}]", size);
ice_trans.second->SendData(IceTransmission::DATA_TYPE::AUDIO,
encoded_audio_buffer, size);
@@ -566,16 +572,15 @@ int PeerConnection::SendAudioData(const char *data, size_t size) {
int PeerConnection::SendUserData(const char *data, size_t size) {
for (auto &ice_trans : ice_transmission_list_) {
if (!is_ice_transmission_ready_[ice_trans.first]) {
continue;
}
ice_trans.second->SendData(IceTransmission::DATA_TYPE::DATA, data, size);
}
return 0;
}
int PeerConnection::SendVideoData(const XVideoFrame *video_frame) {
if (!ice_ready_) {
return -1;
}
if (ice_transmission_list_.empty()) {
return -1;
}
@@ -591,6 +596,9 @@ int PeerConnection::SendVideoData(const XVideoFrame *video_frame) {
[this](char *encoded_frame, size_t size,
VideoEncoder::VideoFrameType frame_type) -> int {
for (auto &ice_trans : ice_transmission_list_) {
if (!is_ice_transmission_ready_[ice_trans.first]) {
continue;
}
// LOG_ERROR("Send frame size: [{}]", size);
ice_trans.second->SendVideoData(
static_cast<IceTransmission::VideoFrameType>(frame_type),
@@ -615,8 +623,8 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
case "login"_H: {
if (j["status"].get<std::string>() == "success") {
user_id_ = j["user_id"].get<std::string>();
net_status_report_(atoi(user_id_.c_str()), TraversalMode::UnknownMode,
0, 0, user_data_);
net_status_report_(user_id_.data(), user_id_.size(),
TraversalMode::UnknownMode, 0, 0, user_data_);
LOG_INFO("Login success with id [{}]", user_id_);
signal_status_ = SignalStatus::SignalConnected;
on_signal_status_(SignalStatus::SignalConnected, user_data_);
@@ -648,9 +656,11 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
LOG_ERROR("{}", reason);
if ("Incorrect password" == reason) {
on_connection_status_(ConnectionStatus::IncorrectPassword,
transmission_id.data(), transmission_id.size(),
user_data_);
} else if ("No such transmission id" == reason) {
on_connection_status_(ConnectionStatus::NoSuchTransmissionId,
transmission_id.data(), transmission_id.size(),
user_data_);
}
} else {
@@ -691,7 +701,9 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
msg.remote_user_id = remote_user_id;
msg.remote_sdp = remote_sdp;
PushIceWorkMsg(msg);
on_connection_status_(ConnectionStatus::Connecting, user_data_);
on_connection_status_(ConnectionStatus::Connecting,
remote_user_id.data(), remote_user_id.size(),
user_data_);
} else {
LOG_ERROR("Invalid offer msg");
}
@@ -713,7 +725,9 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
msg.remote_user_id = remote_user_id;
msg.remote_sdp = remote_sdp;
PushIceWorkMsg(msg);
on_connection_status_(ConnectionStatus::Connecting, user_data_);
on_connection_status_(ConnectionStatus::Connecting,
remote_user_id.data(), remote_user_id.size(),
user_data_);
} else {
LOG_ERROR("Invalid answer msg");
}
@@ -741,7 +755,6 @@ void PeerConnection::ProcessSignal(const std::string &signal) {
}
void PeerConnection::StartIceWorker() {
LOG_INFO("Start ice worker");
ice_worker_ = std::thread([this]() {
while (true) {
std::unique_lock<std::mutex> lck(ice_work_mutex_);
@@ -752,7 +765,6 @@ void PeerConnection::StartIceWorker() {
}
if (!ice_worker_running_) {
LOG_INFO("Exit ice worker");
break;
}
@@ -767,7 +779,6 @@ void PeerConnection::StartIceWorker() {
}
void PeerConnection::StopIceWorker() {
LOG_INFO("Stop ice worker");
ice_worker_running_ = false;
ice_work_cv_.notify_one();
if (ice_worker_.joinable()) {
@@ -840,7 +851,7 @@ void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) {
if (user_id_it != ice_transmission_list_.end()) {
user_id_it->second->DestroyIceTransmission();
ice_transmission_list_.erase(user_id_it);
ice_ready_ = false;
is_ice_transmission_ready_[user_id] = false;
LOG_INFO("Terminate transmission to user [{}]", user_id);
}
break;
@@ -960,6 +971,7 @@ void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) {
user_id_it.second->DestroyIceTransmission();
}
ice_transmission_list_.clear();
is_ice_transmission_ready_.clear();
break;
}
default: {

View File

@@ -21,10 +21,12 @@ typedef void (*OnReceiveVideoFrame)(const XVideoFrame *video_frame,
typedef void (*OnSignalStatus)(SignalStatus, void *);
typedef void (*OnConnectionStatus)(ConnectionStatus, void *);
typedef void (*OnConnectionStatus)(ConnectionStatus, const char *, const size_t,
void *);
typedef void (*NetStatusReport)(int, TraversalMode, const unsigned short,
const unsigned short, void *);
typedef void (*NetStatusReport)(const char *, const size_t, TraversalMode,
const unsigned short, const unsigned short,
void *);
typedef struct {
bool use_cfg_file;
@@ -166,15 +168,17 @@ class PeerConnection {
private:
std::map<std::string, std::unique_ptr<IceTransmission>>
ice_transmission_list_;
std::function<void(const char *, size_t, const char *, size_t)>
std::map<std::string, bool> is_ice_transmission_ready_;
std::function<void(const char *, size_t, const std::string &)>
on_receive_video_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_audio_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_data_ = nullptr;
std::function<void(std::string)> on_ice_status_change_ = nullptr;
std::function<void(int, IceTransmission::TraversalType, const unsigned short,
const unsigned short, void *)>
std::function<void(std::string, const std::string &)> on_ice_status_change_ =
nullptr;
std::function<void(const std::string &, IceTransmission::TraversalType,
const unsigned short, const unsigned short, void *)>
on_net_status_report_ = nullptr;
bool ice_ready_ = false;

View File

@@ -14,7 +14,7 @@ using nlohmann::json;
IceTransmission::IceTransmission(
bool offer_peer, std::string &transmission_id, std::string &user_id,
std::string &remote_user_id, std::shared_ptr<WsClient> ice_ws_transmission,
std::function<void(std::string)> on_ice_status_change)
std::function<void(std::string, const std::string &)> on_ice_status_change)
: offer_peer_(offer_peer),
transmission_id_(transmission_id),
user_id_(user_id),
@@ -82,8 +82,7 @@ int IceTransmission::InitIceTransmission(
// LOG_ERROR("OnReceiveCompleteFrame {}", video_frame.Size());
ice_io_statistics_->UpdateVideoInboundBytes(video_frame.Size());
on_receive_video_((const char *)video_frame.Buffer(),
video_frame.Size(), remote_user_id_.data(),
remote_user_id_.size());
video_frame.Size(), remote_user_id_);
});
rtp_video_receiver_->Start();
@@ -130,8 +129,7 @@ int IceTransmission::InitIceTransmission(
rtp_audio_receiver_->SetOnReceiveData(
[this](const char *data, size_t size) -> void {
ice_io_statistics_->UpdateAudioInboundBytes(size);
on_receive_audio_(data, size, remote_user_id_.data(),
remote_user_id_.size());
on_receive_audio_(data, size, remote_user_id_);
});
rtp_audio_sender_ = std::make_unique<RtpAudioSender>();
@@ -176,8 +174,7 @@ int IceTransmission::InitIceTransmission(
rtp_data_receiver_->SetOnReceiveData(
[this](const char *data, size_t size) -> void {
ice_io_statistics_->UpdateDataInboundBytes(size);
on_receive_data_(data, size, remote_user_id_.data(),
remote_user_id_.size());
on_receive_data_(data, size, remote_user_id_);
});
rtp_data_sender_ = std::make_unique<RtpDataSender>();
@@ -212,7 +209,9 @@ int IceTransmission::InitIceTransmission(
if (user_ptr) {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr);
LOG_INFO("[{}->{}] state_change: {}", ice_transmission_obj->user_id_,
if (!ice_transmission_obj->is_closed_) {
LOG_INFO("[{}->{}] state_change: {}",
ice_transmission_obj->user_id_,
ice_transmission_obj->remote_user_id_,
nice_component_state_to_string(state));
ice_transmission_obj->state_ = state;
@@ -223,9 +222,9 @@ int IceTransmission::InitIceTransmission(
}
ice_transmission_obj->on_ice_status_change_(
nice_component_state_to_string(state));
} else {
LOG_INFO("state_change: {}", nice_component_state_to_string(state));
nice_component_state_to_string(state),
ice_transmission_obj->remote_user_id_);
}
}
},
[](NiceAgent *agent, guint stream_id, guint component_id,
@@ -301,7 +300,7 @@ int IceTransmission::InitIceTransmission(
ice_transmission_obj->traversal_type_ = TraversalType::TP2P;
}
ice_transmission_obj->on_receive_net_status_report_(
atoi(ice_transmission_obj->transmission_id_.c_str()),
ice_transmission_obj->transmission_id_,
ice_transmission_obj->traversal_type_, 0, 0, nullptr);
}
},
@@ -310,7 +309,7 @@ int IceTransmission::InitIceTransmission(
if (user_ptr) {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr);
if (ice_transmission_obj) {
if (ice_transmission_obj && !ice_transmission_obj->is_closed_) {
if (ice_transmission_obj->CheckIsVideoPacket(buffer, size)) {
RtpPacket packet((uint8_t *)buffer, size);
ice_transmission_obj->rtp_video_receiver_->InsertRtpPacket(
@@ -336,8 +335,10 @@ int IceTransmission::InitIceTransmission(
int IceTransmission::DestroyIceTransmission() {
LOG_INFO("[{}->{}] Destroy ice transmission", user_id_, remote_user_id_);
is_closed_ = true;
if (on_ice_status_change_) {
on_ice_status_change_("closed");
on_ice_status_change_("closed", remote_user_id_);
}
if (ice_io_statistics_) {
@@ -360,8 +361,6 @@ int IceTransmission::DestroyIceTransmission() {
rtp_data_sender_->Stop();
}
LOG_ERROR("threads stoped");
return ice_agent_->DestroyIceAgent();
}

View File

@@ -38,7 +38,8 @@ class IceTransmission {
IceTransmission(bool offer_peer, std::string &transmission_id,
std::string &user_id, std::string &remote_user_id,
std::shared_ptr<WsClient> ice_ws_transmission,
std::function<void(std::string)> on_ice_status_change);
std::function<void(std::string, const std::string &)>
on_ice_status_change);
~IceTransmission();
public:
@@ -56,26 +57,26 @@ class IceTransmission {
int DestroyIceTransmission();
void SetOnReceiveVideoFunc(
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_video) {
on_receive_video_ = on_receive_video;
}
void SetOnReceiveAudioFunc(
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_audio) {
on_receive_audio_ = on_receive_audio;
}
void SetOnReceiveDataFunc(
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_data) {
on_receive_data_ = on_receive_data;
}
void SetOnReceiveNetStatusReportFunc(
std::function<void(int, TraversalType, const unsigned short,
const unsigned short, void *)>
std::function<void(const std::string &, TraversalType,
const unsigned short, const unsigned short, void *)>
on_receive_net_status_report) {
on_receive_net_status_report_ = on_receive_net_status_report;
}
@@ -147,16 +148,18 @@ class IceTransmission {
private:
std::unique_ptr<IceAgent> ice_agent_ = nullptr;
bool is_closed_ = false;
std::shared_ptr<WsClient> ice_ws_transport_ = nullptr;
CongestionControl *congestion_control_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_video_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_audio_ = nullptr;
std::function<void(const char *, size_t, const char *, size_t)>
std::function<void(const char *, size_t, const std::string &)>
on_receive_data_ = nullptr;
std::function<void(std::string)> on_ice_status_change_ = nullptr;
std::function<void(int, TraversalType, const unsigned short,
std::function<void(std::string, const std::string &)> on_ice_status_change_ =
nullptr;
std::function<void(const std::string &, TraversalType, const unsigned short,
const unsigned short, void *)>
on_receive_net_status_report_ = nullptr;