Complete AV1 frame rtp packetizer and depacketizer

This commit is contained in:
dijunkun
2024-05-07 17:13:14 +08:00
parent 95da7ff52d
commit 114c80cd72
7 changed files with 96 additions and 43 deletions

View File

@@ -76,14 +76,6 @@ int Dav1dAv1Decoder::Init() {
int Dav1dAv1Decoder::Decode( int Dav1dAv1Decoder::Decode(
const uint8_t *data, int size, const uint8_t *data, int size,
std::function<void(VideoFrame)> on_receive_decoded_frame) { std::function<void(VideoFrame)> on_receive_decoded_frame) {
if (!first_) {
if ((*(data + 4) & 0x1f) != 0x07) {
return -1;
} else {
first_ = true;
}
}
ScopedDav1dData scoped_dav1d_data; ScopedDav1dData scoped_dav1d_data;
Dav1dData &dav1d_data = scoped_dav1d_data.Data(); Dav1dData &dav1d_data = scoped_dav1d_data.Data();
dav1d_data_wrap(&dav1d_data, data, size, dav1d_data_wrap(&dav1d_data, data, size,

View File

@@ -531,7 +531,7 @@ int PeerConnection::SendVideoData(const char *data, size_t size) {
[this](char *encoded_frame, size_t size, [this](char *encoded_frame, size_t size,
VideoEncoder::VideoFrameType frame_type) -> int { VideoEncoder::VideoFrameType frame_type) -> int {
for (auto &ice_trans : ice_transmission_list_) { for (auto &ice_trans : ice_transmission_list_) {
// LOG_ERROR("H264 frame size: [{}]", size); LOG_ERROR("Send frame size: [{}]", size);
// ice_trans.second->SendData(IceTransmission::DATA_TYPE::VIDEO, // ice_trans.second->SendData(IceTransmission::DATA_TYPE::VIDEO,
// encoded_frame, size); // encoded_frame, size);
ice_trans.second->SendVideoData( ice_trans.second->SendVideoData(

View File

@@ -511,9 +511,14 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size,
} else if (RtpPacket::PAYLOAD_TYPE::AV1 == payload_type_) { } else if (RtpPacket::PAYLOAD_TYPE::AV1 == payload_type_) {
std::vector<Obu> obus = ParseObus(buffer, size); std::vector<Obu> obus = ParseObus(buffer, size);
// LOG_ERROR("Total size = [{}]", size); // LOG_ERROR("Total size = [{}]", size);
uint32_t timestamp =
std::chrono::high_resolution_clock::now().time_since_epoch().count();
for (int i = 0; i < obus.size(); i++) { for (int i = 0; i < obus.size(); i++) {
LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_, // LOG_ERROR("1 [{}] Obu size = [{}], Obu type [{}]", i, obus[i].size_,
ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_))); // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_)));
if (obus[i].size_ <= MAX_NALU_LEN) { if (obus[i].size_ <= MAX_NALU_LEN) {
RtpPacket rtp_packet; RtpPacket rtp_packet;
rtp_packet.SetVerion(version_); rtp_packet.SetVerion(version_);
@@ -522,11 +527,7 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size,
rtp_packet.SetMarker(1); rtp_packet.SetMarker(1);
rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_));
rtp_packet.SetSequenceNumber(sequence_number_++); rtp_packet.SetSequenceNumber(sequence_number_++);
rtp_packet.SetTimestamp(timestamp);
timestamp_ = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
rtp_packet.SetTimestamp(timestamp_);
rtp_packet.SetSsrc(ssrc_); rtp_packet.SetSsrc(ssrc_);
if (!csrcs_.empty()) { if (!csrcs_.empty()) {
@@ -540,15 +541,13 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size,
rtp_packet.SetAv1AggrHeader(0, 0, 1, 0); rtp_packet.SetAv1AggrHeader(0, 0, 1, 0);
rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_); rtp_packet.EncodeAv1(obus[i].data_, obus[i].size_);
LOG_ERROR("enc payload size = {}", rtp_packet.PayloadSize()); // LOG_ERROR("enc payload size = {}", rtp_packet.PayloadSize());
packets.emplace_back(rtp_packet); packets.emplace_back(rtp_packet);
} else { } else {
size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN; size_t last_packet_size = obus[i].size_ % MAX_NALU_LEN;
size_t packet_num = size_t packet_num =
obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0); obus[i].size_ / MAX_NALU_LEN + (last_packet_size ? 1 : 0);
timestamp_ = std::chrono::high_resolution_clock::now()
.time_since_epoch()
.count();
for (size_t index = 0; index < packet_num; index++) { for (size_t index = 0; index < packet_num; index++) {
RtpPacket rtp_packet; RtpPacket rtp_packet;
rtp_packet.SetVerion(version_); rtp_packet.SetVerion(version_);
@@ -557,7 +556,7 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size,
rtp_packet.SetMarker(index == packet_num - 1 ? 1 : 0); rtp_packet.SetMarker(index == packet_num - 1 ? 1 : 0);
rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_));
rtp_packet.SetSequenceNumber(sequence_number_++); rtp_packet.SetSequenceNumber(sequence_number_++);
rtp_packet.SetTimestamp(timestamp_); rtp_packet.SetTimestamp(timestamp);
rtp_packet.SetSsrc(ssrc_); rtp_packet.SetSsrc(ssrc_);
if (!csrcs_.empty()) { if (!csrcs_.empty()) {
rtp_packet.SetCsrcs(csrcs_); rtp_packet.SetCsrcs(csrcs_);
@@ -584,7 +583,7 @@ void RtpCodec::Encode(VideoFrameType frame_type, uint8_t* buffer, size_t size,
MAX_NALU_LEN); MAX_NALU_LEN);
} }
LOG_ERROR("enc payload size = {}", rtp_packet.PayloadSize()); // LOG_ERROR("enc payload size = {}", rtp_packet.PayloadSize());
packets.emplace_back(rtp_packet); packets.emplace_back(rtp_packet);
} }
} }

View File

@@ -346,6 +346,20 @@ class RtpPacket {
return fu_header_.end; return fu_header_.end;
} }
bool Av1FrameStart() {
ParseRtpData();
int z, y, w, n;
GetAv1AggrHeader(z, y, w, n);
return !z && !y;
}
bool Av1FrameEnd() {
ParseRtpData();
int z, y, w, n;
GetAv1AggrHeader(z, y, w, n);
return z && !y;
}
private: private:
void TryToDecodeRtpPacket(); void TryToDecodeRtpPacket();
void ParseRtpData(); void ParseRtpData();

View File

@@ -54,7 +54,7 @@ void RtpVideoReceiver::InsertRtpPacket(RtpPacket& rtp_packet) {
// SendRtcpRR(rtcp_rr); // SendRtcpRR(rtcp_rr);
} }
if (rtp_packet.PayloadType() == RtpPacket::PAYLOAD_TYPE::AV1) { if (rtp_packet.PayloadType() == RtpPacket::PAYLOAD_TYPE::AV1) {
ProcessAV1RtpPacket(rtp_packet); ProcessAv1RtpPacket(rtp_packet);
} else { } else {
ProcessH264RtpPacket(rtp_packet); ProcessH264RtpPacket(rtp_packet);
} }
@@ -173,27 +173,22 @@ void RtpVideoReceiver::ProcessH264RtpPacket(RtpPacket& rtp_packet) {
} }
} }
void RtpVideoReceiver::ProcessAV1RtpPacket(RtpPacket& rtp_packet) { void RtpVideoReceiver::ProcessAv1RtpPacket(RtpPacket& rtp_packet) {
LOG_ERROR("recv payload size = {}, sequence_number_ = {}", // LOG_ERROR("recv payload size = {}, sequence_number_ = {}",
rtp_packet.PayloadSize(), rtp_packet.SequenceNumber()); // rtp_packet.PayloadSize(), rtp_packet.SequenceNumber());
int z, y, w, n; if (RtpPacket::PAYLOAD_TYPE::AV1 == rtp_packet.PayloadType()) {
rtp_packet.GetAv1AggrHeader(z, y, w, n); incomplete_frame_list_[rtp_packet.SequenceNumber()] = rtp_packet;
LOG_ERROR("z = {}, y = {}, w = {}, n = {}", z, y, w, n); bool complete = CheckIsAv1FrameCompleted(rtp_packet);
if (z == 0) {
} }
if (y == 0) { // std::vector<Obu> obus =
} // ParseObus((uint8_t*)rtp_packet.Payload(), rtp_packet.PayloadSize());
// for (int i = 0; i < obus.size(); i++) {
std::vector<Obu> obus = // LOG_ERROR("2 [{}|{}] Obu size = [{}], Obu type [{}]", i, obus.size(),
ParseObus((uint8_t*)rtp_packet.Payload(), rtp_packet.PayloadSize()); // obus[i].size_,
for (int i = 0; i < obus.size(); i++) { // ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_)));
LOG_ERROR("2 [{}|{}] Obu size = [{}], Obu type [{}]", i, obus.size(), // }
obus[i].size_,
ObuTypeToString((OBU_TYPE)ObuType(obus[i].header_)));
}
} }
bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacket& rtp_packet) { bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacket& rtp_packet) {
@@ -243,6 +238,58 @@ bool RtpVideoReceiver::CheckIsH264FrameCompleted(RtpPacket& rtp_packet) {
return false; return false;
} }
bool RtpVideoReceiver::CheckIsAv1FrameCompleted(RtpPacket& rtp_packet) {
if (rtp_packet.Av1FrameEnd()) {
uint16_t end_seq = rtp_packet.SequenceNumber();
if (incomplete_frame_list_.size() == end_seq) {
return true;
}
size_t start = rtp_packet.SequenceNumber();
while (end_seq--) {
auto it = incomplete_frame_list_.find(end_seq);
if (it == incomplete_frame_list_.end()) {
// The last fragment has already received. If all fragments are in
// order, then some fragments lost in tranmission and need to be
// repaired using FEC
return false;
} else if (!it->second.Av1FrameStart()) {
continue;
} else if (it->second.Av1FrameStart()) {
start = it->second.SequenceNumber();
// skip temporal delimiter OBU
break;
} else {
LOG_WARN("What happened?")
return false;
}
}
if (start != rtp_packet.SequenceNumber()) {
if (!nv12_data_) {
nv12_data_ = new uint8_t[NV12_BUFFER_SIZE];
}
size_t complete_frame_size = 0;
for (; start <= rtp_packet.SequenceNumber(); start++) {
memcpy(nv12_data_ + complete_frame_size,
incomplete_frame_list_[start].Payload(),
incomplete_frame_list_[start].PayloadSize());
complete_frame_size += incomplete_frame_list_[start].PayloadSize();
incomplete_frame_list_.erase(start);
}
compelete_video_frame_queue_.push(
VideoFrame(nv12_data_, complete_frame_size));
return true;
}
}
return false;
}
bool RtpVideoReceiver::Process() { bool RtpVideoReceiver::Process() {
if (!compelete_video_frame_queue_.isEmpty()) { if (!compelete_video_frame_queue_.isEmpty()) {
VideoFrame video_frame; VideoFrame video_frame;

View File

@@ -30,7 +30,8 @@ class RtpVideoReceiver : public ThreadBase {
} }
private: private:
void ProcessAV1RtpPacket(RtpPacket& rtp_packet); void ProcessAv1RtpPacket(RtpPacket& rtp_packet);
bool CheckIsAv1FrameCompleted(RtpPacket& rtp_packet);
private: private:
void ProcessH264RtpPacket(RtpPacket& rtp_packet); void ProcessH264RtpPacket(RtpPacket& rtp_packet);

View File

@@ -62,7 +62,7 @@ int IceTransmission::InitIceTransmission(
}); });
rtp_video_receiver_->SetOnReceiveCompleteFrame( rtp_video_receiver_->SetOnReceiveCompleteFrame(
[this](VideoFrame &video_frame) -> void { [this](VideoFrame &video_frame) -> void {
// LOG_ERROR("OnReceiveCompleteFrame {}", video_frame.Size()); LOG_ERROR("OnReceiveCompleteFrame {}", video_frame.Size());
on_receive_video_((const char *)video_frame.Buffer(), on_receive_video_((const char *)video_frame.Buffer(),
video_frame.Size(), remote_user_id_.data(), video_frame.Size(), remote_user_id_.data(),
remote_user_id_.size()); remote_user_id_.size());