Fix kcp transmission error

This commit is contained in:
dijunkun
2023-08-31 17:49:08 +08:00
parent 3c1f7973d0
commit af285f4b5b
12 changed files with 338 additions and 59 deletions

View File

@@ -2,8 +2,23 @@
#include "log.h"
VideoDecoder::VideoDecoder() {}
VideoDecoder::~VideoDecoder() {}
#define SAVE_ENCODER_STREAM 0
VideoDecoder::VideoDecoder() {
if (SAVE_ENCODER_STREAM) {
file_ = fopen("decode_stream.h264", "w+b");
if (!file_) {
LOG_WARN("Fail to open stream.h264");
}
}
}
VideoDecoder::~VideoDecoder() {
if (SAVE_ENCODER_STREAM && file_) {
fflush(file_);
fclose(file_);
file_ = nullptr;
}
}
int VideoDecoder::Init() {
ck(cuInit(0));
@@ -34,7 +49,11 @@ int VideoDecoder::Decode(const uint8_t *pData, int nSize) {
}
if ((*(pData + 4) & 0x1f) == 0x07) {
// LOG_WARN("Receive key frame");
LOG_WARN("Receive key frame");
}
if (SAVE_ENCODER_STREAM) {
fwrite((unsigned char *)pData, 1, nSize, file_);
}
int ret = decoder->Decode(pData, nSize);
@@ -51,10 +70,12 @@ int VideoDecoder::GetFrame(uint8_t *yuv_data, uint32_t &width, uint32_t &height,
uint8_t *data = nullptr;
data = decoder->GetFrame();
if (data) {
yuv_data = data;
// yuv_data = data;
width = decoder->GetWidth();
height = decoder->GetHeight();
size = width * height * 3 / 2;
memcpy(yuv_data, data, size);
return 0;
return -1;

View File

@@ -16,6 +16,7 @@ class VideoDecoder {
NvDecoder* decoder = nullptr;
bool get_first_keyframe_ = false;
bool skip_frame_ = false;
FILE* file_ = nullptr;
};
#endif

View File

@@ -8,9 +8,9 @@
VideoEncoder::VideoEncoder() {
if (SAVE_ENCODER_STREAM) {
file_ = fopen("saved/stream.h264", "w+b");
file_ = fopen("encode_stream.h264", "w+b");
if (!file_) {
LOG_WARN("Fail to open saved/stream.h264");
LOG_WARN("Fail to open stream.h264");
}
}
}
@@ -79,7 +79,7 @@ int VideoEncoder::Encode(const uint8_t *pData, int nSize) {
return -1;
}
if (0 == seq_++ % (30 * 5)) {
if (0 == seq_++ % (30)) {
ForceIdr();
}

View File

@@ -12,7 +12,12 @@ using nlohmann::json;
PeerConnection::PeerConnection(OnReceiveBuffer on_receive_buffer)
: on_receive_buffer_(on_receive_buffer) {}
PeerConnection::~PeerConnection() {}
PeerConnection::~PeerConnection() {
if (nv12_data_) {
delete nv12_data_;
nv12_data_ = nullptr;
}
}
int PeerConnection::Init(PeerConnectionParams params,
const std::string &transmission_id,
@@ -39,7 +44,17 @@ int PeerConnection::Init(PeerConnectionParams params,
on_receive_ice_msg_ = [this](const char *data, size_t size,
const char *user_id, size_t user_id_size) {
on_receive_buffer_(data, size, user_id, user_id_size);
int num_frame_returned = Decode((uint8_t *)data, size);
uint32_t width = 0;
uint32_t height = 0;
uint32_t frame_size = 0;
for (size_t i = 0; i < num_frame_returned; ++i) {
int ret = GetFrame((uint8_t *)nv12_data_, width, height, frame_size);
on_receive_buffer_(nv12_data_, width * height * 3 / 2, user_id,
user_id_size);
}
};
ws_transport_ = new WsTransmission(on_receive_ws_msg_);
@@ -51,6 +66,8 @@ int PeerConnection::Init(PeerConnectionParams params,
do {
} while (SignalStatus::Connected != GetSignalStatus());
VideoEncoder::Init();
VideoDecoder::Init();
nv12_data_ = new char[1280 * 720 * 3 / 2];
return 0;
}

View File

@@ -75,6 +75,7 @@ class PeerConnection : public VideoEncoder, VideoDecoder {
SignalStatus signal_status_ = SignalStatus::Closed;
OnReceiveBuffer on_receive_buffer_;
char *nv12_data_ = nullptr;
private:
};

101
src/ringbuffer/ringbuffer.h Normal file
View File

@@ -0,0 +1,101 @@
#ifndef _RINGBUFFER_H_
#define _RINGBUFFER_H_
#include <string.h>
class RingBuffer {
public:
class Data {
public:
Data() = default;
Data(const char* data, size_t size) {
data_ = new char[size];
memcpy(data_, data, size);
size_ = size;
}
Data(const Data& obj) {
data_ = new char[obj.size_];
memcpy(data_, obj.data_, obj.size_);
size_ = obj.size_;
}
Data& operator=(const Data& obj) {
data_ = new char[obj.size_];
memcpy(data_, obj.data_, obj.size_);
size_ = obj.size_;
return *this;
}
~Data() {
if (data_) {
delete data_;
data_ = nullptr;
}
size_ = 0;
}
size_t size() const { return size_; }
char* data() const { return data_; }
public:
char* data_ = nullptr;
size_t size_ = 0;
};
public:
RingBuffer(unsigned size = 128) : m_size(size), m_front(0), m_rear(0) {
m_data = new Data[size];
}
~RingBuffer() {
delete[] m_data;
m_data = nullptr;
}
inline bool isEmpty() const { return m_front == m_rear; }
inline bool isFull() const { return m_front == (m_rear + 1) % m_size; }
bool push(const Data& value) {
if (isFull()) {
return false;
}
m_data[m_rear] = value;
m_rear = (m_rear + 1) % m_size;
return true;
}
bool push(const Data* value) {
if (isFull()) {
return false;
}
m_data[m_rear] = *value;
m_rear = (m_rear + 1) % m_size;
return true;
}
inline bool pop(Data& value) {
if (isEmpty()) {
return false;
}
value = m_data[m_front];
m_front = (m_front + 1) % m_size;
return true;
}
inline unsigned int front() const { return m_front; }
inline unsigned int rear() const { return m_rear; }
inline unsigned int size() const { return m_size; }
private:
unsigned int m_size;
int m_front;
int m_rear;
Data* m_data;
};
#endif

View File

@@ -6,6 +6,7 @@
#include <thread>
#include "common.h"
#include "ikcp.h"
#include "log.h"
#if defined(WIN32) || defined(_WIN32) || defined(WIN64) || defined(_WIN64)
@@ -82,42 +83,67 @@ IceTransmission::~IceTransmission() {
delete ice_agent_;
ice_agent_ = nullptr;
}
ikcp_release(kcp_);
}
int IceTransmission::InitIceTransmission(std::string &ip, int port) {
kcp_ = ikcp_create(0x11223344, (void *)this);
ikcp_setoutput(kcp_,
[](const char *buf, int len, ikcpcb *kcp, void *user) -> int {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user);
LOG_ERROR("Real send size: {}", len);
return ice_transmission_obj->ice_agent_->Send(buf, len);
});
// ikcp_wndsize(kcp_, 1280, 1280);
ikcp_nodelay(kcp_, 0, 40, 0, 0);
ikcp_setmtu(kcp_, 4000);
// kcp_->rx_minrto = 10;
// kcp_->fastresend = 1;
std::thread kcp_update_thread([this]() {
while (1) {
int ret = 0;
ikcpcb *kcp = ikcp_create(0x11223344, (void *)this);
ikcp_setoutput(
kcp, [](const char *buf, int len, ikcpcb *kcp, void *user) -> int {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user);
return ice_transmission_obj->ice_agent_->Send(buf, len);
});
ikcp_wndsize(kcp, 1024, 1280);
ikcp_nodelay(kcp, 1, 20, 2, 1);
// ikcp_setmtu(kcp_, 4000);
// kcp_->rx_minrto = 10;
// kcp_->fastresend = 1;
while (!kcp_stop_) {
auto clock = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
mtx_.lock();
ikcp_update(kcp_, iclock());
ikcp_update(kcp, clock);
if (!send_ringbuffer_.isEmpty()) {
RingBuffer::Data buffer;
if (ikcp_waitsnd(kcp) <= kcp->snd_wnd * 2) {
send_ringbuffer_.pop(buffer);
ret = ikcp_send(kcp, buffer.data(), buffer.size());
}
}
if (!recv_ringbuffer_.isEmpty()) {
RingBuffer::Data buffer;
recv_ringbuffer_.pop(buffer);
ret = ikcp_input(kcp, buffer.data(), buffer.size());
}
int len = 0;
int total_len = 0;
while (1) {
len = ikcp_recv(kcp_, kcp_complete_buffer_ + len, 1400);
len = ikcp_recv(kcp, kcp_complete_buffer_ + len, 400000);
total_len += len;
if (len <= 0) break;
if (len <= 0) {
if (on_receive_ice_msg_cb_ && total_len > 0) {
// LOG_ERROR("Receive size: {}", total_len);
on_receive_ice_msg_cb_(kcp_complete_buffer_, total_len,
remote_user_id_.data(),
remote_user_id_.size());
}
break;
}
}
mtx_.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(2));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
ikcp_release(kcp);
});
kcp_update_thread.detach();
@@ -151,6 +177,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
if (ice_transmission_obj->offer_peer_) {
ice_transmission_obj->GetLocalSdp();
ice_transmission_obj->SendOffer();
} else {
ice_transmission_obj->CreateAnswer();
ice_transmission_obj->SendAnswer();
@@ -162,11 +189,12 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
IceTransmission *ice_transmission_obj =
static_cast<IceTransmission *>(user_ptr);
if (ice_transmission_obj->on_receive_ice_msg_cb_) {
LOG_ERROR("[{}] Receive size: {}", (void *)user_ptr, size);
ice_transmission_obj->mtx_.lock();
int ret = ikcp_input(ice_transmission_obj->kcp_, data, size);
ice_transmission_obj->recv_ringbuffer_.push(
std::move(RingBuffer::Data(data, size)));
// int ret = ikcp_input(ice_transmission_obj->kcp_, data, size);
// ikcp_update(ice_transmission_obj->kcp_, iclock());
LOG_ERROR("ikcp_input {}", ret);
// LOG_ERROR("ikcp_input {}", ret);
// auto clock =
// std::chrono::duration_cast<std::chrono::milliseconds>(
// std::chrono::system_clock::now().time_since_epoch())
@@ -174,8 +202,6 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
// ikcp_update(ice_transmission_obj->kcp_, clock);
ice_transmission_obj->mtx_.unlock();
// ice_transmission_obj->on_receive_ice_msg_cb_(
// ice_transmission_obj->kcp_complete_buffer_, total_len,
// ice_transmission_obj->remote_user_id_.data(),
@@ -193,6 +219,7 @@ int IceTransmission::InitIceTransmission(std::string &ip, int port) {
int IceTransmission::DestroyIceTransmission() {
LOG_INFO("[{}->{}] Destroy ice transmission", user_id_, remote_user_id_);
kcp_stop_ = true;
return ice_agent_->DestoryIceAgent();
}
@@ -275,20 +302,8 @@ int IceTransmission::SendAnswer() {
int IceTransmission::SendData(const char *data, size_t size) {
if (JUICE_STATE_COMPLETED == state_) {
LOG_ERROR("[{}] Wanna send size: {}", (void *)this, size);
mtx_.lock();
if (ikcp_waitsnd(kcp_) > kcp_->snd_wnd) {
// LOG_ERROR("Skip frame");
// mtx_.unlock();
// return 0;
ikcp_flush(kcp_);
}
int ret = ikcp_send(kcp_, data, size / 100);
LOG_ERROR("ikcp_send {}, wnd [{} | {}]", ret, ikcp_waitsnd(kcp_),
kcp_->snd_wnd);
mtx_.unlock();
// ice_agent_->Send(data, size);
LOG_ERROR("Send size: {}", size);
send_ringbuffer_.push(std::move(RingBuffer::Data(data, size)));
}
return 0;
}

View File

@@ -5,7 +5,7 @@
#include "congestion_control.h"
#include "ice_agent.h"
#include "ikcp.h"
#include "ringbuffer.h"
#include "ws_transmission.h"
class IceTransmission {
public:
@@ -64,9 +64,12 @@ class IceTransmission {
juice_state_t state_ = JUICE_STATE_DISCONNECTED;
private:
ikcpcb *kcp_ = nullptr;
// ikcpcb *kcp_ = nullptr;
char kcp_complete_buffer_[2560 * 1440 * 4];
std::mutex mtx_;
RingBuffer send_ringbuffer_;
RingBuffer recv_ringbuffer_;
bool kcp_stop_ = false;
};
#endif