From 49b74ffcd61a509cf5282327ab804591d2ed3184 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Thu, 9 Jan 2025 17:03:08 +0800 Subject: [PATCH] [fix] fix crash caused by RtpPacketReceived --- .../rtp_channel/rtp_video_receiver.cpp | 29 +- src/rtcp/transport_feedback.cpp | 250 ------------------ src/rtcp/transport_feedback.h | 183 ------------- src/rtp/rtp_packet/rtp_packet.cpp | 16 ++ src/rtp/rtp_packet/rtp_packet.h | 3 + 5 files changed, 40 insertions(+), 441 deletions(-) delete mode 100644 src/rtcp/transport_feedback.cpp delete mode 100644 src/rtcp/transport_feedback.h diff --git a/src/channel/rtp_channel/rtp_video_receiver.cpp b/src/channel/rtp_channel/rtp_video_receiver.cpp index 9979fa1..464ff5f 100644 --- a/src/channel/rtp_channel/rtp_video_receiver.cpp +++ b/src/channel/rtp_channel/rtp_video_receiver.cpp @@ -39,15 +39,16 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { rtp_statistics_->Start(); } - RtpPacketReceived* rtp_packet_received; - rtp_packet_received = dynamic_cast(&rtp_packet); - rtp_packet_received->set_arrival_time( + RtpPacketReceived rtp_packet_received; + rtp_packet_received.Build(rtp_packet.Buffer(), rtp_packet.Size()); + + rtp_packet_received.set_arrival_time( std::chrono::system_clock::now().time_since_epoch().count()); - rtp_packet_received->set_ecn(EcnMarking::kEct0); - rtp_packet_received->set_recovered(false); - rtp_packet_received->set_payload_type_frequency(0); + rtp_packet_received.set_ecn(EcnMarking::kEct0); + rtp_packet_received.set_recovered(false); + rtp_packet_received.set_payload_type_frequency(0); receive_side_congestion_controller_.OnReceivedPacket( - *rtp_packet_received, ReceiveSideCongestionController::MediaType::VIDEO); + rtp_packet_received, ReceiveSideCongestionController::MediaType::VIDEO); last_recv_bytes_ = (uint32_t)rtp_packet.PayloadSize(); total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize(); @@ -346,7 +347,19 @@ int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) { } void RtpVideoReceiver::SendCombinedRtcpPacket( - std::vector> packets) {} + std::vector> packets) { + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); + } + + LOG_ERROR("Send combined rtcp packet"); + + for (auto& packet : packets) { + if (data_send_func_((const char*)packet->Buffer(), packet->Size())) { + LOG_ERROR("Send CCB failed"); + } + } +} bool RtpVideoReceiver::CheckIsTimeSendRR() { uint32_t now_ts = static_cast( diff --git a/src/rtcp/transport_feedback.cpp b/src/rtcp/transport_feedback.cpp deleted file mode 100644 index 75ed318..0000000 --- a/src/rtcp/transport_feedback.cpp +++ /dev/null @@ -1,250 +0,0 @@ -#include "transport_feedback.h" - -#include "byte_io.h" -#include "log.h" -#include "sequence_number_compare.h" - -// Header size: -// * 4 bytes Common RTCP Packet Header -// * 8 bytes Common Packet Format for RTCP Feedback Messages -// * 8 bytes FeedbackPacket header -constexpr size_t kTransportFeedbackHeaderSizeBytes = 4 + 8 + 8; -constexpr size_t kChunkSizeBytes = 2; -// TODO(sprang): Add support for dynamic max size for easier fragmentation, -// eg. set it to what's left in the buffer or IP_PACKET_SIZE. -// Size constraint imposed by RTCP common header: 16bit size field interpreted -// as number of four byte words minus the first header word. -constexpr size_t kMaxSizeBytes = (1 << 16) * 4; -// Payload size: -// * 8 bytes Common Packet Format for RTCP Feedback Messages -// * 8 bytes FeedbackPacket header. -// * 2 bytes for one chunk. -constexpr size_t kMinPayloadSizeBytes = 8 + 8 + 2; -constexpr int64_t kBaseTimeTick = - TransportFeedback::kDeltaTick * (1 << 8); // 64ms -constexpr int64_t kTimeWrapPeriod = kBaseTimeTick * (1 << 24); // 12.43days - -TransportFeedback::TransportFeedback() - : base_seq_no_(0), pkt_stat_cnt_(0), ref_time_(0), feedback_pkt_cnt_(0) {} - -TransportFeedback::~TransportFeedback() {} - -bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number, - int64_t timestamp) { - int16_t delta = 0; - // Convert to ticks and round. - if (last_ts_ > timestamp) { - timestamp += (last_ts_ - timestamp) + kTimeWrapPeriod; - } - - int64_t delta_full = timestamp - last_ts_ % kTimeWrapPeriod; - if (delta_full > kTimeWrapPeriod / 2) { - delta_full -= kTimeWrapPeriod; - delta_full -= kDeltaTick / 2; - } else { - delta_full += kDeltaTick / 2; - } - delta_full /= kDeltaTick; - - delta = static_cast(delta_full); - // If larger than 16bit signed, we can't represent it - need new fb packet. - if (delta != delta_full) { - LOG_WARN("Delta value too large ( >= 2^16 ticks )"); - return false; - } - - uint16_t next_seq_no = base_seq_no_ + pkt_stat_cnt_; - if (sequence_number != next_seq_no) { - uint16_t last_seq_no = next_seq_no - 1; - if (!IsNewerSequenceNumber(sequence_number, last_seq_no)) return false; - uint16_t num_missing_packets = sequence_number - next_seq_no; - if (!AddMissingPackets(num_missing_packets)) return false; - } - - DeltaSize delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2; - if (!AddDeltaSize(delta_size)) return false; - - received_packets_.emplace_back(sequence_number, delta); - last_ts_ += delta * kDeltaTick; - size_bytes_ += delta_size; - - return true; -} - -bool TransportFeedback::AddMissingPackets(size_t num_missing_packets) { - size_t new_num_seq_no = pkt_stat_cnt_ + num_missing_packets; - if (new_num_seq_no > kMaxReportedPackets) { - return false; - } - - if (!last_chunk_.Empty()) { - while (num_missing_packets > 0 && last_chunk_.CanAdd(0)) { - last_chunk_.Add(0); - --num_missing_packets; - } - if (num_missing_packets == 0) { - pkt_stat_cnt_ = new_num_seq_no; - return true; - } - encoded_chunks_.push_back(last_chunk_.Emit()); - } - - size_t full_chunks = num_missing_packets / LastChunk::kMaxRunLengthCapacity; - size_t partial_chunk = num_missing_packets % LastChunk::kMaxRunLengthCapacity; - size_t num_chunks = full_chunks + (partial_chunk > 0 ? 1 : 0); - if (size_bytes_ + kChunkSizeBytes * num_chunks > kMaxSizeBytes) { - pkt_stat_cnt_ = (new_num_seq_no - num_missing_packets); - return false; - } - size_bytes_ += kChunkSizeBytes * num_chunks; - // T = 0, S = 0, run length = kMaxRunLengthCapacity, see EncodeRunLength(). - encoded_chunks_.insert(encoded_chunks_.end(), full_chunks, - LastChunk::kMaxRunLengthCapacity); - last_chunk_.AddMissingPackets(partial_chunk); - pkt_stat_cnt_ = new_num_seq_no; - return true; -} - -bool TransportFeedback::AddDeltaSize(DeltaSize delta_size) { - if (pkt_stat_cnt_ == kMaxReportedPackets) return false; - size_t add_chunk_size = last_chunk_.Empty() ? kChunkSizeBytes : 0; - if (size_bytes_ + delta_size + add_chunk_size > kMaxSizeBytes) return false; - - if (last_chunk_.CanAdd(delta_size)) { - size_bytes_ += add_chunk_size; - last_chunk_.Add(delta_size); - ++pkt_stat_cnt_; - return true; - } - if (size_bytes_ + delta_size + kChunkSizeBytes > kMaxSizeBytes) return false; - - encoded_chunks_.push_back(last_chunk_.Emit()); - size_bytes_ += kChunkSizeBytes; - last_chunk_.Add(delta_size); - ++pkt_stat_cnt_; - return true; -} - -void TransportFeedback::Clear() { - pkt_stat_cnt_ = 0; - last_ts_ = BaseTime(); - received_packets_.clear(); - all_packets_.clear(); - encoded_chunks_.clear(); - last_chunk_.Clear(); - size_bytes_ = kTransportFeedbackHeaderSizeBytes; -} - -// Serialize packet. -bool TransportFeedback::Create(uint8_t* packet, size_t* position, - size_t max_length, - PacketReadyCallback callback) const { - if (pkt_stat_cnt_ == 0) return false; - - while (*position + BlockLength() > max_length) { - if (!OnBufferFull(packet, position, callback)) return false; - } - const size_t position_end = *position + BlockLength(); - const size_t padding_length = PaddingLength(); - bool has_padding = padding_length > 0; - CreateHeader(kFeedbackMessageType, kPacketType, HeaderLength(), has_padding, - packet, position); - CreateCommonFeedback(packet + *position); - *position += kCommonFeedbackLength; - - ByteWriter::WriteBigEndian(&packet[*position], base_seq_no_); - *position += 2; - - ByteWriter::WriteBigEndian(&packet[*position], pkt_stat_cnt_); - *position += 2; - - ByteWriter::WriteBigEndian(&packet[*position], ref_time_); - *position += 3; - - packet[(*position)++] = feedback_pkt_cnt_; - - for (uint16_t chunk : encoded_chunks_) { - ByteWriter::WriteBigEndian(&packet[*position], chunk); - *position += 2; - } - if (!last_chunk_.Empty()) { - uint16_t chunk = last_chunk_.EncodeLast(); - ByteWriter::WriteBigEndian(&packet[*position], chunk); - *position += 2; - } - - for (const auto& received_packet : received_packets_) { - int16_t delta = received_packet.delta_ticks(); - if (delta >= 0 && delta <= 0xFF) { - packet[(*position)++] = delta; - } else { - ByteWriter::WriteBigEndian(&packet[*position], delta); - *position += 2; - } - } - - if (padding_length > 0) { - for (size_t i = 0; i < padding_length - 1; ++i) { - packet[(*position)++] = 0; - } - packet[(*position)++] = padding_length; - } - - if (*position != position_end) { - LOG_FATAL("padding_length is too small"); - } - return true; -} - -void TransportFeedback::SetBaseSequenceNumber(uint16_t base_sequence) { - base_seq_no_ = base_sequence; -} - -void TransportFeedback::SetPacketStatusCount(uint16_t packet_status_count) { - pkt_stat_cnt_ = packet_status_count; -} - -void TransportFeedback::SetReferenceTime(uint32_t reference_time) { - ref_time_ = reference_time; -} - -void TransportFeedback::SetFeedbackPacketCount(uint8_t feedback_packet_count) { - feedback_pkt_cnt_ = feedback_packet_count; -} - -int64_t TransportFeedback::BaseTime() const { - // Add an extra kTimeWrapPeriod to allow add received packets arrived earlier - // than the first added packet (and thus allow to record negative deltas) - // even when ref_time_ == 0. - return 0 + kTimeWrapPeriod + int64_t{ref_time_} * kBaseTimeTick; -} - -int64_t TransportFeedback::GetBaseDelta(int64_t prev_timestamp) const { - int64_t delta = BaseTime() - prev_timestamp; - // Compensate for wrap around. - if (std::abs(delta - kTimeWrapPeriod) < std::abs(delta)) { - delta -= kTimeWrapPeriod; // Wrap backwards. - } else if (std::abs(delta + kTimeWrapPeriod) < std::abs(delta)) { - delta += kTimeWrapPeriod; // Wrap forwards. - } - return delta; -} - -size_t TransportFeedback::BlockLength() const { - // Round size_bytes_ up to multiple of 32bits. - return (size_bytes_ + 3) & (~static_cast(3)); -} - -size_t TransportFeedback::PaddingLength() const { - return BlockLength() - size_bytes_; -} - -void TransportFeedback::ParseCommonFeedback(const uint8_t* payload) { - SetSenderSsrc(ByteReader::ReadBigEndian(&payload[0])); - SetMediaSsrc(ByteReader::ReadBigEndian(&payload[4])); -} - -void TransportFeedback::CreateCommonFeedback(uint8_t* payload) const { - ByteWriter::WriteBigEndian(&payload[0], sender_ssrc()); - ByteWriter::WriteBigEndian(&payload[4], media_ssrc()); -} \ No newline at end of file diff --git a/src/rtcp/transport_feedback.h b/src/rtcp/transport_feedback.h deleted file mode 100644 index d938ff0..0000000 --- a/src/rtcp/transport_feedback.h +++ /dev/null @@ -1,183 +0,0 @@ -/* - * @Author: DI JUNKUN - * @Date: 2024-12-11 - * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. - */ - -#ifndef _TRANSPORT_FEEDBACK_H_ -#define _TRANSPORT_FEEDBACK_H_ - -// RR -// 0 1 2 3 -// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// |V=2|P| FMT=15 | PT=205 | length | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | SSRC of packet sender | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | SSRC of media source | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | base sequence number | packet status count | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | reference time | fb pkt. count | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | packet chunk | packet chunk | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// . . -// . . -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | packet chunk | recv delta | recv delta | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// . . -// . . -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | recv delta | recv delta | zero padding | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -// RTP transport sequence number -// 0 1 2 3 -// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | 0xBE | 0xDE | length=1 | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -// | ID | L=1 |transport-wide sequence number | zero padding | -// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -#include -#include - -#include "rtcp_header.h" -#include "rtcp_packet.h" -#include "rtcp_typedef.h" - -class TransportFeedback : public RtcpPacket { - public: - class ReceivedPacket { - public: - ReceivedPacket(uint16_t sequence_number, int16_t delta_ticks) - : sequence_number_(sequence_number), delta_ticks_(delta_ticks) {} - ReceivedPacket(const ReceivedPacket&) = default; - ReceivedPacket& operator=(const ReceivedPacket&) = default; - - uint16_t sequence_number() const { return sequence_number_; } - int16_t delta_ticks() const { return delta_ticks_; } - int64_t delta() const { return delta_ticks_ * kDeltaTick; } - - private: - uint16_t sequence_number_; - int16_t delta_ticks_; - }; - - static constexpr uint8_t kFeedbackMessageType = 15; - static constexpr int64_t kDeltaTick = 250; // 0.25ms - // Maximum number of packets (including missing) TransportFeedback can report. - static constexpr size_t kMaxReportedPackets = 0xffff; - - private: - // Size in bytes of a delta time in rtcp packet. - // Valid values are 0 (packet wasn't received), 1 or 2. - using DeltaSize = uint8_t; - // Keeps DeltaSizes that can be encoded into single chunk if it is last chunk. - class LastChunk { - public: - using DeltaSize = TransportFeedback::DeltaSize; - static constexpr size_t kMaxRunLengthCapacity = 0x1fff; - - LastChunk(); - - bool Empty() const; - void Clear(); - // Return if delta sizes still can be encoded into single chunk with added - // `delta_size`. - bool CanAdd(DeltaSize delta_size) const; - // Add `delta_size`, assumes `CanAdd(delta_size)`, - void Add(DeltaSize delta_size); - // Equivalent to calling Add(0) `num_missing` times. Assumes `Empty()`. - void AddMissingPackets(size_t num_missing); - - // Encode chunk as large as possible removing encoded delta sizes. - // Assume CanAdd() == false for some valid delta_size. - uint16_t Emit(); - // Encode all stored delta_sizes into single chunk, pad with 0s if needed. - uint16_t EncodeLast() const; - - // Decode up to `max_size` delta sizes from `chunk`. - void Decode(uint16_t chunk, size_t max_size); - // Appends content of the Lastchunk to `deltas`. - void AppendTo(std::vector* deltas) const; - - private: - static constexpr size_t kMaxOneBitCapacity = 14; - static constexpr size_t kMaxTwoBitCapacity = 7; - static constexpr size_t kMaxVectorCapacity = kMaxOneBitCapacity; - static constexpr DeltaSize kLarge = 2; - - uint16_t EncodeOneBit() const; - void DecodeOneBit(uint16_t chunk, size_t max_size); - - uint16_t EncodeTwoBit(size_t size) const; - void DecodeTwoBit(uint16_t chunk, size_t max_size); - - uint16_t EncodeRunLength() const; - void DecodeRunLength(uint16_t chunk, size_t max_size); - - std::array delta_sizes_; - size_t size_; - bool all_same_; - bool has_large_delta_; - }; - - public: - TransportFeedback(); - ~TransportFeedback(); - - public: - static constexpr uint8_t kPacketType = 205; - - public: - bool AddReceivedPacket(uint16_t sequence_number, int64_t timestamp); - bool AddMissingPackets(size_t num_missing_packets); - bool AddDeltaSize(DeltaSize delta_size); - void Clear(); - - bool Create(uint8_t* packet, size_t* position, size_t max_length, - PacketReadyCallback callback) const override; - - public: - void SetMediaSsrc(uint32_t ssrc) { media_ssrc_ = ssrc; } - uint32_t media_ssrc() const { return media_ssrc_; } - void ParseCommonFeedback(const uint8_t* payload); - void CreateCommonFeedback(uint8_t* payload) const; - - public: - static constexpr size_t kCommonFeedbackLength = 8; - void SetBaseSequenceNumber(uint16_t base_sequence_number); - void SetPacketStatusCount(uint16_t packet_status_count); - void SetReferenceTime(uint32_t reference_time); - void SetFeedbackPacketCount(uint8_t feedback_packet_count); - - int64_t BaseTime() const; - int64_t GetBaseDelta(int64_t prev_timestamp) const; - size_t BlockLength() const override; - size_t PaddingLength() const; - - private: - uint32_t media_ssrc_ = 0; - - private: - uint16_t base_seq_no_; - uint16_t pkt_stat_cnt_; - uint32_t ref_time_; - uint8_t feedback_pkt_cnt_; - - int64_t last_ts_; - - std::vector received_packets_; - std::vector all_packets_; - // All but last encoded packet chunks. - std::vector encoded_chunks_; - LastChunk last_chunk_; - size_t size_bytes_; -}; - -#endif \ No newline at end of file diff --git a/src/rtp/rtp_packet/rtp_packet.cpp b/src/rtp/rtp_packet/rtp_packet.cpp index 881ee38..228bf25 100644 --- a/src/rtp/rtp_packet/rtp_packet.cpp +++ b/src/rtp/rtp_packet/rtp_packet.cpp @@ -127,6 +127,22 @@ RtpPacket::~RtpPacket() { payload_size_ = 0; } +bool RtpPacket::Build(const uint8_t *buffer, uint32_t size) { + if (size > 0) { + buffer_ = (uint8_t *)malloc(size); + if (NULL == buffer_) { + LOG_ERROR("Malloc failed"); + } else { + memcpy(buffer_, buffer, size); + } + size_ = size; + + // TryToDecodeH264RtpPacket(buffer_); + return true; + } + return false; +} + const uint8_t *RtpPacket::Encode(uint8_t *payload, size_t payload_size) { buffer_[0] = (version_ << 6) | (has_padding_ << 5) | (has_extension_ << 4) | total_csrc_number_; diff --git a/src/rtp/rtp_packet/rtp_packet.h b/src/rtp/rtp_packet/rtp_packet.h index 9c78f40..22b8116 100644 --- a/src/rtp/rtp_packet/rtp_packet.h +++ b/src/rtp/rtp_packet/rtp_packet.h @@ -192,6 +192,9 @@ class RtpPacket { virtual ~RtpPacket(); + public: + bool Build(const uint8_t *buffer, uint32_t size); + public: // Set Header void SetVerion(uint8_t version) { version_ = version; }