diff --git a/src/rtp/obu.cpp b/src/rtp/obu.cpp index 88c0d68..68bebec 100644 --- a/src/rtp/obu.cpp +++ b/src/rtp/obu.cpp @@ -1,109 +1 @@ #include "obu.h" - -#include - -#include "log.h" - -Obu::Obu() {} - -Obu::Obu(const Obu &obu) { - if (obu.size_ > 0) { - data_ = (uint8_t *)malloc(obu.size_); - memcpy(data_, obu.data_, obu.size_); - size_ = obu.size_; - } - - if (obu.payload_size_ > 0) { - payload_ = (uint8_t *)malloc(obu.payload_size_); - memcpy(payload_, obu.payload_, obu.payload_size_); - payload_size_ = obu.payload_size_; - } - - header_ = obu.header_; - extension_header_ = obu.extension_header_; -} - -Obu::Obu(Obu &&obu) - : data_(std::move(obu.data_)), - size_(obu.size_), - payload_((uint8_t *)std::move(obu.payload_)), - payload_size_(obu.payload_size_), - header_(obu.header_), - extension_header_(obu.extension_header_) { - obu.data_ = nullptr; - obu.size_ = 0; - obu.payload_ = nullptr; - obu.payload_size_ = 0; -} - -Obu &Obu::operator=(const Obu &obu) { - if (&obu != this) { - if (obu.size_ > 0) { - data_ = (uint8_t *)realloc(data_, obu.size_); - memcpy(data_, obu.data_, obu.size_); - size_ = obu.size_; - } - - if (obu.payload_size_ > 0) { - payload_ = (uint8_t *)realloc(payload_, obu.payload_size_); - memcpy(payload_, obu.payload_, obu.payload_size_); - payload_size_ = obu.payload_size_; - } - - header_ = obu.header_; - extension_header_ = obu.extension_header_; - } - return *this; -} - -Obu &Obu::operator=(Obu &&obu) { - if (&obu != this) { - data_ = std::move(obu.data_); - obu.data_ = nullptr; - size_ = obu.size_; - obu.size_ = 0; - - payload_ = std::move(obu.payload_); - obu.payload_ = nullptr; - payload_size_ = obu.payload_size_; - obu.payload_size_ = 0; - - header_ = obu.header_; - obu.header_ = 0; - extension_header_ = obu.extension_header_; - obu.extension_header_ = 0; - } - return *this; -} - -Obu::~Obu() { - if (data_) { - free(data_); - data_ = nullptr; - } - size_ = 0; - - if (payload_) { - free(payload_); - payload_ = nullptr; - } - payload_size_ = 0; - - header_ = 0; - extension_header_ = 0; -} - -bool Obu::SetPayload(const uint8_t *payload, int size) { - if (payload_) { - free(payload_); - payload_ = nullptr; - } - payload_ = (uint8_t *)malloc(size); - memcpy(payload_, payload, size); - payload_size_ = size; - - if (payload_) - return true; - else - return false; -} \ No newline at end of file diff --git a/src/rtp/obu.h b/src/rtp/obu.h index bfd582a..f95ce45 100644 --- a/src/rtp/obu.h +++ b/src/rtp/obu.h @@ -9,27 +9,41 @@ #include #include +#include -#include "aom/aom_codec.h" - -class Obu { - public: - Obu(); - Obu(const Obu &obu); - Obu(Obu &&obu); - Obu &operator=(const Obu &obu); - Obu &operator=(Obu &&obu); - - ~Obu(); - - bool SetPayload(const uint8_t *payload, int size); - - uint8_t header_ = 0; - uint8_t extension_header_ = 0; // undefined if (header & kXbit) == 0 - uint8_t *payload_ = nullptr; - int payload_size_ = 0; - uint8_t *data_ = nullptr; - int size_ = 0; // size of the header and payload combined. +namespace obu { +struct PayloadSizeLimits { + int max_payload_len = 1200; + int first_packet_reduction_len = 0; + int last_packet_reduction_len = 0; + // Reduction len for packet that is first & last at the same time. + int single_packet_reduction_len = 0; }; +enum class VideoFrameType { + kEmptyFrame = 0, + kVideoFrameKey = 3, + kVideoFrameDelta = 4, +}; + +struct Obu { + uint8_t header; + uint8_t extension_header; // undefined if (header & kXbit) == 0 + std::vector payload; + int size; // size of the header and payload combined. +}; + +struct Packet { + explicit Packet(int first_obu_index) : first_obu(first_obu_index) {} + // Indexes into obus_ vector of the first and last obus that should put into + // the packet. + int first_obu; + int num_obu_elements = 0; + int first_obu_offset = 0; + int last_obu_size; + // Total size consumed by the packet. + int packet_size = 0; +}; +} // namespace obu + #endif \ No newline at end of file diff --git a/src/rtp/obu_parser.cpp b/src/rtp/obu_parser.cpp index d2a1952..8ce677c 100644 --- a/src/rtp/obu_parser.cpp +++ b/src/rtp/obu_parser.cpp @@ -2,6 +2,7 @@ #include "byte_buffer.h" #include "log.h" +namespace obu { constexpr int kAggregationHeaderSize = 1; // when there are 3 or less OBU (fragments) in a packet, size of the last one @@ -13,6 +14,52 @@ constexpr int kObuTypeTemporalDelimiter = 2; constexpr int kObuTypeTileList = 8; constexpr int kObuTypePadding = 15; +int Leb128Size(uint64_t value) { + int size = 0; + while (value >= 0x80) { + ++size; + value >>= 7; + } + return size + 1; +} + +uint64_t ReadLeb128(const uint8_t*& read_at, const uint8_t* end) { + uint64_t value = 0; + int fill_bits = 0; + while (read_at != end && fill_bits < 64 - 7) { + uint8_t leb128_byte = *read_at; + value |= uint64_t{leb128_byte & 0x7Fu} << fill_bits; + ++read_at; + fill_bits += 7; + if ((leb128_byte & 0x80) == 0) { + return value; + } + } + // Read 9 bytes and didn't find the terminator byte. Check if 10th byte + // is that terminator, however to fit result into uint64_t it may carry only + // single bit. + if (read_at != end && *read_at <= 1) { + value |= uint64_t{*read_at} << fill_bits; + ++read_at; + return value; + } + // Failed to find terminator leb128 byte. + read_at = nullptr; + return 0; +} + +int WriteLeb128(uint64_t value, uint8_t* buffer) { + int size = 0; + while (value >= 0x80) { + buffer[size] = 0x80 | (value & 0x7F); + ++size; + value >>= 7; + } + buffer[size] = value; + ++size; + return size; +} + const char* ObuTypeToString(OBU_TYPE type) { switch (type) { case OBU_SEQUENCE_HEADER: @@ -45,18 +92,28 @@ bool ObuHasSize(uint8_t obu_header) { return obu_header & kObuSizePresentBit; } int ObuType(uint8_t obu_header) { return (obu_header & 0b0'1111'000) >> 3; } +int MaxFragmentSize(int remaining_bytes) { + if (remaining_bytes <= 1) { + return 0; + } + for (int i = 1;; ++i) { + if (remaining_bytes < (1 << 7 * i) + i) { + return remaining_bytes - i; + } + } +} + +// ParseObus + std::vector ParseObus(uint8_t* payload, int payload_size) { std::vector result; ByteBufferReader payload_reader(reinterpret_cast(payload), payload_size); - - int pos = 0; while (payload_reader.Length() > 0) { Obu obu; - payload_reader.ReadUInt8(&obu.header_); - - obu.size_ = 1; - if (ObuHasExtension(obu.header_)) { + payload_reader.ReadUInt8(&obu.header); + obu.size = 1; + if (ObuHasExtension(obu.header)) { if (payload_reader.Length() == 0) { LOG_ERROR( "Malformed AV1 input: expected extension_header, no more bytes in " @@ -64,37 +121,32 @@ std::vector ParseObus(uint8_t* payload, int payload_size) { (payload_size - payload_reader.Length())); return {}; } - payload_reader.ReadUInt8(&obu.extension_header_); - ++obu.size_; + payload_reader.ReadUInt8(&obu.extension_header); + ++obu.size; } - if (!ObuHasSize(obu.header_)) { - obu.SetPayload(reinterpret_cast(payload_reader.Data()), - payload_reader.Length()); + if (!ObuHasSize(obu.header)) { + obu.payload = std::vector( + reinterpret_cast(payload_reader.Data()), + payload_reader.Length()); payload_reader.Consume(payload_reader.Length()); } else { - uint64_t payload_size = 0; + uint64_t size = 0; size_t len = 0; - if (!payload_reader.ReadUVarint(&payload_size, &len) || - payload_size > payload_reader.Length()) { + if (!payload_reader.ReadUVarint(&size, &len) || + size > payload_reader.Length()) { LOG_ERROR( "Malformed AV1 input: declared payload_size {} is larger than " - "remaining " - "buffer size {}", - payload_size, payload_reader.Length()); + "remaining buffer size {}", + size, payload_reader.Length()); return {}; } - obu.SetPayload(reinterpret_cast(payload_reader.Data()), - payload_size); - obu.size_ += len; - payload_reader.Consume(payload_size); + obu.payload = std::vector( + reinterpret_cast(payload_reader.Data()), size); + payload_reader.Consume(size); } - obu.size_ += obu.payload_size_; - obu.data_ = (uint8_t*)malloc(obu.size_); - memcpy(obu.data_, payload + pos, obu.size_); - pos += obu.size_; - + obu.size += obu.payload.size(); // Skip obus that shouldn't be transfered over rtp. - int obu_type = ObuType(obu.header_); + int obu_type = ObuType(obu.header); // if (obu_type != kObuTypeTemporalDelimiter && obu_type != kObuTypeTileList // && // obu_type != kObuTypePadding) { @@ -105,5 +157,302 @@ std::vector ParseObus(uint8_t* payload, int payload_size) { } } + // for (int i = 0; i < result.size(); i++) { + // LOG_ERROR("[{}] Obu size = [{}], Obu type [{}]", i, + // result[i].payload_size_, + // ObuTypeToString((OBU_TYPE)ObuType(result[i].header_))); + // } + return result; -} \ No newline at end of file +} + +int AdditionalBytesForPreviousObuElement(const Packet& packet) { + if (packet.packet_size == 0) { + // Packet is still empty => no last OBU element, no need to reserve space + // for it. + return 0; + } + if (packet.num_obu_elements > kMaxNumObusToOmitSize) { + // There is so many obu elements in the packet, all of them must be + // prepended with the length field. That imply space for the length of the + // last obu element is already reserved. + return 0; + } + // No space was reserved for length field of the last OBU element, but that + // element becoming non-last, so it now requires explicit length field. + // Calculate how many bytes are needed to store the length in leb128 format. + return Leb128Size(packet.last_obu_size); +} + +std::vector Packetize(std::vector obus, PayloadSizeLimits limits) { + int max_payload_len = 1200; + std::vector packets; + if (obus.empty()) { + return packets; + } + + // Aggregation header is present in all packets. + max_payload_len -= kAggregationHeaderSize; + + // Assemble packets. Push to current packet as much as it can hold before + // considering next one. That would normally cause uneven distribution across + // packets, specifically last one would be generally smaller. + packets.emplace_back(/*first_obu_index=*/0); + int packet_remaining_bytes = max_payload_len; + for (size_t obu_index = 0; obu_index < obus.size(); ++obu_index) { + const bool is_last_obu = obu_index == obus.size() - 1; + const Obu& obu = obus[obu_index]; + + // Putting `obu` into the last packet would make last obu element stored in + // that packet not last. All not last OBU elements must be prepend with the + // element length. AdditionalBytesForPreviousObuElement calculates how many + // bytes are needed to store that length. + int previous_obu_extra_size = + AdditionalBytesForPreviousObuElement(packets.back()); + int min_required_size = + packets.back().num_obu_elements >= kMaxNumObusToOmitSize ? 2 : 1; + if (packet_remaining_bytes < previous_obu_extra_size + min_required_size) { + // Start a new packet. + packets.emplace_back(/*first_obu_index=*/obu_index); + packet_remaining_bytes = max_payload_len; + previous_obu_extra_size = 0; + } + Packet& packet = packets.back(); + // Start inserting current obu into the packet. + packet.packet_size += previous_obu_extra_size; + packet_remaining_bytes -= previous_obu_extra_size; + packet.num_obu_elements++; + + bool must_write_obu_element_size = + packet.num_obu_elements > kMaxNumObusToOmitSize; + // Can fit all of the obu into the packet? + int required_bytes = obu.size; + if (must_write_obu_element_size) { + required_bytes += Leb128Size(obu.size); + } + int available_bytes = packet_remaining_bytes; + // if (is_last_obu) { + // // If this packet would be the last packet, available size is smaller. + // if (packets.size() == 1) { + // available_bytes += limits.first_packet_reduction_len; + // available_bytes -= limits.single_packet_reduction_len; + // } else { + // available_bytes -= limits.last_packet_reduction_len; + // } + // } + if (required_bytes <= available_bytes) { + // Insert the obu into the packet unfragmented. + packet.last_obu_size = obu.size; + packet.packet_size += required_bytes; + packet_remaining_bytes -= required_bytes; + continue; + } + + // required_bytes larger than available_bytes, fragment the obu. + int max_first_fragment_size = must_write_obu_element_size + ? MaxFragmentSize(packet_remaining_bytes) + : packet_remaining_bytes; + // Because available_bytes might be different than + // packet_remaining_bytes it might happen that max_first_fragment_size >= + // obu.size. Also, since checks above verified `obu` should not be put + // completely into the `packet`, leave at least 1 byte for later packet. + int first_fragment_size = std::min(obu.size - 1, max_first_fragment_size); + if (first_fragment_size == 0) { + // Rather than writing 0-size element at the tail of the packet, + // 'uninsert' the `obu` from the `packet`. + packet.num_obu_elements--; + packet.packet_size -= previous_obu_extra_size; + } else { + packet.packet_size += first_fragment_size; + if (must_write_obu_element_size) { + packet.packet_size += Leb128Size(first_fragment_size); + } + packet.last_obu_size = first_fragment_size; + } + + // Add middle fragments that occupy all of the packet. + // These are easy because + // - one obu per packet imply no need to store the size of the obu. + // - this packets are nor the first nor the last packets of the frame, so + // packet capacity is always limits.max_payload_len. + int obu_offset; + for (obu_offset = first_fragment_size; + obu_offset + max_payload_len < obu.size; + obu_offset += max_payload_len) { + packets.emplace_back(/*first_obu_index=*/obu_index); + Packet& packet = packets.back(); + packet.num_obu_elements = 1; + packet.first_obu_offset = obu_offset; + int middle_fragment_size = max_payload_len; + packet.last_obu_size = middle_fragment_size; + packet.packet_size = middle_fragment_size; + } + + // Add the last fragment of the obu. + int last_fragment_size = obu.size - obu_offset; + // Check for corner case where last fragment of the last obu is too large + // to fit into last packet, but may fully fit into semi-last packet. + if (is_last_obu && + last_fragment_size > + limits.max_payload_len - limits.last_packet_reduction_len) { + // Split last fragments into two. + if (last_fragment_size < 2) { + LOG_FATAL("last_fragment_size small than 2"); + return {}; + } + // Try to even packet sizes rather than payload sizes across the last + // two packets. + int semi_last_fragment_size = last_fragment_size / 2; + // But leave at least one payload byte for the last packet to avoid + // weird scenarios where size of the fragment is zero and rtp payload has + // nothing except for an aggregation header. + if (semi_last_fragment_size >= last_fragment_size) { + semi_last_fragment_size = last_fragment_size - 1; + } + last_fragment_size -= semi_last_fragment_size; + + packets.emplace_back(/*first_obu_index=*/obu_index); + Packet& packet = packets.back(); + packet.num_obu_elements = 1; + packet.first_obu_offset = obu_offset; + packet.last_obu_size = semi_last_fragment_size; + packet.packet_size = semi_last_fragment_size; + obu_offset += semi_last_fragment_size; + } + packets.emplace_back(/*first_obu_index=*/obu_index); + Packet& last_packet = packets.back(); + last_packet.num_obu_elements = 1; + last_packet.first_obu_offset = obu_offset; + last_packet.last_obu_size = last_fragment_size; + last_packet.packet_size = last_fragment_size; + packet_remaining_bytes = max_payload_len - last_fragment_size; + } + return packets; +} + +uint8_t AggregationHeader() const { + const Packet& packet = packets_[packet_index_]; + uint8_t aggregation_header = 0; + + // Set Z flag: first obu element is continuation of the previous OBU. + bool first_obu_element_is_fragment = packet.first_obu_offset > 0; + if (first_obu_element_is_fragment) aggregation_header |= (1 << 7); + + // Set Y flag: last obu element will be continuated in the next packet. + int last_obu_offset = + packet.num_obu_elements == 1 ? packet.first_obu_offset : 0; + bool last_obu_is_fragment = + last_obu_offset + packet.last_obu_size < + obus_[packet.first_obu + packet.num_obu_elements - 1].size; + if (last_obu_is_fragment) aggregation_header |= (1 << 6); + + // Set W field: number of obu elements in the packet (when not too large). + if (packet.num_obu_elements <= kMaxNumObusToOmitSize) + aggregation_header |= packet.num_obu_elements << 4; + + // Set N flag: beginning of a new coded video sequence. + // Encoder may produce key frame without a sequence header, thus double check + // incoming frame includes the sequence header. Since Temporal delimiter is + // already filtered out, sequence header should be the first obu when present. + if (frame_type_ == VideoFrameType::kVideoFrameKey && packet_index_ == 0 && + ObuType(obus_.front().header) == kObuTypeSequenceHeader) { + aggregation_header |= (1 << 3); + } + return aggregation_header; +} + +bool NextPacket(RtpPacket* packet) { + if (packet_index_ >= packets_.size()) { + return false; + } + const Packet& next_packet = packets_[packet_index_]; + + if (next_packet.num_obu_elements < 0) { + LOG_FATAL("NextPacket: num_obu_elements < 0"); + return false; + } + + if (next_packet.first_obu_offset >= obus_[next_packet.first_obu].size) { + LOG_FATAL( + "next_packet.first_obu_offset >= obus_[next_packet.first_obu].size"); + return false; + } + + if (next_packet.last_obu_size > + obus_[next_packet.first_obu + next_packet.num_obu_elements - 1].size) { + LOG_FATAL( + "next_packet.last_obu_size>obus_[next_packet.first_obu+" + "next_packet.num_obu_elements-1].size"); + return false; + } + + uint8_t* const rtp_payload = + packet->AllocatePayload(kAggregationHeaderSize + next_packet.packet_size); + uint8_t* write_at = rtp_payload; + + *write_at++ = AggregationHeader(); + + int obu_offset = next_packet.first_obu_offset; + // Store all OBU elements except the last one. + for (int i = 0; i < next_packet.num_obu_elements - 1; ++i) { + const Obu& obu = obus_[next_packet.first_obu + i]; + size_t fragment_size = obu.size - obu_offset; + write_at += WriteLeb128(fragment_size, write_at); + if (obu_offset == 0) { + *write_at++ = obu.header & ~kObuSizePresentBit; + } + if (obu_offset <= 1 && ObuHasExtension(obu.header)) { + *write_at++ = obu.extension_header; + } + int payload_offset = + std::max(0, obu_offset - (ObuHasExtension(obu.header) ? 2 : 1)); + size_t payload_size = obu.payload.size() - payload_offset; + if (!obu.payload.empty() && payload_size > 0) { + memcpy(write_at, obu.payload.data() + payload_offset, payload_size); + } + write_at += payload_size; + // All obus are stored from the beginning, except, may be, the first one. + obu_offset = 0; + } + // Store the last OBU element. + const Obu& last_obu = + obus_[next_packet.first_obu + next_packet.num_obu_elements - 1]; + int fragment_size = next_packet.last_obu_size; + RTC_DCHECK_GT(fragment_size, 0); + if (next_packet.num_obu_elements > kMaxNumObusToOmitSize) { + write_at += WriteLeb128(fragment_size, write_at); + } + if (obu_offset == 0 && fragment_size > 0) { + *write_at++ = last_obu.header & ~kObuSizePresentBit; + --fragment_size; + } + if (obu_offset <= 1 && ObuHasExtension(last_obu.header) && + fragment_size > 0) { + *write_at++ = last_obu.extension_header; + --fragment_size; + } + if (write_at - rtp_payload + fragment_size != + kAggregationHeaderSize + next_packet.packet_size) { + LOG_FATAL("write_at - rtp_payload + fragment_size!= + kAggregationHeaderSize + next_packet.packet_size"); + return false; + } + int payload_offset = + std::max(0, obu_offset - (ObuHasExtension(last_obu.header) ? 2 : 1)); + memcpy(write_at, last_obu.payload.data() + payload_offset, fragment_size); + write_at += fragment_size; + + if (write_at - rtp_payload != + kAggregationHeaderSize + next_packet.packet_size) { + LOG_FATAL("write_at - rtp_payload!= + kAggregationHeaderSize + next_packet.packet_size"); + return false; + } + + ++packet_index_; + bool is_last_packet_in_frame = packet_index_ == packets_.size(); + packet->SetMarker(is_last_packet_in_frame && is_last_frame_in_picture_); + return true; +} + +} // namespace obu \ No newline at end of file diff --git a/src/rtp/obu_parser.h b/src/rtp/obu_parser.h index 8e0620d..7bddbe4 100644 --- a/src/rtp/obu_parser.h +++ b/src/rtp/obu_parser.h @@ -13,6 +13,7 @@ #include "obu.h" +namespace obu { std::vector ParseObus(uint8_t* payload, int payload_size); const char* ObuTypeToString(OBU_TYPE type); @@ -22,5 +23,6 @@ bool ObuHasExtension(uint8_t obu_header); bool ObuHasSize(uint8_t obu_header); int ObuType(uint8_t obu_header); +} // namespace obu #endif \ No newline at end of file diff --git a/src/rtp/rtp_codec.cpp b/src/rtp/rtp_codec.cpp index c85c33e..b3b3de6 100644 --- a/src/rtp/rtp_codec.cpp +++ b/src/rtp/rtp_codec.cpp @@ -134,7 +134,6 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, fec_encoder_.ReleaseFecPackets(fec_packets, size); return; } - if (size <= MAX_NALU_LEN) { RtpPacket rtp_packet; rtp_packet.SetVerion(version_); @@ -518,86 +517,7 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size, } } } else if (RtpPacket::PAYLOAD_TYPE::AV1 == payload_type_) { - std::vector obus = ParseObus(buffer, size); - // LOG_ERROR("Total size = [{}]", size); - - uint32_t timestamp = - std::chrono::duration_cast( - std::chrono::system_clock::now().time_since_epoch()) - .count(); - - for (int i = 0; i < obus.size(); i++) { - // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, - // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); - - if (obus[i].size_ <= MAX_NALU_LEN) { - RtpPacket rtp_packet; - rtp_packet.SetVerion(version_); - rtp_packet.SetHasPadding(has_padding_); - rtp_packet.SetHasExtension(has_extension_); - rtp_packet.SetMarker(1); - rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); - rtp_packet.SetSequenceNumber(sequence_number_++); - rtp_packet.SetTimestamp(timestamp); - rtp_packet.SetSsrc(ssrc_); - - if (!csrcs_.empty()) { - rtp_packet.SetCsrcs(csrcs_); - } - - if (has_extension_) { - rtp_packet.SetExtensionProfile(extension_profile_); - rtp_packet.SetExtensionData(extension_data_, extension_len_); - } - - rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); - rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_); - - packets.emplace_back(rtp_packet); - } else { - size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN; - size_t packet_num = - obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); - - for (size_t index = 0; index < packet_num; index++) { - RtpPacket rtp_packet; - rtp_packet.SetVerion(version_); - rtp_packet.SetHasPadding(has_padding_); - rtp_packet.SetHasExtension(has_extension_); - rtp_packet.SetMarker(index == packet_num - 1 ? 1 : 0); - rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); - rtp_packet.SetSequenceNumber(sequence_number_++); - rtp_packet.SetTimestamp(timestamp); - rtp_packet.SetSsrc(ssrc_); - if (!csrcs_.empty()) { - rtp_packet.SetCsrcs(csrcs_); - } - if (has_extension_) { - rtp_packet.SetExtensionProfile(extension_profile_); - rtp_packet.SetExtensionData(extension_data_, extension_len_); - } - - int z = index != 0 ? 1 : 0; - int y = index != packet_num - 1 ? 1 : 0; - int w = 1; - int n = (frame_type == VideoFrameType::kVideoFrameKey) && - (ObuType(obus[i].header_) == kObuTypeSequenceHeader) - ? 1 - : 0; - rtp_packet.SetAv1AggrHeader(z, y, w, n); - - if (index == packet_num - 1 && last_packet_size > 0) { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, - last_packet_size); - } else { - rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, - MAX_NALU_LEN); - } - - packets.emplace_back(rtp_packet); - } - } - } + EncodeAv1(buffer, size, packets); } } diff --git a/src/rtp/rtp_codec.h b/src/rtp/rtp_codec.h index 68971c3..a9c83bf 100644 --- a/src/rtp/rtp_codec.h +++ b/src/rtp/rtp_codec.h @@ -31,6 +31,8 @@ class RtpCodec { private: bool IsKeyFrame(const uint8_t* buffer, size_t size); + void EncodeAv1(uint8_t* buffer, size_t size, std::vector& packets); + private: uint32_t version_ = 0; bool has_padding_ = false; diff --git a/src/rtp/rtp_codec_av1.cpp b/src/rtp/rtp_codec_av1.cpp new file mode 100644 index 0000000..f39be1c --- /dev/null +++ b/src/rtp/rtp_codec_av1.cpp @@ -0,0 +1,85 @@ +#include "byte_buffer.h" +#include "log.h" +#include "rtp_codec.h" + +void EncodeAv1(uint8_t* buffer, size_t size, std::vector& packets) { + std::vector obus = ParseObus(buffer, size); + // LOG_ERROR("Total size = [{}]", size); + + uint32_t timestamp = std::chrono::duration_cast( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + for (int i = 0; i < obus.size(); i++) { + // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, + // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); + + if (obus[i].size_ <= MAX_NALU_LEN) { + RtpPacket rtp_packet; + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(1); + rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); + rtp_packet.SetTimestamp(timestamp); + rtp_packet.SetSsrc(ssrc_); + + if (!csrcs_.empty()) { + rtp_packet.SetCsrcs(csrcs_); + } + + if (has_extension_) { + rtp_packet.SetExtensionProfile(extension_profile_); + rtp_packet.SetExtensionData(extension_data_, extension_len_); + } + + rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); + rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_); + + packets.emplace_back(rtp_packet); + } else { + size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN; + size_t packet_num = + obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); + + for (size_t index = 0; index < packet_num; index++) { + RtpPacket rtp_packet; + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(index == packet_num - 1 ? 1 : 0); + rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); + rtp_packet.SetTimestamp(timestamp); + rtp_packet.SetSsrc(ssrc_); + if (!csrcs_.empty()) { + rtp_packet.SetCsrcs(csrcs_); + } + if (has_extension_) { + rtp_packet.SetExtensionProfile(extension_profile_); + rtp_packet.SetExtensionData(extension_data_, extension_len_); + } + + int z = index != 0 ? 1 : 0; + int y = index != packet_num - 1 ? 1 : 0; + int w = 1; + int n = (frame_type == VideoFrameType::kVideoFrameKey) && + (ObuType(obus[i].header_) == kObuTypeSequenceHeader) + ? 1 + : 0; + rtp_packet.SetAv1AggrHeader(z, y, w, n); + + if (index == packet_num - 1 && last_packet_size > 0) { + rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, + last_packet_size); + } else { + rtp_packet.EncodeAv1(obus[i].data_ + index * MAX_NALU_LEN, + MAX_NALU_LEN); + } + + packets.emplace_back(rtp_packet); + } + } + } +} \ No newline at end of file