From eea37424c94e5b1a7f61abf57ed16087f4402b1b Mon Sep 17 00:00:00 2001 From: dijunkun Date: Thu, 25 Dec 2025 18:13:18 +0800 Subject: [PATCH] [feat] use receiver ack to calculate file transfer progress --- src/gui/panels/remote_peer_panel.cpp | 1 + src/gui/render.cpp | 1 + src/gui/render.h | 10 ++ src/gui/render_callback.cpp | 87 ++++++++++++- src/gui/toolbars/control_bar.cpp | 152 +++++++++-------------- src/gui/windows/file_transfer_window.cpp | 46 +++---- src/tools/file_transfer.cpp | 35 +++++- src/tools/file_transfer.h | 20 ++- submodules/minirtc | 2 +- 9 files changed, 227 insertions(+), 127 deletions(-) diff --git a/src/gui/panels/remote_peer_panel.cpp b/src/gui/panels/remote_peer_panel.cpp index 606e8af..95a2ddc 100644 --- a/src/gui/panels/remote_peer_panel.cpp +++ b/src/gui/panels/remote_peer_panel.cpp @@ -208,6 +208,7 @@ int Render::ConnectTo(const std::string& remote_id, const char* password, AddAudioStream(props->peer_, props->audio_label_.c_str()); AddDataStream(props->peer_, props->data_label_.c_str(), false); AddDataStream(props->peer_, props->file_label_.c_str(), true); + AddDataStream(props->peer_, props->file_feedback_label_.c_str(), true); props->connection_status_ = ConnectionStatus::Connecting; diff --git a/src/gui/render.cpp b/src/gui/render.cpp index 95ea3cc..9b88d63 100644 --- a/src/gui/render.cpp +++ b/src/gui/render.cpp @@ -715,6 +715,7 @@ int Render::CreateConnectionPeer() { AddAudioStream(peer_, audio_label_.c_str()); AddDataStream(peer_, data_label_.c_str(), false); AddDataStream(peer_, file_label_.c_str(), true); + AddDataStream(peer_, file_feedback_label_.c_str(), true); return 0; } else { return -1; diff --git a/src/gui/render.h b/src/gui/render.h index 0a2084b..b1017e8 100644 --- a/src/gui/render.h +++ b/src/gui/render.h @@ -46,6 +46,7 @@ class Render { std::string audio_label_ = "control_audio"; std::string data_label_ = "control_data"; std::string file_label_ = "file"; + std::string file_feedback_label_ = "file_feedback"; std::string local_id_ = ""; std::string remote_id_ = ""; bool exit_ = false; @@ -136,6 +137,8 @@ class Render { uint64_t file_send_last_bytes_ = 0; bool file_transfer_window_visible_ = false; bool file_transfer_completed_ = false; + std::atomic current_file_id_{ + 0}; // Track current file transfer ID }; public: @@ -482,8 +485,15 @@ class Render { std::string video_secondary_label_ = "secondary_display"; std::string audio_label_ = "audio"; std::string data_label_ = "data"; + std::string info_label_ = "info"; + std::string control_data_label_ = "control_data"; std::string file_label_ = "file"; + std::string file_feedback_label_ = "file_feedback"; Params params_; + // Map file_id to props for tracking file transfer progress via ACK + std::unordered_map> + file_id_to_props_; + std::shared_mutex file_id_to_props_mutex_; SDL_AudioDeviceID input_dev_; SDL_AudioDeviceID output_dev_; ScreenCapturerFactory* screen_capturer_factory_ = nullptr; diff --git a/src/gui/render_callback.cpp b/src/gui/render_callback.cpp index 0c347eb..efdc92b 100644 --- a/src/gui/render_callback.cpp +++ b/src/gui/render_callback.cpp @@ -1,12 +1,15 @@ #include #include #include +#include #include #include +#include #include "device_controller.h" #include "file_transfer.h" #include "localization.h" +#include "minirtc.h" #include "platform.h" #include "rd_log.h" #include "render.h" @@ -312,10 +315,90 @@ void Render::OnReceiveDataBufferCb(const char* data, size_t size, } std::string source_id = std::string(src_id, src_id_size); - if (source_id == "file") { - // try to parse as file-transfer chunk first + if (source_id == render->file_label_) { + std::string remote_user_id = std::string(user_id, user_id_size); + static FileReceiver receiver; + receiver.SetOnSendAck([render](const FileTransferAck& ack) -> int { + return SendReliableDataFrame( + render->peer_, reinterpret_cast(&ack), + sizeof(FileTransferAck), render->file_feedback_label_.c_str()); + }); + receiver.OnData(data, size); + return; + } else if (source_id == render->file_feedback_label_) { + if (size < sizeof(FileTransferAck)) { + LOG_ERROR("FileTransferAck: buffer too small, size={}", size); + return; + } + + FileTransferAck ack{}; + memcpy(&ack, data, sizeof(FileTransferAck)); + + if (ack.magic != kFileAckMagic) { + LOG_ERROR( + "FileTransferAck: invalid magic, got 0x{:08X}, expected 0x{:08X}", + ack.magic, kFileAckMagic); + return; + } + + std::shared_ptr props = nullptr; + { + std::shared_lock lock(render->file_id_to_props_mutex_); + auto it = render->file_id_to_props_.find(ack.file_id); + if (it != render->file_id_to_props_.end()) { + props = it->second.lock(); + } + } + + if (!props) { + LOG_WARN("FileTransferAck: no props found for file_id={}", ack.file_id); + return; + } + + // Update progress based on ACK + props->file_sent_bytes_ = ack.acked_offset; + props->file_total_bytes_ = ack.total_size; + + // Check if transfer is completed + if ((ack.flags & 0x01) != 0) { + // Transfer completed - receiver has finished receiving the file + props->file_transfer_completed_ = true; + props->file_transfer_window_visible_ = true; + props->file_sending_ = false; // Mark sending as finished + LOG_INFO( + "File transfer completed via ACK, file_id={}, total_size={}, " + "acked_offset={}", + ack.file_id, ack.total_size, ack.acked_offset); + + // Unregister file_id mapping after completion + { + std::lock_guard lock( + render->file_id_to_props_mutex_); + render->file_id_to_props_.erase(ack.file_id); + } + } + + // Update rate calculation + auto now = std::chrono::steady_clock::now(); + { + std::lock_guard lock(props->file_transfer_mutex_); + auto elapsed = std::chrono::duration_cast( + now - props->file_send_last_update_time_) + .count(); + + if (elapsed >= 100) { + uint64_t bytes_sent_since_last = + ack.acked_offset - props->file_send_last_bytes_; + uint32_t rate_bps = + static_cast((bytes_sent_since_last * 8 * 1000) / elapsed); + props->file_send_rate_bps_ = rate_bps; + props->file_send_last_bytes_ = ack.acked_offset; + props->file_send_last_update_time_ = now; + } + } + return; } diff --git a/src/gui/toolbars/control_bar.cpp b/src/gui/toolbars/control_bar.cpp index 3751982..91415ce 100644 --- a/src/gui/toolbars/control_bar.cpp +++ b/src/gui/toolbars/control_bar.cpp @@ -213,8 +213,9 @@ int Render::ControlBar(std::shared_ptr& props) { std::filesystem::path file_path = std::filesystem::path(path); std::string file_label = file_label_; auto props_weak = std::weak_ptr(props); + Render* render_ptr = this; - std::thread([peer, file_path, file_label, props_weak]() { + std::thread([peer, file_path, file_label, props_weak, render_ptr]() { auto props_locked = props_weak.lock(); if (!props_locked) { return; @@ -254,94 +255,63 @@ int Render::ControlBar(std::shared_ptr& props) { props_locked->file_total_bytes_.load()); FileSender sender; - auto last_progress_update = std::chrono::steady_clock::now(); - auto last_rate_update = std::chrono::steady_clock::now(); - uint64_t last_actual_sent_bytes = 0; + uint32_t file_id = FileSender::NextFileId(); - int ret = sender.SendFile( - file_path, file_path.filename().string(), - [peer, file_label, props_weak, &last_progress_update, - &last_rate_update, &last_actual_sent_bytes, - total_size](const char* buf, size_t sz) -> int { - int send_ret = - SendReliableDataFrame(peer, buf, sz, file_label.c_str()); - if (send_ret == 0) { - // Update progress periodically (every 50ms) by querying - // actual sent bytes - auto now = std::chrono::steady_clock::now(); - auto elapsed_progress = - std::chrono::duration_cast( - now - last_progress_update) - .count(); - - if (elapsed_progress >= 50) { - // Query actual sent bytes from the transport layer - uint64_t actual_sent_bytes = - GetDataChannelSentBytes(peer, file_label.c_str()); - - auto props_locked = props_weak.lock(); - if (props_locked) { - props_locked->file_sent_bytes_ = actual_sent_bytes; - - // Update rate every 100ms - auto elapsed_rate = - std::chrono::duration_cast( - now - last_rate_update) - .count(); - - if (elapsed_rate >= 100) { - std::lock_guard lock( - props_locked->file_transfer_mutex_); - uint64_t bytes_sent_since_last = - actual_sent_bytes - last_actual_sent_bytes; - // Calculate rate in bits per second - uint32_t rate_bps = static_cast( - (bytes_sent_since_last * 8 * 1000) / elapsed_rate); - props_locked->file_send_rate_bps_ = rate_bps; - last_actual_sent_bytes = actual_sent_bytes; - last_rate_update = now; - } - } - last_progress_update = now; - } - } - return send_ret; - }); - - // Reset file transfer progress and show completion - auto props_locked_final = props_weak.lock(); - if (props_locked_final) { - // Reset atomic variables first - props_locked_final->file_sending_ = false; - props_locked_final->file_sent_bytes_ = 0; - props_locked_final->file_total_bytes_ = 0; - props_locked_final->file_send_rate_bps_ = 0; - - // Show completion window - if (ret == 0) { - props_locked_final->file_transfer_completed_ = true; - props_locked_final->file_transfer_window_visible_ = true; - } else { - props_locked_final->file_transfer_completed_ = false; - props_locked_final->file_transfer_window_visible_ = false; - } - - { - std::lock_guard lock( - props_locked_final->file_transfer_mutex_); - // Keep file name for completion message - if (ret != 0) { - props_locked_final->file_sending_name_ = ""; - } - } - LOG_INFO("File transfer progress reset, completed={}", ret == 0); + { + std::lock_guard lock( + render_ptr->file_id_to_props_mutex_); + render_ptr->file_id_to_props_[file_id] = props_weak; } - if (ret != 0) { - LOG_ERROR("FileSender::SendFile failed for [{}], ret={}", - file_path.string().c_str(), ret); - } else { - LOG_INFO("File send finished: {}", file_path.string().c_str()); + props_locked->current_file_id_ = file_id; + + // Progress will be updated via ACK from receiver + // Don't update file_sent_bytes_ here, let ACK control the progress + int ret = sender.SendFile( + file_path, file_path.filename().string(), + [peer, file_label](const char* buf, size_t sz) -> int { + return SendReliableDataFrame(peer, buf, sz, file_label.c_str()); + }, + 64 * 1024, // chunk_size + file_id); // file_id + + // Mark sending thread as finished, but don't set completion flag yet + // Completion will be set when we receive the final ACK from receiver + auto props_locked_final = props_weak.lock(); + if (props_locked_final) { + props_locked_final->file_sending_ = false; + + if (ret != 0) { + // On error, clean up immediately + props_locked_final->file_transfer_completed_ = false; + props_locked_final->file_transfer_window_visible_ = false; + props_locked_final->file_sent_bytes_ = 0; + props_locked_final->file_total_bytes_ = 0; + props_locked_final->file_send_rate_bps_ = 0; + props_locked_final->current_file_id_ = 0; + + // Unregister file_id mapping on error + { + std::lock_guard lock( + render_ptr->file_id_to_props_mutex_); + render_ptr->file_id_to_props_.erase(file_id); + } + + { + std::lock_guard lock( + props_locked_final->file_transfer_mutex_); + props_locked_final->file_sending_name_ = ""; + } + + LOG_ERROR("FileSender::SendFile failed for [{}], ret={}", + file_path.string().c_str(), ret); + } else { + // On success, keep file_id mapping and wait for final ACK + // Don't set completion flag here - wait for ACK with completed + // flag + LOG_INFO("File send finished (waiting for ACK): {}", + file_path.string().c_str()); + } } }).detach(); } @@ -450,16 +420,6 @@ int Render::ControlBar(std::shared_ptr& props) { if (props->net_traffic_stats_button_pressed_ && props->control_bar_expand_) { NetTrafficStats(props); - } else { - // Debug: log why NetTrafficStats is not being called - static bool logged_once = false; - if (!logged_once && props->file_sending_.load()) { - LOG_INFO( - "NetTrafficStats not called: button_pressed={}, " - "control_bar_expand={}", - props->net_traffic_stats_button_pressed_, props->control_bar_expand_); - logged_once = true; - } } ImGui::PopStyleVar(); diff --git a/src/gui/windows/file_transfer_window.cpp b/src/gui/windows/file_transfer_window.cpp index 0264858..013e1ee 100644 --- a/src/gui/windows/file_transfer_window.cpp +++ b/src/gui/windows/file_transfer_window.cpp @@ -38,8 +38,8 @@ int Render::FileTransferWindow( ImGuiIO& io = ImGui::GetIO(); // Position window at bottom-left of stream window - float window_width = 400.0f; - float window_height = 120.0f; + float window_width = main_window_width_ * 0.35f; + float window_height = main_window_height_ * 0.25f; float pos_x = 10.0f; float pos_y = 10.0f; @@ -56,20 +56,22 @@ int Render::FileTransferWindow( ImGui::SetNextWindowSize(ImVec2(window_width, window_height), ImGuiCond_Always); - ImGui::PushStyleVar(ImGuiStyleVar_WindowRounding, 5.0f); + ImGui::PushStyleVar(ImGuiStyleVar_WindowRounding, 3.0f); ImGui::PushStyleVar(ImGuiStyleVar_WindowBorderSize, 1.0f); - ImGui::PushStyleColor(ImGuiCol_WindowBg, ImVec4(0.2f, 0.2f, 0.2f, 0.95f)); - ImGui::PushStyleColor(ImGuiCol_Border, ImVec4(0.5f, 0.5f, 0.5f, 1.0f)); - ImGui::PushStyleColor(ImGuiCol_TitleBg, ImVec4(0.3f, 0.3f, 0.3f, 1.0f)); + ImGui::PushStyleColor(ImGuiCol_WindowBg, ImVec4(1.0f, 1.0f, 1.0f, 0.8f)); + ImGui::PushStyleColor(ImGuiCol_Border, ImVec4(1.0f, 1.0f, 1.0f, 1.0f)); + ImGui::PushStyleColor(ImGuiCol_TitleBg, ImVec4(1.0f, 1.0f, 1.0f, 1.0f)); ImGui::PushStyleColor(ImGuiCol_TitleBgActive, ImVec4(0.4f, 0.4f, 0.4f, 1.0f)); std::string window_title = "File Transfer"; bool window_opened = true; + // ImGui::SetWindowFontScale(0.5f); if (ImGui::Begin("FileTransferWindow", &window_opened, ImGuiWindowFlags_NoCollapse | ImGuiWindowFlags_NoResize | ImGuiWindowFlags_NoMove | ImGuiWindowFlags_NoSavedSettings)) { + ImGui::SetWindowFontScale(0.5f); ImGui::PopStyleColor(4); ImGui::PopStyleVar(2); @@ -81,14 +83,14 @@ int Render::FileTransferWindow( return 0; } - ImGui::SetWindowFontScale(0.6f); - bool is_sending = props->file_sending_.load(); + uint64_t total = props->file_total_bytes_.load(); + bool has_transfer = total > 0; // Check if there's an active transfer if (props->file_transfer_completed_ && !is_sending) { // Show completion message ImGui::SetCursorPos(ImVec2(10, 30)); - ImGui::TextColored(ImVec4(0.0f, 1.0f, 0.0f, 1.0f), + ImGui::TextColored(ImVec4(0.0f, 0.0f, 1.0f, 1.0f), "%s File transfer completed!", ICON_FA_CHECK); std::string file_name; @@ -106,13 +108,14 @@ int Render::FileTransferWindow( props->file_transfer_completed_ = false; props->file_transfer_window_visible_ = false; } - } else if (is_sending) { - // Show transfer progress + } else if (has_transfer && !props->file_transfer_completed_) { + // Show transfer progress (either sending or waiting for ACK) uint64_t sent = props->file_sent_bytes_.load(); - uint64_t total = props->file_total_bytes_.load(); - float progress = - total > 0 ? static_cast(sent) / static_cast(total) - : 0.0f; + // Re-read total in case it was updated + uint64_t current_total = props->file_total_bytes_.load(); + float progress = current_total > 0 ? static_cast(sent) / + static_cast(current_total) + : 0.0f; progress = (std::max)(0.0f, (std::min)(1.0f, progress)); // File name @@ -144,24 +147,25 @@ int Render::FileTransferWindow( ImGui::SameLine(0, 20); // Format size display char size_str[64]; - if (total < 1024) { + if (current_total < 1024) { snprintf(size_str, sizeof(size_str), "%llu B", - (unsigned long long)total); - } else if (total < 1024 * 1024) { - snprintf(size_str, sizeof(size_str), "%.2f KB", total / 1024.0f); + (unsigned long long)current_total); + } else if (current_total < 1024 * 1024) { + snprintf(size_str, sizeof(size_str), "%.2f KB", + current_total / 1024.0f); } else { snprintf(size_str, sizeof(size_str), "%.2f MB", - total / (1024.0f * 1024.0f)); + current_total / (1024.0f * 1024.0f)); } ImGui::Text("Size: %s", size_str); } - ImGui::SetWindowFontScale(1.0f); ImGui::End(); } else { ImGui::PopStyleColor(4); ImGui::PopStyleVar(2); } + ImGui::SetWindowFontScale(1.0f); return 0; } diff --git a/src/tools/file_transfer.cpp b/src/tools/file_transfer.cpp index 15787ca..fc8f8f1 100644 --- a/src/tools/file_transfer.cpp +++ b/src/tools/file_transfer.cpp @@ -9,14 +9,13 @@ namespace crossdesk { namespace { std::atomic g_next_file_id{1}; -constexpr uint32_t kFileChunkMagic = 0x4A4E544D; // 'JNTM' } // namespace uint32_t FileSender::NextFileId() { return g_next_file_id.fetch_add(1); } int FileSender::SendFile(const std::filesystem::path& path, const std::string& label, const SendFunc& send, - std::size_t chunk_size) { + std::size_t chunk_size, uint32_t file_id) { if (!send) { LOG_ERROR("FileSender::SendFile: send function is empty"); return -1; @@ -43,10 +42,13 @@ int FileSender::SendFile(const std::filesystem::path& path, path.string().c_str()); return -1; } - LOG_INFO("FileSender send file {}, total size {}", path.string().c_str(), - total_size); + LOG_INFO("FileSender send file {}, total size {}, file_id={}", + path.string().c_str(), total_size, file_id); - const uint32_t file_id = NextFileId(); + if (file_id == 0) { + file_id = NextFileId(); + } + const uint32_t final_file_id = file_id; uint64_t offset = 0; bool is_first = true; std::string file_name = label.empty() ? path.filename().string() : label; @@ -69,7 +71,7 @@ int FileSender::SendFile(const std::filesystem::path& path, const std::string* name_ptr = is_first ? &file_name : nullptr; std::vector chunk = BuildChunk( - file_id, offset, total_size, buffer.data(), + final_file_id, offset, total_size, buffer.data(), static_cast(bytes_read), name_ptr, is_first, is_last); int ret = send(chunk.data(), chunk.size()); @@ -264,6 +266,27 @@ bool FileReceiver::HandleChunk(const FileChunkHeader& header, ctx.received += static_cast(payload_size); } + // Send ACK after processing chunk + if (on_send_ack_) { + FileTransferAck ack{}; + ack.magic = kFileAckMagic; + ack.file_id = header.file_id; + ack.acked_offset = header.offset + static_cast(payload_size); + ack.total_size = header.total_size; + ack.flags = 0; + + bool is_last = (header.flags & 0x02) != 0; + if (is_last || ctx.received >= ctx.total_size) { + ack.flags |= 0x01; // completed + } + + int ret = on_send_ack_(ack); + if (ret != 0) { + LOG_ERROR("FileReceiver: failed to send ACK for file_id={}, ret={}", + header.file_id, ret); + } + } + bool is_last = (header.flags & 0x02) != 0; if (is_last || ctx.received >= ctx.total_size) { ctx.ofs.close(); diff --git a/src/tools/file_transfer.h b/src/tools/file_transfer.h index 5a001ce..eef05a5 100644 --- a/src/tools/file_transfer.h +++ b/src/tools/file_transfer.h @@ -17,6 +17,10 @@ namespace crossdesk { +// Magic constants for file transfer protocol +constexpr uint32_t kFileChunkMagic = 0x4A4E544D; // 'JNTM' +constexpr uint32_t kFileAckMagic = 0x4A4E5443; // 'JNTC' + #pragma pack(push, 1) struct FileChunkHeader { uint32_t magic; // magic to identify file-transfer chunks @@ -27,6 +31,14 @@ struct FileChunkHeader { uint16_t name_len; // filename length (bytes), only set on first chunk uint8_t flags; // bit0: is_first, bit1: is_last, others reserved }; + +struct FileTransferAck { + uint32_t magic; // magic to identify file-transfer ack + uint32_t file_id; // must match FileChunkHeader.file_id + uint64_t acked_offset; // received offset + uint64_t total_size; // total file size + uint32_t flags; // bit0: completed, bit1: error +}; #pragma pack(pop) class FileSender { @@ -43,9 +55,11 @@ class FileSender { // `path` : full path to the local file. // `label` : logical filename to send (usually path.filename()). // `send` : callback that pushes one encoded chunk into the data channel. + // `file_id` : file id to use (0 means auto-generate). // Return 0 on success, <0 on error. int SendFile(const std::filesystem::path& path, const std::string& label, - const SendFunc& send, std::size_t chunk_size = 64 * 1024); + const SendFunc& send, std::size_t chunk_size = 64 * 1024, + uint32_t file_id = 0); // build a single encoded chunk buffer according to FileChunkHeader protocol. static std::vector BuildChunk(uint32_t file_id, uint64_t offset, @@ -66,6 +80,7 @@ class FileReceiver { using OnFileComplete = std::function; + using OnSendAck = std::function; public: // save to default desktop directory. @@ -80,6 +95,8 @@ class FileReceiver { void SetOnFileComplete(OnFileComplete cb) { on_file_complete_ = cb; } + void SetOnSendAck(OnSendAck cb) { on_send_ack_ = cb; } + const std::filesystem::path& OutputDir() const { return output_dir_; } private: @@ -92,6 +109,7 @@ class FileReceiver { std::filesystem::path output_dir_; std::unordered_map contexts_; OnFileComplete on_file_complete_ = nullptr; + OnSendAck on_send_ack_ = nullptr; }; } // namespace crossdesk diff --git a/submodules/minirtc b/submodules/minirtc index 235b86b..e049099 160000 --- a/submodules/minirtc +++ b/submodules/minirtc @@ -1 +1 @@ -Subproject commit 235b86b5bf84e8359d513dbf91512a02e7bd7a86 +Subproject commit e049099e43f38a52f53ce63a6fe39e9f71865366