[fix] fix crash caused by RtpPacketReceived

This commit is contained in:
dijunkun
2025-01-09 17:03:08 +08:00
parent de212a8e75
commit 49b74ffcd6
5 changed files with 40 additions and 441 deletions

View File

@@ -39,15 +39,16 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
rtp_statistics_->Start();
}
RtpPacketReceived* rtp_packet_received;
rtp_packet_received = dynamic_cast<RtpPacketReceived*>(&rtp_packet);
rtp_packet_received->set_arrival_time(
RtpPacketReceived rtp_packet_received;
rtp_packet_received.Build(rtp_packet.Buffer(), rtp_packet.Size());
rtp_packet_received.set_arrival_time(
std::chrono::system_clock::now().time_since_epoch().count());
rtp_packet_received->set_ecn(EcnMarking::kEct0);
rtp_packet_received->set_recovered(false);
rtp_packet_received->set_payload_type_frequency(0);
rtp_packet_received.set_ecn(EcnMarking::kEct0);
rtp_packet_received.set_recovered(false);
rtp_packet_received.set_payload_type_frequency(0);
receive_side_congestion_controller_.OnReceivedPacket(
*rtp_packet_received, ReceiveSideCongestionController::MediaType::VIDEO);
rtp_packet_received, ReceiveSideCongestionController::MediaType::VIDEO);
last_recv_bytes_ = (uint32_t)rtp_packet.PayloadSize();
total_rtp_payload_recv_ += (uint32_t)rtp_packet.PayloadSize();
@@ -346,7 +347,19 @@ int RtpVideoReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) {
}
void RtpVideoReceiver::SendCombinedRtcpPacket(
std::vector<std::unique_ptr<RtcpPacket>> packets) {}
std::vector<std::unique_ptr<RtcpPacket>> packets) {
if (!data_send_func_) {
LOG_ERROR("data_send_func_ is nullptr");
}
LOG_ERROR("Send combined rtcp packet");
for (auto& packet : packets) {
if (data_send_func_((const char*)packet->Buffer(), packet->Size())) {
LOG_ERROR("Send CCB failed");
}
}
}
bool RtpVideoReceiver::CheckIsTimeSendRR() {
uint32_t now_ts = static_cast<uint32_t>(

View File

@@ -1,250 +0,0 @@
#include "transport_feedback.h"
#include "byte_io.h"
#include "log.h"
#include "sequence_number_compare.h"
// Header size:
// * 4 bytes Common RTCP Packet Header
// * 8 bytes Common Packet Format for RTCP Feedback Messages
// * 8 bytes FeedbackPacket header
constexpr size_t kTransportFeedbackHeaderSizeBytes = 4 + 8 + 8;
constexpr size_t kChunkSizeBytes = 2;
// TODO(sprang): Add support for dynamic max size for easier fragmentation,
// eg. set it to what's left in the buffer or IP_PACKET_SIZE.
// Size constraint imposed by RTCP common header: 16bit size field interpreted
// as number of four byte words minus the first header word.
constexpr size_t kMaxSizeBytes = (1 << 16) * 4;
// Payload size:
// * 8 bytes Common Packet Format for RTCP Feedback Messages
// * 8 bytes FeedbackPacket header.
// * 2 bytes for one chunk.
constexpr size_t kMinPayloadSizeBytes = 8 + 8 + 2;
constexpr int64_t kBaseTimeTick =
TransportFeedback::kDeltaTick * (1 << 8); // 64ms
constexpr int64_t kTimeWrapPeriod = kBaseTimeTick * (1 << 24); // 12.43days
TransportFeedback::TransportFeedback()
: base_seq_no_(0), pkt_stat_cnt_(0), ref_time_(0), feedback_pkt_cnt_(0) {}
TransportFeedback::~TransportFeedback() {}
bool TransportFeedback::AddReceivedPacket(uint16_t sequence_number,
int64_t timestamp) {
int16_t delta = 0;
// Convert to ticks and round.
if (last_ts_ > timestamp) {
timestamp += (last_ts_ - timestamp) + kTimeWrapPeriod;
}
int64_t delta_full = timestamp - last_ts_ % kTimeWrapPeriod;
if (delta_full > kTimeWrapPeriod / 2) {
delta_full -= kTimeWrapPeriod;
delta_full -= kDeltaTick / 2;
} else {
delta_full += kDeltaTick / 2;
}
delta_full /= kDeltaTick;
delta = static_cast<int16_t>(delta_full);
// If larger than 16bit signed, we can't represent it - need new fb packet.
if (delta != delta_full) {
LOG_WARN("Delta value too large ( >= 2^16 ticks )");
return false;
}
uint16_t next_seq_no = base_seq_no_ + pkt_stat_cnt_;
if (sequence_number != next_seq_no) {
uint16_t last_seq_no = next_seq_no - 1;
if (!IsNewerSequenceNumber(sequence_number, last_seq_no)) return false;
uint16_t num_missing_packets = sequence_number - next_seq_no;
if (!AddMissingPackets(num_missing_packets)) return false;
}
DeltaSize delta_size = (delta >= 0 && delta <= 0xff) ? 1 : 2;
if (!AddDeltaSize(delta_size)) return false;
received_packets_.emplace_back(sequence_number, delta);
last_ts_ += delta * kDeltaTick;
size_bytes_ += delta_size;
return true;
}
bool TransportFeedback::AddMissingPackets(size_t num_missing_packets) {
size_t new_num_seq_no = pkt_stat_cnt_ + num_missing_packets;
if (new_num_seq_no > kMaxReportedPackets) {
return false;
}
if (!last_chunk_.Empty()) {
while (num_missing_packets > 0 && last_chunk_.CanAdd(0)) {
last_chunk_.Add(0);
--num_missing_packets;
}
if (num_missing_packets == 0) {
pkt_stat_cnt_ = new_num_seq_no;
return true;
}
encoded_chunks_.push_back(last_chunk_.Emit());
}
size_t full_chunks = num_missing_packets / LastChunk::kMaxRunLengthCapacity;
size_t partial_chunk = num_missing_packets % LastChunk::kMaxRunLengthCapacity;
size_t num_chunks = full_chunks + (partial_chunk > 0 ? 1 : 0);
if (size_bytes_ + kChunkSizeBytes * num_chunks > kMaxSizeBytes) {
pkt_stat_cnt_ = (new_num_seq_no - num_missing_packets);
return false;
}
size_bytes_ += kChunkSizeBytes * num_chunks;
// T = 0, S = 0, run length = kMaxRunLengthCapacity, see EncodeRunLength().
encoded_chunks_.insert(encoded_chunks_.end(), full_chunks,
LastChunk::kMaxRunLengthCapacity);
last_chunk_.AddMissingPackets(partial_chunk);
pkt_stat_cnt_ = new_num_seq_no;
return true;
}
bool TransportFeedback::AddDeltaSize(DeltaSize delta_size) {
if (pkt_stat_cnt_ == kMaxReportedPackets) return false;
size_t add_chunk_size = last_chunk_.Empty() ? kChunkSizeBytes : 0;
if (size_bytes_ + delta_size + add_chunk_size > kMaxSizeBytes) return false;
if (last_chunk_.CanAdd(delta_size)) {
size_bytes_ += add_chunk_size;
last_chunk_.Add(delta_size);
++pkt_stat_cnt_;
return true;
}
if (size_bytes_ + delta_size + kChunkSizeBytes > kMaxSizeBytes) return false;
encoded_chunks_.push_back(last_chunk_.Emit());
size_bytes_ += kChunkSizeBytes;
last_chunk_.Add(delta_size);
++pkt_stat_cnt_;
return true;
}
void TransportFeedback::Clear() {
pkt_stat_cnt_ = 0;
last_ts_ = BaseTime();
received_packets_.clear();
all_packets_.clear();
encoded_chunks_.clear();
last_chunk_.Clear();
size_bytes_ = kTransportFeedbackHeaderSizeBytes;
}
// Serialize packet.
bool TransportFeedback::Create(uint8_t* packet, size_t* position,
size_t max_length,
PacketReadyCallback callback) const {
if (pkt_stat_cnt_ == 0) return false;
while (*position + BlockLength() > max_length) {
if (!OnBufferFull(packet, position, callback)) return false;
}
const size_t position_end = *position + BlockLength();
const size_t padding_length = PaddingLength();
bool has_padding = padding_length > 0;
CreateHeader(kFeedbackMessageType, kPacketType, HeaderLength(), has_padding,
packet, position);
CreateCommonFeedback(packet + *position);
*position += kCommonFeedbackLength;
ByteWriter<uint16_t>::WriteBigEndian(&packet[*position], base_seq_no_);
*position += 2;
ByteWriter<uint16_t>::WriteBigEndian(&packet[*position], pkt_stat_cnt_);
*position += 2;
ByteWriter<uint32_t, 3>::WriteBigEndian(&packet[*position], ref_time_);
*position += 3;
packet[(*position)++] = feedback_pkt_cnt_;
for (uint16_t chunk : encoded_chunks_) {
ByteWriter<uint16_t>::WriteBigEndian(&packet[*position], chunk);
*position += 2;
}
if (!last_chunk_.Empty()) {
uint16_t chunk = last_chunk_.EncodeLast();
ByteWriter<uint16_t>::WriteBigEndian(&packet[*position], chunk);
*position += 2;
}
for (const auto& received_packet : received_packets_) {
int16_t delta = received_packet.delta_ticks();
if (delta >= 0 && delta <= 0xFF) {
packet[(*position)++] = delta;
} else {
ByteWriter<int16_t>::WriteBigEndian(&packet[*position], delta);
*position += 2;
}
}
if (padding_length > 0) {
for (size_t i = 0; i < padding_length - 1; ++i) {
packet[(*position)++] = 0;
}
packet[(*position)++] = padding_length;
}
if (*position != position_end) {
LOG_FATAL("padding_length is too small");
}
return true;
}
void TransportFeedback::SetBaseSequenceNumber(uint16_t base_sequence) {
base_seq_no_ = base_sequence;
}
void TransportFeedback::SetPacketStatusCount(uint16_t packet_status_count) {
pkt_stat_cnt_ = packet_status_count;
}
void TransportFeedback::SetReferenceTime(uint32_t reference_time) {
ref_time_ = reference_time;
}
void TransportFeedback::SetFeedbackPacketCount(uint8_t feedback_packet_count) {
feedback_pkt_cnt_ = feedback_packet_count;
}
int64_t TransportFeedback::BaseTime() const {
// Add an extra kTimeWrapPeriod to allow add received packets arrived earlier
// than the first added packet (and thus allow to record negative deltas)
// even when ref_time_ == 0.
return 0 + kTimeWrapPeriod + int64_t{ref_time_} * kBaseTimeTick;
}
int64_t TransportFeedback::GetBaseDelta(int64_t prev_timestamp) const {
int64_t delta = BaseTime() - prev_timestamp;
// Compensate for wrap around.
if (std::abs(delta - kTimeWrapPeriod) < std::abs(delta)) {
delta -= kTimeWrapPeriod; // Wrap backwards.
} else if (std::abs(delta + kTimeWrapPeriod) < std::abs(delta)) {
delta += kTimeWrapPeriod; // Wrap forwards.
}
return delta;
}
size_t TransportFeedback::BlockLength() const {
// Round size_bytes_ up to multiple of 32bits.
return (size_bytes_ + 3) & (~static_cast<size_t>(3));
}
size_t TransportFeedback::PaddingLength() const {
return BlockLength() - size_bytes_;
}
void TransportFeedback::ParseCommonFeedback(const uint8_t* payload) {
SetSenderSsrc(ByteReader<uint32_t>::ReadBigEndian(&payload[0]));
SetMediaSsrc(ByteReader<uint32_t>::ReadBigEndian(&payload[4]));
}
void TransportFeedback::CreateCommonFeedback(uint8_t* payload) const {
ByteWriter<uint32_t>::WriteBigEndian(&payload[0], sender_ssrc());
ByteWriter<uint32_t>::WriteBigEndian(&payload[4], media_ssrc());
}

View File

@@ -1,183 +0,0 @@
/*
* @Author: DI JUNKUN
* @Date: 2024-12-11
* Copyright (c) 2024 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _TRANSPORT_FEEDBACK_H_
#define _TRANSPORT_FEEDBACK_H_
// RR
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// |V=2|P| FMT=15 | PT=205 | length |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of packet sender |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | SSRC of media source |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | base sequence number | packet status count |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | reference time | fb pkt. count |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | packet chunk | packet chunk |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// . .
// . .
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | packet chunk | recv delta | recv delta |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// . .
// . .
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | recv delta | recv delta | zero padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// RTP transport sequence number
// 0 1 2 3
// 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | 0xBE | 0xDE | length=1 |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
// | ID | L=1 |transport-wide sequence number | zero padding |
// +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
#include <array>
#include <vector>
#include "rtcp_header.h"
#include "rtcp_packet.h"
#include "rtcp_typedef.h"
class TransportFeedback : public RtcpPacket {
public:
class ReceivedPacket {
public:
ReceivedPacket(uint16_t sequence_number, int16_t delta_ticks)
: sequence_number_(sequence_number), delta_ticks_(delta_ticks) {}
ReceivedPacket(const ReceivedPacket&) = default;
ReceivedPacket& operator=(const ReceivedPacket&) = default;
uint16_t sequence_number() const { return sequence_number_; }
int16_t delta_ticks() const { return delta_ticks_; }
int64_t delta() const { return delta_ticks_ * kDeltaTick; }
private:
uint16_t sequence_number_;
int16_t delta_ticks_;
};
static constexpr uint8_t kFeedbackMessageType = 15;
static constexpr int64_t kDeltaTick = 250; // 0.25ms
// Maximum number of packets (including missing) TransportFeedback can report.
static constexpr size_t kMaxReportedPackets = 0xffff;
private:
// Size in bytes of a delta time in rtcp packet.
// Valid values are 0 (packet wasn't received), 1 or 2.
using DeltaSize = uint8_t;
// Keeps DeltaSizes that can be encoded into single chunk if it is last chunk.
class LastChunk {
public:
using DeltaSize = TransportFeedback::DeltaSize;
static constexpr size_t kMaxRunLengthCapacity = 0x1fff;
LastChunk();
bool Empty() const;
void Clear();
// Return if delta sizes still can be encoded into single chunk with added
// `delta_size`.
bool CanAdd(DeltaSize delta_size) const;
// Add `delta_size`, assumes `CanAdd(delta_size)`,
void Add(DeltaSize delta_size);
// Equivalent to calling Add(0) `num_missing` times. Assumes `Empty()`.
void AddMissingPackets(size_t num_missing);
// Encode chunk as large as possible removing encoded delta sizes.
// Assume CanAdd() == false for some valid delta_size.
uint16_t Emit();
// Encode all stored delta_sizes into single chunk, pad with 0s if needed.
uint16_t EncodeLast() const;
// Decode up to `max_size` delta sizes from `chunk`.
void Decode(uint16_t chunk, size_t max_size);
// Appends content of the Lastchunk to `deltas`.
void AppendTo(std::vector<DeltaSize>* deltas) const;
private:
static constexpr size_t kMaxOneBitCapacity = 14;
static constexpr size_t kMaxTwoBitCapacity = 7;
static constexpr size_t kMaxVectorCapacity = kMaxOneBitCapacity;
static constexpr DeltaSize kLarge = 2;
uint16_t EncodeOneBit() const;
void DecodeOneBit(uint16_t chunk, size_t max_size);
uint16_t EncodeTwoBit(size_t size) const;
void DecodeTwoBit(uint16_t chunk, size_t max_size);
uint16_t EncodeRunLength() const;
void DecodeRunLength(uint16_t chunk, size_t max_size);
std::array<DeltaSize, kMaxVectorCapacity> delta_sizes_;
size_t size_;
bool all_same_;
bool has_large_delta_;
};
public:
TransportFeedback();
~TransportFeedback();
public:
static constexpr uint8_t kPacketType = 205;
public:
bool AddReceivedPacket(uint16_t sequence_number, int64_t timestamp);
bool AddMissingPackets(size_t num_missing_packets);
bool AddDeltaSize(DeltaSize delta_size);
void Clear();
bool Create(uint8_t* packet, size_t* position, size_t max_length,
PacketReadyCallback callback) const override;
public:
void SetMediaSsrc(uint32_t ssrc) { media_ssrc_ = ssrc; }
uint32_t media_ssrc() const { return media_ssrc_; }
void ParseCommonFeedback(const uint8_t* payload);
void CreateCommonFeedback(uint8_t* payload) const;
public:
static constexpr size_t kCommonFeedbackLength = 8;
void SetBaseSequenceNumber(uint16_t base_sequence_number);
void SetPacketStatusCount(uint16_t packet_status_count);
void SetReferenceTime(uint32_t reference_time);
void SetFeedbackPacketCount(uint8_t feedback_packet_count);
int64_t BaseTime() const;
int64_t GetBaseDelta(int64_t prev_timestamp) const;
size_t BlockLength() const override;
size_t PaddingLength() const;
private:
uint32_t media_ssrc_ = 0;
private:
uint16_t base_seq_no_;
uint16_t pkt_stat_cnt_;
uint32_t ref_time_;
uint8_t feedback_pkt_cnt_;
int64_t last_ts_;
std::vector<ReceivedPacket> received_packets_;
std::vector<ReceivedPacket> all_packets_;
// All but last encoded packet chunks.
std::vector<uint16_t> encoded_chunks_;
LastChunk last_chunk_;
size_t size_bytes_;
};
#endif

View File

@@ -127,6 +127,22 @@ RtpPacket::~RtpPacket() {
payload_size_ = 0;
}
bool RtpPacket::Build(const uint8_t *buffer, uint32_t size) {
if (size > 0) {
buffer_ = (uint8_t *)malloc(size);
if (NULL == buffer_) {
LOG_ERROR("Malloc failed");
} else {
memcpy(buffer_, buffer, size);
}
size_ = size;
// TryToDecodeH264RtpPacket(buffer_);
return true;
}
return false;
}
const uint8_t *RtpPacket::Encode(uint8_t *payload, size_t payload_size) {
buffer_[0] = (version_ << 6) | (has_padding_ << 5) | (has_extension_ << 4) |
total_csrc_number_;

View File

@@ -192,6 +192,9 @@ class RtpPacket {
virtual ~RtpPacket();
public:
bool Build(const uint8_t *buffer, uint32_t size);
public:
// Set Header
void SetVerion(uint8_t version) { version_ = version; }