[feat] add support for displaying multiple controller info and file transfer to controllers

This commit is contained in:
dijunkun
2026-01-26 17:47:31 +08:00
parent 7bbd10a50c
commit b6a52dbcd4
9 changed files with 3028 additions and 2808 deletions

View File

@@ -829,7 +829,7 @@ int Render::CreateConnectionPeer() {
params_.on_signal_status = OnSignalStatusCb;
params_.on_connection_status = OnConnectionStatusCb;
params_.net_status_report = NetStatusReport;
params_.on_net_status_report = OnNetStatusReport;
params_.user_data = this;
@@ -1873,16 +1873,35 @@ void Render::CleanSubStreamWindowProperties(
}
}
std::shared_ptr<Render::SubStreamWindowProperties>
Render::GetSubStreamWindowPropertiesByRemoteId(const std::string& remote_id) {
if (remote_id.empty()) {
return nullptr;
}
std::shared_lock lock(client_properties_mutex_);
auto it = client_properties_.find(remote_id);
if (it == client_properties_.end()) {
return nullptr;
}
return it->second;
}
void Render::StartFileTransfer(std::shared_ptr<SubStreamWindowProperties> props,
const std::filesystem::path& file_path,
const std::string& file_label) {
if (!props || !props->peer_) {
LOG_ERROR("StartFileTransfer: invalid props or peer");
const std::string& file_label,
const std::string& remote_id) {
const bool is_global = (props == nullptr);
PeerPtr* peer = is_global ? peer_ : props->peer_;
if (!peer) {
LOG_ERROR("StartFileTransfer: invalid peer");
return;
}
bool expected = false;
if (!props->file_sending_.compare_exchange_strong(expected, true)) {
if (!(is_global ? file_transfer_.file_sending_
: props->file_transfer_.file_sending_)
.compare_exchange_strong(expected, true)) {
// Already sending, this should not happen if called correctly
LOG_WARN(
"StartFileTransfer called but file_sending_ is already true, "
@@ -1891,13 +1910,19 @@ void Render::StartFileTransfer(std::shared_ptr<SubStreamWindowProperties> props,
return;
}
auto peer = props->peer_;
auto props_weak = std::weak_ptr<SubStreamWindowProperties>(props);
Render* render_ptr = this;
std::thread([peer, file_path, file_label, props_weak, render_ptr]() {
std::thread([peer, file_path, file_label, props_weak, render_ptr, remote_id,
is_global]() {
auto props_locked = props_weak.lock();
if (!props_locked) {
FileTransferState* state = nullptr;
if (props_locked) {
state = &props_locked->file_transfer_;
} else if (is_global) {
state = &render_ptr->file_transfer_;
} else {
return;
}
@@ -1905,49 +1930,50 @@ void Render::StartFileTransfer(std::shared_ptr<SubStreamWindowProperties> props,
uint64_t total_size = std::filesystem::file_size(file_path, ec);
if (ec) {
LOG_ERROR("Failed to get file size: {}", ec.message().c_str());
props_locked->file_sending_ = false;
state->file_sending_ = false;
return;
}
props_locked->file_sent_bytes_ = 0;
props_locked->file_total_bytes_ = total_size;
props_locked->file_send_rate_bps_ = 0;
props_locked->file_transfer_window_visible_ = true;
state->file_sent_bytes_ = 0;
state->file_total_bytes_ = total_size;
state->file_send_rate_bps_ = 0;
state->file_transfer_window_visible_ = true;
{
std::lock_guard<std::mutex> lock(props_locked->file_transfer_mutex_);
props_locked->file_send_start_time_ = std::chrono::steady_clock::now();
props_locked->file_send_last_update_time_ =
props_locked->file_send_start_time_;
props_locked->file_send_last_bytes_ = 0;
std::lock_guard<std::mutex> lock(state->file_transfer_mutex_);
state->file_send_start_time_ = std::chrono::steady_clock::now();
state->file_send_last_update_time_ = state->file_send_start_time_;
state->file_send_last_bytes_ = 0;
}
LOG_INFO(
"File transfer started: {} ({} bytes), file_sending_={}, "
"total_bytes_={}",
file_path.filename().string(), total_size,
props_locked->file_sending_.load(),
props_locked->file_total_bytes_.load());
file_path.filename().string(), total_size, state->file_sending_.load(),
state->file_total_bytes_.load());
FileSender sender;
uint32_t file_id = FileSender::NextFileId();
{
if (props_locked) {
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_props_mutex_);
render_ptr->file_id_to_props_[file_id] = props_weak;
} else {
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_transfer_state_mutex_);
render_ptr->file_id_to_transfer_state_[file_id] = state;
}
props_locked->current_file_id_ = file_id;
state->current_file_id_ = file_id;
// Update file transfer list: mark as sending
// Find the queued file that matches the exact file path
{
std::lock_guard<std::mutex> lock(props_locked->file_transfer_list_mutex_);
for (auto& info : props_locked->file_transfer_list_) {
std::lock_guard<std::mutex> lock(state->file_transfer_list_mutex_);
for (auto& info : state->file_transfer_list_) {
if (info.file_path == file_path &&
info.status ==
SubStreamWindowProperties::FileTransferStatus::Queued) {
info.status = SubStreamWindowProperties::FileTransferStatus::Sending;
info.status == FileTransferState::FileTransferStatus::Queued) {
info.status = FileTransferState::FileTransferStatus::Sending;
info.file_id = file_id;
info.file_size = total_size;
info.sent_bytes = 0;
@@ -1956,81 +1982,88 @@ void Render::StartFileTransfer(std::shared_ptr<SubStreamWindowProperties> props,
}
}
props_locked->file_transfer_window_visible_ = true;
state->file_transfer_window_visible_ = true;
// Progress will be updated via ACK from receiver
int ret = sender.SendFile(
file_path, file_path.filename().string(),
[peer, file_label](const char* buf, size_t sz) -> int {
return SendReliableDataFrame(peer, buf, sz, file_label.c_str());
[peer, file_label, remote_id](const char* buf, size_t sz) -> int {
if (remote_id.empty()) {
return SendReliableDataFrame(peer, buf, sz, file_label.c_str());
} else {
return SendReliableDataFrameToPeer(
peer, buf, sz, file_label.c_str(), remote_id.c_str(),
remote_id.size());
}
},
64 * 1024, file_id);
// file_sending_ should remain true until we receive the final ACK from
// receiver
auto props_locked_final = props_weak.lock();
if (props_locked_final) {
// On error, set file_sending_ to false immediately to allow next file
if (ret != 0) {
props_locked_final->file_sending_ = false;
props_locked_final->file_transfer_window_visible_ = false;
props_locked_final->file_sent_bytes_ = 0;
props_locked_final->file_total_bytes_ = 0;
props_locked_final->file_send_rate_bps_ = 0;
props_locked_final->current_file_id_ = 0;
// On error, set file_sending_ to false immediately to allow next file
if (ret != 0) {
state->file_sending_ = false;
state->file_transfer_window_visible_ = false;
state->file_sent_bytes_ = 0;
state->file_total_bytes_ = 0;
state->file_send_rate_bps_ = 0;
state->current_file_id_ = 0;
// Unregister file_id mapping on error
{
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_props_mutex_);
render_ptr->file_id_to_props_.erase(file_id);
}
// Unregister file_id mapping on error
if (props_locked) {
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_props_mutex_);
render_ptr->file_id_to_props_.erase(file_id);
} else {
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_transfer_state_mutex_);
render_ptr->file_id_to_transfer_state_.erase(file_id);
}
// Update file transfer list: mark as failed
{
std::lock_guard<std::mutex> lock(
props_locked_final->file_transfer_list_mutex_);
for (auto& info : props_locked_final->file_transfer_list_) {
if (info.file_id == file_id) {
info.status =
SubStreamWindowProperties::FileTransferStatus::Failed;
break;
}
// Update file transfer list: mark as failed
{
std::lock_guard<std::mutex> lock(state->file_transfer_list_mutex_);
for (auto& info : state->file_transfer_list_) {
if (info.file_id == file_id) {
info.status = FileTransferState::FileTransferStatus::Failed;
break;
}
}
LOG_ERROR("FileSender::SendFile failed for [{}], ret={}",
file_path.string().c_str(), ret);
render_ptr->ProcessFileQueue(props_locked_final);
}
LOG_ERROR("FileSender::SendFile failed for [{}], ret={}",
file_path.string().c_str(), ret);
render_ptr->ProcessFileQueue(props_locked);
}
}).detach();
}
void Render::ProcessFileQueue(
std::shared_ptr<SubStreamWindowProperties> props) {
if (!props) {
FileTransferState* state = props ? &props->file_transfer_ : &file_transfer_;
if (!state) {
return;
}
if (props->file_sending_.load()) {
if (state->file_sending_.load()) {
return;
}
SubStreamWindowProperties::QueuedFile queued_file;
FileTransferState::QueuedFile queued_file;
{
std::lock_guard<std::mutex> lock(props->file_queue_mutex_);
if (props->file_send_queue_.empty()) {
std::lock_guard<std::mutex> lock(state->file_queue_mutex_);
if (state->file_send_queue_.empty()) {
return;
}
queued_file = props->file_send_queue_.front();
props->file_send_queue_.pop();
queued_file = state->file_send_queue_.front();
state->file_send_queue_.pop();
}
LOG_INFO("Processing next file in queue: {}",
queued_file.file_path.string().c_str());
StartFileTransfer(props, queued_file.file_path, queued_file.file_label);
StartFileTransfer(props, queued_file.file_path, queued_file.file_label,
queued_file.remote_id);
}
void Render::UpdateRenderRect() {