mirror of
https://github.com/kunkundi/crossdesk.git
synced 2025-10-26 20:25:34 +08:00
[feat] put ice agent into ice worker thread and use message queue to handle events
This commit is contained in:
@@ -27,8 +27,6 @@ PeerConnection::~PeerConnection() {
|
|||||||
load_nvcodec_dll_success = false;
|
load_nvcodec_dll_success = false;
|
||||||
|
|
||||||
user_data_ = nullptr;
|
user_data_ = nullptr;
|
||||||
|
|
||||||
ice_transmission_list_.clear();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int PeerConnection::Init(PeerConnectionParams params,
|
int PeerConnection::Init(PeerConnectionParams params,
|
||||||
@@ -189,19 +187,19 @@ int PeerConnection::Init(PeerConnectionParams params,
|
|||||||
} else if ("failed" == ice_status) {
|
} else if ("failed" == ice_status) {
|
||||||
ice_ready_ = false;
|
ice_ready_ = false;
|
||||||
on_connection_status_(ConnectionStatus::Failed, user_data_);
|
on_connection_status_(ConnectionStatus::Failed, user_data_);
|
||||||
LOG_INFO("Ice failed");
|
enable_turn_ = true;
|
||||||
} else {
|
} else {
|
||||||
ice_ready_ = false;
|
ice_ready_ = false;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
on_net_status_report_ = [this](int TransmissionId,
|
on_net_status_report_ = [this](int transmission_id,
|
||||||
IceTransmission::TraversalType mode,
|
IceTransmission::TraversalType mode,
|
||||||
const unsigned short send,
|
const unsigned short send,
|
||||||
const unsigned short receive, void *user_ptr) {
|
const unsigned short receive, void *user_ptr) {
|
||||||
if (net_status_report_) {
|
if (net_status_report_) {
|
||||||
net_status_report_(atoi(transmission_id_.c_str()), TraversalMode(mode),
|
net_status_report_(transmission_id, TraversalMode(mode), send, receive,
|
||||||
send, receive, user_data_);
|
user_data_);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -212,6 +210,16 @@ int PeerConnection::Init(PeerConnectionParams params,
|
|||||||
ws_transport_->Connect(uri_);
|
ws_transport_->Connect(uri_);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
StartIceWorker();
|
||||||
|
|
||||||
|
if (0 != CreateVideoCodec(hardware_acceleration_)) {
|
||||||
|
LOG_ERROR("Create video codec failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 != CreateAudioCodec()) {
|
||||||
|
LOG_ERROR("Create audio codec failed");
|
||||||
|
}
|
||||||
|
|
||||||
// do {
|
// do {
|
||||||
// } while (SignalStatus::SignalConnected != GetSignalStatus());
|
// } while (SignalStatus::SignalConnected != GetSignalStatus());
|
||||||
|
|
||||||
@@ -339,6 +347,7 @@ int PeerConnection::Create(const std::string &transmission_id,
|
|||||||
|
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
|
local_transmission_id_ = transmission_id;
|
||||||
password_ = password;
|
password_ = password;
|
||||||
|
|
||||||
json message = {{"type", "create_transmission"},
|
json message = {{"type", "create_transmission"},
|
||||||
@@ -366,8 +375,8 @@ int PeerConnection::Join(const std::string &transmission_id,
|
|||||||
password_ = password;
|
password_ = password;
|
||||||
leave_ = false;
|
leave_ = false;
|
||||||
|
|
||||||
transmission_id_ = transmission_id;
|
remote_transmission_id_ = transmission_id;
|
||||||
ret = RequestTransmissionMemberList(transmission_id_, password);
|
ret = RequestTransmissionMemberList(remote_transmission_id_, password);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -389,220 +398,32 @@ int PeerConnection::Leave(const std::string &transmission_id) {
|
|||||||
ice_ready_ = false;
|
ice_ready_ = false;
|
||||||
leave_ = true;
|
leave_ = true;
|
||||||
|
|
||||||
for (auto &user_id_it : ice_transmission_list_) {
|
IceWorkMsg msg;
|
||||||
user_id_it.second->DestroyIceTransmission();
|
msg.type = IceWorkMsg::Type::Destroy;
|
||||||
}
|
PushIceWorkMsg(msg);
|
||||||
|
|
||||||
ice_transmission_list_.clear();
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void PeerConnection::ProcessSignal(const std::string &signal) {
|
int PeerConnection::Destroy() {
|
||||||
auto j = json::parse(signal);
|
StopIceWorker();
|
||||||
std::string type = j["type"];
|
if (ws_transport_) {
|
||||||
// LOG_INFO("signal type: {}", type);
|
LOG_INFO("Close websocket")
|
||||||
switch (HASH_STRING_PIECE(type.c_str())) {
|
ws_transport_->Close();
|
||||||
case "login"_H: {
|
|
||||||
if (j["status"].get<std::string>() == "success") {
|
|
||||||
user_id_ = j["user_id"].get<std::string>();
|
|
||||||
net_status_report_(atoi(user_id_.c_str()), TraversalMode::UnknownMode,
|
|
||||||
0, 0, user_data_);
|
|
||||||
LOG_INFO("Login success with id [{}]", user_id_);
|
|
||||||
signal_status_ = SignalStatus::SignalConnected;
|
|
||||||
on_signal_status_(SignalStatus::SignalConnected, user_data_);
|
|
||||||
} else if (j["status"].get<std::string>() == "fail") {
|
|
||||||
LOG_WARN("Login failed with id [{}]", transmission_id_);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "transmission_id"_H: {
|
|
||||||
if (j["status"].get<std::string>() == "success") {
|
|
||||||
transmission_id_ = j["transmission_id"].get<std::string>();
|
|
||||||
user_id_ = transmission_id_;
|
|
||||||
LOG_INFO("Create transmission success with id [{}]", transmission_id_);
|
|
||||||
} else if (j["status"].get<std::string>() == "fail") {
|
|
||||||
LOG_WARN("Create transmission failed with id [{}], due to [{}]",
|
|
||||||
transmission_id_, j["reason"].get<std::string>().c_str());
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "user_id_list"_H: {
|
|
||||||
user_id_list_ = j["user_id_list"];
|
|
||||||
|
|
||||||
std::string transmission_id = j["transmission_id"].get<std::string>();
|
|
||||||
std::string status = j["status"].get<std::string>();
|
|
||||||
if (status == "failed") {
|
|
||||||
std::string reason = j["reason"].get<std::string>();
|
|
||||||
LOG_ERROR("{}", reason);
|
|
||||||
if ("Incorrect password" == reason) {
|
|
||||||
on_connection_status_(ConnectionStatus::IncorrectPassword,
|
|
||||||
user_data_);
|
|
||||||
} else if ("No such transmission id" == reason) {
|
|
||||||
on_connection_status_(ConnectionStatus::NoSuchTransmissionId,
|
|
||||||
user_data_);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (leave_) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (user_id_list_.empty()) {
|
|
||||||
LOG_WARN("Wait for host create transmission [{}]", transmission_id);
|
|
||||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
|
||||||
RequestTransmissionMemberList(transmission_id, password_);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_INFO("Transmission [{}] members: [", transmission_id);
|
|
||||||
for (auto user_id : user_id_list_) {
|
|
||||||
LOG_INFO("{}", user_id);
|
|
||||||
}
|
|
||||||
LOG_INFO("]");
|
|
||||||
|
|
||||||
if (0 != CreateVideoCodec(hardware_acceleration_)) {
|
|
||||||
LOG_ERROR("Create video codec failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 != CreateAudioCodec()) {
|
|
||||||
LOG_ERROR("Create audio codec failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto &remote_user_id : user_id_list_) {
|
|
||||||
// if (remote_user_id == user_id_) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
ice_transmission_list_[remote_user_id] =
|
|
||||||
std::make_unique<IceTransmission>(
|
|
||||||
enable_turn_, trickle_ice_, true, transmission_id, user_id_,
|
|
||||||
remote_user_id, ws_transport_, on_ice_status_change_);
|
|
||||||
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveVideoFunc(
|
|
||||||
on_receive_video_);
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveAudioFunc(
|
|
||||||
on_receive_audio_);
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveDataFunc(
|
|
||||||
on_receive_data_);
|
|
||||||
ice_transmission_list_[remote_user_id]
|
|
||||||
->SetOnReceiveNetStatusReportFunc(on_net_status_report_);
|
|
||||||
|
|
||||||
ice_transmission_list_[remote_user_id]->InitIceTransmission(
|
|
||||||
cfg_stun_server_ip_, stun_server_port_, cfg_turn_server_ip_,
|
|
||||||
turn_server_port_, cfg_turn_server_username_,
|
|
||||||
cfg_turn_server_password_,
|
|
||||||
av1_encoding_ ? RtpPacket::AV1 : RtpPacket::H264);
|
|
||||||
ice_transmission_list_[remote_user_id]->JoinTransmission();
|
|
||||||
}
|
|
||||||
|
|
||||||
// on_connection_status_(ConnectionStatus::Connecting);
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "user_leave_transmission"_H: {
|
|
||||||
std::string user_id = j["user_id"];
|
|
||||||
LOG_INFO("Receive notification: user id [{}] leave transmission",
|
|
||||||
user_id);
|
|
||||||
auto user_id_it = ice_transmission_list_.find(user_id);
|
|
||||||
if (user_id_it != ice_transmission_list_.end()) {
|
|
||||||
user_id_it->second->DestroyIceTransmission();
|
|
||||||
ice_transmission_list_.erase(user_id_it);
|
|
||||||
ice_ready_ = false;
|
|
||||||
LOG_INFO("Terminate transmission to user [{}]", user_id);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "offer"_H: {
|
|
||||||
std::string transmission_id = j["transmission_id"].get<std::string>();
|
|
||||||
std::string remote_user_id = j["remote_user_id"].get<std::string>();
|
|
||||||
LOG_INFO("[{}] receive offer from [{}]", user_id_, remote_user_id);
|
|
||||||
|
|
||||||
if (0 != CreateVideoCodec(hardware_acceleration_)) {
|
|
||||||
LOG_ERROR("Create video codec failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 != CreateAudioCodec()) {
|
|
||||||
LOG_ERROR("Create audio codec failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
ice_transmission_list_[remote_user_id] =
|
|
||||||
std::make_unique<IceTransmission>(
|
|
||||||
enable_turn_, trickle_ice_, false, transmission_id, user_id_,
|
|
||||||
remote_user_id, ws_transport_, on_ice_status_change_);
|
|
||||||
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveVideoFunc(
|
|
||||||
on_receive_video_);
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveAudioFunc(
|
|
||||||
on_receive_audio_);
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveDataFunc(
|
|
||||||
on_receive_data_);
|
|
||||||
ice_transmission_list_[remote_user_id]->SetOnReceiveNetStatusReportFunc(
|
|
||||||
on_net_status_report_);
|
|
||||||
|
|
||||||
ice_transmission_list_[remote_user_id]->InitIceTransmission(
|
|
||||||
cfg_stun_server_ip_, stun_server_port_, cfg_turn_server_ip_,
|
|
||||||
turn_server_port_, cfg_turn_server_username_,
|
|
||||||
cfg_turn_server_password_,
|
|
||||||
av1_encoding_ ? RtpPacket::AV1 : RtpPacket::H264);
|
|
||||||
ice_transmission_list_[remote_user_id]->SetTransmissionId(
|
|
||||||
transmission_id_);
|
|
||||||
|
|
||||||
if (j.contains("sdp")) {
|
|
||||||
on_connection_status_(ConnectionStatus::Connecting, user_data_);
|
|
||||||
std::string remote_sdp = j["sdp"].get<std::string>();
|
|
||||||
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
|
|
||||||
if (trickle_ice_) {
|
|
||||||
sdp_without_cands_ = remote_sdp;
|
|
||||||
ice_transmission_list_[remote_user_id]->SendAnswer();
|
|
||||||
}
|
|
||||||
ice_transmission_list_[remote_user_id]->GatherCandidates();
|
|
||||||
} else {
|
|
||||||
LOG_ERROR("Invalid offer msg");
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "answer"_H: {
|
|
||||||
on_connection_status_(ConnectionStatus::Connecting, user_data_);
|
|
||||||
std::string transmission_id = j["transmission_id"].get<std::string>();
|
|
||||||
std::string remote_user_id = j["remote_user_id"].get<std::string>();
|
|
||||||
LOG_INFO("[{}] receive answer from [{}]", user_id_, remote_user_id);
|
|
||||||
|
|
||||||
if (j.contains("sdp")) {
|
|
||||||
std::string remote_sdp = j["sdp"].get<std::string>();
|
|
||||||
if (ice_transmission_list_.find(remote_user_id) !=
|
|
||||||
ice_transmission_list_.end()) {
|
|
||||||
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
|
|
||||||
if (trickle_ice_) {
|
|
||||||
sdp_without_cands_ = remote_sdp;
|
|
||||||
ice_transmission_list_[remote_user_id]->GatherCandidates();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG_ERROR("Invalid answer msg");
|
|
||||||
}
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case "new_candidate"_H: {
|
|
||||||
std::string transmission_id = j["transmission_id"].get<std::string>();
|
|
||||||
std::string new_candidate = j["sdp"].get<std::string>();
|
|
||||||
std::string remote_user_id = j["remote_user_id"].get<std::string>();
|
|
||||||
|
|
||||||
// LOG_INFO("[{}] receive new candidate from [{}]:[{}]", user_id_,
|
|
||||||
// remote_user_id, new_candidate);
|
|
||||||
|
|
||||||
if (ice_transmission_list_.find(remote_user_id) !=
|
|
||||||
ice_transmission_list_.end()) {
|
|
||||||
ice_transmission_list_[remote_user_id]->SetRemoteSdp(
|
|
||||||
sdp_without_cands_ + new_candidate);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default: {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (nv12_data_) {
|
||||||
|
delete nv12_data_;
|
||||||
|
nv12_data_ = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef __APPLE__
|
||||||
|
#else
|
||||||
|
if (hardware_acceleration_ && load_nvcodec_dll_success) {
|
||||||
|
ReleaseNvCodecDll();
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int PeerConnection::RequestTransmissionMemberList(
|
int PeerConnection::RequestTransmissionMemberList(
|
||||||
@@ -615,7 +436,7 @@ int PeerConnection::RequestTransmissionMemberList(
|
|||||||
LOG_INFO("[{}] Request member list", user_id_);
|
LOG_INFO("[{}] Request member list", user_id_);
|
||||||
|
|
||||||
json message = {{"type", "query_user_id_list"},
|
json message = {{"type", "query_user_id_list"},
|
||||||
{"transmission_id", transmission_id_},
|
{"transmission_id", transmission_id},
|
||||||
{"password", password}};
|
{"password", password}};
|
||||||
|
|
||||||
if (ws_transport_) {
|
if (ws_transport_) {
|
||||||
@@ -624,27 +445,6 @@ int PeerConnection::RequestTransmissionMemberList(
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int PeerConnection::Destroy() {
|
|
||||||
if (ws_transport_) {
|
|
||||||
LOG_INFO("Close websocket")
|
|
||||||
ws_transport_->Close();
|
|
||||||
}
|
|
||||||
|
|
||||||
ice_transmission_list_.clear();
|
|
||||||
if (nv12_data_) {
|
|
||||||
delete nv12_data_;
|
|
||||||
nv12_data_ = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __APPLE__
|
|
||||||
#else
|
|
||||||
if (hardware_acceleration_ && load_nvcodec_dll_success) {
|
|
||||||
ReleaseNvCodecDll();
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
SignalStatus PeerConnection::GetSignalStatus() {
|
SignalStatus PeerConnection::GetSignalStatus() {
|
||||||
std::lock_guard<std::mutex> l(signal_status_mutex_);
|
std::lock_guard<std::mutex> l(signal_status_mutex_);
|
||||||
return signal_status_;
|
return signal_status_;
|
||||||
@@ -671,8 +471,6 @@ int PeerConnection::SendVideoData(const char *data, size_t size) {
|
|||||||
VideoEncoder::VideoFrameType frame_type) -> int {
|
VideoEncoder::VideoFrameType frame_type) -> int {
|
||||||
for (auto &ice_trans : ice_transmission_list_) {
|
for (auto &ice_trans : ice_transmission_list_) {
|
||||||
// LOG_ERROR("Send frame size: [{}]", size);
|
// LOG_ERROR("Send frame size: [{}]", size);
|
||||||
// ice_trans.second->SendData(IceTransmission::DATA_TYPE::VIDEO,
|
|
||||||
// encoded_frame, size);
|
|
||||||
ice_trans.second->SendVideoData(
|
ice_trans.second->SendVideoData(
|
||||||
static_cast<IceTransmission::VideoFrameType>(frame_type),
|
static_cast<IceTransmission::VideoFrameType>(frame_type),
|
||||||
encoded_frame, size);
|
encoded_frame, size);
|
||||||
@@ -716,4 +514,300 @@ int PeerConnection::SendUserData(const char *data, size_t size) {
|
|||||||
ice_trans.second->SendData(IceTransmission::DATA_TYPE::DATA, data, size);
|
ice_trans.second->SendData(IceTransmission::DATA_TYPE::DATA, data, size);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PeerConnection::ProcessSignal(const std::string &signal) {
|
||||||
|
auto j = json::parse(signal);
|
||||||
|
std::string type = j["type"];
|
||||||
|
// LOG_INFO("signal type: {}", type);
|
||||||
|
switch (HASH_STRING_PIECE(type.c_str())) {
|
||||||
|
case "login"_H: {
|
||||||
|
if (j["status"].get<std::string>() == "success") {
|
||||||
|
user_id_ = j["user_id"].get<std::string>();
|
||||||
|
net_status_report_(atoi(user_id_.c_str()), TraversalMode::UnknownMode,
|
||||||
|
0, 0, user_data_);
|
||||||
|
LOG_INFO("Login success with id [{}]", user_id_);
|
||||||
|
signal_status_ = SignalStatus::SignalConnected;
|
||||||
|
on_signal_status_(SignalStatus::SignalConnected, user_data_);
|
||||||
|
} else if (j["status"].get<std::string>() == "fail") {
|
||||||
|
LOG_WARN("Login failed with id [{}]", user_id_);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "transmission_id"_H: {
|
||||||
|
if (j["status"].get<std::string>() == "success") {
|
||||||
|
local_transmission_id_ = j["transmission_id"].get<std::string>();
|
||||||
|
user_id_ = local_transmission_id_;
|
||||||
|
LOG_INFO("Create transmission success with id [{}]",
|
||||||
|
local_transmission_id_);
|
||||||
|
} else if (j["status"].get<std::string>() == "fail") {
|
||||||
|
LOG_WARN("Create transmission failed with id [{}], due to [{}]",
|
||||||
|
local_transmission_id_,
|
||||||
|
j["reason"].get<std::string>().c_str());
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "user_id_list"_H: {
|
||||||
|
user_id_list_ = j["user_id_list"];
|
||||||
|
|
||||||
|
std::string transmission_id = j["transmission_id"].get<std::string>();
|
||||||
|
std::string status = j["status"].get<std::string>();
|
||||||
|
if (status == "failed") {
|
||||||
|
std::string reason = j["reason"].get<std::string>();
|
||||||
|
LOG_ERROR("{}", reason);
|
||||||
|
if ("Incorrect password" == reason) {
|
||||||
|
on_connection_status_(ConnectionStatus::IncorrectPassword,
|
||||||
|
user_data_);
|
||||||
|
} else if ("No such transmission id" == reason) {
|
||||||
|
on_connection_status_(ConnectionStatus::NoSuchTransmissionId,
|
||||||
|
user_data_);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (leave_) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
IceWorkMsg msg;
|
||||||
|
msg.type = IceWorkMsg::Type::UserIdList;
|
||||||
|
msg.transmission_id = transmission_id;
|
||||||
|
msg.user_id_list = user_id_list_;
|
||||||
|
PushIceWorkMsg(msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "user_leave_transmission"_H: {
|
||||||
|
std::string user_id = j["user_id"];
|
||||||
|
IceWorkMsg msg;
|
||||||
|
msg.type = IceWorkMsg::Type::UserLeaveTransmission;
|
||||||
|
msg.user_id = user_id;
|
||||||
|
PushIceWorkMsg(msg);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "offer"_H: {
|
||||||
|
std::string transmission_id = j["transmission_id"].get<std::string>();
|
||||||
|
std::string remote_user_id = j["remote_user_id"].get<std::string>();
|
||||||
|
if (j.contains("sdp")) {
|
||||||
|
std::string remote_sdp = j["sdp"].get<std::string>();
|
||||||
|
LOG_INFO("[{}] receive offer from [{}]", user_id_, remote_user_id);
|
||||||
|
|
||||||
|
IceWorkMsg msg;
|
||||||
|
msg.type = IceWorkMsg::Type::Offer;
|
||||||
|
msg.transmission_id = transmission_id;
|
||||||
|
msg.remote_user_id = remote_user_id;
|
||||||
|
msg.remote_sdp = remote_sdp;
|
||||||
|
PushIceWorkMsg(msg);
|
||||||
|
on_connection_status_(ConnectionStatus::Connecting, user_data_);
|
||||||
|
} else {
|
||||||
|
LOG_ERROR("Invalid offer msg");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "answer"_H: {
|
||||||
|
std::string transmission_id = j["transmission_id"].get<std::string>();
|
||||||
|
std::string remote_user_id = j["remote_user_id"].get<std::string>();
|
||||||
|
|
||||||
|
if (j.contains("sdp")) {
|
||||||
|
std::string remote_sdp = j["sdp"].get<std::string>();
|
||||||
|
LOG_INFO("[{}] receive answer from [{}]", user_id_, remote_user_id);
|
||||||
|
|
||||||
|
IceWorkMsg msg;
|
||||||
|
msg.type = IceWorkMsg::Type::Answer;
|
||||||
|
msg.transmission_id = transmission_id;
|
||||||
|
msg.remote_user_id = remote_user_id;
|
||||||
|
msg.remote_sdp = remote_sdp;
|
||||||
|
PushIceWorkMsg(msg);
|
||||||
|
on_connection_status_(ConnectionStatus::Connecting, user_data_);
|
||||||
|
} else {
|
||||||
|
LOG_ERROR("Invalid answer msg");
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case "new_candidate"_H: {
|
||||||
|
std::string transmission_id = j["transmission_id"].get<std::string>();
|
||||||
|
std::string new_candidate = j["sdp"].get<std::string>();
|
||||||
|
std::string remote_user_id = j["remote_user_id"].get<std::string>();
|
||||||
|
|
||||||
|
IceWorkMsg msg;
|
||||||
|
msg.type = IceWorkMsg::Type::NewCandidate;
|
||||||
|
msg.transmission_id = transmission_id;
|
||||||
|
msg.remote_user_id = remote_user_id;
|
||||||
|
msg.new_candidate = new_candidate;
|
||||||
|
PushIceWorkMsg(msg);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PeerConnection::StartIceWorker() {
|
||||||
|
ice_worker_.reset(new std::thread([this]() {
|
||||||
|
while (ice_worker_running_) {
|
||||||
|
std::unique_lock<std::mutex> lck(ice_work_mutex_);
|
||||||
|
while (ice_work_msg_queue_.empty()) {
|
||||||
|
ice_work_cv_.wait(lck, [this] { return !ice_work_msg_queue_.empty(); });
|
||||||
|
}
|
||||||
|
IceWorkMsg msg = ice_work_msg_queue_.front();
|
||||||
|
ice_work_msg_queue_.pop();
|
||||||
|
ProcessIceWorkMsg(msg);
|
||||||
|
}
|
||||||
|
std::queue<IceWorkMsg> empty_queue;
|
||||||
|
std::swap(ice_work_msg_queue_, empty_queue);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
void PeerConnection::StopIceWorker() {
|
||||||
|
ice_worker_running_ = false;
|
||||||
|
if (ice_worker_->joinable()) {
|
||||||
|
ice_worker_->join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void PeerConnection::PushIceWorkMsg(const IceWorkMsg &msg) {
|
||||||
|
std::lock_guard<std::mutex> lck(ice_work_mutex_);
|
||||||
|
ice_work_msg_queue_.push(msg);
|
||||||
|
ice_work_cv_.notify_one();
|
||||||
|
}
|
||||||
|
|
||||||
|
void PeerConnection::ProcessIceWorkMsg(const IceWorkMsg &msg) {
|
||||||
|
switch (msg.type) {
|
||||||
|
case IceWorkMsg::Type::Login: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IceWorkMsg::Type::UserIdList: {
|
||||||
|
std::vector<std::string> user_id_list = msg.user_id_list;
|
||||||
|
std::string transmission_id = msg.transmission_id;
|
||||||
|
|
||||||
|
if (user_id_list.empty()) {
|
||||||
|
LOG_WARN("Wait for host create transmission [{}]", transmission_id);
|
||||||
|
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||||
|
RequestTransmissionMemberList(transmission_id, password_);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_INFO("Transmission [{}] members: [", transmission_id);
|
||||||
|
for (auto user_id : user_id_list) {
|
||||||
|
LOG_INFO("{}", user_id);
|
||||||
|
}
|
||||||
|
LOG_INFO("]");
|
||||||
|
|
||||||
|
for (auto &remote_user_id : user_id_list) {
|
||||||
|
ice_transmission_list_[remote_user_id] =
|
||||||
|
std::make_unique<IceTransmission>(
|
||||||
|
enable_turn_, trickle_ice_, true, transmission_id, user_id_,
|
||||||
|
remote_user_id, ws_transport_, on_ice_status_change_);
|
||||||
|
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveVideoFunc(
|
||||||
|
on_receive_video_);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveAudioFunc(
|
||||||
|
on_receive_audio_);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveDataFunc(
|
||||||
|
on_receive_data_);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveNetStatusReportFunc(
|
||||||
|
on_net_status_report_);
|
||||||
|
|
||||||
|
ice_transmission_list_[remote_user_id]->InitIceTransmission(
|
||||||
|
cfg_stun_server_ip_, stun_server_port_, cfg_turn_server_ip_,
|
||||||
|
turn_server_port_, cfg_turn_server_username_,
|
||||||
|
cfg_turn_server_password_,
|
||||||
|
av1_encoding_ ? RtpPacket::AV1 : RtpPacket::H264);
|
||||||
|
ice_transmission_list_[remote_user_id]->JoinTransmission();
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IceWorkMsg::Type::UserLeaveTransmission: {
|
||||||
|
std::string user_id = msg.user_id;
|
||||||
|
LOG_INFO("Receive notification: user id [{}] leave transmission",
|
||||||
|
user_id);
|
||||||
|
auto user_id_it = ice_transmission_list_.find(user_id);
|
||||||
|
if (user_id_it != ice_transmission_list_.end()) {
|
||||||
|
user_id_it->second->DestroyIceTransmission();
|
||||||
|
ice_transmission_list_.erase(user_id_it);
|
||||||
|
ice_ready_ = false;
|
||||||
|
LOG_INFO("Terminate transmission to user [{}]", user_id);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IceWorkMsg::Type::Offer: {
|
||||||
|
std::string transmission_id = msg.transmission_id;
|
||||||
|
std::string remote_user_id = msg.remote_user_id;
|
||||||
|
ice_transmission_list_[remote_user_id] =
|
||||||
|
std::make_unique<IceTransmission>(
|
||||||
|
enable_turn_, trickle_ice_, false, transmission_id, user_id_,
|
||||||
|
remote_user_id, ws_transport_, on_ice_status_change_);
|
||||||
|
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveVideoFunc(
|
||||||
|
on_receive_video_);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveAudioFunc(
|
||||||
|
on_receive_audio_);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveDataFunc(
|
||||||
|
on_receive_data_);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetOnReceiveNetStatusReportFunc(
|
||||||
|
on_net_status_report_);
|
||||||
|
|
||||||
|
ice_transmission_list_[remote_user_id]->InitIceTransmission(
|
||||||
|
cfg_stun_server_ip_, stun_server_port_, cfg_turn_server_ip_,
|
||||||
|
turn_server_port_, cfg_turn_server_username_,
|
||||||
|
cfg_turn_server_password_,
|
||||||
|
av1_encoding_ ? RtpPacket::AV1 : RtpPacket::H264);
|
||||||
|
ice_transmission_list_[remote_user_id]->SetTransmissionId(
|
||||||
|
transmission_id);
|
||||||
|
|
||||||
|
std::string remote_sdp = msg.remote_sdp;
|
||||||
|
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
|
||||||
|
if (trickle_ice_) {
|
||||||
|
sdp_without_cands_ = remote_sdp;
|
||||||
|
ice_transmission_list_[remote_user_id]->SendAnswer();
|
||||||
|
}
|
||||||
|
ice_transmission_list_[remote_user_id]->GatherCandidates();
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IceWorkMsg::Type::Answer: {
|
||||||
|
std::string remote_user_id = msg.remote_user_id;
|
||||||
|
std::string remote_sdp = msg.remote_sdp;
|
||||||
|
if (ice_transmission_list_.find(remote_user_id) !=
|
||||||
|
ice_transmission_list_.end()) {
|
||||||
|
ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp);
|
||||||
|
if (trickle_ice_) {
|
||||||
|
sdp_without_cands_ = remote_sdp;
|
||||||
|
ice_transmission_list_[remote_user_id]->GatherCandidates();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IceWorkMsg::Type::NewCandidate: {
|
||||||
|
std::string transmission_id = msg.transmission_id;
|
||||||
|
std::string new_candidate = msg.new_candidate;
|
||||||
|
std::string remote_user_id = msg.remote_user_id;
|
||||||
|
|
||||||
|
// LOG_INFO("[{}] receive new candidate from [{}]:[{}]", user_id_,
|
||||||
|
// remote_user_id, new_candidate);
|
||||||
|
|
||||||
|
if (ice_transmission_list_.find(remote_user_id) !=
|
||||||
|
ice_transmission_list_.end()) {
|
||||||
|
ice_transmission_list_[remote_user_id]->SetRemoteSdp(
|
||||||
|
sdp_without_cands_ + new_candidate);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IceWorkMsg::Type::Destroy: {
|
||||||
|
for (auto &user_id_it : ice_transmission_list_) {
|
||||||
|
user_id_it.second->DestroyIceTransmission();
|
||||||
|
}
|
||||||
|
ice_transmission_list_.clear();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -48,6 +48,26 @@ typedef struct {
|
|||||||
void *user_data;
|
void *user_data;
|
||||||
} PeerConnectionParams;
|
} PeerConnectionParams;
|
||||||
|
|
||||||
|
struct IceWorkMsg {
|
||||||
|
enum Type {
|
||||||
|
Login = 0,
|
||||||
|
UserIdList,
|
||||||
|
UserLeaveTransmission,
|
||||||
|
Offer,
|
||||||
|
Answer,
|
||||||
|
NewCandidate,
|
||||||
|
Destroy
|
||||||
|
};
|
||||||
|
|
||||||
|
Type type;
|
||||||
|
std::vector<std::string> user_id_list;
|
||||||
|
std::string user_id;
|
||||||
|
std::string transmission_id;
|
||||||
|
std::string remote_user_id;
|
||||||
|
std::string new_candidate;
|
||||||
|
std::string remote_sdp;
|
||||||
|
};
|
||||||
|
|
||||||
class PeerConnection {
|
class PeerConnection {
|
||||||
public:
|
public:
|
||||||
PeerConnection();
|
PeerConnection();
|
||||||
@@ -83,6 +103,12 @@ class PeerConnection {
|
|||||||
int RequestTransmissionMemberList(const std::string &transmission_id,
|
int RequestTransmissionMemberList(const std::string &transmission_id,
|
||||||
const std::string &password);
|
const std::string &password);
|
||||||
|
|
||||||
|
private:
|
||||||
|
void StartIceWorker();
|
||||||
|
void StopIceWorker();
|
||||||
|
void ProcessIceWorkMsg(const IceWorkMsg &msg);
|
||||||
|
void PushIceWorkMsg(const IceWorkMsg &msg);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string uri_ = "";
|
std::string uri_ = "";
|
||||||
std::string cfg_signal_server_ip_;
|
std::string cfg_signal_server_ip_;
|
||||||
@@ -110,7 +136,8 @@ class PeerConnection {
|
|||||||
std::function<void(WsStatus)> on_ws_status_ = nullptr;
|
std::function<void(WsStatus)> on_ws_status_ = nullptr;
|
||||||
unsigned int ws_connection_id_ = 0;
|
unsigned int ws_connection_id_ = 0;
|
||||||
std::string user_id_ = "";
|
std::string user_id_ = "";
|
||||||
std::string transmission_id_ = "";
|
std::string local_transmission_id_ = "";
|
||||||
|
std::string remote_transmission_id_ = "";
|
||||||
std::vector<std::string> user_id_list_;
|
std::vector<std::string> user_id_list_;
|
||||||
WsStatus ws_status_ = WsStatus::WsClosed;
|
WsStatus ws_status_ = WsStatus::WsClosed;
|
||||||
SignalStatus signal_status_ = SignalStatus::SignalClosed;
|
SignalStatus signal_status_ = SignalStatus::SignalClosed;
|
||||||
@@ -158,6 +185,13 @@ class PeerConnection {
|
|||||||
std::unique_ptr<AudioEncoder> audio_encoder_ = nullptr;
|
std::unique_ptr<AudioEncoder> audio_encoder_ = nullptr;
|
||||||
std::unique_ptr<AudioDecoder> audio_decoder_ = nullptr;
|
std::unique_ptr<AudioDecoder> audio_decoder_ = nullptr;
|
||||||
bool audio_codec_inited_ = false;
|
bool audio_codec_inited_ = false;
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::unique_ptr<std::thread> ice_worker_ = nullptr;
|
||||||
|
std::atomic<bool> ice_worker_running_{true};
|
||||||
|
std::queue<IceWorkMsg> ice_work_msg_queue_;
|
||||||
|
std::condition_variable ice_work_cv_;
|
||||||
|
std::mutex ice_work_mutex_;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
@@ -87,6 +87,7 @@ void WsCore::Close(websocketpp::close::status::value code, std::string reason) {
|
|||||||
if (ec) {
|
if (ec) {
|
||||||
LOG_ERROR("Initiating close error: {}", ec.message());
|
LOG_ERROR("Initiating close error: {}", ec.message());
|
||||||
}
|
}
|
||||||
|
OnWsStatus(WsStatus::WsClosed);
|
||||||
}
|
}
|
||||||
|
|
||||||
void WsCore::Send(std::string message) {
|
void WsCore::Send(std::string message) {
|
||||||
|
|||||||
Reference in New Issue
Block a user