[feat] use receiver ack to calculate file transfer progress

This commit is contained in:
dijunkun
2025-12-25 18:13:18 +08:00
parent b322181853
commit eea37424c9
9 changed files with 227 additions and 127 deletions

View File

@@ -208,6 +208,7 @@ int Render::ConnectTo(const std::string& remote_id, const char* password,
AddAudioStream(props->peer_, props->audio_label_.c_str());
AddDataStream(props->peer_, props->data_label_.c_str(), false);
AddDataStream(props->peer_, props->file_label_.c_str(), true);
AddDataStream(props->peer_, props->file_feedback_label_.c_str(), true);
props->connection_status_ = ConnectionStatus::Connecting;

View File

@@ -715,6 +715,7 @@ int Render::CreateConnectionPeer() {
AddAudioStream(peer_, audio_label_.c_str());
AddDataStream(peer_, data_label_.c_str(), false);
AddDataStream(peer_, file_label_.c_str(), true);
AddDataStream(peer_, file_feedback_label_.c_str(), true);
return 0;
} else {
return -1;

View File

@@ -46,6 +46,7 @@ class Render {
std::string audio_label_ = "control_audio";
std::string data_label_ = "control_data";
std::string file_label_ = "file";
std::string file_feedback_label_ = "file_feedback";
std::string local_id_ = "";
std::string remote_id_ = "";
bool exit_ = false;
@@ -136,6 +137,8 @@ class Render {
uint64_t file_send_last_bytes_ = 0;
bool file_transfer_window_visible_ = false;
bool file_transfer_completed_ = false;
std::atomic<uint32_t> current_file_id_{
0}; // Track current file transfer ID
};
public:
@@ -482,8 +485,15 @@ class Render {
std::string video_secondary_label_ = "secondary_display";
std::string audio_label_ = "audio";
std::string data_label_ = "data";
std::string info_label_ = "info";
std::string control_data_label_ = "control_data";
std::string file_label_ = "file";
std::string file_feedback_label_ = "file_feedback";
Params params_;
// Map file_id to props for tracking file transfer progress via ACK
std::unordered_map<uint32_t, std::weak_ptr<SubStreamWindowProperties>>
file_id_to_props_;
std::shared_mutex file_id_to_props_mutex_;
SDL_AudioDeviceID input_dev_;
SDL_AudioDeviceID output_dev_;
ScreenCapturerFactory* screen_capturer_factory_ = nullptr;

View File

@@ -1,12 +1,15 @@
#include <chrono>
#include <cmath>
#include <cstdlib>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <unordered_map>
#include "device_controller.h"
#include "file_transfer.h"
#include "localization.h"
#include "minirtc.h"
#include "platform.h"
#include "rd_log.h"
#include "render.h"
@@ -312,10 +315,90 @@ void Render::OnReceiveDataBufferCb(const char* data, size_t size,
}
std::string source_id = std::string(src_id, src_id_size);
if (source_id == "file") {
// try to parse as file-transfer chunk first
if (source_id == render->file_label_) {
std::string remote_user_id = std::string(user_id, user_id_size);
static FileReceiver receiver;
receiver.SetOnSendAck([render](const FileTransferAck& ack) -> int {
return SendReliableDataFrame(
render->peer_, reinterpret_cast<const char*>(&ack),
sizeof(FileTransferAck), render->file_feedback_label_.c_str());
});
receiver.OnData(data, size);
return;
} else if (source_id == render->file_feedback_label_) {
if (size < sizeof(FileTransferAck)) {
LOG_ERROR("FileTransferAck: buffer too small, size={}", size);
return;
}
FileTransferAck ack{};
memcpy(&ack, data, sizeof(FileTransferAck));
if (ack.magic != kFileAckMagic) {
LOG_ERROR(
"FileTransferAck: invalid magic, got 0x{:08X}, expected 0x{:08X}",
ack.magic, kFileAckMagic);
return;
}
std::shared_ptr<SubStreamWindowProperties> props = nullptr;
{
std::shared_lock lock(render->file_id_to_props_mutex_);
auto it = render->file_id_to_props_.find(ack.file_id);
if (it != render->file_id_to_props_.end()) {
props = it->second.lock();
}
}
if (!props) {
LOG_WARN("FileTransferAck: no props found for file_id={}", ack.file_id);
return;
}
// Update progress based on ACK
props->file_sent_bytes_ = ack.acked_offset;
props->file_total_bytes_ = ack.total_size;
// Check if transfer is completed
if ((ack.flags & 0x01) != 0) {
// Transfer completed - receiver has finished receiving the file
props->file_transfer_completed_ = true;
props->file_transfer_window_visible_ = true;
props->file_sending_ = false; // Mark sending as finished
LOG_INFO(
"File transfer completed via ACK, file_id={}, total_size={}, "
"acked_offset={}",
ack.file_id, ack.total_size, ack.acked_offset);
// Unregister file_id mapping after completion
{
std::lock_guard<std::shared_mutex> lock(
render->file_id_to_props_mutex_);
render->file_id_to_props_.erase(ack.file_id);
}
}
// Update rate calculation
auto now = std::chrono::steady_clock::now();
{
std::lock_guard<std::mutex> lock(props->file_transfer_mutex_);
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
now - props->file_send_last_update_time_)
.count();
if (elapsed >= 100) {
uint64_t bytes_sent_since_last =
ack.acked_offset - props->file_send_last_bytes_;
uint32_t rate_bps =
static_cast<uint32_t>((bytes_sent_since_last * 8 * 1000) / elapsed);
props->file_send_rate_bps_ = rate_bps;
props->file_send_last_bytes_ = ack.acked_offset;
props->file_send_last_update_time_ = now;
}
}
return;
}

View File

@@ -213,8 +213,9 @@ int Render::ControlBar(std::shared_ptr<SubStreamWindowProperties>& props) {
std::filesystem::path file_path = std::filesystem::path(path);
std::string file_label = file_label_;
auto props_weak = std::weak_ptr<SubStreamWindowProperties>(props);
Render* render_ptr = this;
std::thread([peer, file_path, file_label, props_weak]() {
std::thread([peer, file_path, file_label, props_weak, render_ptr]() {
auto props_locked = props_weak.lock();
if (!props_locked) {
return;
@@ -254,94 +255,63 @@ int Render::ControlBar(std::shared_ptr<SubStreamWindowProperties>& props) {
props_locked->file_total_bytes_.load());
FileSender sender;
auto last_progress_update = std::chrono::steady_clock::now();
auto last_rate_update = std::chrono::steady_clock::now();
uint64_t last_actual_sent_bytes = 0;
uint32_t file_id = FileSender::NextFileId();
{
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_props_mutex_);
render_ptr->file_id_to_props_[file_id] = props_weak;
}
props_locked->current_file_id_ = file_id;
// Progress will be updated via ACK from receiver
// Don't update file_sent_bytes_ here, let ACK control the progress
int ret = sender.SendFile(
file_path, file_path.filename().string(),
[peer, file_label, props_weak, &last_progress_update,
&last_rate_update, &last_actual_sent_bytes,
total_size](const char* buf, size_t sz) -> int {
int send_ret =
SendReliableDataFrame(peer, buf, sz, file_label.c_str());
if (send_ret == 0) {
// Update progress periodically (every 50ms) by querying
// actual sent bytes
auto now = std::chrono::steady_clock::now();
auto elapsed_progress =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_progress_update)
.count();
[peer, file_label](const char* buf, size_t sz) -> int {
return SendReliableDataFrame(peer, buf, sz, file_label.c_str());
},
64 * 1024, // chunk_size
file_id); // file_id
if (elapsed_progress >= 50) {
// Query actual sent bytes from the transport layer
uint64_t actual_sent_bytes =
GetDataChannelSentBytes(peer, file_label.c_str());
auto props_locked = props_weak.lock();
if (props_locked) {
props_locked->file_sent_bytes_ = actual_sent_bytes;
// Update rate every 100ms
auto elapsed_rate =
std::chrono::duration_cast<std::chrono::milliseconds>(
now - last_rate_update)
.count();
if (elapsed_rate >= 100) {
std::lock_guard<std::mutex> lock(
props_locked->file_transfer_mutex_);
uint64_t bytes_sent_since_last =
actual_sent_bytes - last_actual_sent_bytes;
// Calculate rate in bits per second
uint32_t rate_bps = static_cast<uint32_t>(
(bytes_sent_since_last * 8 * 1000) / elapsed_rate);
props_locked->file_send_rate_bps_ = rate_bps;
last_actual_sent_bytes = actual_sent_bytes;
last_rate_update = now;
}
}
last_progress_update = now;
}
}
return send_ret;
});
// Reset file transfer progress and show completion
// Mark sending thread as finished, but don't set completion flag yet
// Completion will be set when we receive the final ACK from receiver
auto props_locked_final = props_weak.lock();
if (props_locked_final) {
// Reset atomic variables first
props_locked_final->file_sending_ = false;
if (ret != 0) {
// On error, clean up immediately
props_locked_final->file_transfer_completed_ = false;
props_locked_final->file_transfer_window_visible_ = false;
props_locked_final->file_sent_bytes_ = 0;
props_locked_final->file_total_bytes_ = 0;
props_locked_final->file_send_rate_bps_ = 0;
props_locked_final->current_file_id_ = 0;
// Show completion window
if (ret == 0) {
props_locked_final->file_transfer_completed_ = true;
props_locked_final->file_transfer_window_visible_ = true;
} else {
props_locked_final->file_transfer_completed_ = false;
props_locked_final->file_transfer_window_visible_ = false;
// Unregister file_id mapping on error
{
std::lock_guard<std::shared_mutex> lock(
render_ptr->file_id_to_props_mutex_);
render_ptr->file_id_to_props_.erase(file_id);
}
{
std::lock_guard<std::mutex> lock(
props_locked_final->file_transfer_mutex_);
// Keep file name for completion message
if (ret != 0) {
props_locked_final->file_sending_name_ = "";
}
}
LOG_INFO("File transfer progress reset, completed={}", ret == 0);
}
if (ret != 0) {
LOG_ERROR("FileSender::SendFile failed for [{}], ret={}",
file_path.string().c_str(), ret);
} else {
LOG_INFO("File send finished: {}", file_path.string().c_str());
// On success, keep file_id mapping and wait for final ACK
// Don't set completion flag here - wait for ACK with completed
// flag
LOG_INFO("File send finished (waiting for ACK): {}",
file_path.string().c_str());
}
}
}).detach();
}
@@ -450,16 +420,6 @@ int Render::ControlBar(std::shared_ptr<SubStreamWindowProperties>& props) {
if (props->net_traffic_stats_button_pressed_ && props->control_bar_expand_) {
NetTrafficStats(props);
} else {
// Debug: log why NetTrafficStats is not being called
static bool logged_once = false;
if (!logged_once && props->file_sending_.load()) {
LOG_INFO(
"NetTrafficStats not called: button_pressed={}, "
"control_bar_expand={}",
props->net_traffic_stats_button_pressed_, props->control_bar_expand_);
logged_once = true;
}
}
ImGui::PopStyleVar();

View File

@@ -38,8 +38,8 @@ int Render::FileTransferWindow(
ImGuiIO& io = ImGui::GetIO();
// Position window at bottom-left of stream window
float window_width = 400.0f;
float window_height = 120.0f;
float window_width = main_window_width_ * 0.35f;
float window_height = main_window_height_ * 0.25f;
float pos_x = 10.0f;
float pos_y = 10.0f;
@@ -56,20 +56,22 @@ int Render::FileTransferWindow(
ImGui::SetNextWindowSize(ImVec2(window_width, window_height),
ImGuiCond_Always);
ImGui::PushStyleVar(ImGuiStyleVar_WindowRounding, 5.0f);
ImGui::PushStyleVar(ImGuiStyleVar_WindowRounding, 3.0f);
ImGui::PushStyleVar(ImGuiStyleVar_WindowBorderSize, 1.0f);
ImGui::PushStyleColor(ImGuiCol_WindowBg, ImVec4(0.2f, 0.2f, 0.2f, 0.95f));
ImGui::PushStyleColor(ImGuiCol_Border, ImVec4(0.5f, 0.5f, 0.5f, 1.0f));
ImGui::PushStyleColor(ImGuiCol_TitleBg, ImVec4(0.3f, 0.3f, 0.3f, 1.0f));
ImGui::PushStyleColor(ImGuiCol_WindowBg, ImVec4(1.0f, 1.0f, 1.0f, 0.8f));
ImGui::PushStyleColor(ImGuiCol_Border, ImVec4(1.0f, 1.0f, 1.0f, 1.0f));
ImGui::PushStyleColor(ImGuiCol_TitleBg, ImVec4(1.0f, 1.0f, 1.0f, 1.0f));
ImGui::PushStyleColor(ImGuiCol_TitleBgActive, ImVec4(0.4f, 0.4f, 0.4f, 1.0f));
std::string window_title = "File Transfer";
bool window_opened = true;
// ImGui::SetWindowFontScale(0.5f);
if (ImGui::Begin("FileTransferWindow", &window_opened,
ImGuiWindowFlags_NoCollapse | ImGuiWindowFlags_NoResize |
ImGuiWindowFlags_NoMove |
ImGuiWindowFlags_NoSavedSettings)) {
ImGui::SetWindowFontScale(0.5f);
ImGui::PopStyleColor(4);
ImGui::PopStyleVar(2);
@@ -81,14 +83,14 @@ int Render::FileTransferWindow(
return 0;
}
ImGui::SetWindowFontScale(0.6f);
bool is_sending = props->file_sending_.load();
uint64_t total = props->file_total_bytes_.load();
bool has_transfer = total > 0; // Check if there's an active transfer
if (props->file_transfer_completed_ && !is_sending) {
// Show completion message
ImGui::SetCursorPos(ImVec2(10, 30));
ImGui::TextColored(ImVec4(0.0f, 1.0f, 0.0f, 1.0f),
ImGui::TextColored(ImVec4(0.0f, 0.0f, 1.0f, 1.0f),
"%s File transfer completed!", ICON_FA_CHECK);
std::string file_name;
@@ -106,12 +108,13 @@ int Render::FileTransferWindow(
props->file_transfer_completed_ = false;
props->file_transfer_window_visible_ = false;
}
} else if (is_sending) {
// Show transfer progress
} else if (has_transfer && !props->file_transfer_completed_) {
// Show transfer progress (either sending or waiting for ACK)
uint64_t sent = props->file_sent_bytes_.load();
uint64_t total = props->file_total_bytes_.load();
float progress =
total > 0 ? static_cast<float>(sent) / static_cast<float>(total)
// Re-read total in case it was updated
uint64_t current_total = props->file_total_bytes_.load();
float progress = current_total > 0 ? static_cast<float>(sent) /
static_cast<float>(current_total)
: 0.0f;
progress = (std::max)(0.0f, (std::min)(1.0f, progress));
@@ -144,24 +147,25 @@ int Render::FileTransferWindow(
ImGui::SameLine(0, 20);
// Format size display
char size_str[64];
if (total < 1024) {
if (current_total < 1024) {
snprintf(size_str, sizeof(size_str), "%llu B",
(unsigned long long)total);
} else if (total < 1024 * 1024) {
snprintf(size_str, sizeof(size_str), "%.2f KB", total / 1024.0f);
(unsigned long long)current_total);
} else if (current_total < 1024 * 1024) {
snprintf(size_str, sizeof(size_str), "%.2f KB",
current_total / 1024.0f);
} else {
snprintf(size_str, sizeof(size_str), "%.2f MB",
total / (1024.0f * 1024.0f));
current_total / (1024.0f * 1024.0f));
}
ImGui::Text("Size: %s", size_str);
}
ImGui::SetWindowFontScale(1.0f);
ImGui::End();
} else {
ImGui::PopStyleColor(4);
ImGui::PopStyleVar(2);
}
ImGui::SetWindowFontScale(1.0f);
return 0;
}

View File

@@ -9,14 +9,13 @@ namespace crossdesk {
namespace {
std::atomic<uint32_t> g_next_file_id{1};
constexpr uint32_t kFileChunkMagic = 0x4A4E544D; // 'JNTM'
} // namespace
uint32_t FileSender::NextFileId() { return g_next_file_id.fetch_add(1); }
int FileSender::SendFile(const std::filesystem::path& path,
const std::string& label, const SendFunc& send,
std::size_t chunk_size) {
std::size_t chunk_size, uint32_t file_id) {
if (!send) {
LOG_ERROR("FileSender::SendFile: send function is empty");
return -1;
@@ -43,10 +42,13 @@ int FileSender::SendFile(const std::filesystem::path& path,
path.string().c_str());
return -1;
}
LOG_INFO("FileSender send file {}, total size {}", path.string().c_str(),
total_size);
LOG_INFO("FileSender send file {}, total size {}, file_id={}",
path.string().c_str(), total_size, file_id);
const uint32_t file_id = NextFileId();
if (file_id == 0) {
file_id = NextFileId();
}
const uint32_t final_file_id = file_id;
uint64_t offset = 0;
bool is_first = true;
std::string file_name = label.empty() ? path.filename().string() : label;
@@ -69,7 +71,7 @@ int FileSender::SendFile(const std::filesystem::path& path,
const std::string* name_ptr = is_first ? &file_name : nullptr;
std::vector<char> chunk = BuildChunk(
file_id, offset, total_size, buffer.data(),
final_file_id, offset, total_size, buffer.data(),
static_cast<uint32_t>(bytes_read), name_ptr, is_first, is_last);
int ret = send(chunk.data(), chunk.size());
@@ -264,6 +266,27 @@ bool FileReceiver::HandleChunk(const FileChunkHeader& header,
ctx.received += static_cast<uint64_t>(payload_size);
}
// Send ACK after processing chunk
if (on_send_ack_) {
FileTransferAck ack{};
ack.magic = kFileAckMagic;
ack.file_id = header.file_id;
ack.acked_offset = header.offset + static_cast<uint64_t>(payload_size);
ack.total_size = header.total_size;
ack.flags = 0;
bool is_last = (header.flags & 0x02) != 0;
if (is_last || ctx.received >= ctx.total_size) {
ack.flags |= 0x01; // completed
}
int ret = on_send_ack_(ack);
if (ret != 0) {
LOG_ERROR("FileReceiver: failed to send ACK for file_id={}, ret={}",
header.file_id, ret);
}
}
bool is_last = (header.flags & 0x02) != 0;
if (is_last || ctx.received >= ctx.total_size) {
ctx.ofs.close();

View File

@@ -17,6 +17,10 @@
namespace crossdesk {
// Magic constants for file transfer protocol
constexpr uint32_t kFileChunkMagic = 0x4A4E544D; // 'JNTM'
constexpr uint32_t kFileAckMagic = 0x4A4E5443; // 'JNTC'
#pragma pack(push, 1)
struct FileChunkHeader {
uint32_t magic; // magic to identify file-transfer chunks
@@ -27,6 +31,14 @@ struct FileChunkHeader {
uint16_t name_len; // filename length (bytes), only set on first chunk
uint8_t flags; // bit0: is_first, bit1: is_last, others reserved
};
struct FileTransferAck {
uint32_t magic; // magic to identify file-transfer ack
uint32_t file_id; // must match FileChunkHeader.file_id
uint64_t acked_offset; // received offset
uint64_t total_size; // total file size
uint32_t flags; // bit0: completed, bit1: error
};
#pragma pack(pop)
class FileSender {
@@ -43,9 +55,11 @@ class FileSender {
// `path` : full path to the local file.
// `label` : logical filename to send (usually path.filename()).
// `send` : callback that pushes one encoded chunk into the data channel.
// `file_id` : file id to use (0 means auto-generate).
// Return 0 on success, <0 on error.
int SendFile(const std::filesystem::path& path, const std::string& label,
const SendFunc& send, std::size_t chunk_size = 64 * 1024);
const SendFunc& send, std::size_t chunk_size = 64 * 1024,
uint32_t file_id = 0);
// build a single encoded chunk buffer according to FileChunkHeader protocol.
static std::vector<char> BuildChunk(uint32_t file_id, uint64_t offset,
@@ -66,6 +80,7 @@ class FileReceiver {
using OnFileComplete =
std::function<void(const std::filesystem::path& saved_path)>;
using OnSendAck = std::function<int(const FileTransferAck& ack)>;
public:
// save to default desktop directory.
@@ -80,6 +95,8 @@ class FileReceiver {
void SetOnFileComplete(OnFileComplete cb) { on_file_complete_ = cb; }
void SetOnSendAck(OnSendAck cb) { on_send_ack_ = cb; }
const std::filesystem::path& OutputDir() const { return output_dir_; }
private:
@@ -92,6 +109,7 @@ class FileReceiver {
std::filesystem::path output_dir_;
std::unordered_map<uint32_t, FileContext> contexts_;
OnFileComplete on_file_complete_ = nullptr;
OnSendAck on_send_ack_ = nullptr;
};
} // namespace crossdesk