mirror of
https://github.com/kunkundi/crossdesk.git
synced 2025-10-26 20:25:34 +08:00
[feat] use pending_frames_ instead of compelete_video_frame_queue_ in h264 frame assember
This commit is contained in:
@@ -11,7 +11,7 @@
|
|||||||
|
|
||||||
#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2)
|
#define NV12_BUFFER_SIZE (1280 * 720 * 3 / 2)
|
||||||
#define RTCP_RR_INTERVAL 1000
|
#define RTCP_RR_INTERVAL 1000
|
||||||
#define MAX_WAIT_TIME_MS 20 // 20ms
|
#define MAX_WAIT_TIME_MS 200 // 20ms
|
||||||
#define NACK_UPDATE_INTERVAL 20 // 20ms
|
#define NACK_UPDATE_INTERVAL 20 // 20ms
|
||||||
|
|
||||||
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock)
|
RtpVideoReceiver::RtpVideoReceiver(std::shared_ptr<SystemClock> clock)
|
||||||
@@ -271,15 +271,29 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264) {
|
|||||||
if (!fec_enable_) {
|
if (!fec_enable_) {
|
||||||
rtp::NAL_UNIT_TYPE nalu_type = rtp_packet_h264.NalUnitType();
|
rtp::NAL_UNIT_TYPE nalu_type = rtp_packet_h264.NalUnitType();
|
||||||
if (rtp::NAL_UNIT_TYPE::NALU == nalu_type) {
|
if (rtp::NAL_UNIT_TYPE::NALU == nalu_type) {
|
||||||
ReceivedFrame received_frame(rtp_packet_h264.Payload(),
|
// ReceivedFrame received_frame(rtp_packet_h264.Payload(),
|
||||||
|
// rtp_packet_h264.PayloadSize());
|
||||||
|
// received_frame.SetReceivedTimestamp(clock_->CurrentTime().us());
|
||||||
|
// received_frame.SetCapturedTimestamp(
|
||||||
|
// (static_cast<int64_t>(rtp_packet_h264.Timestamp()) /
|
||||||
|
// rtp::kMsToRtpTimestamp -
|
||||||
|
// delta_ntp_internal_ms_) *
|
||||||
|
// 1000);
|
||||||
|
// compelete_video_frame_queue_.push(received_frame);
|
||||||
|
|
||||||
|
std::unique_ptr<ReceivedFrame> received_frame =
|
||||||
|
std::make_unique<ReceivedFrame>(rtp_packet_h264.Payload(),
|
||||||
rtp_packet_h264.PayloadSize());
|
rtp_packet_h264.PayloadSize());
|
||||||
received_frame.SetReceivedTimestamp(clock_->CurrentTime().us());
|
received_frame->SetReceivedTimestamp(clock_->CurrentTime().us());
|
||||||
received_frame.SetCapturedTimestamp(
|
received_frame->SetCapturedTimestamp(
|
||||||
(static_cast<int64_t>(rtp_packet_h264.Timestamp()) /
|
(static_cast<int64_t>(rtp_packet_h264.Timestamp()) /
|
||||||
rtp::kMsToRtpTimestamp -
|
rtp::kMsToRtpTimestamp -
|
||||||
delta_ntp_internal_ms_) *
|
delta_ntp_internal_ms_) *
|
||||||
1000);
|
1000);
|
||||||
compelete_video_frame_queue_.push(received_frame);
|
|
||||||
|
std::lock_guard<std::mutex> lock(pending_frames_mtx_);
|
||||||
|
pending_frames_[rtp_packet_h264.Timestamp()] = {
|
||||||
|
std::move(received_frame), true, clock_->CurrentTime().ms()};
|
||||||
} else if (rtp::NAL_UNIT_TYPE::FU_A == nalu_type) {
|
} else if (rtp::NAL_UNIT_TYPE::FU_A == nalu_type) {
|
||||||
if (rtp::PAYLOAD_TYPE::H264 == rtp_packet_h264.PayloadType()) {
|
if (rtp::PAYLOAD_TYPE::H264 == rtp_packet_h264.PayloadType()) {
|
||||||
incomplete_h264_frame_list_[rtp_packet_h264.SequenceNumber()] =
|
incomplete_h264_frame_list_[rtp_packet_h264.SequenceNumber()] =
|
||||||
@@ -473,7 +487,8 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264,
|
|||||||
if (clock_->CurrentTime().ms() - missing_seqs_wait_ts_iter->second >
|
if (clock_->CurrentTime().ms() - missing_seqs_wait_ts_iter->second >
|
||||||
MAX_WAIT_TIME_MS) {
|
MAX_WAIT_TIME_MS) {
|
||||||
missing_sequence_numbers_wait_time_.erase(missing_seqs_wait_ts_iter);
|
missing_sequence_numbers_wait_time_.erase(missing_seqs_wait_ts_iter);
|
||||||
LOG_WARN("rtx packet seq {} is timeout", seq);
|
std::lock_guard<std::mutex> lock(pending_frames_mtx_);
|
||||||
|
pending_frames_.erase(timestamp);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -483,6 +498,8 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264,
|
|||||||
++sequence_number) {
|
++sequence_number) {
|
||||||
if (incomplete_h264_frame_list_.find(sequence_number) ==
|
if (incomplete_h264_frame_list_.find(sequence_number) ==
|
||||||
incomplete_h264_frame_list_.end()) {
|
incomplete_h264_frame_list_.end()) {
|
||||||
|
std::lock_guard<std::mutex> lock(pending_frames_mtx_);
|
||||||
|
pending_frames_[timestamp] = {nullptr, false, clock_->CurrentTime().ms()};
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -490,79 +507,6 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264,
|
|||||||
return PopCompleteFrame(start_seq, end_seq, timestamp);
|
return PopCompleteFrame(start_seq, end_seq, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RtpVideoReceiver::CheckIsH264FrameCompletedFuaEndReceived(
|
|
||||||
RtpPacketH264& rtp_packet_h264) {
|
|
||||||
uint32_t timestamp = rtp_packet_h264.Timestamp();
|
|
||||||
uint16_t end_seq = rtp_packet_h264.SequenceNumber();
|
|
||||||
fua_end_sequence_numbers_[timestamp] = end_seq;
|
|
||||||
uint16_t start_seq = 0;
|
|
||||||
bool has_start = false;
|
|
||||||
bool has_missing = false;
|
|
||||||
if (missing_sequence_numbers_wait_time_.find(timestamp) ==
|
|
||||||
missing_sequence_numbers_wait_time_.end()) {
|
|
||||||
missing_sequence_numbers_wait_time_[timestamp] = clock_->CurrentTime().ms();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (uint16_t seq = end_seq; seq > 0; --seq) {
|
|
||||||
auto it = incomplete_h264_frame_list_.find(seq);
|
|
||||||
if (it->second.FuAStart()) {
|
|
||||||
start_seq = seq;
|
|
||||||
has_start = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!has_start) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return PopCompleteFrame(start_seq, end_seq, timestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RtpVideoReceiver::CheckIsH264FrameCompletedMissSeqReceived(
|
|
||||||
RtpPacketH264& rtp_packet_h264) {
|
|
||||||
if (fua_end_sequence_numbers_.find(rtp_packet_h264.Timestamp()) ==
|
|
||||||
fua_end_sequence_numbers_.end()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t timestamp = rtp_packet_h264.Timestamp();
|
|
||||||
uint16_t end_seq = fua_end_sequence_numbers_[timestamp];
|
|
||||||
uint16_t start_seq = 0;
|
|
||||||
bool has_start = false;
|
|
||||||
bool has_missing = false;
|
|
||||||
|
|
||||||
for (uint16_t seq = end_seq; seq > 0; --seq) {
|
|
||||||
auto it = incomplete_h264_frame_list_.find(seq);
|
|
||||||
if (it == incomplete_h264_frame_list_.end()) {
|
|
||||||
if (padding_sequence_numbers_.find(seq) ==
|
|
||||||
padding_sequence_numbers_.end()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
} else if (it->second.FuAStart()) {
|
|
||||||
start_seq = seq;
|
|
||||||
has_start = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!has_start) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (missing_sequence_numbers_wait_time_.find(timestamp) !=
|
|
||||||
missing_sequence_numbers_wait_time_.end()) {
|
|
||||||
int64_t wait_time = clock_->CurrentTime().us() -
|
|
||||||
missing_sequence_numbers_wait_time_[timestamp];
|
|
||||||
if (wait_time < MAX_WAIT_TIME_MS) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG_WARN("complete frame");
|
|
||||||
return PopCompleteFrame(start_seq, end_seq, timestamp);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
|
bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
|
||||||
uint32_t timestamp) {
|
uint32_t timestamp) {
|
||||||
size_t complete_frame_size = 0;
|
size_t complete_frame_size = 0;
|
||||||
@@ -599,9 +543,10 @@ bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ReceivedFrame received_frame(nv12_data_, complete_frame_size);
|
std::unique_ptr<ReceivedFrame> received_frame =
|
||||||
received_frame.SetReceivedTimestamp(clock_->CurrentTime().us());
|
std::make_unique<ReceivedFrame>(nv12_data_, complete_frame_size);
|
||||||
received_frame.SetCapturedTimestamp(
|
received_frame->SetReceivedTimestamp(clock_->CurrentTime().us());
|
||||||
|
received_frame->SetCapturedTimestamp(
|
||||||
(static_cast<int64_t>(timestamp) / rtp::kMsToRtpTimestamp -
|
(static_cast<int64_t>(timestamp) / rtp::kMsToRtpTimestamp -
|
||||||
delta_ntp_internal_ms_) *
|
delta_ntp_internal_ms_) *
|
||||||
1000);
|
1000);
|
||||||
@@ -609,8 +554,11 @@ bool RtpVideoReceiver::PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
|
|||||||
fua_start_sequence_numbers_.erase(timestamp);
|
fua_start_sequence_numbers_.erase(timestamp);
|
||||||
fua_end_sequence_numbers_.erase(timestamp);
|
fua_end_sequence_numbers_.erase(timestamp);
|
||||||
missing_sequence_numbers_wait_time_.erase(timestamp);
|
missing_sequence_numbers_wait_time_.erase(timestamp);
|
||||||
compelete_video_frame_queue_.push(received_frame);
|
// compelete_video_frame_queue_.push(received_frame);
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(pending_frames_mtx_);
|
||||||
|
pending_frames_[timestamp] = {std::move(received_frame), true,
|
||||||
|
clock_->CurrentTime().ms()};
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -749,24 +697,46 @@ bool RtpVideoReceiver::Process() {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!compelete_video_frame_queue_.isEmpty()) {
|
// if (!compelete_video_frame_queue_.isEmpty()) {
|
||||||
std::optional<ReceivedFrame> video_frame =
|
// std::optional<ReceivedFrame> video_frame =
|
||||||
compelete_video_frame_queue_.pop();
|
// compelete_video_frame_queue_.pop();
|
||||||
if (on_receive_complete_frame_ && video_frame) {
|
// if (on_receive_complete_frame_ && video_frame) {
|
||||||
// auto now_complete_frame_ts =
|
// // auto now_complete_frame_ts =
|
||||||
// std::chrono::duration_cast<std::chrono::milliseconds>(
|
// // std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||||
// std::chrono::system_clock::now().time_since_epoch())
|
// // std::chrono::system_clock::now().time_since_epoch())
|
||||||
// .count();
|
// // .count();
|
||||||
// uint32_t duration = now_complete_frame_ts -
|
// // uint32_t duration = now_complete_frame_ts -
|
||||||
// last_complete_frame_ts_; LOG_ERROR("Duration {}", duration);
|
// // last_complete_frame_ts_; LOG_ERROR("Duration {}", duration);
|
||||||
// last_complete_frame_ts_ = now_complete_frame_ts;
|
// // last_complete_frame_ts_ = now_complete_frame_ts;
|
||||||
|
|
||||||
on_receive_complete_frame_(*video_frame);
|
// on_receive_complete_frame_(*video_frame);
|
||||||
// #ifdef SAVE_RTP_RECV_STREAM
|
// // #ifdef SAVE_RTP_RECV_STREAM
|
||||||
// fwrite((unsigned char*)video_frame.Buffer(), 1,
|
// // fwrite((unsigned char*)video_frame.Buffer(), 1,
|
||||||
// video_frame.Size(),
|
// // video_frame.Size(),
|
||||||
// file_rtp_recv_);
|
// // file_rtp_recv_);
|
||||||
// #endif
|
// // #endif
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(pending_frames_mtx_);
|
||||||
|
if (pending_frames_.empty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto it = pending_frames_.begin();
|
||||||
|
while (it != pending_frames_.end()) {
|
||||||
|
if (it->second.is_complete) {
|
||||||
|
if (on_receive_complete_frame_) {
|
||||||
|
on_receive_complete_frame_(*(it->second.frame));
|
||||||
|
}
|
||||||
|
it = pending_frames_.erase(it);
|
||||||
|
} else {
|
||||||
|
if (clock_->CurrentTime().ms() - it->second.arrival_time >
|
||||||
|
MAX_WAIT_TIME_MS) {
|
||||||
|
it = pending_frames_.erase(it);
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
#include <functional>
|
#include <functional>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
#include <mutex>
|
||||||
#include <queue>
|
#include <queue>
|
||||||
#include <set>
|
#include <set>
|
||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
@@ -61,8 +62,6 @@ class RtpVideoReceiver : public ThreadBase,
|
|||||||
void ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264);
|
void ProcessH264RtpPacket(RtpPacketH264& rtp_packet_h264);
|
||||||
bool CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264, bool is_start,
|
bool CheckIsH264FrameCompleted(RtpPacketH264& rtp_packet_h264, bool is_start,
|
||||||
bool is_end, bool is_rtx);
|
bool is_end, bool is_rtx);
|
||||||
bool CheckIsH264FrameCompletedFuaEndReceived(RtpPacketH264& rtp_packet_h264);
|
|
||||||
bool CheckIsH264FrameCompletedMissSeqReceived(RtpPacketH264& rtp_packet_h264);
|
|
||||||
bool PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
|
bool PopCompleteFrame(uint16_t start_seq, uint16_t end_seq,
|
||||||
uint32_t timestamp);
|
uint32_t timestamp);
|
||||||
|
|
||||||
@@ -104,6 +103,15 @@ class RtpVideoReceiver : public ThreadBase,
|
|||||||
uint32_t last_complete_frame_ts_ = 0;
|
uint32_t last_complete_frame_ts_ = 0;
|
||||||
RingBuffer<ReceivedFrame> compelete_video_frame_queue_;
|
RingBuffer<ReceivedFrame> compelete_video_frame_queue_;
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct PendingFrame {
|
||||||
|
std::unique_ptr<ReceivedFrame> frame;
|
||||||
|
bool is_complete = false;
|
||||||
|
int64_t arrival_time = 0;
|
||||||
|
};
|
||||||
|
std::map<uint32_t, PendingFrame> pending_frames_;
|
||||||
|
std::mutex pending_frames_mtx_;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
|
std::shared_ptr<IOStatistics> io_statistics_ = nullptr;
|
||||||
uint32_t last_recv_bytes_ = 0;
|
uint32_t last_recv_bytes_ = 0;
|
||||||
|
|||||||
@@ -90,7 +90,7 @@ void VideoChannelSend::Destroy() {}
|
|||||||
|
|
||||||
int VideoChannelSend::SendVideo(const EncodedFrame& encoded_frame) {
|
int VideoChannelSend::SendVideo(const EncodedFrame& encoded_frame) {
|
||||||
if (rtp_packetizer_ && packet_sender_) {
|
if (rtp_packetizer_ && packet_sender_) {
|
||||||
int32_t rtp_timestamp =
|
uint32_t rtp_timestamp =
|
||||||
delta_ntp_internal_ms_ +
|
delta_ntp_internal_ms_ +
|
||||||
static_cast<uint32_t>(encoded_frame.CapturedTimestamp() / 1000);
|
static_cast<uint32_t>(encoded_frame.CapturedTimestamp() / 1000);
|
||||||
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
|
std::vector<std::unique_ptr<RtpPacket>> rtp_packets =
|
||||||
|
|||||||
Reference in New Issue
Block a user