Optimize PeerConnection::Create() and PeerConnection::Join()

This commit is contained in:
dijunkun
2023-08-24 10:25:23 +08:00
parent e31405c78b
commit 1c0d80fa3a
3 changed files with 161 additions and 232 deletions

View File

@@ -9,137 +9,11 @@
using nlohmann::json;
static const std::map<std::string, unsigned int> siganl_types{
{"ws_connection_id", 1},
{"offer", 2},
{"transmission_id", 3},
{"remote_sdp", 4},
{"candidate", 5}};
PeerConnection::PeerConnection() {}
PeerConnection::~PeerConnection() {}
int PeerConnection::Create(PeerConnectionParams params,
const std::string &transmission_id,
const std::string &user_id) {
user_id_ = user_id;
INIReader reader(params.cfg_path);
cfg_signal_server_ip_ = reader.Get("signal server", "ip", "-1");
cfg_signal_server_port_ = reader.Get("signal server", "port", "-1");
cfg_stun_server_ip_ = reader.Get("stun server", "ip", "-1");
cfg_stun_server_port_ = reader.Get("stun server", "port", "-1");
std::regex regex("\n");
LOG_INFO("Read config success");
signal_server_port_ = stoi(cfg_signal_server_port_);
stun_server_port_ = stoi(cfg_stun_server_port_);
LOG_INFO("stun server ip [{}] port [{}]", cfg_stun_server_ip_,
stun_server_port_);
on_receive_ws_msg_ = [this](const std::string &msg) {
auto j = json::parse(msg);
std::string type = j["type"];
LOG_INFO("msg type: {}", type.c_str());
switch (HASH_STRING_PIECE(type.c_str())) {
case "ws_connection_id"_H: {
ws_connection_id_ = j["ws_connection_id"].get<unsigned int>();
LOG_INFO("Receive local peer websocket connection id [{}]",
ws_connection_id_);
signal_status_ = SignalStatus::Connected;
break;
}
case "transmission_id"_H: {
if (j["status"].get<std::string>() == "success") {
transmission_id_ = j["transmission_id"].get<std::string>();
LOG_INFO("Create transmission success with id [{}]",
transmission_id_);
} else if (j["status"].get<std::string>() == "fail") {
LOG_WARN("Create transmission failed with id [{}], due to [{}]",
transmission_id_, j["reason"].get<std::string>().c_str());
}
break;
}
case "user_leave_transmission"_H: {
std::string user_id = j["user_id"];
LOG_INFO("Receive notification: user id [{}] leave transmission",
user_id);
auto user_id_it = ice_transmission_list_.find(user_id);
if (user_id_it != ice_transmission_list_.end()) {
user_id_it->second->DestroyIceTransmission();
delete user_id_it->second;
user_id_it->second = nullptr;
ice_transmission_list_.erase(user_id_it);
LOG_INFO("Terminate transmission to user [{}]", user_id);
}
break;
}
case "offer"_H: {
std::string remote_sdp = j["sdp"].get<std::string>();
if (remote_sdp.empty()) {
LOG_INFO("Invalid remote sdp");
} else {
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
LOG_INFO("Receive remote sdp from [{}]", remote_user_id);
ice_transmission_list_[remote_user_id] = new IceTransmission(
false, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_);
ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_[remote_user_id]->SetTransmissionId(
transmission_id_);
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
ice_transmission_list_[remote_user_id]->GatherCandidates();
}
break;
}
default: {
// ice_transmission_->OnReceiveMessage(msg);
break;
}
}
};
on_receive_ice_msg_ = [this](const char *data, size_t size) {
std::string msg(data, size);
LOG_INFO("Receive data: [{}]", msg.c_str());
};
ws_transport_ = new WsTransmission(on_receive_ws_msg_);
uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_;
if (ws_transport_) {
ws_transport_->Connect(uri_);
}
do {
// LOG_INFO("GetSignalStatus = {}", GetSignalStatus());
} while (SignalStatus::Connected != GetSignalStatus());
json message = {{"type", "create_transmission"},
{"user_id", user_id},
{"transmission_id", transmission_id}};
if (ws_transport_) {
ws_transport_->Send(message.dump());
// LOG_INFO("Send create transmission request: {}", message.dump());
LOG_INFO("Send create transmission request, transmission_id [{}]",
transmission_id);
}
return 0;
}
int PeerConnection::Join(PeerConnectionParams params,
int PeerConnection::Init(PeerConnectionParams params,
const std::string &transmission_id,
const std::string &user_id) {
// Todo: checkout user_id unique or not
@@ -157,11 +31,81 @@ int PeerConnection::Join(PeerConnectionParams params,
signal_server_port_ = stoi(cfg_signal_server_port_);
stun_server_port_ = stoi(cfg_stun_server_port_);
on_receive_ws_msg_ = [this](const std::string &msg) {
auto j = json::parse(msg);
LOG_INFO("stun server ip [{}] port [{}]", cfg_stun_server_ip_,
stun_server_port_);
on_receive_ws_msg_ = [this](const std::string &msg) { ProcessSignal(msg); };
on_receive_ice_msg_ = [this](const char *data, size_t size) {
std::string msg(data, size);
LOG_INFO("Receive data: [{}]", msg.c_str());
};
ws_transport_ = new WsTransmission(on_receive_ws_msg_);
uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_;
if (ws_transport_) {
ws_transport_->Connect(uri_);
}
do {
} while (SignalStatus::Connected != GetSignalStatus());
return 0;
}
int PeerConnection::Create(PeerConnectionParams params,
const std::string &transmission_id,
const std::string &user_id) {
int ret = 0;
ret = Init(params, transmission_id, user_id);
json message = {{"type", "create_transmission"},
{"user_id", user_id},
{"transmission_id", transmission_id}};
if (ws_transport_) {
ws_transport_->Send(message.dump());
LOG_INFO("Send create transmission request, transmission_id [{}]",
transmission_id);
}
return ret;
}
int PeerConnection::Join(PeerConnectionParams params,
const std::string &transmission_id,
const std::string &user_id) {
int ret = 0;
ret = Init(params, transmission_id, user_id);
transmission_id_ = transmission_id;
ret = RequestTransmissionMemberList(transmission_id_);
return ret;
}
void PeerConnection::ProcessSignal(const std::string &signal) {
auto j = json::parse(signal);
std::string type = j["type"];
LOG_INFO("msg type: {}", type);
LOG_INFO("signal type: {}", type);
switch (HASH_STRING_PIECE(type.c_str())) {
case "ws_connection_id"_H: {
ws_connection_id_ = j["ws_connection_id"].get<unsigned int>();
LOG_INFO("Receive local peer websocket connection id [{}]",
ws_connection_id_);
signal_status_ = SignalStatus::Connected;
break;
}
case "transmission_id"_H: {
if (j["status"].get<std::string>() == "success") {
transmission_id_ = j["transmission_id"].get<std::string>();
LOG_INFO("Create transmission success with id [{}]", transmission_id_);
} else if (j["status"].get<std::string>() == "fail") {
LOG_WARN("Create transmission failed with id [{}], due to [{}]",
transmission_id_, j["reason"].get<std::string>().c_str());
}
break;
}
case "user_id_list"_H: {
user_id_list_ = j["user_id_list"];
std::string transmission_id = j["transmission_id"];
@@ -173,9 +117,9 @@ int PeerConnection::Join(PeerConnectionParams params,
LOG_INFO("]");
for (auto &remote_user_id : user_id_list_) {
ice_transmission_list_[remote_user_id] = new IceTransmission(
true, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_);
ice_transmission_list_[remote_user_id] =
new IceTransmission(true, transmission_id, user_id_, remote_user_id,
ws_transport_, on_receive_ice_msg_);
ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_[remote_user_id]->JoinTransmission();
@@ -197,13 +141,6 @@ int PeerConnection::Join(PeerConnectionParams params,
}
break;
}
case "ws_connection_id"_H: {
ws_connection_id_ = j["ws_connection_id"].get<unsigned int>();
LOG_INFO("Receive local peer websocket connection id [{}]",
ws_connection_id_);
signal_status_ = SignalStatus::Connected;
break;
}
case "offer"_H: {
std::string remote_sdp = j["sdp"].get<std::string>();
@@ -213,7 +150,7 @@ int PeerConnection::Join(PeerConnectionParams params,
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
LOG_INFO("Receive remote sdp from [{}]", remote_user_id);
LOG_INFO("[{}] receive offer from [{}]", user_id_, remote_user_id);
ice_transmission_list_[remote_user_id] = new IceTransmission(
false, transmission_id, user_id_, remote_user_id, ws_transport_,
@@ -231,7 +168,7 @@ int PeerConnection::Join(PeerConnectionParams params,
}
break;
}
case "remote_sdp"_H: {
case "answer"_H: {
std::string remote_sdp = j["sdp"].get<std::string>();
if (remote_sdp.empty()) {
LOG_INFO("remote_sdp is empty");
@@ -240,8 +177,7 @@ int PeerConnection::Join(PeerConnectionParams params,
std::string sdp = j["sdp"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
LOG_INFO("Receive remote sdp from [{}]", remote_user_id);
// LOG_INFO("Receive remote sdp [{}]", remote_sdp);
LOG_INFO("[{}] receive answer from [{}]", user_id_, remote_user_id);
if (ice_transmission_list_.find(remote_user_id) !=
ice_transmission_list_.end()) {
@@ -255,27 +191,6 @@ int PeerConnection::Join(PeerConnectionParams params,
break;
}
}
};
on_receive_ice_msg_ = [this](const char *data, size_t size) {
std::string msg(data, size);
LOG_INFO("Receive data: [{}]", msg.c_str());
};
transmission_id_ = transmission_id;
ws_transport_ = new WsTransmission(on_receive_ws_msg_);
uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_;
if (ws_transport_) {
ws_transport_->Connect(uri_);
}
do {
// LOG_INFO("GetSignalStatus = {}", GetSignalStatus());
} while (SignalStatus::Connected != GetSignalStatus());
RequestTransmissionMemberList(transmission_id_);
return 0;
}
int PeerConnection::RequestTransmissionMemberList(

View File

@@ -29,16 +29,24 @@ class PeerConnection {
int Create(PeerConnectionParams params,
const std::string &transmission_id = "",
const std::string &user_id = "");
int Join(PeerConnectionParams params, const std::string &transmission_id,
const std::string &user_id = "");
int Destroy();
int RequestTransmissionMemberList(const std::string &transmission_id);
int Destroy();
SignalStatus GetSignalStatus();
int SendData(const char *data, size_t size);
private:
int Init(PeerConnectionParams params, const std::string &transmission_id,
const std::string &user_id);
void ProcessSignal(const std::string &signal);
int RequestTransmissionMemberList(const std::string &transmission_id);
private:
std::string uri_ = "";
std::string cfg_signal_server_ip_;

View File

@@ -66,6 +66,12 @@ bool SignalServer::on_close(websocketpp::connection_hdl hdl) {
std::vector<std::string> user_id_list =
transmission_manager_.GetAllUserIdOfTransmission(transmission_id);
if (user_id_list.empty()) {
transmission_list_.erase(transmission_id);
LOG_INFO("Release transmission [{}] due to no user in this transmission",
transmission_id);
}
for (const auto& user_id : user_id_list) {
send_msg(transmission_manager_.GetWsHandle(user_id), message);
}
@@ -196,7 +202,7 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl,
// LOG_INFO("send answer sdp [{}]", sdp);
LOG_INFO("[{}] send answer to [{}]", user_id, remote_user_id);
json message = {{"type", "remote_sdp"},
json message = {{"type", "answer"},
{"sdp", sdp},
{"remote_user_id", user_id},
{"transmission_id", transmission_id}};