From 1c0d80fa3a0af13a0710422b3131e40ddd4dff6a Mon Sep 17 00:00:00 2001 From: dijunkun Date: Thu, 24 Aug 2023 10:25:23 +0800 Subject: [PATCH] Optimize PeerConnection::Create() and PeerConnection::Join() --- src/pc/peer_connection.cpp | 373 ++++++++++---------------- src/pc/peer_connection.h | 12 +- tests/signal_server/signal_server.cpp | 8 +- 3 files changed, 161 insertions(+), 232 deletions(-) diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 2c1fe98..c0f8c11 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -9,137 +9,11 @@ using nlohmann::json; -static const std::map siganl_types{ - {"ws_connection_id", 1}, - {"offer", 2}, - {"transmission_id", 3}, - {"remote_sdp", 4}, - {"candidate", 5}}; - PeerConnection::PeerConnection() {} PeerConnection::~PeerConnection() {} -int PeerConnection::Create(PeerConnectionParams params, - const std::string &transmission_id, - const std::string &user_id) { - user_id_ = user_id; - - INIReader reader(params.cfg_path); - cfg_signal_server_ip_ = reader.Get("signal server", "ip", "-1"); - cfg_signal_server_port_ = reader.Get("signal server", "port", "-1"); - cfg_stun_server_ip_ = reader.Get("stun server", "ip", "-1"); - cfg_stun_server_port_ = reader.Get("stun server", "port", "-1"); - std::regex regex("\n"); - - LOG_INFO("Read config success"); - - signal_server_port_ = stoi(cfg_signal_server_port_); - stun_server_port_ = stoi(cfg_stun_server_port_); - - LOG_INFO("stun server ip [{}] port [{}]", cfg_stun_server_ip_, - stun_server_port_); - - on_receive_ws_msg_ = [this](const std::string &msg) { - auto j = json::parse(msg); - std::string type = j["type"]; - LOG_INFO("msg type: {}", type.c_str()); - switch (HASH_STRING_PIECE(type.c_str())) { - case "ws_connection_id"_H: { - ws_connection_id_ = j["ws_connection_id"].get(); - LOG_INFO("Receive local peer websocket connection id [{}]", - ws_connection_id_); - signal_status_ = SignalStatus::Connected; - break; - } - case "transmission_id"_H: { - if (j["status"].get() == "success") { - transmission_id_ = j["transmission_id"].get(); - LOG_INFO("Create transmission success with id [{}]", - transmission_id_); - - } else if (j["status"].get() == "fail") { - LOG_WARN("Create transmission failed with id [{}], due to [{}]", - transmission_id_, j["reason"].get().c_str()); - } - break; - } - case "user_leave_transmission"_H: { - std::string user_id = j["user_id"]; - LOG_INFO("Receive notification: user id [{}] leave transmission", - user_id); - auto user_id_it = ice_transmission_list_.find(user_id); - if (user_id_it != ice_transmission_list_.end()) { - user_id_it->second->DestroyIceTransmission(); - delete user_id_it->second; - user_id_it->second = nullptr; - ice_transmission_list_.erase(user_id_it); - LOG_INFO("Terminate transmission to user [{}]", user_id); - } - break; - } - case "offer"_H: { - std::string remote_sdp = j["sdp"].get(); - - if (remote_sdp.empty()) { - LOG_INFO("Invalid remote sdp"); - } else { - std::string transmission_id = j["transmission_id"].get(); - std::string sdp = j["sdp"].get(); - std::string remote_user_id = j["remote_user_id"].get(); - LOG_INFO("Receive remote sdp from [{}]", remote_user_id); - - ice_transmission_list_[remote_user_id] = new IceTransmission( - false, transmission_id, user_id_, remote_user_id, ws_transport_, - on_receive_ice_msg_); - - ice_transmission_list_[remote_user_id]->InitIceTransmission( - cfg_stun_server_ip_, stun_server_port_); - - ice_transmission_list_[remote_user_id]->SetTransmissionId( - transmission_id_); - - ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); - - ice_transmission_list_[remote_user_id]->GatherCandidates(); - } - break; - } - default: { - // ice_transmission_->OnReceiveMessage(msg); - break; - } - } - }; - - on_receive_ice_msg_ = [this](const char *data, size_t size) { - std::string msg(data, size); - LOG_INFO("Receive data: [{}]", msg.c_str()); - }; - - ws_transport_ = new WsTransmission(on_receive_ws_msg_); - uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; - if (ws_transport_) { - ws_transport_->Connect(uri_); - } - - do { - // LOG_INFO("GetSignalStatus = {}", GetSignalStatus()); - } while (SignalStatus::Connected != GetSignalStatus()); - - json message = {{"type", "create_transmission"}, - {"user_id", user_id}, - {"transmission_id", transmission_id}}; - if (ws_transport_) { - ws_transport_->Send(message.dump()); - // LOG_INFO("Send create transmission request: {}", message.dump()); - LOG_INFO("Send create transmission request, transmission_id [{}]", - transmission_id); - } - return 0; -} - -int PeerConnection::Join(PeerConnectionParams params, +int PeerConnection::Init(PeerConnectionParams params, const std::string &transmission_id, const std::string &user_id) { // Todo: checkout user_id unique or not @@ -157,113 +31,16 @@ int PeerConnection::Join(PeerConnectionParams params, signal_server_port_ = stoi(cfg_signal_server_port_); stun_server_port_ = stoi(cfg_stun_server_port_); - on_receive_ws_msg_ = [this](const std::string &msg) { - auto j = json::parse(msg); - std::string type = j["type"]; - LOG_INFO("msg type: {}", type); - switch (HASH_STRING_PIECE(type.c_str())) { - case "user_id_list"_H: { - user_id_list_ = j["user_id_list"]; - std::string transmission_id = j["transmission_id"]; + LOG_INFO("stun server ip [{}] port [{}]", cfg_stun_server_ip_, + stun_server_port_); - LOG_INFO("Transmission [{}] members: [", transmission_id); - for (auto user_id : user_id_list_) { - LOG_INFO("{}", user_id); - } - LOG_INFO("]"); - - for (auto &remote_user_id : user_id_list_) { - ice_transmission_list_[remote_user_id] = new IceTransmission( - true, transmission_id, user_id_, remote_user_id, ws_transport_, - on_receive_ice_msg_); - ice_transmission_list_[remote_user_id]->InitIceTransmission( - cfg_stun_server_ip_, stun_server_port_); - ice_transmission_list_[remote_user_id]->JoinTransmission(); - } - - break; - } - case "user_leave_transmission"_H: { - std::string user_id = j["user_id"]; - LOG_INFO("Receive notification: user id [{}] leave transmission", - user_id); - auto user_id_it = ice_transmission_list_.find(user_id); - if (user_id_it != ice_transmission_list_.end()) { - user_id_it->second->DestroyIceTransmission(); - delete user_id_it->second; - user_id_it->second = nullptr; - ice_transmission_list_.erase(user_id_it); - LOG_INFO("Terminate transmission to user [{}]", user_id); - } - break; - } - case "ws_connection_id"_H: { - ws_connection_id_ = j["ws_connection_id"].get(); - LOG_INFO("Receive local peer websocket connection id [{}]", - ws_connection_id_); - signal_status_ = SignalStatus::Connected; - break; - } - case "offer"_H: { - std::string remote_sdp = j["sdp"].get(); - - if (remote_sdp.empty()) { - LOG_INFO("Invalid remote sdp"); - } else { - std::string transmission_id = j["transmission_id"].get(); - std::string sdp = j["sdp"].get(); - std::string remote_user_id = j["remote_user_id"].get(); - LOG_INFO("Receive remote sdp from [{}]", remote_user_id); - - ice_transmission_list_[remote_user_id] = new IceTransmission( - false, transmission_id, user_id_, remote_user_id, ws_transport_, - on_receive_ice_msg_); - - ice_transmission_list_[remote_user_id]->InitIceTransmission( - cfg_stun_server_ip_, stun_server_port_); - - ice_transmission_list_[remote_user_id]->SetTransmissionId( - transmission_id_); - - ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); - - ice_transmission_list_[remote_user_id]->GatherCandidates(); - } - break; - } - case "remote_sdp"_H: { - std::string remote_sdp = j["sdp"].get(); - if (remote_sdp.empty()) { - LOG_INFO("remote_sdp is empty"); - } else { - std::string transmission_id = j["transmission_id"].get(); - std::string sdp = j["sdp"].get(); - std::string remote_user_id = j["remote_user_id"].get(); - - LOG_INFO("Receive remote sdp from [{}]", remote_user_id); - // LOG_INFO("Receive remote sdp [{}]", remote_sdp); - - if (ice_transmission_list_.find(remote_user_id) != - ice_transmission_list_.end()) { - ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); - } - } - break; - } - default: { - // ice_transmission_->OnReceiveMessage(msg); - break; - } - } - }; + on_receive_ws_msg_ = [this](const std::string &msg) { ProcessSignal(msg); }; on_receive_ice_msg_ = [this](const char *data, size_t size) { std::string msg(data, size); LOG_INFO("Receive data: [{}]", msg.c_str()); }; - transmission_id_ = transmission_id; - ws_transport_ = new WsTransmission(on_receive_ws_msg_); uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; if (ws_transport_) { @@ -271,13 +48,151 @@ int PeerConnection::Join(PeerConnectionParams params, } do { - // LOG_INFO("GetSignalStatus = {}", GetSignalStatus()); } while (SignalStatus::Connected != GetSignalStatus()); - RequestTransmissionMemberList(transmission_id_); return 0; } +int PeerConnection::Create(PeerConnectionParams params, + const std::string &transmission_id, + const std::string &user_id) { + int ret = 0; + + ret = Init(params, transmission_id, user_id); + + json message = {{"type", "create_transmission"}, + {"user_id", user_id}, + {"transmission_id", transmission_id}}; + if (ws_transport_) { + ws_transport_->Send(message.dump()); + LOG_INFO("Send create transmission request, transmission_id [{}]", + transmission_id); + } + return ret; +} + +int PeerConnection::Join(PeerConnectionParams params, + const std::string &transmission_id, + const std::string &user_id) { + int ret = 0; + + ret = Init(params, transmission_id, user_id); + + transmission_id_ = transmission_id; + ret = RequestTransmissionMemberList(transmission_id_); + return ret; +} + +void PeerConnection::ProcessSignal(const std::string &signal) { + auto j = json::parse(signal); + std::string type = j["type"]; + LOG_INFO("signal type: {}", type); + switch (HASH_STRING_PIECE(type.c_str())) { + case "ws_connection_id"_H: { + ws_connection_id_ = j["ws_connection_id"].get(); + LOG_INFO("Receive local peer websocket connection id [{}]", + ws_connection_id_); + signal_status_ = SignalStatus::Connected; + break; + } + case "transmission_id"_H: { + if (j["status"].get() == "success") { + transmission_id_ = j["transmission_id"].get(); + LOG_INFO("Create transmission success with id [{}]", transmission_id_); + + } else if (j["status"].get() == "fail") { + LOG_WARN("Create transmission failed with id [{}], due to [{}]", + transmission_id_, j["reason"].get().c_str()); + } + break; + } + case "user_id_list"_H: { + user_id_list_ = j["user_id_list"]; + std::string transmission_id = j["transmission_id"]; + + LOG_INFO("Transmission [{}] members: [", transmission_id); + for (auto user_id : user_id_list_) { + LOG_INFO("{}", user_id); + } + LOG_INFO("]"); + + for (auto &remote_user_id : user_id_list_) { + ice_transmission_list_[remote_user_id] = + new IceTransmission(true, transmission_id, user_id_, remote_user_id, + ws_transport_, on_receive_ice_msg_); + ice_transmission_list_[remote_user_id]->InitIceTransmission( + cfg_stun_server_ip_, stun_server_port_); + ice_transmission_list_[remote_user_id]->JoinTransmission(); + } + + break; + } + case "user_leave_transmission"_H: { + std::string user_id = j["user_id"]; + LOG_INFO("Receive notification: user id [{}] leave transmission", + user_id); + auto user_id_it = ice_transmission_list_.find(user_id); + if (user_id_it != ice_transmission_list_.end()) { + user_id_it->second->DestroyIceTransmission(); + delete user_id_it->second; + user_id_it->second = nullptr; + ice_transmission_list_.erase(user_id_it); + LOG_INFO("Terminate transmission to user [{}]", user_id); + } + break; + } + case "offer"_H: { + std::string remote_sdp = j["sdp"].get(); + + if (remote_sdp.empty()) { + LOG_INFO("Invalid remote sdp"); + } else { + std::string transmission_id = j["transmission_id"].get(); + std::string sdp = j["sdp"].get(); + std::string remote_user_id = j["remote_user_id"].get(); + LOG_INFO("[{}] receive offer from [{}]", user_id_, remote_user_id); + + ice_transmission_list_[remote_user_id] = new IceTransmission( + false, transmission_id, user_id_, remote_user_id, ws_transport_, + on_receive_ice_msg_); + + ice_transmission_list_[remote_user_id]->InitIceTransmission( + cfg_stun_server_ip_, stun_server_port_); + + ice_transmission_list_[remote_user_id]->SetTransmissionId( + transmission_id_); + + ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); + + ice_transmission_list_[remote_user_id]->GatherCandidates(); + } + break; + } + case "answer"_H: { + std::string remote_sdp = j["sdp"].get(); + if (remote_sdp.empty()) { + LOG_INFO("remote_sdp is empty"); + } else { + std::string transmission_id = j["transmission_id"].get(); + std::string sdp = j["sdp"].get(); + std::string remote_user_id = j["remote_user_id"].get(); + + LOG_INFO("[{}] receive answer from [{}]", user_id_, remote_user_id); + + if (ice_transmission_list_.find(remote_user_id) != + ice_transmission_list_.end()) { + ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); + } + } + break; + } + default: { + // ice_transmission_->OnReceiveMessage(msg); + break; + } + } +} + int PeerConnection::RequestTransmissionMemberList( const std::string &transmission_id) { LOG_INFO("Request member list"); diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index 6258430..baa5d50 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -29,16 +29,24 @@ class PeerConnection { int Create(PeerConnectionParams params, const std::string &transmission_id = "", const std::string &user_id = ""); + int Join(PeerConnectionParams params, const std::string &transmission_id, const std::string &user_id = ""); - int Destroy(); - int RequestTransmissionMemberList(const std::string &transmission_id); + int Destroy(); SignalStatus GetSignalStatus(); int SendData(const char *data, size_t size); + private: + int Init(PeerConnectionParams params, const std::string &transmission_id, + const std::string &user_id); + + void ProcessSignal(const std::string &signal); + + int RequestTransmissionMemberList(const std::string &transmission_id); + private: std::string uri_ = ""; std::string cfg_signal_server_ip_; diff --git a/tests/signal_server/signal_server.cpp b/tests/signal_server/signal_server.cpp index 3933e68..08c5c33 100644 --- a/tests/signal_server/signal_server.cpp +++ b/tests/signal_server/signal_server.cpp @@ -66,6 +66,12 @@ bool SignalServer::on_close(websocketpp::connection_hdl hdl) { std::vector user_id_list = transmission_manager_.GetAllUserIdOfTransmission(transmission_id); + if (user_id_list.empty()) { + transmission_list_.erase(transmission_id); + LOG_INFO("Release transmission [{}] due to no user in this transmission", + transmission_id); + } + for (const auto& user_id : user_id_list) { send_msg(transmission_manager_.GetWsHandle(user_id), message); } @@ -196,7 +202,7 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl, // LOG_INFO("send answer sdp [{}]", sdp); LOG_INFO("[{}] send answer to [{}]", user_id, remote_user_id); - json message = {{"type", "remote_sdp"}, + json message = {{"type", "answer"}, {"sdp", sdp}, {"remote_user_id", user_id}, {"transmission_id", transmission_id}};