From 6bda59b1a72d0b1628ff0ff14c399c6a527e3a84 Mon Sep 17 00:00:00 2001 From: dijunkun Date: Sat, 12 Oct 2024 17:30:29 +0800 Subject: [PATCH] [fix] obu data corrupted after transmission --- .../video/decode/dav1d/dav1d_av1_decoder.cpp | 3 +- .../video/encode/aom/aom_av1_encoder.cpp | 2 +- src/rtp/byte_buffer.cpp | 5 +- src/rtp/byte_buffer.h | 2 +- src/rtp/obu_parser.cpp | 386 ++---------------- src/rtp/obu_parser.h | 13 + src/rtp/rtp_codec.cpp | 55 ++- src/rtp/rtp_codec_av1.cpp | 149 +++---- src/rtp/rtp_packet.cpp | 1 - src/rtp/rtp_video_receiver.cpp | 20 +- 10 files changed, 170 insertions(+), 466 deletions(-) diff --git a/src/media/video/decode/dav1d/dav1d_av1_decoder.cpp b/src/media/video/decode/dav1d/dav1d_av1_decoder.cpp index a465a83..036b0a5 100644 --- a/src/media/video/decode/dav1d/dav1d_av1_decoder.cpp +++ b/src/media/video/decode/dav1d/dav1d_av1_decoder.cpp @@ -2,7 +2,7 @@ #include "log.h" -#define SAVE_RECEIVED_AV1_STREAM 0 +#define SAVE_RECEIVED_AV1_STREAM 1 #define SAVE_DECODED_NV12_STREAM 0 #include "libyuv.h" @@ -98,6 +98,7 @@ int Dav1dAv1Decoder::Init() { int Dav1dAv1Decoder::Decode( const uint8_t *data, int size, std::function on_receive_decoded_frame) { + LOG_ERROR("frame size = {}", size); if (SAVE_RECEIVED_AV1_STREAM) { fwrite((unsigned char *)data, 1, size, file_av1_); } diff --git a/src/media/video/encode/aom/aom_av1_encoder.cpp b/src/media/video/encode/aom/aom_av1_encoder.cpp index 2a66f65..440bfc2 100644 --- a/src/media/video/encode/aom/aom_av1_encoder.cpp +++ b/src/media/video/encode/aom/aom_av1_encoder.cpp @@ -6,7 +6,7 @@ #include "log.h" #define SAVE_RECEIVED_NV12_STREAM 0 -#define SAVE_ENCODED_AV1_STREAM 0 +#define SAVE_ENCODED_AV1_STREAM 1 #define SET_ENCODER_PARAM_OR_RETURN_ERROR(param_id, param_value) \ do { \ diff --git a/src/rtp/byte_buffer.cpp b/src/rtp/byte_buffer.cpp index 2bc2e34..8530cc4 100644 --- a/src/rtp/byte_buffer.cpp +++ b/src/rtp/byte_buffer.cpp @@ -29,7 +29,7 @@ bool ByteBufferReader::ReadUInt8(uint8_t* val) { return ReadBytes(reinterpret_cast(val), 1); } -bool ByteBufferReader::ReadUVarint(uint64_t* val, size_t* len) { +bool ByteBufferReader::ReadUVarint(uint64_t* val) { if (!val) { return false; } @@ -46,9 +46,6 @@ bool ByteBufferReader::ReadUVarint(uint64_t* val, size_t* len) { // True if the msb is not a continuation byte. if (static_cast(byte) < 0x80) { *val = v; - if (len) { - *len = i / 8 + (i % 8 ? 1 : 0) + 1; - } return true; } } diff --git a/src/rtp/byte_buffer.h b/src/rtp/byte_buffer.h index dca6b8f..374154c 100644 --- a/src/rtp/byte_buffer.h +++ b/src/rtp/byte_buffer.h @@ -24,7 +24,7 @@ class ByteBufferReader { bool ReadBytes(char* val, size_t len); bool ReadUInt8(uint8_t* val); - bool ReadUVarint(uint64_t* val, size_t* len); + bool ReadUVarint(uint64_t* val); bool Consume(size_t size); diff --git a/src/rtp/obu_parser.cpp b/src/rtp/obu_parser.cpp index b16c708..ed71696 100644 --- a/src/rtp/obu_parser.cpp +++ b/src/rtp/obu_parser.cpp @@ -14,52 +14,6 @@ constexpr int kObuTypeTemporalDelimiter = 2; constexpr int kObuTypeTileList = 8; constexpr int kObuTypePadding = 15; -int Leb128Size(uint64_t value) { - int size = 0; - while (value >= 0x80) { - ++size; - value >>= 7; - } - return size + 1; -} - -uint64_t ReadLeb128(const uint8_t*& read_at, const uint8_t* end) { - uint64_t value = 0; - int fill_bits = 0; - while (read_at != end && fill_bits < 64 - 7) { - uint8_t leb128_byte = *read_at; - value |= uint64_t{leb128_byte & 0x7Fu} << fill_bits; - ++read_at; - fill_bits += 7; - if ((leb128_byte & 0x80) == 0) { - return value; - } - } - // Read 9 bytes and didn't find the terminator byte. Check if 10th byte - // is that terminator, however to fit result into uint64_t it may carry only - // single bit. - if (read_at != end && *read_at <= 1) { - value |= uint64_t{*read_at} << fill_bits; - ++read_at; - return value; - } - // Failed to find terminator leb128 byte. - read_at = nullptr; - return 0; -} - -int WriteLeb128(uint64_t value, uint8_t* buffer) { - int size = 0; - while (value >= 0x80) { - buffer[size] = 0x80 | (value & 0x7F); - ++size; - value >>= 7; - } - buffer[size] = value; - ++size; - return size; -} - const char* ObuTypeToString(OBU_TYPE type) { switch (type) { case OBU_SEQUENCE_HEADER: @@ -92,25 +46,13 @@ bool ObuHasSize(uint8_t obu_header) { return obu_header & kObuSizePresentBit; } int ObuType(uint8_t obu_header) { return (obu_header & 0b0'1111'000) >> 3; } -int MaxFragmentSize(int remaining_bytes) { - if (remaining_bytes <= 1) { - return 0; - } - for (int i = 1;; ++i) { - if (remaining_bytes < (1 << 7 * i) + i) { - return remaining_bytes - i; - } - } -} - -// ParseObus - std::vector ParseObus(uint8_t* payload, int payload_size) { std::vector result; ByteBufferReader payload_reader(reinterpret_cast(payload), payload_size); while (payload_reader.Length() > 0) { Obu obu; + bool has_ext_header = false; payload_reader.ReadUInt8(&obu.header); obu.size = 1; if (ObuHasExtension(obu.header)) { @@ -123,16 +65,17 @@ std::vector ParseObus(uint8_t* payload, int payload_size) { } payload_reader.ReadUInt8(&obu.extension_header); ++obu.size; + has_ext_header = true; } if (!ObuHasSize(obu.header)) { obu.payload = std::vector( reinterpret_cast(payload_reader.Data()), - payload_reader.Length()); + reinterpret_cast(payload_reader.Data()) + + payload_reader.Length()); payload_reader.Consume(payload_reader.Length()); } else { uint64_t size = 0; - size_t len = 0; - if (!payload_reader.ReadUVarint(&size, &len) || + if (!payload_reader.ReadUVarint(&size) || size > payload_reader.Length()) { LOG_ERROR( "Malformed AV1 input: declared payload_size {} is larger than " @@ -140,319 +83,38 @@ std::vector ParseObus(uint8_t* payload, int payload_size) { size, payload_reader.Length()); return {}; } + obu.payload = std::vector( - reinterpret_cast(payload_reader.Data()), size); + reinterpret_cast(payload_reader.Data()), + reinterpret_cast(payload_reader.Data()) + size); payload_reader.Consume(size); + LOG_ERROR("Has size = {}", size); } obu.size += obu.payload.size(); // Skip obus that shouldn't be transfered over rtp. int obu_type = ObuType(obu.header); - // if (obu_type != kObuTypeTemporalDelimiter && obu_type != kObuTypeTileList - // && + if (has_ext_header) { + obu.payload.insert(obu.payload.begin(), obu.extension_header); + } + obu.payload.insert(obu.payload.begin(), obu.header); + if (obu_type != kObuTypeTileList && // + obu_type != kObuTypePadding) { + result.push_back(obu); + } + // if (obu_type != kObuTypeTemporalDelimiter && // + // obu_type != kObuTypeTileList && // // obu_type != kObuTypePadding) { // result.push_back(obu); // } - if (obu_type != kObuTypeTileList && obu_type != kObuTypePadding) { - result.push_back(obu); - } } - // for (int i = 0; i < result.size(); i++) { - // LOG_ERROR("[{}] Obu size = [{}], Obu type [{}]", i, - // result[i].payload_size_, - // ObuTypeToString((OBU_TYPE)ObuType(result[i].header_))); - // } + for (int i = 0; i < result.size(); i++) { + LOG_ERROR("[{}] Obu size = [{}], Obu type [{}|{}]", i, result[i].size, + ObuType(result[i].payload[0]), + ObuTypeToString((OBU_TYPE)ObuType(result[i].header))); + } return result; } -int AdditionalBytesForPreviousObuElement(const Packet& packet) { - if (packet.packet_size == 0) { - // Packet is still empty => no last OBU element, no need to reserve space - // for it. - return 0; - } - if (packet.num_obu_elements > kMaxNumObusToOmitSize) { - // There is so many obu elements in the packet, all of them must be - // prepended with the length field. That imply space for the length of the - // last obu element is already reserved. - return 0; - } - // No space was reserved for length field of the last OBU element, but that - // element becoming non-last, so it now requires explicit length field. - // Calculate how many bytes are needed to store the length in leb128 format. - return Leb128Size(packet.last_obu_size); -} - -std::vector Packetize(std::vector obus) { - int max_payload_len = 1200; - std::vector packets; - if (obus.empty()) { - return packets; - } - - // Aggregation header is present in all packets. - max_payload_len -= kAggregationHeaderSize; - - // Assemble packets. Push to current packet as much as it can hold before - // considering next one. That would normally cause uneven distribution across - // packets, specifically last one would be generally smaller. - packets.emplace_back(/*first_obu_index=*/0); - int packet_remaining_bytes = max_payload_len; - for (size_t obu_index = 0; obu_index < obus.size(); ++obu_index) { - const bool is_last_obu = obu_index == obus.size() - 1; - const Obu& obu = obus[obu_index]; - - // Putting `obu` into the last packet would make last obu element stored in - // that packet not last. All not last OBU elements must be prepend with the - // element length. AdditionalBytesForPreviousObuElement calculates how many - // bytes are needed to store that length. - int previous_obu_extra_size = - AdditionalBytesForPreviousObuElement(packets.back()); - int min_required_size = - packets.back().num_obu_elements >= kMaxNumObusToOmitSize ? 2 : 1; - if (packet_remaining_bytes < previous_obu_extra_size + min_required_size) { - // Start a new packet. - packets.emplace_back(/*first_obu_index=*/obu_index); - packet_remaining_bytes = max_payload_len; - previous_obu_extra_size = 0; - } - Packet& packet = packets.back(); - // Start inserting current obu into the packet. - packet.packet_size += previous_obu_extra_size; - packet_remaining_bytes -= previous_obu_extra_size; - packet.num_obu_elements++; - - bool must_write_obu_element_size = - packet.num_obu_elements > kMaxNumObusToOmitSize; - // Can fit all of the obu into the packet? - int required_bytes = obu.size; - if (must_write_obu_element_size) { - required_bytes += Leb128Size(obu.size); - } - int available_bytes = packet_remaining_bytes; - // if (is_last_obu) { - // // If this packet would be the last packet, available size is smaller. - // if (packets.size() == 1) { - // available_bytes += limits.first_packet_reduction_len; - // available_bytes -= limits.single_packet_reduction_len; - // } else { - // available_bytes -= limits.last_packet_reduction_len; - // } - // } - if (required_bytes <= available_bytes) { - // Insert the obu into the packet unfragmented. - packet.last_obu_size = obu.size; - packet.packet_size += required_bytes; - packet_remaining_bytes -= required_bytes; - continue; - } - - // required_bytes larger than available_bytes, fragment the obu. - int max_first_fragment_size = must_write_obu_element_size - ? MaxFragmentSize(packet_remaining_bytes) - : packet_remaining_bytes; - // Because available_bytes might be different than - // packet_remaining_bytes it might happen that max_first_fragment_size >= - // obu.size. Also, since checks above verified `obu` should not be put - // completely into the `packet`, leave at least 1 byte for later packet. - int first_fragment_size = std::min(obu.size - 1, max_first_fragment_size); - if (first_fragment_size == 0) { - // Rather than writing 0-size element at the tail of the packet, - // 'uninsert' the `obu` from the `packet`. - packet.num_obu_elements--; - packet.packet_size -= previous_obu_extra_size; - } else { - packet.packet_size += first_fragment_size; - if (must_write_obu_element_size) { - packet.packet_size += Leb128Size(first_fragment_size); - } - packet.last_obu_size = first_fragment_size; - } - - // Add middle fragments that occupy all of the packet. - // These are easy because - // - one obu per packet imply no need to store the size of the obu. - // - this packets are nor the first nor the last packets of the frame, so - // packet capacity is always limits.max_payload_len. - int obu_offset; - for (obu_offset = first_fragment_size; - obu_offset + max_payload_len < obu.size; - obu_offset += max_payload_len) { - packets.emplace_back(/*first_obu_index=*/obu_index); - Packet& packet = packets.back(); - packet.num_obu_elements = 1; - packet.first_obu_offset = obu_offset; - int middle_fragment_size = max_payload_len; - packet.last_obu_size = middle_fragment_size; - packet.packet_size = middle_fragment_size; - } - - // Add the last fragment of the obu. - int last_fragment_size = obu.size - obu_offset; - // Check for corner case where last fragment of the last obu is too large - // to fit into last packet, but may fully fit into semi-last packet. - if (is_last_obu && - last_fragment_size > - limits.max_payload_len - limits.last_packet_reduction_len) { - // Split last fragments into two. - if (last_fragment_size < 2) { - LOG_FATAL("last_fragment_size small than 2"); - return {}; - } - // Try to even packet sizes rather than payload sizes across the last - // two packets. - int semi_last_fragment_size = last_fragment_size / 2; - // But leave at least one payload byte for the last packet to avoid - // weird scenarios where size of the fragment is zero and rtp payload has - // nothing except for an aggregation header. - if (semi_last_fragment_size >= last_fragment_size) { - semi_last_fragment_size = last_fragment_size - 1; - } - last_fragment_size -= semi_last_fragment_size; - - packets.emplace_back(/*first_obu_index=*/obu_index); - Packet& packet = packets.back(); - packet.num_obu_elements = 1; - packet.first_obu_offset = obu_offset; - packet.last_obu_size = semi_last_fragment_size; - packet.packet_size = semi_last_fragment_size; - obu_offset += semi_last_fragment_size; - } - packets.emplace_back(/*first_obu_index=*/obu_index); - Packet& last_packet = packets.back(); - last_packet.num_obu_elements = 1; - last_packet.first_obu_offset = obu_offset; - last_packet.last_obu_size = last_fragment_size; - last_packet.packet_size = last_fragment_size; - packet_remaining_bytes = max_payload_len - last_fragment_size; - } - return packets; -} - -uint8_t AggregationHeader() const { - const Packet& packet = packets_[packet_index_]; - uint8_t aggregation_header = 0; - - // Set Z flag: first obu element is continuation of the previous OBU. - bool first_obu_element_is_fragment = packet.first_obu_offset > 0; - if (first_obu_element_is_fragment) aggregation_header |= (1 << 7); - - // Set Y flag: last obu element will be continuated in the next packet. - int last_obu_offset = - packet.num_obu_elements == 1 ? packet.first_obu_offset : 0; - bool last_obu_is_fragment = - last_obu_offset + packet.last_obu_size < - obus_[packet.first_obu + packet.num_obu_elements - 1].size; - if (last_obu_is_fragment) aggregation_header |= (1 << 6); - - // Set W field: number of obu elements in the packet (when not too large). - if (packet.num_obu_elements <= kMaxNumObusToOmitSize) - aggregation_header |= packet.num_obu_elements << 4; - - // Set N flag: beginning of a new coded video sequence. - // Encoder may produce key frame without a sequence header, thus double check - // incoming frame includes the sequence header. Since Temporal delimiter is - // already filtered out, sequence header should be the first obu when present. - if (frame_type_ == VideoFrameType::kVideoFrameKey && packet_index_ == 0 && - ObuType(obus_.front().header) == kObuTypeSequenceHeader) { - aggregation_header |= (1 << 3); - } - return aggregation_header; -} - -bool NextPacket(RtpPacket* packet) { - if (packet_index_ >= packets_.size()) { - return false; - } - const Packet& next_packet = packets_[packet_index_]; - - if (next_packet.num_obu_elements < 0) { - LOG_FATAL("NextPacket: num_obu_elements < 0"); - return false; - } - - if (next_packet.first_obu_offset >= obus_[next_packet.first_obu].size) { - LOG_FATAL( - "next_packet.first_obu_offset >= obus_[next_packet.first_obu].size"); - return false; - } - - if (next_packet.last_obu_size > - obus_[next_packet.first_obu + next_packet.num_obu_elements - 1].size) { - LOG_FATAL( - "next_packet.last_obu_size>obus_[next_packet.first_obu+" - "next_packet.num_obu_elements-1].size"); - return false; - } - - uint8_t* const rtp_payload = - packet->AllocatePayload(kAggregationHeaderSize + next_packet.packet_size); - uint8_t* write_at = rtp_payload; - - *write_at++ = AggregationHeader(); - - int obu_offset = next_packet.first_obu_offset; - // Store all OBU elements except the last one. - for (int i = 0; i < next_packet.num_obu_elements - 1; ++i) { - const Obu& obu = obus_[next_packet.first_obu + i]; - size_t fragment_size = obu.size - obu_offset; - write_at += WriteLeb128(fragment_size, write_at); - if (obu_offset == 0) { - *write_at++ = obu.header & ~kObuSizePresentBit; - } - if (obu_offset <= 1 && ObuHasExtension(obu.header)) { - *write_at++ = obu.extension_header; - } - int payload_offset = - std::max(0, obu_offset - (ObuHasExtension(obu.header) ? 2 : 1)); - size_t payload_size = obu.payload.size() - payload_offset; - if (!obu.payload.empty() && payload_size > 0) { - memcpy(write_at, obu.payload.data() + payload_offset, payload_size); - } - write_at += payload_size; - // All obus are stored from the beginning, except, may be, the first one. - obu_offset = 0; - } - // Store the last OBU element. - const Obu& last_obu = - obus_[next_packet.first_obu + next_packet.num_obu_elements - 1]; - int fragment_size = next_packet.last_obu_size; - RTC_DCHECK_GT(fragment_size, 0); - if (next_packet.num_obu_elements > kMaxNumObusToOmitSize) { - write_at += WriteLeb128(fragment_size, write_at); - } - if (obu_offset == 0 && fragment_size > 0) { - *write_at++ = last_obu.header & ~kObuSizePresentBit; - --fragment_size; - } - if (obu_offset <= 1 && ObuHasExtension(last_obu.header) && - fragment_size > 0) { - *write_at++ = last_obu.extension_header; - --fragment_size; - } - if (write_at - rtp_payload + fragment_size != - kAggregationHeaderSize + next_packet.packet_size) { - LOG_FATAL("write_at - rtp_payload + fragment_size!= - kAggregationHeaderSize + next_packet.packet_size"); - return false; - } - int payload_offset = - std::max(0, obu_offset - (ObuHasExtension(last_obu.header) ? 2 : 1)); - memcpy(write_at, last_obu.payload.data() + payload_offset, fragment_size); - write_at += fragment_size; - - if (write_at - rtp_payload != - kAggregationHeaderSize + next_packet.packet_size) { - LOG_FATAL("write_at - rtp_payload!= - kAggregationHeaderSize + next_packet.packet_size"); - return false; - } - - ++packet_index_; - bool is_last_packet_in_frame = packet_index_ == packets_.size(); - packet->SetMarker(is_last_packet_in_frame && is_last_frame_in_picture_); - return true; -} - } // namespace obu \ No newline at end of file diff --git a/src/rtp/obu_parser.h b/src/rtp/obu_parser.h index 47281fb..2b3de84 100644 --- a/src/rtp/obu_parser.h +++ b/src/rtp/obu_parser.h @@ -15,6 +15,19 @@ #include "rtp_packet.h" namespace obu { + +typedef enum { + OBU_SEQUENCE_HEADER = 1, + OBU_TEMPORAL_DELIMITER = 2, + OBU_FRAME_HEADER = 3, + OBU_TILE_GROUP = 4, + OBU_METADATA = 5, + OBU_FRAME = 6, + OBU_REDUNDANT_FRAME_HEADER = 7, + OBU_TILE_LIST = 8, + OBU_PADDING = 15, +} OBU_TYPE; + std::vector ParseObus(uint8_t* payload, int payload_size); std::vector Packetize(std::vector obus); diff --git a/src/rtp/rtp_codec.cpp b/src/rtp/rtp_codec.cpp index ccb87f2..6b908a6 100644 --- a/src/rtp/rtp_codec.cpp +++ b/src/rtp/rtp_codec.cpp @@ -12,6 +12,8 @@ constexpr int kObuTypeSequenceHeader = 1; +using namespace obu; + RtpCodec ::RtpCodec(RtpPacket::PAYLOAD_TYPE payload_type) : version_(RTP_VERSION), has_padding_(false), @@ -221,9 +223,9 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, std::vector obus = ParseObus(buffer, size); LOG_ERROR("Total size = [{}]", size); for (int i = 0; i < obus.size(); i++) { - LOG_ERROR("[{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, - ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); - if (obus[i].size_ <= MAX_NALU_LEN) { + LOG_ERROR("[{}] Obu size = [{}], Obu type [{}]", i, obus[i].size, + ObuTypeToString((OBU_TYPE)ObuType(obus[i].header))); + if (obus[i].size <= MAX_NALU_LEN) { RtpPacket rtp_packet; rtp_packet.SetVerion(version_); rtp_packet.SetHasPadding(has_padding_); @@ -249,12 +251,12 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); - rtp_packet.EncodeAv1(obus[i].data_, obus[i].payload_size_); + rtp_packet.EncodeAv1(obus[i].payload.data(), obus[i].payload.size()); packets.emplace_back(rtp_packet); } else { - size_t last_packet_size = obus[i].payload_size_ % MAX_NALU_LEN; + size_t last_packet_size = obus[i].payload.size() % MAX_NALU_LEN; size_t packet_num = - obus[i].payload_size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); + obus[i].payload.size() / MAX_NALU_LEN + (last_packet_size ? 1 : 0); timestamp_ = std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch()) .count(); @@ -283,10 +285,10 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, rtp_packet.SetAv1AggrHeader(z, y, w, n); if (index == packet_num - 1 && last_packet_size > 0) { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, + rtp_packet.EncodeAv1(obus[i].payload.data() + index * MAX_NALU_LEN, last_packet_size); } else { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, + rtp_packet.EncodeAv1(obus[i].payload.data() + index * MAX_NALU_LEN, MAX_NALU_LEN); } packets.emplace_back(rtp_packet); @@ -526,10 +528,10 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size, .count(); for (int i = 0; i < obus.size(); i++) { - // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, - // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); + // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size, + // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header))); - if (obus[i].size_ <= MAX_NALU_LEN) { + if (obus[i].size <= MAX_NALU_LEN) { RtpPacket rtp_packet; rtp_packet.SetVerion(version_); rtp_packet.SetHasPadding(has_padding_); @@ -550,13 +552,25 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size, } rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); - rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_); + if (obus[i].payload.data() == nullptr) { + LOG_ERROR("obus[i].payload.data() is nullptr"); + } + if (obus[i].size == 0) { + LOG_ERROR("obus[i].size == 0"); + } + + rtp_packet.EncodeAv1(obus[i].payload.data(), obus[i].size); + // int type = (obus[i].payload[0] & 0b0'1111'000) >> 3; + int type = (rtp_packet.Payload()[0] & 0b0'1111'000) >> 3; + LOG_ERROR("!!!!! Obu type = [{}]", type); + LOG_ERROR("output [{:X} {:X}]", rtp_packet.Payload()[0], + rtp_packet.Payload()[1]); packets.emplace_back(rtp_packet); } else { - size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN; + size_t last_packet_size = obus[i].size % MAX_NALU_LEN; size_t packet_num = - obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); + obus[i].size / MAX_NALU_LEN + (last_packet_size ? 1 : 0); for (size_t index = 0; index < packet_num; index++) { RtpPacket rtp_packet; @@ -580,16 +594,21 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size, int y = index != packet_num - 1 ? 1 : 0; int w = 1; int n = (frame_type == VideoFrameType::kVideoFrameKey) && - (ObuType(obus[i].header_) == kObuTypeSequenceHeader) + (ObuType(obus[i].header) == kObuTypeSequenceHeader) ? 1 : 0; rtp_packet.SetAv1AggrHeader(z, y, w, n); - + if (obus[i].payload.data() == nullptr) { + LOG_ERROR("obus[i].payload.data() is nullptr"); + } + if (obus[i].size == 0) { + LOG_ERROR("obus[i].size == 0"); + } if (index == packet_num - 1 && last_packet_size > 0) { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, + rtp_packet.EncodeAv1(obus[i].payload.data() + index * MAX_NALU_LEN, last_packet_size); } else { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, + rtp_packet.EncodeAv1(obus[i].payload.data() + index * MAX_NALU_LEN, MAX_NALU_LEN); } diff --git a/src/rtp/rtp_codec_av1.cpp b/src/rtp/rtp_codec_av1.cpp index 78eb11c..3b02353 100644 --- a/src/rtp/rtp_codec_av1.cpp +++ b/src/rtp/rtp_codec_av1.cpp @@ -3,91 +3,92 @@ #include "obu_parser.h" #include "rtp_codec.h" -void EncodeAv1(uint8_t* buffer, size_t size, std::vector& packets) { - std::vector obus = obu::ParseObus(buffer, size); - std::vector packets = obu::Packetizer(obus); +// void EncodeAv1(uint8_t* buffer, size_t size, std::vector& packets) +// { +// std::vector obus = obu::ParseObus(buffer, size); +// std::vector packets = obu::Packetizer(obus); - int num_packets = packets.size(); +// int num_packets = packets.size(); - if (1 == num_packets) { - } +// if (1 == num_packets) { +// } - // LOG_ERROR("Total size = [{}]", size); +// // LOG_ERROR("Total size = [{}]", size); - uint32_t timestamp = std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); +// uint32_t timestamp = std::chrono::duration_cast( +// std::chrono::system_clock::now().time_since_epoch()) +// .count(); - for (int i = 0; i < obus.size(); i++) { - // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, - // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); +// for (int i = 0; i < obus.size(); i++) { +// // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, +// // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); - if (obus[i].size_ <= MAX_NALU_LEN) { - RtpPacket rtp_packet; - rtp_packet.SetVerion(version_); - rtp_packet.SetHasPadding(has_padding_); - rtp_packet.SetHasExtension(has_extension_); - rtp_packet.SetMarker(1); - rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); - rtp_packet.SetSequenceNumber(sequence_number_++); - rtp_packet.SetTimestamp(timestamp); - rtp_packet.SetSsrc(ssrc_); +// if (obus[i].size_ <= MAX_NALU_LEN) { +// RtpPacket rtp_packet; +// rtp_packet.SetVerion(version_); +// rtp_packet.SetHasPadding(has_padding_); +// rtp_packet.SetHasExtension(has_extension_); +// rtp_packet.SetMarker(1); +// rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); +// rtp_packet.SetSequenceNumber(sequence_number_++); +// rtp_packet.SetTimestamp(timestamp); +// rtp_packet.SetSsrc(ssrc_); - if (!csrcs_.empty()) { - rtp_packet.SetCsrcs(csrcs_); - } +// if (!csrcs_.empty()) { +// rtp_packet.SetCsrcs(csrcs_); +// } - if (has_extension_) { - rtp_packet.SetExtensionProfile(extension_profile_); - rtp_packet.SetExtensionData(extension_data_, extension_len_); - } +// if (has_extension_) { +// rtp_packet.SetExtensionProfile(extension_profile_); +// rtp_packet.SetExtensionData(extension_data_, extension_len_); +// } - rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); - rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_); +// rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); +// rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_); - packets.emplace_back(rtp_packet); - } else { - size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN; - size_t packet_num = - obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); +// packets.emplace_back(rtp_packet); +// } else { +// size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN; +// size_t packet_num = +// obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); - for (size_t index = 0; index < packet_num; index++) { - RtpPacket rtp_packet; - rtp_packet.SetVerion(version_); - rtp_packet.SetHasPadding(has_padding_); - rtp_packet.SetHasExtension(has_extension_); - rtp_packet.SetMarker(index == packet_num - 1 ? 1 : 0); - rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); - rtp_packet.SetSequenceNumber(sequence_number_++); - rtp_packet.SetTimestamp(timestamp); - rtp_packet.SetSsrc(ssrc_); - if (!csrcs_.empty()) { - rtp_packet.SetCsrcs(csrcs_); - } - if (has_extension_) { - rtp_packet.SetExtensionProfile(extension_profile_); - rtp_packet.SetExtensionData(extension_data_, extension_len_); - } +// for (size_t index = 0; index < packet_num; index++) { +// RtpPacket rtp_packet; +// rtp_packet.SetVerion(version_); +// rtp_packet.SetHasPadding(has_padding_); +// rtp_packet.SetHasExtension(has_extension_); +// rtp_packet.SetMarker(index == packet_num - 1 ? 1 : 0); +// rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); +// rtp_packet.SetSequenceNumber(sequence_number_++); +// rtp_packet.SetTimestamp(timestamp); +// rtp_packet.SetSsrc(ssrc_); +// if (!csrcs_.empty()) { +// rtp_packet.SetCsrcs(csrcs_); +// } +// if (has_extension_) { +// rtp_packet.SetExtensionProfile(extension_profile_); +// rtp_packet.SetExtensionData(extension_data_, extension_len_); +// } - int z = index != 0 ? 1 : 0; - int y = index != packet_num - 1 ? 1 : 0; - int w = 1; - int n = (frame_type == VideoFrameType::kVideoFrameKey) && - (ObuType(obus[i].header_) == kObuTypeSequenceHeader) - ? 1 - : 0; - rtp_packet.SetAv1AggrHeader(z, y, w, n); +// int z = index != 0 ? 1 : 0; +// int y = index != packet_num - 1 ? 1 : 0; +// int w = 1; +// int n = (frame_type == VideoFrameType::kVideoFrameKey) && +// (ObuType(obus[i].header_) == kObuTypeSequenceHeader) +// ? 1 +// : 0; +// rtp_packet.SetAv1AggrHeader(z, y, w, n); - if (index == packet_num - 1 && last_packet_size > 0) { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, - last_packet_size); - } else { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, - MAX_NALU_LEN); - } +// if (index == packet_num - 1 && last_packet_size > 0) { +// rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, +// last_packet_size); +// } else { +// rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, +// MAX_NALU_LEN); +// } - packets.emplace_back(rtp_packet); - } - } - } -} \ No newline at end of file +// packets.emplace_back(rtp_packet); +// } +// } +// } +// } \ No newline at end of file diff --git a/src/rtp/rtp_packet.cpp b/src/rtp/rtp_packet.cpp index 469d4f8..731d460 100644 --- a/src/rtp/rtp_packet.cpp +++ b/src/rtp/rtp_packet.cpp @@ -430,7 +430,6 @@ const uint8_t *RtpPacket::EncodeAv1(uint8_t *payload, size_t payload_size) { uint32_t payload_offset = aggr_header_offset; memcpy(buffer_ + 13 + payload_offset, payload, payload_size); size_ = payload_size + (13 + payload_offset); - return buffer_; } diff --git a/src/rtp/rtp_video_receiver.cpp b/src/rtp/rtp_video_receiver.cpp index be2f162..2f40fd7 100644 --- a/src/rtp/rtp_video_receiver.cpp +++ b/src/rtp/rtp_video_receiver.cpp @@ -1,5 +1,6 @@ #include "rtp_video_receiver.h" +#include "byte_buffer.h" #include "log.h" #include "obu_parser.h" @@ -236,6 +237,9 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacket& rtp_packet) { } bool RtpVideoReceiver::CheckIsAv1FrameCompleted(RtpPacket& rtp_packet) { + LOG_ERROR("input [{:X} {:X}]", rtp_packet.Payload()[0], + rtp_packet.Payload()[1]); + if (rtp_packet.Av1FrameEnd()) { uint16_t end_seq = rtp_packet.SequenceNumber(); size_t start = end_seq; @@ -265,14 +269,22 @@ bool RtpVideoReceiver::CheckIsAv1FrameCompleted(RtpPacket& rtp_packet) { size_t complete_frame_size = 0; for (; start <= rtp_packet.SequenceNumber(); start++) { - memcpy(nv12_data_ + complete_frame_size, - incomplete_frame_list_[start].Payload(), - incomplete_frame_list_[start].PayloadSize()); + const uint8_t* obu_frame = incomplete_frame_list_[start].Payload(); + size_t obu_frame_size = incomplete_frame_list_[start].PayloadSize(); + memcpy(nv12_data_ + complete_frame_size, obu_frame, obu_frame_size); - complete_frame_size += incomplete_frame_list_[start].PayloadSize(); + complete_frame_size += obu_frame_size; incomplete_frame_list_.erase(start); } + obu::Obu obu; + ByteBufferReader payload_reader(reinterpret_cast(nv12_data_), + complete_frame_size); + payload_reader.ReadUInt8(&obu.header); + int type = (nv12_data_[0] & 0b0'1111'000) >> 3; + + LOG_ERROR("complete_frame_size = {}, type = {}", complete_frame_size, + type); compelete_video_frame_queue_.push( VideoFrame(nv12_data_, complete_frame_size));