From 515f0c06bd9eea13113405a1c986afcbddc2ba4f Mon Sep 17 00:00:00 2001 From: dijunkun Date: Fri, 18 Aug 2023 17:35:53 +0800 Subject: [PATCH] Dev version: Support multiple ice connections --- src/ice/ice_agent.cpp | 7 +- src/ice/ice_transmission.cpp | 26 ++-- src/pc/peer_connection.cpp | 143 ++++++++++++++----- src/ws/ws_transmission.cpp | 2 +- tests/signal_server/main.cpp | 7 +- tests/signal_server/signal_server.cpp | 51 +++++-- tests/signal_server/transmission_manager.cpp | 60 +++++++- tests/signal_server/transmission_manager.h | 14 +- 8 files changed, 242 insertions(+), 68 deletions(-) diff --git a/src/ice/ice_agent.cpp b/src/ice/ice_agent.cpp index a079065..eace4ee 100644 --- a/src/ice/ice_agent.cpp +++ b/src/ice/ice_agent.cpp @@ -49,7 +49,8 @@ char *IceAgent::GenerateLocalSdp() { } juice_get_local_description(agent_, local_sdp_, JUICE_MAX_SDP_STRING_LEN); - LOG_INFO("Generate local sdp:[\n{}]", local_sdp_); + // LOG_INFO("Generate local sdp:[\n{}]", local_sdp_); + LOG_INFO("Generate local sdp"); return local_sdp_; } @@ -57,13 +58,13 @@ char *IceAgent::GenerateLocalSdp() { int IceAgent::SetRemoteSdp(const char *remote_sdp) { LOG_INFO("Set remote sdp"); juice_set_remote_description(agent_, remote_sdp); - LOG_INFO("Remote description:[\n{}]", remote_sdp); + // LOG_INFO("Remote description:[\n{}]", remote_sdp); return 0; } int IceAgent::GatherCandidates() { - LOG_INFO("Gather candidates"); + LOG_INFO("[{}] Gather candidates", (void *)this); juice_gather_candidates(agent_); return 0; diff --git a/src/ice/ice_transmission.cpp b/src/ice/ice_transmission.cpp index 4305d68..ea6c333 100644 --- a/src/ice/ice_transmission.cpp +++ b/src/ice/ice_transmission.cpp @@ -31,7 +31,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { LOG_INFO("state_change: {}", ice_status[state]); }, [](juice_agent_t *agent, const char *sdp, void *user_ptr) { - LOG_INFO("candadite: {}", sdp); + // LOG_INFO("candadite: {}", sdp); // trickle // static_cast(user_ptr)->SendOfferLocalCandidate(sdp); @@ -136,7 +136,8 @@ int IceTransmission::SendOffer() { json message = {{"type", "offer"}, {"transmission_id", transmission_id_}, {"sdp", local_sdp_}}; - LOG_INFO("Send offer:\n{}", message.dump().c_str()); + // LOG_INFO("Send offer:\n{}", message.dump().c_str()); + LOG_INFO("Send offer"); if (ice_ws_transport_) { ice_ws_transport_->Send(message.dump()); @@ -165,7 +166,9 @@ int IceTransmission::SendAnswer() { {"transmission_id", transmission_id_}, {"sdp", local_sdp_}, {"guest", remote_ice_username_}}; - LOG_INFO("Send answer:\n{}", message.dump().c_str()); + // LOG_INFO("Send answer:\n{}", message.dump().c_str()); + LOG_INFO("[{}] Send answer to [{}]", GetIceUsername(local_sdp_), + remote_ice_username_); if (ice_ws_transport_) { ice_ws_transport_->Send(message.dump()); @@ -178,7 +181,8 @@ int IceTransmission::SendOfferLocalCandidate( json message = {{"type", "offer_candidate"}, {"transmission_id", transmission_id_}, {"sdp", remote_candidate}}; - LOG_INFO("Send candidate:\n{}", message.dump().c_str()); + // LOG_INFO("Send candidate:\n{}", message.dump().c_str()); + LOG_INFO("Send candidate"); if (ice_ws_transport_) { ice_ws_transport_->Send(message.dump()); @@ -191,7 +195,8 @@ int IceTransmission::SendAnswerLocalCandidate( json message = {{"type", "answer_candidate"}, {"transmission_id", transmission_id_}, {"sdp", remote_candidate}}; - LOG_INFO("Send candidate:\n{}", message.dump().c_str()); + // LOG_INFO("Send candidate:\n{}", message.dump().c_str()); + LOG_INFO("Send candidate"); if (ice_ws_transport_) { ice_ws_transport_->Send(message.dump()); @@ -206,7 +211,7 @@ int IceTransmission::SendData(const char *data, size_t size) { void IceTransmission::OnReceiveMessage(const std::string &msg) { auto j = json::parse(msg); - LOG_INFO("msg: {}", msg.c_str()); + // LOG_INFO("msg: {}", msg.c_str()); std::string type = j["type"]; @@ -217,7 +222,8 @@ void IceTransmission::OnReceiveMessage(const std::string &msg) { if (remote_sdp_.empty()) { LOG_INFO("Invalid remote sdp"); } else { - LOG_INFO("Receive remote sdp [{}]", remote_sdp_); + // LOG_INFO("Receive remote sdp [{}]", remote_sdp_); + LOG_INFO("Receive remote sdp"); SetRemoteSdp(remote_sdp_); GatherCandidates(); @@ -243,7 +249,8 @@ void IceTransmission::OnReceiveMessage(const std::string &msg) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); QueryRemoteSdp(transmission_id_); } else { - LOG_INFO("Receive remote sdp [{}]", remote_sdp_); + // LOG_INFO("Receive remote sdp [{}]", remote_sdp_); + LOG_INFO("Receive remote sdp"); SetRemoteSdp(remote_sdp_); if (!offer_peer_) { @@ -254,7 +261,8 @@ void IceTransmission::OnReceiveMessage(const std::string &msg) { } case "candidate"_H: { std::string candidate = j["sdp"].get(); - LOG_INFO("Receive candidate [{}]", candidate); + // LOG_INFO("Receive candidate [{}]", candidate); + LOG_INFO("Receive candidate"); AddRemoteCandidate(candidate); break; } diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index cfa52fc..ac17b21 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -64,24 +64,25 @@ int PeerConnection::Create(PeerConnectionParams params, if (remote_sdp.empty()) { LOG_INFO("Invalid remote sdp"); } else { - LOG_INFO("Receive remote sdp [{}]", remote_sdp); - - ice_transmission_ = - new IceTransmission(false, ws_transport_, on_receive_ice_msg_); - std::string ice_username = GetIceUsername(remote_sdp); - ice_transmission_list_[ice_username] = ice_transmission_; - ice_transmission_->InitIceTransmission(cfg_stun_server_ip_, - stun_server_port_); + LOG_INFO("Receive remote sdp from [{}]", ice_username); - ice_transmission_->SetRemoteSdp(remote_sdp); + // IceTransmission *ice_transmission = + // new IceTransmission(false, ws_transport_, on_receive_ice_msg_); - ice_transmission_->GatherCandidates(); + ice_transmission_list_[ice_username] = + new IceTransmission(false, ws_transport_, on_receive_ice_msg_); + ice_transmission_list_[ice_username]->InitIceTransmission( + cfg_stun_server_ip_, stun_server_port_); + + ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp); + + ice_transmission_list_[ice_username]->GatherCandidates(); } break; } default: { - ice_transmission_->OnReceiveMessage(msg); + // ice_transmission_->OnReceiveMessage(msg); break; } } @@ -116,36 +117,94 @@ int PeerConnection::Create(PeerConnectionParams params, int PeerConnection::Join(PeerConnectionParams params, const std::string &transmission_id) { INIReader reader(params.cfg_path); - std::string cfg_signal_server_ip = reader.Get("signal server", "ip", "-1"); - std::string cfg_signal_server_port = - reader.Get("signal server", "port", "-1"); - std::string cfg_stun_server_ip = reader.Get("stun server", "ip", "-1"); - std::string cfg_stun_server_port = reader.Get("stun server", "port", "-1"); + cfg_signal_server_ip_ = reader.Get("signal server", "ip", "-1"); + cfg_signal_server_port_ = reader.Get("signal server", "port", "-1"); + cfg_stun_server_ip_ = reader.Get("stun server", "ip", "-1"); + cfg_stun_server_port_ = reader.Get("stun server", "port", "-1"); std::regex regex("\n"); - int signal_server_port = stoi(cfg_signal_server_port); - int stun_server_port = stoi(cfg_stun_server_port); + LOG_INFO("Read config success"); + + signal_server_port_ = stoi(cfg_signal_server_port_); + stun_server_port_ = stoi(cfg_stun_server_port_); on_receive_ws_msg_ = [this](const std::string &msg) { do { - } while (!ice_transmission_); + } while (ice_transmission_list_.empty()); auto j = json::parse(msg); std::string type = j["type"]; - auto itr = siganl_types.find(type); - if (itr != siganl_types.end()) { - LOG_INFO("msg type :{}", itr->first); - switch (itr->second) { - case 1: { - ws_connection_id_ = j["ws_connection_id"].get(); - LOG_INFO("Receive local peer websocket connection id [{}]", - ws_connection_id_); - signal_status_ = SignalStatus::Connected; - break; + LOG_INFO("msg type :{}", type.c_str()); + switch (HASH_STRING_PIECE(type.c_str())) { + case "ws_connection_id"_H: { + ws_connection_id_ = j["ws_connection_id"].get(); + LOG_INFO("Receive local peer websocket connection id [{}]", + ws_connection_id_); + signal_status_ = SignalStatus::Connected; + break; + } + case "offer"_H: { + std::string remote_sdp = j["sdp"].get(); + + if (remote_sdp.empty()) { + LOG_INFO("Invalid remote sdp"); + } else { + std::string ice_username = GetIceUsername(remote_sdp); + LOG_INFO("Receive remote sdp from [{}]", ice_username); + + // IceTransmission *ice_transmission = + // new IceTransmission(false, ws_transport_, on_receive_ice_msg_); + + ice_transmission_list_[ice_username] = + new IceTransmission(false, ws_transport_, on_receive_ice_msg_); + ice_transmission_list_[ice_username]->InitIceTransmission( + cfg_stun_server_ip_, stun_server_port_); + + ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp); + + ice_transmission_list_[ice_username]->GatherCandidates(); } - default: { - ice_transmission_->OnReceiveMessage(msg); - break; + break; + } + case "remote_sdp"_H: { + std::string remote_sdp = j["sdp"].get(); + if (remote_sdp.empty()) { + LOG_INFO("remote_sdp is empty"); + } else { + std::string ice_username = GetIceUsername(remote_sdp); + LOG_INFO("Receive remote sdp from [{}]", ice_username); + // LOG_INFO("Receive remote sdp [{}]", remote_sdp); + + if (ice_transmission_list_.size() == 1 && + ice_transmission_list_.begin()->first == "self") { + ice_transmission_list_["self"]->SetRemoteSdp(remote_sdp); + } else if (ice_transmission_list_.find(ice_username) == + ice_transmission_list_.end()) { + ice_transmission_list_[ice_username] = + new IceTransmission(false, ws_transport_, on_receive_ice_msg_); + ice_transmission_list_[ice_username]->InitIceTransmission( + cfg_stun_server_ip_, stun_server_port_); + ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp); + } + + // if (!offer_peer_) { + // GatherCandidates(); + // } } + break; + } + case "candidate"_H: { + std::string remote_sdp_with_candidates = j["sdp"].get(); + std::string ice_username = GetIceUsername(remote_sdp_with_candidates); + LOG_INFO("Receive remote candidates from [{}]", ice_username); + // LOG_INFO("Receive candidate [{}]", candidate); + + ice_transmission_list_[ice_username]->AddRemoteCandidate( + remote_sdp_with_candidates); + break; + } + default: { + ice_transmission_->OnReceiveMessage(msg); + break; } } }; @@ -158,20 +217,26 @@ int PeerConnection::Join(PeerConnectionParams params, transmission_id_ = transmission_id; ws_transport_ = new WsTransmission(on_receive_ws_msg_); - uri_ = "ws://" + cfg_signal_server_ip + ":" + cfg_signal_server_port; + uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; if (ws_transport_) { ws_transport_->Connect(uri_); } - ice_transmission_ = + ice_transmission_list_["self"] = new IceTransmission(true, ws_transport_, on_receive_ice_msg_); - ice_transmission_->InitIceTransmission(cfg_stun_server_ip, stun_server_port); + ice_transmission_list_["self"]->InitIceTransmission(cfg_stun_server_ip_, + stun_server_port_); + // ice_transmission_ = + // new IceTransmission(true, ws_transport_, on_receive_ice_msg_); + // ice_transmission_->InitIceTransmission(cfg_stun_server_ip, + // stun_server_port); do { - LOG_INFO("GetSignalStatus = {}", GetSignalStatus()); + // LOG_INFO("GetSignalStatus = {}", GetSignalStatus()); } while (SignalStatus::Connected != GetSignalStatus()); - ice_transmission_->JoinTransmission(transmission_id_); + // ice_transmission_->JoinTransmission(transmission_id_); + ice_transmission_list_["self"]->JoinTransmission(transmission_id_); return 0; } @@ -185,6 +250,8 @@ int PeerConnection::Destroy() { SignalStatus PeerConnection::GetSignalStatus() { return signal_status_; } int PeerConnection::SendData(const char *data, size_t size) { - ice_transmission_->SendData(data, size); + for (auto ice_trans : ice_transmission_list_) { + ice_trans.second->SendData(data, size); + } return 0; } \ No newline at end of file diff --git a/src/ws/ws_transmission.cpp b/src/ws/ws_transmission.cpp index d490001..ad59c3f 100644 --- a/src/ws/ws_transmission.cpp +++ b/src/ws/ws_transmission.cpp @@ -9,7 +9,7 @@ WsTransmission::WsTransmission( WsTransmission::~WsTransmission() {} void WsTransmission::OnReceiveMessage(const std::string &msg) { - LOG_INFO("Receive msg: {}", msg); + // LOG_INFO("Receive msg: {}", msg); if (on_receive_msg_) { on_receive_msg_(msg); } diff --git a/tests/signal_server/main.cpp b/tests/signal_server/main.cpp index 0ffa79c..eb8927e 100644 --- a/tests/signal_server/main.cpp +++ b/tests/signal_server/main.cpp @@ -5,7 +5,12 @@ int main(int argc, char* argv[]) { SignalServer s; - std::string port = argv[1]; + std::string port = ""; + if (argc > 1) { + port = argv[1]; + } else { + port = "9090"; + } std::cout << "Port: " << port << std::endl; s.run(std::stoi(port)); return 0; diff --git a/tests/signal_server/signal_server.cpp b/tests/signal_server/signal_server.cpp index 19427a4..7d3c657 100644 --- a/tests/signal_server/signal_server.cpp +++ b/tests/signal_server/signal_server.cpp @@ -72,7 +72,6 @@ bool SignalServer::on_pong(websocketpp::connection_hdl hdl, std::string s) { } void SignalServer::run(uint16_t port) { - // Listen on port 9093 server_.listen(port); // Queues a connection accept operation @@ -133,32 +132,64 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl, case "offer"_H: { std::string transmission_id = j["transmission_id"].get(); std::string sdp = j["sdp"].get(); - LOG_INFO("Receive transmission id [{}] with offer sdp [{}]", - transmission_id, sdp); + // LOG_INFO("Receive transmission id [{}] with offer sdp [{}]", + // transmission_id, sdp); transmission_manager_.BindGuestToTransmission(hdl, transmission_id); + std::string guest_username = GetIceUsername(sdp); + transmission_manager_.BindGuestUsernameToWsHandle(guest_username, hdl); websocketpp::connection_hdl host_hdl = transmission_manager_.GetHostOfTransmission(transmission_id); + std::vector guest_hdl_list = + transmission_manager_.GetAllGuestsOfTransmission(transmission_id); - std::string ice_username = GetIceUsername(sdp); - transmission_manager_.BindGuestUsernameToWsHandle(ice_username, hdl); + // LOG_INFO("send offer sdp [{}]", sdp); + json message = { + {"type", "offer"}, {"sdp", sdp}, {"guest", guest_username}}; - LOG_INFO("send offer sdp [{}]", sdp.c_str()); - json message = {{"type", "offer"}, {"sdp", sdp}, {"guest", ice_username}}; + LOG_INFO("[{}] send offer sdp to host", guest_username); send_msg(host_hdl, message); + + LOG_INFO("Size of guest_hdl_list: {}", guest_hdl_list.size()); + for (auto guest_hdl : guest_hdl_list) { + if (guest_hdl.lock().get() != hdl.lock().get()) { + LOG_INFO("[{}] send offer sdp to [{}]", guest_username, + transmission_manager_.GetGuestUsername(guest_hdl)); + send_msg(guest_hdl, message); + } + } + break; + } + case "query_members"_H: { + std::string transmission_id = j["transmission_id"].get(); + std::vector member_list = + transmission_manager_.GetAllMembersOfTransmission(transmission_id); + + json message = {{"type", "transmission_members"}, + {"transmission_id", transmission_id}, + {"transmission_members", member_list}, + {"status", "success"}}; + send_msg(hdl, message); break; } case "answer"_H: { std::string transmission_id = j["transmission_id"].get(); std::string sdp = j["sdp"].get(); std::string guest_ice_username = j["guest"].get(); - LOG_INFO("Receive transmission id [{}] with answer sdp [{}]", - transmission_id, sdp); + std::string host_ice_username = GetIceUsername(sdp); + if (transmission_manager_.GetHostUsername(hdl).empty()) { + transmission_manager_.BindHostUsernameToWsHandle(host_ice_username, + hdl); + } + // LOG_INFO("Receive transmission id [{}] with answer sdp [{}]", + // transmission_id, sdp); websocketpp::connection_hdl guest_hdl = transmission_manager_.GetGuestWsHandle(guest_ice_username); - LOG_INFO("send answer sdp [{}]", sdp.c_str()); + // LOG_INFO("send answer sdp [{}]", sdp); + LOG_INFO("[{}] send answer sdp to [{}]", host_ice_username, + guest_ice_username); json message = {{"type", "remote_sdp"}, {"sdp", sdp}}; send_msg(guest_hdl, message); break; diff --git a/tests/signal_server/transmission_manager.cpp b/tests/signal_server/transmission_manager.cpp index bc82217..a992d40 100644 --- a/tests/signal_server/transmission_manager.cpp +++ b/tests/signal_server/transmission_manager.cpp @@ -21,6 +21,14 @@ bool TransmissionManager::BindHostToTransmission( bool TransmissionManager::BindGuestToTransmission( websocketpp::connection_hdl hdl, const std::string& transmission_id) { + if (transmission_guest_list_.find(transmission_id) != + transmission_guest_list_.end()) { + transmission_guest_list_[transmission_id].push_back(hdl); + } else { + std::vector guest_hdl_list; + guest_hdl_list.push_back(hdl); + transmission_guest_list_[transmission_id] = guest_hdl_list; + } return true; } @@ -34,11 +42,23 @@ bool TransmissionManager::ReleaseGuestFromTransmission( return true; } +bool TransmissionManager::BindHostUsernameToWsHandle( + const std::string& host_username, websocketpp::connection_hdl hdl) { + if (transmission_host_username_list_.find(host_username) != + transmission_host_username_list_.end()) { + LOG_ERROR("Guest already bind to username [{}]", host_username.c_str()); + return false; + } else { + transmission_host_username_list_[host_username] = hdl; + } + return true; +} + bool TransmissionManager::BindGuestUsernameToWsHandle( const std::string& guest_username, websocketpp::connection_hdl hdl) { if (transmission_guest_username_list_.find(guest_username) != transmission_guest_username_list_.end()) { - LOG_WARN("Guest already bind to username [{}]", guest_username.c_str()); + LOG_ERROR("Guest already bind to username [{}]", guest_username.c_str()); return false; } else { transmission_guest_username_list_[guest_username] = hdl; @@ -57,14 +77,32 @@ websocketpp::connection_hdl TransmissionManager::GetHostOfTransmission( } } -std::set -TransmissionManager::GetGuestOfTransmission( +std::string TransmissionManager::GetHostUsername( + websocketpp::connection_hdl hdl) { + for (auto host : transmission_host_username_list_) { + if (host.second.lock().get() == hdl.lock().get()) return host.first; + } + + return ""; +} + +std::string TransmissionManager::GetGuestUsername( + websocketpp::connection_hdl hdl) { + for (auto guest : transmission_guest_username_list_) { + if (guest.second.lock().get() == hdl.lock().get()) return guest.first; + } + + return ""; +} + +std::vector +TransmissionManager::GetAllGuestsOfTransmission( const std::string& transmission_id) { if (transmission_guest_list_.find(transmission_id) != transmission_guest_list_.end()) { return transmission_guest_list_[transmission_id]; } else { - return std::set(); + return std::vector(); } } @@ -77,4 +115,18 @@ websocketpp::connection_hdl TransmissionManager::GetGuestWsHandle( websocketpp::connection_hdl hdl; return hdl; } +} + +std::vector TransmissionManager::GetAllMembersOfTransmission( + const std::string& transmission_id) { + std::vector member_list; + + member_list.push_back( + GetHostUsername(GetHostOfTransmission(transmission_id))); + + for (auto guest_hdl : GetAllGuestsOfTransmission(transmission_id)) { + member_list.push_back(GetGuestUsername(guest_hdl)); + } + + return member_list; } \ No newline at end of file diff --git a/tests/signal_server/transmission_manager.h b/tests/signal_server/transmission_manager.h index 909a974..3a1bc18 100644 --- a/tests/signal_server/transmission_manager.h +++ b/tests/signal_server/transmission_manager.h @@ -20,21 +20,31 @@ class TransmissionManager { bool ReleaseGuestFromTransmission(websocketpp::connection_hdl hdl, const std::string& transmission_id); + bool BindHostUsernameToWsHandle(const std::string& host_username, + websocketpp::connection_hdl hdl); bool BindGuestUsernameToWsHandle(const std::string& guest_username, websocketpp::connection_hdl hdl); + std::string GetHostUsername(websocketpp::connection_hdl hdl); + std::string GetGuestUsername(websocketpp::connection_hdl hdl); + websocketpp::connection_hdl GetHostOfTransmission( const std::string& transmission_id); - std::set GetGuestOfTransmission( + std::vector GetAllGuestsOfTransmission( const std::string& transmission_id); websocketpp::connection_hdl GetGuestWsHandle( const std::string& guest_username); + std::vector GetAllMembersOfTransmission( + const std::string& transmission_id); + private: std::map transmission_host_list_; - std::map> + std::map> transmission_guest_list_; + std::map + transmission_host_username_list_; std::map transmission_guest_username_list_; };