[fix] fix websocket ping thread blocking main thread during program exit

This commit is contained in:
dijunkun
2024-08-14 16:36:32 +08:00
parent 5f8e60d1c8
commit c056bb6f7d
2 changed files with 42 additions and 33 deletions

View File

@@ -13,32 +13,23 @@ WsCore::WsCore() {
m_endpoint_.init_asio(); m_endpoint_.init_asio();
m_endpoint_.start_perpetual(); m_endpoint_.start_perpetual();
std::thread t(&client::run, &m_endpoint_); m_thread_ = std::thread(&client::run, &m_endpoint_);
m_thread_ = std::move(t);
} }
WsCore::~WsCore() { WsCore::~WsCore() {
destructed_ = true;
running_ = false;
cond_var_.notify_one();
if (ping_thread_.joinable()) {
ping_thread_.join();
}
m_endpoint_.stop_perpetual(); m_endpoint_.stop_perpetual();
if (GetStatus() != WsStatus::WsOpened) {
// Only close open connections
return;
}
websocketpp::lib::error_code ec;
m_endpoint_.close(connection_handle_, websocketpp::close::status::going_away,
"", ec);
if (ec) {
LOG_INFO("Closing connection error: {}", ec.message());
}
if (m_thread_.joinable()) { if (m_thread_.joinable()) {
m_thread_.join(); m_thread_.join();
} }
if (ping_thread_.joinable()) {
ping_thread_.join();
}
} }
int WsCore::Connect(std::string const &uri) { int WsCore::Connect(std::string const &uri) {
@@ -51,7 +42,7 @@ int WsCore::Connect(std::string const &uri) {
connection_handle_ = con->get_handle(); connection_handle_ = con->get_handle();
if (ec) { if (ec) {
LOG_INFO("Connect initialization error: {}", ec.message()); LOG_ERROR("Connect initialization error: {}", ec.message());
return -1; return -1;
} }
@@ -94,7 +85,7 @@ void WsCore::Close(websocketpp::close::status::value code, std::string reason) {
m_endpoint_.close(connection_handle_, code, reason, ec); m_endpoint_.close(connection_handle_, code, reason, ec);
if (ec) { if (ec) {
LOG_INFO("Initiating close error: {}", ec.message()); LOG_ERROR("Initiating close error: {}", ec.message());
} }
} }
@@ -104,22 +95,30 @@ void WsCore::Send(std::string message) {
m_endpoint_.send(connection_handle_, message, m_endpoint_.send(connection_handle_, message,
websocketpp::frame::opcode::text, ec); websocketpp::frame::opcode::text, ec);
if (ec) { if (ec) {
LOG_INFO("Sending message error: {}", ec.message()); LOG_ERROR("Sending message error: {}", ec.message());
return; return;
} }
} }
void WsCore::Ping(websocketpp::connection_hdl hdl) { void WsCore::Ping(websocketpp::connection_hdl hdl) {
while (running_) {
{
std::unique_lock<std::mutex> lock(mtx_);
cond_var_.wait_for(lock, std::chrono::seconds(interval_),
[this] { return !running_; });
}
auto con = m_endpoint_.get_con_from_hdl(hdl); auto con = m_endpoint_.get_con_from_hdl(hdl);
while (con->get_state() == websocketpp::session::state::open) { if (con && con->get_state() == websocketpp::session::state::open) {
websocketpp::lib::error_code ec; websocketpp::lib::error_code ec;
m_endpoint_.ping(hdl, "", ec); m_endpoint_.ping(hdl, "", ec);
if (ec) { if (ec) {
LOG_ERROR("Ping error: {}", ec.message()); LOG_ERROR("Ping error: {}", ec.message());
break; break;
} }
}
std::this_thread::sleep_for(std::chrono::seconds(3)); if (!running_) break;
} }
} }
@@ -129,8 +128,7 @@ void WsCore::OnOpen(client *c, websocketpp::connection_hdl hdl) {
ws_status_ = WsStatus::WsOpened; ws_status_ = WsStatus::WsOpened;
OnWsStatus(WsStatus::WsOpened); OnWsStatus(WsStatus::WsOpened);
std::thread t(&WsCore::Ping, this, hdl); ping_thread_ = std::thread(&WsCore::Ping, this, hdl);
ping_thread_ = std::move(t);
} }
void WsCore::OnFail(client *c, websocketpp::connection_hdl hdl) { void WsCore::OnFail(client *c, websocketpp::connection_hdl hdl) {
@@ -142,8 +140,10 @@ void WsCore::OnFail(client *c, websocketpp::connection_hdl hdl) {
void WsCore::OnClose(client *c, websocketpp::connection_hdl hdl) { void WsCore::OnClose(client *c, websocketpp::connection_hdl hdl) {
ws_status_ = WsStatus::WsClosed; ws_status_ = WsStatus::WsClosed;
if (running_) {
OnWsStatus(WsStatus::WsClosed); OnWsStatus(WsStatus::WsClosed);
} }
}
bool WsCore::OnPing(websocketpp::connection_hdl hdl, std::string msg) { bool WsCore::OnPing(websocketpp::connection_hdl hdl, std::string msg) {
return true; return true;

View File

@@ -1,7 +1,9 @@
#ifndef _WS_CORE_H_ #ifndef _WS_CORE_H_
#define _WS_CORE_H_ #define _WS_CORE_H_
#include <condition_variable>
#include <map> #include <map>
#include <mutex>
#include <sstream> #include <sstream>
#include <string> #include <string>
#include <thread> #include <thread>
@@ -23,7 +25,9 @@ class WsCore {
int Connect(std::string const &uri); int Connect(std::string const &uri);
void Close(websocketpp::close::status::value code, std::string reason); void Close(websocketpp::close::status::value code =
websocketpp::close::status::going_away,
std::string reason = "");
void Send(std::string message); void Send(std::string message);
@@ -55,10 +59,15 @@ class WsCore {
websocketpp::connection_hdl connection_handle_; websocketpp::connection_hdl connection_handle_;
std::thread m_thread_; std::thread m_thread_;
std::thread ping_thread_; std::thread ping_thread_;
bool running_ = true;
std::mutex mtx_;
unsigned int interval_ = 3;
std::condition_variable cond_var_;
WsStatus ws_status_ = WsStatus::WsClosed; WsStatus ws_status_ = WsStatus::WsClosed;
int timeout_count_ = 0; int timeout_count_ = 0;
std::string uri_; std::string uri_;
bool destructed_ = false;
}; };
#endif #endif