diff --git a/application/remote_desk/remote_desk_client/remote_desk_client.cpp b/application/remote_desk/remote_desk_client/remote_desk_client.cpp new file mode 100644 index 0000000..ebbf39d --- /dev/null +++ b/application/remote_desk/remote_desk_client/remote_desk_client.cpp @@ -0,0 +1,105 @@ + + +#include + +#include +#include +#include + +#include "x.h" + +#define SDL_MAIN_HANDLED +#include "SDL2/SDL.h" + +int screen_w = 1280, screen_h = 720; +const int pixel_w = 1280, pixel_h = 720; + +unsigned char dst_buffer[pixel_w * pixel_h * 3 / 2]; +SDL_Texture *sdlTexture = nullptr; +SDL_Renderer *sdlRenderer = nullptr; +SDL_Rect sdlRect; + +// Refresh Event +#define REFRESH_EVENT (SDL_USEREVENT + 1) +#define QUIT_EVENT (SDL_USEREVENT + 2) + +int thread_exit = 0; + +int refresh_video(void *opaque) { + SDL_Event event; + while (thread_exit == 0) { + event.type = REFRESH_EVENT; + SDL_PushEvent(&event); + SDL_Delay(10); + } + + event.type = QUIT_EVENT; + SDL_PushEvent(&event); + + return 0; +} + +void GuestReceiveBuffer(const char *data, size_t size, const char *user_id, + size_t user_id_size) { + std::cout << "Receive size: " << size << std::endl; + memcpy(dst_buffer, data, size); +} + +int main() { + Params params; + params.cfg_path = "config.ini"; + params.on_receive_buffer = GuestReceiveBuffer; + + std::string transmission_id = "000000"; + std::string user_id = "Client"; + + PeerPtr *peer = CreatePeer(¶ms); + JoinConnection(peer, transmission_id.c_str(), user_id.c_str()); + + if (SDL_Init(SDL_INIT_VIDEO)) { + printf("Could not initialize SDL - %s\n", SDL_GetError()); + return -1; + } + + SDL_Window *screen; + screen = SDL_CreateWindow("RTS Receiver", SDL_WINDOWPOS_UNDEFINED, + SDL_WINDOWPOS_UNDEFINED, screen_w, screen_h, + SDL_WINDOW_RESIZABLE); + if (!screen) { + printf("SDL: could not create window - exiting:%s\n", SDL_GetError()); + return -1; + } + sdlRenderer = SDL_CreateRenderer(screen, -1, SDL_RENDERER_ACCELERATED); + + Uint32 pixformat = 0; + pixformat = SDL_PIXELFORMAT_NV12; + + sdlTexture = SDL_CreateTexture(sdlRenderer, pixformat, + SDL_TEXTUREACCESS_STREAMING, pixel_w, pixel_h); + + SDL_Thread *refresh_thread = SDL_CreateThread(refresh_video, NULL, NULL); + SDL_Event event; + while (1) { + // Wait + SDL_WaitEvent(&event); + if (event.type == REFRESH_EVENT) { + sdlRect.x = 0; + sdlRect.y = 0; + sdlRect.w = screen_w; + sdlRect.h = screen_h; + + SDL_UpdateTexture(sdlTexture, NULL, dst_buffer, pixel_w); + SDL_RenderClear(sdlRenderer); + SDL_RenderCopy(sdlRenderer, sdlTexture, NULL, &sdlRect); + SDL_RenderPresent(sdlRenderer); + } else if (event.type == SDL_WINDOWEVENT) { + // If Resize + SDL_GetWindowSize(screen, &screen_w, &screen_h); + printf("Resize windows: %dx%d\n", screen_w, screen_h); + } else if (event.type == SDL_QUIT) { + break; + } + } + + return 0; +} diff --git a/application/remote_desk/remote_desk_server/remote_desk_server.cpp b/application/remote_desk/remote_desk_server/remote_desk_server.cpp index 1f2f03c..3751d39 100644 --- a/application/remote_desk/remote_desk_server/remote_desk_server.cpp +++ b/application/remote_desk/remote_desk_server/remote_desk_server.cpp @@ -8,7 +8,7 @@ extern "C" { #include }; -#define NV12_BUFFER_SIZE 2560 * 1440 * 3 / 2 +#define NV12_BUFFER_SIZE 1280 * 720 * 3 / 2 RemoteDeskServer ::RemoteDeskServer() {} @@ -23,14 +23,14 @@ int BGRAToNV12FFmpeg(unsigned char *src_buffer, int width, int height, unsigned char *dst_buffer) { AVFrame *Input_pFrame = av_frame_alloc(); AVFrame *Output_pFrame = av_frame_alloc(); - struct SwsContext *img_convert_ctx = sws_getContext( - width, height, AV_PIX_FMT_BGRA, width, height, AV_PIX_FMT_NV12, - SWS_FAST_BILINEAR, nullptr, nullptr, nullptr); + struct SwsContext *img_convert_ctx = + sws_getContext(width, height, AV_PIX_FMT_BGRA, 1280, 720, AV_PIX_FMT_NV12, + SWS_FAST_BILINEAR, nullptr, nullptr, nullptr); av_image_fill_arrays(Input_pFrame->data, Input_pFrame->linesize, src_buffer, AV_PIX_FMT_BGRA, width, height, 1); av_image_fill_arrays(Output_pFrame->data, Output_pFrame->linesize, dst_buffer, - AV_PIX_FMT_NV12, width, height, 1); + AV_PIX_FMT_NV12, 1280, 720, 1); sws_scale(img_convert_ctx, (uint8_t const **)Input_pFrame->data, Input_pFrame->linesize, 0, height, Output_pFrame->data, @@ -85,7 +85,7 @@ int RemoteDeskServer::Init() { BGRAToNV12FFmpeg(data, width, height, (unsigned char *)nv12_buffer_); SendData(peer, DATA_TYPE::VIDEO, (const char *)nv12_buffer_, NV12_BUFFER_SIZE); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // std::this_thread::sleep_for(std::chrono::milliseconds(30)); }); screen_capture->Start(); diff --git a/application/remote_desk/xmake.lua b/application/remote_desk/xmake.lua index 713200b..a545da5 100644 --- a/application/remote_desk/xmake.lua +++ b/application/remote_desk/xmake.lua @@ -43,6 +43,17 @@ target("remote_desk_server") add_includedirs("../../src/interface") -- add_links("avformat", "swscale") +target("remote_desk_client") + set_kind("binary") + add_deps("projectx") + add_packages("log") + add_packages("ffmpeg") + add_packages("vcpkg::sdl2") + add_files("remote_desk_client/*.cpp") + add_includedirs("../../src/interface") + add_links("SDL2-static", "SDL2main", "Shell32", "gdi32", "winmm", + "setupapi", "version", "WindowsApp", "Imm32", "avutil") + -- target("remote_desk") -- set_kind("binary") -- add_deps("projectx") diff --git a/src/media/video/decode/nvcodec/nv_decoder.cpp b/src/media/video/decode/nvcodec/nv_decoder.cpp index d2a2ba9..e0a71c7 100644 --- a/src/media/video/decode/nvcodec/nv_decoder.cpp +++ b/src/media/video/decode/nvcodec/nv_decoder.cpp @@ -2,8 +2,23 @@ #include "log.h" -VideoDecoder::VideoDecoder() {} -VideoDecoder::~VideoDecoder() {} +#define SAVE_ENCODER_STREAM 0 + +VideoDecoder::VideoDecoder() { + if (SAVE_ENCODER_STREAM) { + file_ = fopen("decode_stream.h264", "w+b"); + if (!file_) { + LOG_WARN("Fail to open stream.h264"); + } + } +} +VideoDecoder::~VideoDecoder() { + if (SAVE_ENCODER_STREAM && file_) { + fflush(file_); + fclose(file_); + file_ = nullptr; + } +} int VideoDecoder::Init() { ck(cuInit(0)); @@ -34,7 +49,11 @@ int VideoDecoder::Decode(const uint8_t *pData, int nSize) { } if ((*(pData + 4) & 0x1f) == 0x07) { - // LOG_WARN("Receive key frame"); + LOG_WARN("Receive key frame"); + } + + if (SAVE_ENCODER_STREAM) { + fwrite((unsigned char *)pData, 1, nSize, file_); } int ret = decoder->Decode(pData, nSize); @@ -51,10 +70,12 @@ int VideoDecoder::GetFrame(uint8_t *yuv_data, uint32_t &width, uint32_t &height, uint8_t *data = nullptr; data = decoder->GetFrame(); if (data) { - yuv_data = data; + // yuv_data = data; + width = decoder->GetWidth(); height = decoder->GetHeight(); size = width * height * 3 / 2; + memcpy(yuv_data, data, size); return 0; return -1; diff --git a/src/media/video/decode/nvcodec/nv_decoder.h b/src/media/video/decode/nvcodec/nv_decoder.h index 20e7ecc..4bb2da3 100644 --- a/src/media/video/decode/nvcodec/nv_decoder.h +++ b/src/media/video/decode/nvcodec/nv_decoder.h @@ -16,6 +16,7 @@ class VideoDecoder { NvDecoder* decoder = nullptr; bool get_first_keyframe_ = false; bool skip_frame_ = false; + FILE* file_ = nullptr; }; #endif \ No newline at end of file diff --git a/src/media/video/encode/nvcodec/nv_encoder.cpp b/src/media/video/encode/nvcodec/nv_encoder.cpp index 592791b..6ff486e 100644 --- a/src/media/video/encode/nvcodec/nv_encoder.cpp +++ b/src/media/video/encode/nvcodec/nv_encoder.cpp @@ -8,9 +8,9 @@ VideoEncoder::VideoEncoder() { if (SAVE_ENCODER_STREAM) { - file_ = fopen("saved/stream.h264", "w+b"); + file_ = fopen("encode_stream.h264", "w+b"); if (!file_) { - LOG_WARN("Fail to open saved/stream.h264"); + LOG_WARN("Fail to open stream.h264"); } } } @@ -79,7 +79,7 @@ int VideoEncoder::Encode(const uint8_t *pData, int nSize) { return -1; } - if (0 == seq_++ % (30 * 5)) { + if (0 == seq_++ % (30)) { ForceIdr(); } diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index 34dfb59..6b0a453 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -12,7 +12,12 @@ using nlohmann::json; PeerConnection::PeerConnection(OnReceiveBuffer on_receive_buffer) : on_receive_buffer_(on_receive_buffer) {} -PeerConnection::~PeerConnection() {} +PeerConnection::~PeerConnection() { + if (nv12_data_) { + delete nv12_data_; + nv12_data_ = nullptr; + } +} int PeerConnection::Init(PeerConnectionParams params, const std::string &transmission_id, @@ -39,7 +44,17 @@ int PeerConnection::Init(PeerConnectionParams params, on_receive_ice_msg_ = [this](const char *data, size_t size, const char *user_id, size_t user_id_size) { - on_receive_buffer_(data, size, user_id, user_id_size); + int num_frame_returned = Decode((uint8_t *)data, size); + + uint32_t width = 0; + uint32_t height = 0; + uint32_t frame_size = 0; + for (size_t i = 0; i < num_frame_returned; ++i) { + int ret = GetFrame((uint8_t *)nv12_data_, width, height, frame_size); + + on_receive_buffer_(nv12_data_, width * height * 3 / 2, user_id, + user_id_size); + } }; ws_transport_ = new WsTransmission(on_receive_ws_msg_); @@ -51,6 +66,8 @@ int PeerConnection::Init(PeerConnectionParams params, do { } while (SignalStatus::Connected != GetSignalStatus()); VideoEncoder::Init(); + VideoDecoder::Init(); + nv12_data_ = new char[1280 * 720 * 3 / 2]; return 0; } diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index d09603a..6e0f866 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -75,6 +75,7 @@ class PeerConnection : public VideoEncoder, VideoDecoder { SignalStatus signal_status_ = SignalStatus::Closed; OnReceiveBuffer on_receive_buffer_; + char *nv12_data_ = nullptr; private: }; diff --git a/src/ringbuffer/ringbuffer.h b/src/ringbuffer/ringbuffer.h new file mode 100644 index 0000000..bf73ef1 --- /dev/null +++ b/src/ringbuffer/ringbuffer.h @@ -0,0 +1,101 @@ +#ifndef _RINGBUFFER_H_ +#define _RINGBUFFER_H_ + +#include + +class RingBuffer { + public: + class Data { + public: + Data() = default; + + Data(const char* data, size_t size) { + data_ = new char[size]; + memcpy(data_, data, size); + size_ = size; + } + + Data(const Data& obj) { + data_ = new char[obj.size_]; + memcpy(data_, obj.data_, obj.size_); + size_ = obj.size_; + } + + Data& operator=(const Data& obj) { + data_ = new char[obj.size_]; + memcpy(data_, obj.data_, obj.size_); + size_ = obj.size_; + return *this; + } + + ~Data() { + if (data_) { + delete data_; + data_ = nullptr; + } + size_ = 0; + } + + size_t size() const { return size_; } + char* data() const { return data_; } + + public: + char* data_ = nullptr; + size_t size_ = 0; + }; + + public: + RingBuffer(unsigned size = 128) : m_size(size), m_front(0), m_rear(0) { + m_data = new Data[size]; + } + + ~RingBuffer() { + delete[] m_data; + m_data = nullptr; + } + + inline bool isEmpty() const { return m_front == m_rear; } + + inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } + + bool push(const Data& value) { + if (isFull()) { + return false; + } + m_data[m_rear] = value; + m_rear = (m_rear + 1) % m_size; + return true; + } + + bool push(const Data* value) { + if (isFull()) { + return false; + } + m_data[m_rear] = *value; + m_rear = (m_rear + 1) % m_size; + return true; + } + + inline bool pop(Data& value) { + if (isEmpty()) { + return false; + } + value = m_data[m_front]; + m_front = (m_front + 1) % m_size; + return true; + } + + inline unsigned int front() const { return m_front; } + + inline unsigned int rear() const { return m_rear; } + + inline unsigned int size() const { return m_size; } + + private: + unsigned int m_size; + int m_front; + int m_rear; + Data* m_data; +}; + +#endif \ No newline at end of file diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 464f3e0..7293804 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -6,6 +6,7 @@ #include #include "common.h" +#include "ikcp.h" #include "log.h" #if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64) @@ -82,42 +83,67 @@ IceTransmission::~IceTransmission() { delete ice_agent_; ice_agent_ = nullptr; } - ikcp_release(kcp_); } int IceTransmission::InitIceTransmission(std::string &ip, int port) { - kcp_ = ikcp_create(0x11223344, (void *)this); - ikcp_setoutput(kcp_, - [](const char *buf, int len, ikcpcb *kcp, void *user) -> int { - IceTransmission *ice_transmission_obj = - static_cast(user); - LOG_ERROR("Real send size: {}", len); - return ice_transmission_obj->ice_agent_->Send(buf, len); - }); - // ikcp_wndsize(kcp_, 1280, 1280); - ikcp_nodelay(kcp_, 0, 40, 0, 0); - ikcp_setmtu(kcp_, 4000); - // kcp_->rx_minrto = 10; - // kcp_->fastresend = 1; std::thread kcp_update_thread([this]() { - while (1) { + int ret = 0; + ikcpcb *kcp = ikcp_create(0x11223344, (void *)this); + ikcp_setoutput( + kcp, [](const char *buf, int len, ikcpcb *kcp, void *user) -> int { + IceTransmission *ice_transmission_obj = + static_cast(user); + return ice_transmission_obj->ice_agent_->Send(buf, len); + }); + ikcp_wndsize(kcp, 1024, 1280); + ikcp_nodelay(kcp, 1, 20, 2, 1); + // ikcp_setmtu(kcp_, 4000); + // kcp_->rx_minrto = 10; + // kcp_->fastresend = 1; + + while (!kcp_stop_) { auto clock = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); - mtx_.lock(); - ikcp_update(kcp_, iclock()); + + ikcp_update(kcp, clock); + + if (!send_ringbuffer_.isEmpty()) { + RingBuffer::Data buffer; + if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) { + send_ringbuffer_.pop(buffer); + ret = ikcp_send(kcp, buffer.data(), buffer.size()); + } + } + + if (!recv_ringbuffer_.isEmpty()) { + RingBuffer::Data buffer; + recv_ringbuffer_.pop(buffer); + ret = ikcp_input(kcp, buffer.data(), buffer.size()); + } int len = 0; int total_len = 0; while (1) { - len = ikcp_recv(kcp_, kcp_complete_buffer_ + len, 1400); + len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000); + total_len += len; - if (len <= 0) break; + + if (len <= 0) { + if (on_receive_ice_msg_cb_ && total_len > 0) { + // LOG_ERROR("Receive size: {}", total_len); + on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len, + remote_user_id_.data(), + remote_user_id_.size()); + } + break; + } } - mtx_.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(2)); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } + + ikcp_release(kcp); }); kcp_update_thread.detach(); @@ -151,6 +177,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { if (ice_transmission_obj->offer_peer_) { ice_transmission_obj->GetLocalSdp(); ice_transmission_obj->SendOffer(); + } else { ice_transmission_obj->CreateAnswer(); ice_transmission_obj->SendAnswer(); @@ -162,11 +189,12 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { IceTransmission *ice_transmission_obj = static_cast(user_ptr); if (ice_transmission_obj->on_receive_ice_msg_cb_) { - LOG_ERROR("[{}] Receive size: {}", (void *)user_ptr, size); - ice_transmission_obj->mtx_.lock(); - int ret = ikcp_input(ice_transmission_obj->kcp_, data, size); + ice_transmission_obj->recv_ringbuffer_.push( + std::move(RingBuffer::Data(data, size))); + + // int ret = ikcp_input(ice_transmission_obj->kcp_, data, size); // ikcp_update(ice_transmission_obj->kcp_, iclock()); - LOG_ERROR("ikcp_input {}", ret); + // LOG_ERROR("ikcp_input {}", ret); // auto clock = // std::chrono::duration_cast( // std::chrono::system_clock::now().time_since_epoch()) @@ -174,8 +202,6 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { // ikcp_update(ice_transmission_obj->kcp_, clock); - ice_transmission_obj->mtx_.unlock(); - // ice_transmission_obj->on_receive_ice_msg_cb_( // ice_transmission_obj->kcp_complete_buffer_, total_len, // ice_transmission_obj->remote_user_id_.data(), @@ -193,6 +219,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) { int IceTransmission::DestroyIceTransmission() { LOG_INFO("[{}->{}] Destroy ice transmission", user_id_, remote_user_id_); + kcp_stop_ = true; return ice_agent_->DestoryIceAgent(); } @@ -275,20 +302,8 @@ int IceTransmission::SendAnswer() { int IceTransmission::SendData(const char *data, size_t size) { if (JUICE_STATE_COMPLETED == state_) { - LOG_ERROR("[{}] Wanna send size: {}", (void *)this, size); - mtx_.lock(); - - if (ikcp_waitsnd(kcp_) > kcp_->snd_wnd) { - // LOG_ERROR("Skip frame"); - // mtx_.unlock(); - // return 0; - ikcp_flush(kcp_); - } - int ret = ikcp_send(kcp_, data, size / 100); - LOG_ERROR("ikcp_send {}, wnd [{} | {}]", ret, ikcp_waitsnd(kcp_), - kcp_->snd_wnd); - mtx_.unlock(); - // ice_agent_->Send(data, size); + LOG_ERROR("Send size: {}", size); + send_ringbuffer_.push(std::move(RingBuffer::Data(data, size))); } return 0; } \ No newline at end of file diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index 974a485..f08b603 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -5,7 +5,7 @@ #include "congestion_control.h" #include "ice_agent.h" -#include "ikcp.h" +#include "ringbuffer.h" #include "ws_transmission.h" class IceTransmission { public: @@ -64,9 +64,12 @@ class IceTransmission { juice_state_t state_ = JUICE_STATE_DISCONNECTED; private: - ikcpcb *kcp_ = nullptr; + // ikcpcb *kcp_ = nullptr; char kcp_complete_buffer_[2560 * 1440 * 4]; std::mutex mtx_; + RingBuffer send_ringbuffer_; + RingBuffer recv_ringbuffer_; + bool kcp_stop_ = false; }; #endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index 710e64d..8a21abf 100644 --- a/xmake.lua +++ b/xmake.lua @@ -43,6 +43,10 @@ target("inih") add_files("src/inih/ini.c", "src/inih/INIReader.cpp") add_includedirs("src/inih", {public = true}) +target("ringbuffer") + set_kind("headeronly") + add_includedirs("src/ringbuffer", {public = true}) + target("ice") set_kind("static") add_deps("log", "common", "ws") @@ -79,7 +83,7 @@ target("qos") target("transmission") set_kind("static") - add_deps("log", "ws", "ice", "qos") + add_deps("log", "ws", "ice", "qos", "ringbuffer") add_files("src/transmission/*.cpp") add_packages("asio", "nlohmann_json") add_includedirs("src/ws", "src/ice", "src/qos", {public = true})