Remove user from transmission when websocket closed

This commit is contained in:
dijunkun
2023-08-23 17:16:20 +08:00
parent 5a38aabb55
commit 932944ad86
7 changed files with 110 additions and 442 deletions

View File

@@ -19,8 +19,6 @@ int IceAgent::CreateIceAgent(juice_cb_state_changed_t on_state_changed,
juice_config_t config;
memset(&config, 0, sizeof(config));
LOG_INFO("stun server ip[{}] port[{}]", ip_, port_);
// STUN server example
config.stun_server_host = ip_.c_str();
config.stun_server_port = port_;
@@ -38,7 +36,6 @@ int IceAgent::CreateIceAgent(juice_cb_state_changed_t on_state_changed,
int IceAgent::DestoryIceAgent() {
juice_destroy(agent_);
return 0;
}
@@ -50,25 +47,15 @@ char *IceAgent::GenerateLocalSdp() {
juice_get_local_description(agent_, local_sdp_, JUICE_MAX_SDP_STRING_LEN);
// LOG_INFO("Generate local sdp:[\n{}]", local_sdp_);
LOG_INFO("Generate local sdp");
return local_sdp_;
}
int IceAgent::SetRemoteSdp(const char *remote_sdp) {
LOG_INFO("[{}] Set remote sdp", (void *)this);
juice_set_remote_description(agent_, remote_sdp);
// LOG_INFO("Remote description:[\n{}]", remote_sdp);
return 0;
return juice_set_remote_description(agent_, remote_sdp);
}
int IceAgent::GatherCandidates() {
LOG_INFO("[{}] Gather candidates", (void *)this);
juice_gather_candidates(agent_);
return 0;
}
int IceAgent::GatherCandidates() { return juice_gather_candidates(agent_); }
juice_state_t IceAgent::GetIceState() {
state_ = juice_get_state(agent_);
@@ -110,18 +97,13 @@ bool IceAgent::GetSelectedAddresses() {
}
int IceAgent::AddRemoteCandidates(const char *remote_candidates) {
juice_add_remote_candidate(agent_, remote_candidates);
return 0;
return juice_add_remote_candidate(agent_, remote_candidates);
}
int IceAgent::SetRemoteGatheringDone() {
juice_set_remote_gathering_done(agent_);
return 0;
return juice_set_remote_gathering_done(agent_);
}
int IceAgent::Send(const char *data, size_t size) {
juice_send(agent_, data, size);
return 0;
return juice_send(agent_, data, size);
}

View File

@@ -32,7 +32,14 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
ice_agent_->CreateIceAgent(
[](juice_agent_t *agent, juice_state_t state, void *user_ptr) {
LOG_INFO("state_change: {}", ice_status[state]);
if (user_ptr) {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr);
LOG_INFO("[{}->{}] state_change: {}", ice_transmission_obj->user_id_,
ice_transmission_obj->remote_user_id_, ice_status[state]);
} else {
LOG_INFO("state_change: {}", ice_status[state]);
}
},
[](juice_agent_t *agent, const char *sdp, void *user_ptr) {
// LOG_INFO("candadite: {}", sdp);
@@ -41,15 +48,15 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
// *>(user_ptr)->SendOfferLocalCandidate(sdp);
},
[](juice_agent_t *agent, void *user_ptr) {
LOG_INFO("gather_done");
// non-trickle
if (user_ptr) {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr);
LOG_INFO("[{}] gather_done", ice_transmission_obj->user_id_);
if (ice_transmission_obj->offer_peer_) {
ice_transmission_obj->GetLocalSdp();
ice_transmission_obj->SendOffer();
LOG_INFO("[{}] SendOffer", (void *)ice_transmission_obj)
} else {
ice_transmission_obj->CreateAnswer();
ice_transmission_obj->SendAnswer();
@@ -104,29 +111,25 @@ int IceTransmission::SetTransmissionId(const std::string &transmission_id) {
int IceTransmission::JoinTransmission() {
LOG_INFO("Join transport");
// if (SignalStatus::Connected != signal_status_) {
// LOG_ERROR("Not connect to signalserver");
// return -1;
// }
// QueryRemoteSdp(transmission_id);
CreateOffer();
return 0;
}
int IceTransmission::GatherCandidates() {
ice_agent_->GatherCandidates();
LOG_INFO("[{}] Gather candidates", user_id_);
return 0;
}
int IceTransmission::GetLocalSdp() {
local_sdp_ = ice_agent_->GenerateLocalSdp();
LOG_INFO("Local ice username: [{}]", GetIceUsername(local_sdp_));
LOG_INFO("[{}] generate local sdp", user_id_);
return 0;
}
int IceTransmission::SetRemoteSdp(const std::string &remote_sdp) {
ice_agent_->SetRemoteSdp(remote_sdp.c_str());
LOG_INFO("[{}] set remote sdp", user_id_);
remote_ice_username_ = GetIceUsername(remote_sdp);
return 0;
}
@@ -137,7 +140,7 @@ int IceTransmission::AddRemoteCandidate(const std::string &remote_candidate) {
}
int IceTransmission::CreateOffer() {
LOG_INFO("[{}] Create offer", (void *)this);
LOG_INFO("[{}] create offer", user_id_);
GatherCandidates();
return 0;
}
@@ -148,11 +151,11 @@ int IceTransmission::SendOffer() {
{"user_id", user_id_},
{"remote_user_id", remote_user_id_},
{"sdp", local_sdp_}};
LOG_INFO("Send offer:\n{}", message.dump());
// LOG_INFO("Send offer");
// LOG_INFO("Send offer:\n{}", message.dump());
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
LOG_INFO("[{}->{}] send offer", user_id_, remote_user_id_);
}
return 0;
}
@@ -160,7 +163,7 @@ int IceTransmission::SendOffer() {
int IceTransmission::QueryRemoteSdp(std::string transmission_id) {
json message = {{"type", "query_remote_sdp"},
{"transmission_id", transmission_id_}};
LOG_INFO("Query remote sdp");
LOG_INFO("[{}] query remote sdp", user_id_);
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
@@ -180,11 +183,9 @@ int IceTransmission::SendAnswer() {
{"user_id", user_id_},
{"remote_user_id", remote_user_id_}};
LOG_INFO("[{}] Send answer to [{}]", GetIceUsername(local_sdp_),
remote_ice_username_);
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
LOG_INFO("[{}->{}] send answer", user_id_, remote_user_id_);
}
return 0;
}
@@ -195,7 +196,7 @@ int IceTransmission::SendOfferLocalCandidate(
{"transmission_id", transmission_id_},
{"sdp", remote_candidate}};
// LOG_INFO("Send candidate:\n{}", message.dump().c_str());
LOG_INFO("Send candidate");
LOG_INFO("[{}] send candidate", user_id_);
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
@@ -209,7 +210,7 @@ int IceTransmission::SendAnswerLocalCandidate(
{"transmission_id", transmission_id_},
{"sdp", remote_candidate}};
// LOG_INFO("Send candidate:\n{}", message.dump().c_str());
LOG_INFO("Send candidate");
LOG_INFO("[{}] send candidate", user_id_);
if (ice_ws_transport_) {
ice_ws_transport_->Send(message.dump());
@@ -223,63 +224,17 @@ int IceTransmission::SendData(const char *data, size_t size) {
}
void IceTransmission::OnReceiveMessage(const std::string &msg) {
auto j = json::parse(msg);
// auto j = json::parse(msg);
// LOG_INFO("msg: {}", msg.c_str());
std::string type = j["type"];
// std::string type = j["type"];
switch (HASH_STRING_PIECE(type.c_str())) {
case "offer"_H: {
remote_sdp_ = j["sdp"].get<std::string>();
if (remote_sdp_.empty()) {
LOG_INFO("Invalid remote sdp");
} else {
// LOG_INFO("Receive remote sdp [{}]", remote_sdp_);
LOG_INFO("Receive remote sdp");
SetRemoteSdp(remote_sdp_);
GatherCandidates();
}
break;
}
case "transmission_id"_H: {
if (j["status"].get<std::string>() == "success") {
transmission_id_ = j["transmission_id"].get<std::string>();
LOG_INFO("Create transmission success with id [{}]", transmission_id_);
// SendOffer();
} else if (j["status"].get<std::string>() == "fail") {
LOG_WARN("Create transmission failed with id [{}], due to [{}]",
transmission_id_, j["reason"].get<std::string>().c_str());
}
break;
}
case "remote_sdp"_H: {
remote_sdp_ = j["sdp"].get<std::string>();
if (remote_sdp_.empty()) {
LOG_INFO("Offer peer not ready, wait 1 second and requery remote sdp");
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
QueryRemoteSdp(transmission_id_);
} else {
// LOG_INFO("Receive remote sdp [{}]", remote_sdp_);
LOG_INFO("Receive remote sdp");
SetRemoteSdp(remote_sdp_);
if (!offer_peer_) {
GatherCandidates();
}
}
break;
}
case "candidate"_H: {
std::string candidate = j["sdp"].get<std::string>();
// LOG_INFO("Receive candidate [{}]", candidate);
LOG_INFO("Receive candidate");
AddRemoteCandidate(candidate);
break;
}
default:
break;
}
// switch (HASH_STRING_PIECE(type.c_str())) {
// case "offer"_H: {
// remote_sdp_ = j["sdp"].get<std::string>();
// break;
// }
// default:
// break;
// }
}

View File

@@ -37,10 +37,13 @@ int PeerConnection::Create(PeerConnectionParams params,
signal_server_port_ = stoi(cfg_signal_server_port_);
stun_server_port_ = stoi(cfg_stun_server_port_);
LOG_INFO("stun server ip [{}] port [{}]", cfg_stun_server_ip_,
stun_server_port_);
on_receive_ws_msg_ = [this](const std::string &msg) {
auto j = json::parse(msg);
std::string type = j["type"];
LOG_INFO("msg type :{}", type.c_str());
LOG_INFO("msg type: {}", type.c_str());
switch (HASH_STRING_PIECE(type.c_str())) {
case "ws_connection_id"_H: {
ws_connection_id_ = j["ws_connection_id"].get<unsigned int>();
@@ -107,7 +110,7 @@ int PeerConnection::Create(PeerConnectionParams params,
}
do {
LOG_INFO("GetSignalStatus = {}", GetSignalStatus());
// LOG_INFO("GetSignalStatus = {}", GetSignalStatus());
} while (SignalStatus::Connected != GetSignalStatus());
json message = {{"type", "create_transmission"},
@@ -115,7 +118,9 @@ int PeerConnection::Create(PeerConnectionParams params,
{"transmission_id", transmission_id}};
if (ws_transport_) {
ws_transport_->Send(message.dump());
LOG_INFO("Send create transmission request: {}", message.dump().c_str());
// LOG_INFO("Send create transmission request: {}", message.dump());
LOG_INFO("Send create transmission request, transmission_id [{}]",
transmission_id);
}
return 0;
}
@@ -139,42 +144,21 @@ int PeerConnection::Join(PeerConnectionParams params,
stun_server_port_ = stoi(cfg_stun_server_port_);
on_receive_ws_msg_ = [this](const std::string &msg) {
// do {
// } while (ice_transmission_list_.empty());
auto j = json::parse(msg);
std::string type = j["type"];
LOG_INFO("msg type :{}", type);
LOG_INFO("msg type: {}", type);
switch (HASH_STRING_PIECE(type.c_str())) {
case "transmission_members"_H: {
transmission_member_list_ = j["transmission_members"];
case "user_id_list"_H: {
user_id_list_ = j["user_id_list"];
std::string transmission_id = j["transmission_id"];
LOG_INFO("Transmission [{}] members: [", transmission_id);
for (auto member : transmission_member_list_) {
LOG_INFO("{}", member);
for (auto user_id : user_id_list_) {
LOG_INFO("{}", user_id);
}
LOG_INFO("]");
// if (transmission_member_list_.size() == 1 &&
// transmission_member_list_[0] == "host") {
// ice_transmission_list_["host"] = new IceTransmission(
// true, "host", ws_transport_, on_receive_ice_msg_);
// ice_transmission_list_["host"]->InitIceTransmission(
// cfg_stun_server_ip_, stun_server_port_);
// ice_transmission_list_["host"]->JoinTransmission(transmission_id,
// user_id_);
// } else {
// for (auto &member : transmission_member_list_) {
// ice_transmission_list_[member] = new IceTransmission(
// true, member, ws_transport_, on_receive_ice_msg_);
// ice_transmission_list_[member]->InitIceTransmission(
// cfg_stun_server_ip_, stun_server_port_);
// ice_transmission_list_[member]->JoinTransmission(transmission_id,
// user_id_);
// }
// }
for (auto &remote_user_id : transmission_member_list_) {
for (auto &remote_user_id : user_id_list_) {
ice_transmission_list_[remote_user_id] = new IceTransmission(
true, transmission_id, user_id_, remote_user_id, ws_transport_,
on_receive_ice_msg_);
@@ -235,25 +219,11 @@ int PeerConnection::Join(PeerConnectionParams params,
ice_transmission_list_.end()) {
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
}
// if (!offer_peer_) {
// GatherCandidates();
// }
}
break;
}
case "candidate"_H: {
std::string remote_sdp_with_candidates = j["sdp"].get<std::string>();
std::string ice_username = GetIceUsername(remote_sdp_with_candidates);
LOG_INFO("Receive remote candidates from [{}]", ice_username);
// LOG_INFO("Receive candidate [{}]", candidate);
ice_transmission_list_[ice_username]->AddRemoteCandidate(
remote_sdp_with_candidates);
break;
}
default: {
ice_transmission_->OnReceiveMessage(msg);
// ice_transmission_->OnReceiveMessage(msg);
break;
}
}
@@ -272,22 +242,11 @@ int PeerConnection::Join(PeerConnectionParams params,
ws_transport_->Connect(uri_);
}
// ice_transmission_list_["self"] =
// new IceTransmission(true, ws_transport_, on_receive_ice_msg_);
// ice_transmission_list_["self"]->InitIceTransmission(cfg_stun_server_ip_,
// stun_server_port_);
// ice_transmission_ =
// new IceTransmission(true, ws_transport_, on_receive_ice_msg_);
// ice_transmission_->InitIceTransmission(cfg_stun_server_ip,
// stun_server_port);
do {
// LOG_INFO("GetSignalStatus = {}", GetSignalStatus());
} while (SignalStatus::Connected != GetSignalStatus());
RequestTransmissionMemberList(transmission_id_);
// ice_transmission_->JoinTransmission(transmission_id_);
// ice_transmission_list_["self"]->JoinTransmission(transmission_id_);
return 0;
}
@@ -295,7 +254,7 @@ int PeerConnection::RequestTransmissionMemberList(
const std::string &transmission_id) {
LOG_INFO("Request member list");
json message = {{"type", "query_members"},
json message = {{"type", "query_user_id_list"},
{"transmission_id", transmission_id_}};
if (ws_transport_) {

View File

@@ -49,13 +49,13 @@ class PeerConnection {
int stun_server_port_ = 0;
WsTransmission *ws_transport_ = nullptr;
IceTransmission *ice_transmission_ = nullptr;
std::vector<std::string> transmission_member_list_;
std::map<std::string, IceTransmission *> ice_transmission_list_;
std::function<void(const std::string &)> on_receive_ws_msg_ = nullptr;
std::function<void(const char *, size_t)> on_receive_ice_msg_ = nullptr;
unsigned int ws_connection_id_ = 0;
std::string user_id_ = "";
std::string transmission_id_ = "";
std::vector<std::string> user_id_list_;
SignalStatus signal_status_ = SignalStatus::Closed;
};