Support mesh connection

This commit is contained in:
dijunkun
2023-08-23 09:43:05 +08:00
parent d5c1c26fc9
commit 5a38aabb55
10 changed files with 151 additions and 114 deletions

View File

@@ -15,11 +15,13 @@ const std::vector<std::string> ice_status = {
"JUICE_STATE_COMPLETED", "JUICE_STATE_FAILED"};
IceTransmission::IceTransmission(
bool offer_peer, std::string remote_ice_username,
WsTransmission *ice_ws_transmission,
bool offer_peer, std::string &transmission_id, std::string &user_id,
std::string &remote_user_id, WsTransmission *ice_ws_transmission,
std::function<void(const char *, size_t)> on_receive_ice_msg)
: offer_peer_(offer_peer),
remote_ice_username_(remote_ice_username),
transmission_id_(transmission_id),
user_id_(user_id),
remote_user_id_(remote_user_id),
ice_ws_transport_(ice_ws_transmission),
on_receive_ice_msg_cb_(on_receive_ice_msg) {}
@@ -99,12 +101,8 @@ int IceTransmission::SetTransmissionId(const std::string &transmission_id) {
return 0;
}
int IceTransmission::JoinTransmission(const std::string &transmission_id,
const std::string &user_id) {
int IceTransmission::JoinTransmission() {
LOG_INFO("Join transport");
offer_peer_ = true;
transmission_id_ = transmission_id;
user_id_ = user_id;
// if (SignalStatus::Connected != signal_status_) {
// LOG_ERROR("Not connect to signalserver");
@@ -148,10 +146,10 @@ int IceTransmission::SendOffer() {
json message = {{"type", "offer"},
{"transmission_id", transmission_id_},
{"user_id", user_id_},
{"remote_peer", remote_ice_username_},
{"remote_user_id", remote_user_id_},
{"sdp", local_sdp_}};
// LOG_INFO("Send offer:\n{}", message.dump().c_str());
LOG_INFO("Send offer");
LOG_INFO("Send offer:\n{}", message.dump());
// LOG_INFO("Send offer");
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
@@ -179,7 +177,8 @@ int IceTransmission::SendAnswer() {
json message = {{"type", "answer"},
{"transmission_id", transmission_id_},
{"sdp", local_sdp_},
{"guest", remote_ice_username_}};
{"user_id", user_id_},
{"remote_user_id", remote_user_id_}};
LOG_INFO("[{}] Send answer to [{}]", GetIceUsername(local_sdp_),
remote_ice_username_);

View File

@@ -8,7 +8,8 @@
class IceTransmission {
public:
IceTransmission(bool offer_peer, std::string remote_ice_username,
IceTransmission(bool offer_peer, std::string &transmission_id,
std::string &user_id, std::string &remote_user_id,
WsTransmission *ice_ws_transmission,
std::function<void(const char *, size_t)> on_receive_ice_msg);
@@ -19,8 +20,7 @@ class IceTransmission {
int DestroyIceTransmission();
int CreateTransmission(const std::string &transmission_id);
int JoinTransmission(const std::string &transmission_id,
const std::string &user_id);
int JoinTransmission();
int SetTransmissionId(const std::string &transmission_id);
@@ -64,6 +64,7 @@ class IceTransmission {
unsigned int connection_id_ = 0;
std::string transmission_id_ = "";
std::string user_id_ = "";
std::string remote_user_id_ = "";
bool offer_peer_ = true;
std::string remote_ice_username_ = "";
};

View File

@@ -25,7 +25,8 @@ typedef struct {
PeerPtr* CreatePeer(const Params* params);
int CreateConnection(PeerPtr* peer_ptr);
int CreateConnection(PeerPtr* peer_ptr, const char* transmission_id,
const char* user_id);
int JoinConnection(PeerPtr* peer_ptr, const char* transmission_id,
const char* user_id);

View File

@@ -67,24 +67,24 @@ int PeerConnection::Create(PeerConnectionParams params,
if (remote_sdp.empty()) {
LOG_INFO("Invalid remote sdp");
} else {
std::string ice_username = GetIceUsername(remote_sdp);
LOG_INFO("Receive remote sdp from [{}]", ice_username);
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
LOG_INFO("Receive remote sdp from [{}]", remote_user_id);
// IceTransmission *ice_transmission =
// new IceTransmission(false, ws_transport_, on_receive_ice_msg_);
ice_transmission_list_[remote_user_id] = new IceTransmission(
false, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_);
ice_transmission_list_[ice_username] = new IceTransmission(
false, ice_username, ws_transport_, on_receive_ice_msg_);
ice_transmission_list_[ice_username]->InitIceTransmission(
ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_[ice_username]->SetTransmissionId(
ice_transmission_list_[remote_user_id]->SetTransmissionId(
transmission_id_);
ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp);
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
ice_transmission_list_[ice_username]->GatherCandidates();
ice_transmission_list_[remote_user_id]->GatherCandidates();
}
break;
}
@@ -155,23 +155,32 @@ int PeerConnection::Join(PeerConnectionParams params,
}
LOG_INFO("]");
if (transmission_member_list_.size() == 1 &&
transmission_member_list_[0] == "host") {
ice_transmission_list_["host"] = new IceTransmission(
true, "host", ws_transport_, on_receive_ice_msg_);
ice_transmission_list_["host"]->InitIceTransmission(
// if (transmission_member_list_.size() == 1 &&
// transmission_member_list_[0] == "host") {
// ice_transmission_list_["host"] = new IceTransmission(
// true, "host", ws_transport_, on_receive_ice_msg_);
// ice_transmission_list_["host"]->InitIceTransmission(
// cfg_stun_server_ip_, stun_server_port_);
// ice_transmission_list_["host"]->JoinTransmission(transmission_id,
// user_id_);
// } else {
// for (auto &member : transmission_member_list_) {
// ice_transmission_list_[member] = new IceTransmission(
// true, member, ws_transport_, on_receive_ice_msg_);
// ice_transmission_list_[member]->InitIceTransmission(
// cfg_stun_server_ip_, stun_server_port_);
// ice_transmission_list_[member]->JoinTransmission(transmission_id,
// user_id_);
// }
// }
for (auto &remote_user_id : transmission_member_list_) {
ice_transmission_list_[remote_user_id] = new IceTransmission(
true, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_);
ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_["host"]->JoinTransmission(transmission_id,
user_id_);
} else {
for (auto &member : transmission_member_list_) {
ice_transmission_list_[member] = new IceTransmission(
true, member, ws_transport_, on_receive_ice_msg_);
ice_transmission_list_[member]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_[member]->JoinTransmission(transmission_id,
user_id_);
}
ice_transmission_list_[remote_user_id]->JoinTransmission();
}
break;
@@ -189,20 +198,24 @@ int PeerConnection::Join(PeerConnectionParams params,
if (remote_sdp.empty()) {
LOG_INFO("Invalid remote sdp");
} else {
std::string ice_username = GetIceUsername(remote_sdp);
LOG_INFO("Receive remote sdp from [{}]", ice_username);
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
LOG_INFO("Receive remote sdp from [{}]", remote_user_id);
// IceTransmission *ice_transmission =
// new IceTransmission(false, ws_transport_, on_receive_ice_msg_);
ice_transmission_list_[remote_user_id] = new IceTransmission(
false, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_);
ice_transmission_list_[ice_username] = new IceTransmission(
false, ice_username, ws_transport_, on_receive_ice_msg_);
ice_transmission_list_[ice_username]->InitIceTransmission(
ice_transmission_list_[remote_user_id]->InitIceTransmission(
cfg_stun_server_ip_, stun_server_port_);
ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp);
ice_transmission_list_[remote_user_id]->SetTransmissionId(
transmission_id_);
ice_transmission_list_[ice_username]->GatherCandidates();
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
ice_transmission_list_[remote_user_id]->GatherCandidates();
}
break;
}
@@ -211,28 +224,16 @@ int PeerConnection::Join(PeerConnectionParams params,
if (remote_sdp.empty()) {
LOG_INFO("remote_sdp is empty");
} else {
std::string ice_username = GetIceUsername(remote_sdp);
LOG_INFO("Receive remote sdp from [{}]", ice_username);
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
LOG_INFO("Receive remote sdp from [{}]", remote_user_id);
// LOG_INFO("Receive remote sdp [{}]", remote_sdp);
// if (ice_transmission_list_.size() == 1 &&
// ice_transmission_list_.begin()->first == "host") {
// ice_transmission_list_["host"]->SetRemoteSdp(remote_sdp);
// } else if (ice_transmission_list_.find(ice_username) ==
// ice_transmission_list_.end()) {
// ice_transmission_list_[ice_username] = new IceTransmission(
// false, ice_username, ws_transport_, on_receive_ice_msg_);
// ice_transmission_list_[ice_username]->InitIceTransmission(
// cfg_stun_server_ip_, stun_server_port_);
// ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp);
// }
if (ice_transmission_list_.size() == 1 &&
ice_transmission_list_.begin()->first == "host") {
ice_transmission_list_["host"]->SetRemoteSdp(remote_sdp);
} else if (ice_transmission_list_.find(ice_username) !=
if (ice_transmission_list_.find(remote_user_id) !=
ice_transmission_list_.end()) {
ice_transmission_list_[ice_username]->SetRemoteSdp(remote_sdp);
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
}
// if (!offer_peer_) {

View File

@@ -22,10 +22,10 @@ PeerPtr *CreatePeer(const Params *params) {
return peer_ptr;
}
int CreateConnection(PeerPtr *peer_ptr) {
peer_ptr->peer_connection->Create(peer_ptr->pc_params);
return 0;
}
// int CreateConnection(PeerPtr *peer_ptr) {
// peer_ptr->peer_connection->Create(peer_ptr->pc_params);
// return 0;
// }
int CreateConnection(PeerPtr *peer_ptr, const char *transmission_id,
const char *user_id) {

View File

@@ -14,7 +14,7 @@ int main(int argc, char** argv) {
PeerPtr* peer = CreatePeer(&params);
JoinConnection(peer, transmission_id.c_str(), user_id.c_str());
std::string msg = "Offer peer";
std::string msg = "[" + user_id + "] Offer peer";
int i = 100;
while (i--) {

View File

@@ -6,10 +6,12 @@ int main(int argc, char** argv) {
Params params;
params.cfg_path = "../../../../config/config.ini";
std::string transmission_id = "000000";
std::string user_id = argv[1];
PeerPtr* peer = CreatePeer(&params);
CreateConnection(peer);
CreateConnection(peer, transmission_id.c_str(), user_id.c_str());
std::string msg = "Answer peer";
std::string msg = "[" + user_id + "] Answer peer";
int i = 100;
while (i--) {

View File

@@ -97,8 +97,8 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl,
case "create_transmission"_H: {
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string user_id = j["user_id"].get<std::string>();
LOG_INFO("Receive create transmission request with id [{}]",
transmission_id);
LOG_INFO("Receive user id [{}] create transmission request with id [{}]",
user_id, transmission_id);
if (transmission_list_.find(transmission_id) ==
transmission_list_.end()) {
if (transmission_id.empty()) {
@@ -114,11 +114,12 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl,
}
transmission_list_.insert(transmission_id);
transmission_manager_.BindWsHandleToTransmission(hdl, transmission_id);
// transmission_manager_.BindWsHandleToTransmission(hdl,
// transmission_id);
transmission_manager_.BindUserIdToTransmission(user_id,
transmission_id);
transmission_manager_.BindUserIdToWsHandle(user_id, hdl);
transmission_manager_.BindUserNameToUserId("host", user_id);
// transmission_manager_.BindUserNameToUserId("host", user_id);
// if (transmission_manager_.GetUsername(hdl).empty()) {
// transmission_manager_.BindUsernameToWsHandle("host", hdl);
@@ -157,19 +158,25 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl,
case "offer"_H: {
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string remote_peer = j["remote_peer"].get<std::string>();
// LOG_INFO("Receive transmission id [{}] with offer sdp [{}]",
// transmission_id, sdp);
transmission_manager_.BindWsHandleToTransmission(hdl, transmission_id);
std::string offer_peer = GetIceUsername(sdp);
transmission_manager_.BindUsernameToWsHandle(offer_peer, hdl);
std::string user_id = j["user_id"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
// transmission_manager_.BindWsHandleToTransmission(hdl, transmission_id);
// std::string offer_peer = GetIceUsername(sdp);
// transmission_manager_.BindUsernameToWsHandle(offer_peer, hdl);
transmission_manager_.BindUserIdToTransmission(user_id, transmission_id);
transmission_manager_.BindUserIdToWsHandle(user_id, hdl);
websocketpp::connection_hdl destination_hdl =
transmission_manager_.GetWsHandle(remote_peer);
transmission_manager_.GetWsHandle(remote_user_id);
json message = {{"type", "offer"}, {"sdp", sdp}};
json message = {{"type", "offer"},
{"sdp", sdp},
{"remote_user_id", user_id},
{"transmission_id", transmission_id}};
LOG_INFO("[{}] send offer sdp to [{}]", offer_peer, remote_peer);
LOG_INFO("[{}] send offer sdp to [{}]", user_id, remote_user_id);
send_msg(destination_hdl, message);
break;
@@ -177,22 +184,33 @@ void SignalServer::on_message(websocketpp::connection_hdl hdl,
case "answer"_H: {
std::string transmission_id = j["transmission_id"].get<std::string>();
std::string sdp = j["sdp"].get<std::string>();
std::string guest_ice_username = j["guest"].get<std::string>();
std::string host_ice_username = GetIceUsername(sdp);
if (transmission_manager_.GetUsername(hdl) == "host") {
LOG_INFO("Update transmission [{}] [host] to [{}]", transmission_id,
host_ice_username);
transmission_manager_.UpdateUsernameToWsHandle(host_ice_username, hdl);
}
std::string user_id = j["user_id"].get<std::string>();
std::string remote_user_id = j["remote_user_id"].get<std::string>();
websocketpp::connection_hdl guest_hdl =
transmission_manager_.GetWsHandle(guest_ice_username);
// transmission_manager_.BindUserIdToTransmission(user_id,
// transmission_id); transmission_manager_.BindUserIdToWsHandle(user_id,
// hdl);
websocketpp::connection_hdl destination_hdl =
transmission_manager_.GetWsHandle(remote_user_id);
// if (transmission_manager_.GetUsername(hdl) == "host") {
// LOG_INFO("Update transmission [{}] [host] to [{}]", transmission_id,
// host_ice_username);
// transmission_manager_.UpdateUsernameToWsHandle(host_ice_username,
// hdl);
// }
// websocketpp::connection_hdl guest_hdl =
// transmission_manager_.GetWsHandle(guest_ice_username);
// LOG_INFO("send answer sdp [{}]", sdp);
LOG_INFO("[{}] send answer sdp to [{}]", host_ice_username,
guest_ice_username);
json message = {{"type", "remote_sdp"}, {"sdp", sdp}};
send_msg(guest_hdl, message);
LOG_INFO("[{}] send answer sdp to [{}]", user_id, remote_user_id);
json message = {{"type", "remote_sdp"},
{"sdp", sdp},
{"remote_user_id", user_id},
{"transmission_id", transmission_id}};
send_msg(destination_hdl, message);
break;
}
case "offer_candidate"_H: {

View File

@@ -132,13 +132,15 @@ websocketpp::connection_hdl TransmissionManager::GetGuestWsHandle(
std::vector<std::string> TransmissionManager::GetAllMembersOfTransmission(
const std::string& transmission_id) {
std::vector<std::string> member_list;
for (auto guest_hdl : GetAllGuestsOfTransmission(transmission_id)) {
member_list.push_back(GetUsername(guest_hdl));
// for (auto guest_hdl : GetAllGuestsOfTransmission(transmission_id)) {
// member_list.push_back(GetUsername(guest_hdl));
// }
if (transmission_user_id_list_.find(transmission_id) !=
transmission_user_id_list_.end()) {
return transmission_user_id_list_[transmission_id];
}
return member_list;
return std::vector<std::string>();
}
bool TransmissionManager::BindWsHandleToTransmission(
@@ -165,6 +167,7 @@ bool TransmissionManager::BindUserIdToTransmission(
const std::string& user_id, const std::string& transmission_id) {
if (transmission_user_id_list_.find(transmission_id) ==
transmission_user_id_list_.end()) {
LOG_INFO("Add user id [{}] to transmission [{}]", user_id, transmission_id);
transmission_user_id_list_[transmission_id].push_back(user_id);
return true;
} else {
@@ -177,6 +180,7 @@ bool TransmissionManager::BindUserIdToTransmission(
}
}
transmission_user_id_list_[transmission_id].push_back(user_id);
LOG_INFO("Add user id [{}] to transmission [{}]", user_id, transmission_id);
}
return true;
}
@@ -249,10 +253,20 @@ std::string TransmissionManager::GetUsername(websocketpp::connection_hdl hdl) {
return "";
}
// websocketpp::connection_hdl TransmissionManager::GetWsHandle(
// const std::string& username) {
// if (username_ws_hdl_list_.find(username) != username_ws_hdl_list_.end()) {
// return username_ws_hdl_list_[username];
// } else {
// websocketpp::connection_hdl hdl;
// return hdl;
// }
// }
websocketpp::connection_hdl TransmissionManager::GetWsHandle(
const std::string& username) {
if (username_ws_hdl_list_.find(username) != username_ws_hdl_list_.end()) {
return username_ws_hdl_list_[username];
const std::string& user_id) {
if (user_id_ws_hdl_list_.find(user_id) != user_id_ws_hdl_list_.end()) {
return user_id_ws_hdl_list_[user_id];
} else {
websocketpp::connection_hdl hdl;
return hdl;

View File

@@ -56,7 +56,8 @@ class TransmissionManager {
bool UpdateUsernameToWsHandle(const std::string& username,
websocketpp::connection_hdl hdl);
std::string GetUsername(websocketpp::connection_hdl hdl);
websocketpp::connection_hdl GetWsHandle(const std::string& username);
// websocketpp::connection_hdl GetWsHandle(const std::string& username);
websocketpp::connection_hdl GetWsHandle(const std::string& user_id);
private:
std::map<std::string, websocketpp::connection_hdl> transmission_host_list_;