diff --git a/src/ice/ice_agent.cpp b/src/ice/ice_agent.cpp index 509a684..4289abe 100644 --- a/src/ice/ice_agent.cpp +++ b/src/ice/ice_agent.cpp @@ -305,6 +305,11 @@ int IceAgent::Send(const char *data, size_t size) { return -1; } + if (agent_closed_) { + LOG_ERROR("Nice agent is closed"); + return -1; + } + // if (NiceComponentState::NICE_COMPONENT_STATE_READY != // nice_agent_get_component_state(agent_, stream_id_, 1)) { // LOG_ERROR("Nice agent not ready"); diff --git a/src/interface/x.h b/src/interface/x.h index 3bff163..83ad59e 100644 --- a/src/interface/x.h +++ b/src/interface/x.h @@ -51,15 +51,17 @@ typedef struct Peer PeerPtr; typedef void (*OnReceiveBuffer)(const char*, size_t, const char*, const size_t, void*); -typedef void (*OnReceiveVideoFrame)(const XVideoFrame* video_frame, const char*, +typedef void (*OnReceiveVideoFrame)(const XVideoFrame*, const char*, const size_t, void*); typedef void (*OnSignalStatus)(SignalStatus, void*); -typedef void (*OnConnectionStatus)(ConnectionStatus, void*); +typedef void (*OnConnectionStatus)(ConnectionStatus, const char*, const size_t, + void*); -typedef void (*NetStatusReport)(int, TraversalMode, const unsigned short, - const unsigned short, void*); +typedef void (*NetStatusReport)(const char*, const size_t, TraversalMode, + const unsigned short, const unsigned short, + void*); typedef struct { bool use_cfg_file; diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 94a7e24..9d6cbee 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -143,63 +143,70 @@ int PeerConnection::Init(PeerConnectionParams params, } }; - on_receive_video_ = [this](const char *data, size_t size, const char *user_id, - size_t user_id_size) { + on_receive_video_ = [this](const char *data, size_t size, + const std::string &user_id) { int num_frame_returned = video_decoder_->Decode( - (uint8_t *)data, size, - [this, user_id, user_id_size](VideoFrame video_frame) { + (uint8_t *)data, size, [this, user_id](VideoFrame video_frame) { if (on_receive_video_frame_) { XVideoFrame x_video_frame; x_video_frame.data = (const char *)video_frame.Buffer(); x_video_frame.width = video_frame.Width(); x_video_frame.height = video_frame.Height(); x_video_frame.size = video_frame.Size(); - on_receive_video_frame_(&x_video_frame, user_id, user_id_size, - user_data_); + on_receive_video_frame_(&x_video_frame, user_id.data(), + user_id.size(), user_data_); } }); }; - on_receive_audio_ = [this](const char *data, size_t size, const char *user_id, - size_t user_id_size) { + on_receive_audio_ = [this](const char *data, size_t size, + const std::string &user_id) { int num_frame_returned = audio_decoder_->Decode( - (uint8_t *)data, size, - [this, user_id, user_id_size](uint8_t *data, int size) { + (uint8_t *)data, size, [this, user_id](uint8_t *data, int size) { if (on_receive_audio_buffer_) { - on_receive_audio_buffer_((const char *)data, size, user_id, - user_id_size, user_data_); + on_receive_audio_buffer_((const char *)data, size, user_id.data(), + user_id.size(), user_data_); } }); }; - on_receive_data_ = [this](const char *data, size_t size, const char *user_id, - size_t user_id_size) { + on_receive_data_ = [this](const char *data, size_t size, + const std::string &user_id) { if (on_receive_data_buffer_) { - on_receive_data_buffer_(data, size, user_id, user_id_size, user_data_); + on_receive_data_buffer_(data, size, user_id.data(), user_id.size(), + user_data_); } }; - on_ice_status_change_ = [this](std::string ice_status) { + on_ice_status_change_ = [this](std::string ice_status, + const std::string &user_id) { if ("connecting" == ice_status) { - on_connection_status_(ConnectionStatus::Connecting, user_data_); + on_connection_status_(ConnectionStatus::Connecting, user_id.data(), + user_id.size(), user_data_); } else if ("gathering" == ice_status) { - on_connection_status_(ConnectionStatus::Gathering, user_data_); + on_connection_status_(ConnectionStatus::Gathering, user_id.data(), + user_id.size(), user_data_); } else if ("disconnected" == ice_status) { - on_connection_status_(ConnectionStatus::Disconnected, user_data_); + on_connection_status_(ConnectionStatus::Disconnected, user_id.data(), + user_id.size(), user_data_); } else if ("connected" == ice_status) { - ice_ready_ = true; - on_connection_status_(ConnectionStatus::Connected, user_data_); + // std::string transmission_id = std::string(user_id, user_id_size); + is_ice_transmission_ready_[user_id] = true; + on_connection_status_(ConnectionStatus::Connected, user_id.data(), + user_id.size(), user_data_); b_force_i_frame_ = true; LOG_INFO("Ice connected"); } else if ("ready" == ice_status) { - ice_ready_ = true; - on_connection_status_(ConnectionStatus::Connected, user_data_); + is_ice_transmission_ready_[user_id] = true; + on_connection_status_(ConnectionStatus::Connected, user_id.data(), + user_id.size(), user_data_); } else if ("closed" == ice_status) { - ice_ready_ = false; + is_ice_transmission_ready_[user_id] = false; LOG_INFO("Ice closed"); - on_connection_status_(ConnectionStatus::Closed, user_data_); + on_connection_status_(ConnectionStatus::Closed, user_id.data(), + user_id.size(), user_data_); } else if ("failed" == ice_status) { - ice_ready_ = false; + is_ice_transmission_ready_[user_id] = false; if (offer_peer_ && try_rejoin_with_turn_) { enable_turn_ = true; reliable_ice_ = false; @@ -218,21 +225,22 @@ int PeerConnection::Init(PeerConnectionParams params, } } else { LOG_INFO("Ice failed"); - on_connection_status_(ConnectionStatus::Failed, user_data_); + on_connection_status_(ConnectionStatus::Failed, user_id.data(), + user_id.size(), user_data_); } } else { - ice_ready_ = false; + is_ice_transmission_ready_[user_id] = false; LOG_INFO("Unknown ice state [{}]", ice_status); } }; - on_net_status_report_ = [this](int transmission_id, + on_net_status_report_ = [this](const std::string &user_id, IceTransmission::TraversalType mode, const unsigned short send, const unsigned short receive, void *user_ptr) { if (net_status_report_) { - net_status_report_(transmission_id, TraversalMode(mode), send, receive, - user_data_); + net_status_report_(user_id.data(), user_id.size(), TraversalMode(mode), + send, receive, user_data_); } }; @@ -450,7 +458,7 @@ int PeerConnection::Leave(const std::string &transmission_id) { transmission_id); } - ice_ready_ = false; + is_ice_transmission_ready_[user_id_] = false; leave_ = true; IceWorkMsg msg; @@ -506,10 +514,6 @@ SignalStatus PeerConnection::GetSignalStatus() { } int PeerConnection::SendVideoData(const char *data, size_t size) { - if (!ice_ready_) { - return -1; - } - if (ice_transmission_list_.empty()) { return -1; } @@ -525,6 +529,9 @@ int PeerConnection::SendVideoData(const char *data, size_t size) { [this](char *encoded_frame, size_t size, VideoEncoder::VideoFrameType frame_type) -> int { for (auto &ice_trans : ice_transmission_list_) { + if (!is_ice_transmission_ready_[ice_trans.first]) { + continue; + } // LOG_ERROR("Send frame size: [{}]", size); ice_trans.second->SendVideoData( static_cast(frame_type), @@ -542,10 +549,6 @@ int PeerConnection::SendVideoData(const char *data, size_t size) { } int PeerConnection::SendAudioData(const char *data, size_t size) { - if (!ice_ready_) { - return -1; - } - if (ice_transmission_list_.empty()) { return -1; } @@ -554,6 +557,9 @@ int PeerConnection::SendAudioData(const char *data, size_t size) { (uint8_t *)data, size, [this](char *encoded_audio_buffer, size_t size) -> int { for (auto &ice_trans : ice_transmission_list_) { + if (!is_ice_transmission_ready_[ice_trans.first]) { + continue; + } // LOG_ERROR("opus frame size: [{}]", size); ice_trans.second->SendData(IceTransmission::DATA_TYPE::AUDIO, encoded_audio_buffer, size); @@ -566,16 +572,15 @@ int PeerConnection::SendAudioData(const char *data, size_t size) { int PeerConnection::SendUserData(const char *data, size_t size) { for (auto &ice_trans : ice_transmission_list_) { + if (!is_ice_transmission_ready_[ice_trans.first]) { + continue; + } ice_trans.second->SendData(IceTransmission::DATA_TYPE::DATA, data, size); } return 0; } int PeerConnection::SendVideoData(const XVideoFrame *video_frame) { - if (!ice_ready_) { - return -1; - } - if (ice_transmission_list_.empty()) { return -1; } @@ -591,6 +596,9 @@ int PeerConnection::SendVideoData(const XVideoFrame *video_frame) { [this](char *encoded_frame, size_t size, VideoEncoder::VideoFrameType frame_type) -> int { for (auto &ice_trans : ice_transmission_list_) { + if (!is_ice_transmission_ready_[ice_trans.first]) { + continue; + } // LOG_ERROR("Send frame size: [{}]", size); ice_trans.second->SendVideoData( static_cast(frame_type), @@ -615,8 +623,8 @@ void PeerConnection::ProcessSignal(const std::string &signal) { case "login"_H: { if (j["status"].get() == "success") { user_id_ = j["user_id"].get(); - net_status_report_(atoi(user_id_.c_str()), TraversalMode::UnknownMode, - 0, 0, user_data_); + net_status_report_(user_id_.data(), user_id_.size(), + TraversalMode::UnknownMode, 0, 0, user_data_); LOG_INFO("Login success with id [{}]", user_id_); signal_status_ = SignalStatus::SignalConnected; on_signal_status_(SignalStatus::SignalConnected, user_data_); @@ -648,9 +656,11 @@ void PeerConnection::ProcessSignal(const std::string &signal) { LOG_ERROR("{}", reason); if ("Incorrect password" == reason) { on_connection_status_(ConnectionStatus::IncorrectPassword, + transmission_id.data(), transmission_id.size(), user_data_); } else if ("No such transmission id" == reason) { on_connection_status_(ConnectionStatus::NoSuchTransmissionId, + transmission_id.data(), transmission_id.size(), user_data_); } } else { @@ -691,7 +701,9 @@ void PeerConnection::ProcessSignal(const std::string &signal) { msg.remote_user_id = remote_user_id; msg.remote_sdp = remote_sdp; PushIceWorkMsg(msg); - on_connection_status_(ConnectionStatus::Connecting, user_data_); + on_connection_status_(ConnectionStatus::Connecting, + remote_user_id.data(), remote_user_id.size(), + user_data_); } else { LOG_ERROR("Invalid offer msg"); } @@ -713,7 +725,9 @@ void PeerConnection::ProcessSignal(const std::string &signal) { msg.remote_user_id = remote_user_id; msg.remote_sdp = remote_sdp; PushIceWorkMsg(msg); - on_connection_status_(ConnectionStatus::Connecting, user_data_); + on_connection_status_(ConnectionStatus::Connecting, + remote_user_id.data(), remote_user_id.size(), + user_data_); } else { LOG_ERROR("Invalid answer msg"); } @@ -741,7 +755,6 @@ void PeerConnection::ProcessSignal(const std::string &signal) { } void PeerConnection::StartIceWorker() { - LOG_INFO("Start ice worker"); ice_worker_ = std::thread([this]() { while (true) { std::unique_lock lck(ice_work_mutex_); @@ -752,7 +765,6 @@ void PeerConnection::StartIceWorker() { } if (!ice_worker_running_) { - LOG_INFO("Exit ice worker"); break; } @@ -767,7 +779,6 @@ void PeerConnection::StartIceWorker() { } void PeerConnection::StopIceWorker() { - LOG_INFO("Stop ice worker"); ice_worker_running_ = false; ice_work_cv_.notify_one(); if (ice_worker_.joinable()) { @@ -840,7 +851,7 @@ void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) { if (user_id_it != ice_transmission_list_.end()) { user_id_it->second->DestroyIceTransmission(); ice_transmission_list_.erase(user_id_it); - ice_ready_ = false; + is_ice_transmission_ready_[user_id] = false; LOG_INFO("Terminate transmission to user [{}]", user_id); } break; @@ -960,6 +971,7 @@ void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) { user_id_it.second->DestroyIceTransmission(); } ice_transmission_list_.clear(); + is_ice_transmission_ready_.clear(); break; } default: { diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index 357e8c9..f822e32 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -21,10 +21,12 @@ typedef void (*OnReceiveVideoFrame)(const XVideoFrame *video_frame, typedef void (*OnSignalStatus)(SignalStatus, void *); -typedef void (*OnConnectionStatus)(ConnectionStatus, void *); +typedef void (*OnConnectionStatus)(ConnectionStatus, const char *, const size_t, + void *); -typedef void (*NetStatusReport)(int, TraversalMode, const unsigned short, - const unsigned short, void *); +typedef void (*NetStatusReport)(const char *, const size_t, TraversalMode, + const unsigned short, const unsigned short, + void *); typedef struct { bool use_cfg_file; @@ -166,15 +168,17 @@ class PeerConnection { private: std::map> ice_transmission_list_; - std::function + std::map is_ice_transmission_ready_; + std::function on_receive_video_ = nullptr; - std::function + std::function on_receive_audio_ = nullptr; - std::function + std::function on_receive_data_ = nullptr; - std::function on_ice_status_change_ = nullptr; - std::function + std::function on_ice_status_change_ = + nullptr; + std::function on_net_status_report_ = nullptr; bool ice_ready_ = false; diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 7426b06..3afc248 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -14,7 +14,7 @@ using nlohmann::json; IceTransmission::IceTransmission( bool offer_peer, std::string &transmission_id, std::string &user_id, std::string &remote_user_id, std::shared_ptr ice_ws_transmission, - std::function on_ice_status_change) + std::function on_ice_status_change) : offer_peer_(offer_peer), transmission_id_(transmission_id), user_id_(user_id), @@ -82,8 +82,7 @@ int IceTransmission::InitIceTransmission( // LOG_ERROR("OnReceiveCompleteFrame {}", video_frame.Size()); ice_io_statistics_->UpdateVideoInboundBytes(video_frame.Size()); on_receive_video_((const char *)video_frame.Buffer(), - video_frame.Size(), remote_user_id_.data(), - remote_user_id_.size()); + video_frame.Size(), remote_user_id_); }); rtp_video_receiver_->Start(); @@ -130,8 +129,7 @@ int IceTransmission::InitIceTransmission( rtp_audio_receiver_->SetOnReceiveData( [this](const char *data, size_t size) -> void { ice_io_statistics_->UpdateAudioInboundBytes(size); - on_receive_audio_(data, size, remote_user_id_.data(), - remote_user_id_.size()); + on_receive_audio_(data, size, remote_user_id_); }); rtp_audio_sender_ = std::make_unique(); @@ -176,8 +174,7 @@ int IceTransmission::InitIceTransmission( rtp_data_receiver_->SetOnReceiveData( [this](const char *data, size_t size) -> void { ice_io_statistics_->UpdateDataInboundBytes(size); - on_receive_data_(data, size, remote_user_id_.data(), - remote_user_id_.size()); + on_receive_data_(data, size, remote_user_id_); }); rtp_data_sender_ = std::make_unique(); @@ -212,20 +209,22 @@ int IceTransmission::InitIceTransmission( if (user_ptr) { IceTransmission *ice_transmission_obj = static_cast(user_ptr); - LOG_INFO("[{}->{}] state_change: {}", ice_transmission_obj->user_id_, - ice_transmission_obj->remote_user_id_, - nice_component_state_to_string(state)); - ice_transmission_obj->state_ = state; + if (!ice_transmission_obj->is_closed_) { + LOG_INFO("[{}->{}] state_change: {}", + ice_transmission_obj->user_id_, + ice_transmission_obj->remote_user_id_, + nice_component_state_to_string(state)); + ice_transmission_obj->state_ = state; - if (state == NICE_COMPONENT_STATE_READY || - state == NICE_COMPONENT_STATE_CONNECTED) { - ice_transmission_obj->ice_io_statistics_->Start(); + if (state == NICE_COMPONENT_STATE_READY || + state == NICE_COMPONENT_STATE_CONNECTED) { + ice_transmission_obj->ice_io_statistics_->Start(); + } + + ice_transmission_obj->on_ice_status_change_( + nice_component_state_to_string(state), + ice_transmission_obj->remote_user_id_); } - - ice_transmission_obj->on_ice_status_change_( - nice_component_state_to_string(state)); - } else { - LOG_INFO("state_change: {}", nice_component_state_to_string(state)); } }, [](NiceAgent *agent, guint stream_id, guint component_id, @@ -301,7 +300,7 @@ int IceTransmission::InitIceTransmission( ice_transmission_obj->traversal_type_ = TraversalType::TP2P; } ice_transmission_obj->on_receive_net_status_report_( - atoi(ice_transmission_obj->transmission_id_.c_str()), + ice_transmission_obj->transmission_id_, ice_transmission_obj->traversal_type_, 0, 0, nullptr); } }, @@ -310,7 +309,7 @@ int IceTransmission::InitIceTransmission( if (user_ptr) { IceTransmission *ice_transmission_obj = static_cast(user_ptr); - if (ice_transmission_obj) { + if (ice_transmission_obj && !ice_transmission_obj->is_closed_) { if (ice_transmission_obj->CheckIsVideoPacket(buffer, size)) { RtpPacket packet((uint8_t *)buffer, size); ice_transmission_obj->rtp_video_receiver_->InsertRtpPacket( @@ -336,8 +335,10 @@ int IceTransmission::InitIceTransmission( int IceTransmission::DestroyIceTransmission() { LOG_INFO("[{}->{}] Destroy ice transmission", user_id_, remote_user_id_); + is_closed_ = true; + if (on_ice_status_change_) { - on_ice_status_change_("closed"); + on_ice_status_change_("closed", remote_user_id_); } if (ice_io_statistics_) { @@ -360,8 +361,6 @@ int IceTransmission::DestroyIceTransmission() { rtp_data_sender_->Stop(); } - LOG_ERROR("threads stoped"); - return ice_agent_->DestroyIceAgent(); } diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index fefdda5..773baf2 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -38,7 +38,8 @@ class IceTransmission { IceTransmission(bool offer_peer, std::string &transmission_id, std::string &user_id, std::string &remote_user_id, std::shared_ptr ice_ws_transmission, - std::function on_ice_status_change); + std::function + on_ice_status_change); ~IceTransmission(); public: @@ -56,26 +57,26 @@ class IceTransmission { int DestroyIceTransmission(); void SetOnReceiveVideoFunc( - std::function + std::function on_receive_video) { on_receive_video_ = on_receive_video; } void SetOnReceiveAudioFunc( - std::function + std::function on_receive_audio) { on_receive_audio_ = on_receive_audio; } void SetOnReceiveDataFunc( - std::function + std::function on_receive_data) { on_receive_data_ = on_receive_data; } void SetOnReceiveNetStatusReportFunc( - std::function + std::function on_receive_net_status_report) { on_receive_net_status_report_ = on_receive_net_status_report; } @@ -147,16 +148,18 @@ class IceTransmission { private: std::unique_ptr ice_agent_ = nullptr; + bool is_closed_ = false; std::shared_ptr ice_ws_transport_ = nullptr; CongestionControl *congestion_control_ = nullptr; - std::function + std::function on_receive_video_ = nullptr; - std::function + std::function on_receive_audio_ = nullptr; - std::function + std::function on_receive_data_ = nullptr; - std::function on_ice_status_change_ = nullptr; - std::function on_ice_status_change_ = + nullptr; + std::function on_receive_net_status_report_ = nullptr;