From 270ad8df43d3b38bc60f0912e9a0e2fffda9811f Mon Sep 17 00:00:00 2001 From: dijunkun Date: Mon, 13 Nov 2023 22:46:07 -0800 Subject: [PATCH] Implementation for FEC encoder module --- src/fec/fec_encoder.cpp | 149 ++++++++++++++++++++++++++++++++++++++++ src/fec/fec_encoder.h | 49 +++++++++++++ src/rtp/rtp_codec.cpp | 78 ++++++++++++++++++++- src/rtp/rtp_codec.h | 5 ++ xmake.lua | 13 ++-- 5 files changed, 287 insertions(+), 7 deletions(-) create mode 100644 src/fec/fec_encoder.cpp create mode 100644 src/fec/fec_encoder.h diff --git a/src/fec/fec_encoder.cpp b/src/fec/fec_encoder.cpp new file mode 100644 index 0000000..41e0f96 --- /dev/null +++ b/src/fec/fec_encoder.cpp @@ -0,0 +1,149 @@ +#include "fec_encoder.h" + +#include "log.h" + +FecEncoder::FecEncoder() {} + +FecEncoder::~FecEncoder() {} + +int FecEncoder::Init() { + fec_codec_id_ = OF_CODEC_REED_SOLOMON_GF_2_M_STABLE; + + fec_rs_params_ = (of_rs_2_m_parameters_t *)calloc(1, sizeof(*fec_params_)); + if (nullptr == fec_rs_params_) { + LOG_ERROR("Create FEC codec params failed"); + return -1; + } + + fec_rs_params_->m = 8; + fec_params_ = (of_parameters_t *)fec_rs_params_; + + if (OF_STATUS_OK != + of_create_codec_instance(&fec_session_, fec_codec_id_, OF_ENCODER, 2)) { + LOG_ERROR("Create FEC codec instance failed"); + return -1; + } + + return 0; +} + +int FecEncoder::Release() { + if (!fec_session_) { + LOG_ERROR("Invalid FEC codec instance"); + return -1; + } + + { + if (OF_STATUS_OK != of_release_codec_instance(fec_session_)) { + LOG_ERROR("Release FEC codec instance failed"); + return -1; + } + } + + return 0; +} + +uint8_t **FecEncoder::Encode(const char *data, size_t len) { + uint8_t **fec_packets = nullptr; + + unsigned int last_packet_size = len % max_size_of_packet_; + unsigned int num_of_source_packets = + len / max_size_of_packet_ + (last_packet_size ? 1 : 0); + unsigned int num_of_total_packets = + (unsigned int)floor((double)num_of_source_packets / code_rate_); + + fec_params_->nb_source_symbols = num_of_source_packets; + fec_params_->nb_repair_symbols = num_of_total_packets - num_of_source_packets; + + fec_params_->encoding_symbol_length = max_size_of_packet_; + + if (OF_STATUS_OK != of_set_fec_parameters(fec_session_, fec_params_)) { + LOG_ERROR("Set FEC params failed for codec_id {}", fec_codec_id_); + return nullptr; + } + + fec_packets = (uint8_t **)calloc(num_of_total_packets, sizeof(uint8_t *)); + + if (nullptr == fec_packets) { + LOG_ERROR("Calloc failed for fec_packets with size [{}])", + num_of_total_packets); + return nullptr; + } + + for (unsigned int esi = 0; esi < num_of_source_packets; esi++) { + if (esi != num_of_source_packets - 1) { + fec_packets[esi] = + (uint8_t *)calloc(max_size_of_packet_, sizeof(uint8_t)); + if (nullptr == fec_packets[esi]) { + LOG_ERROR("Calloc failed for fec_packets[{}] with size [{}])", esi, + max_size_of_packet_); + ReleaseFecPackets(fec_packets, len); + return nullptr; + } + memcpy(fec_packets[esi], data + esi * max_size_of_packet_, + max_size_of_packet_); + } else { + fec_packets[esi] = + (uint8_t *)calloc(max_size_of_packet_, sizeof(uint8_t)); + if (nullptr == fec_packets[esi]) { + LOG_ERROR("Calloc failed for fec_packets[{}] with size [{}])", esi, + last_packet_size); + ReleaseFecPackets(fec_packets, len); + return nullptr; + } + memcpy(fec_packets[esi], data + esi * max_size_of_packet_, + last_packet_size); + } + } + + for (unsigned int esi = num_of_source_packets; esi < num_of_total_packets; + esi++) { + fec_packets[esi] = (uint8_t *)calloc(max_size_of_packet_, sizeof(uint8_t)); + if (nullptr == fec_packets[esi]) { + LOG_ERROR("Calloc failed for fec_packets[{}] with size [{}])", esi, + max_size_of_packet_); + ReleaseFecPackets(fec_packets, len); + return nullptr; + } + if (OF_STATUS_OK != + of_build_repair_symbol(fec_session_, (void **)fec_packets, esi)) { + LOG_ERROR("Build repair symbols failed for esi [{}]", esi); + ReleaseFecPackets(fec_packets, len); + return nullptr; + } + } + + return fec_packets; +} + +int FecEncoder::ReleaseFecPackets(uint8_t **fec_packets, size_t len) { + if (nullptr == fec_packets) { + LOG_ERROR("Release Fec packets failed, due to fec_packets is nullptr"); + return -1; + } + unsigned int last_packet_size = len % max_size_of_packet_; + unsigned int num_of_source_packets = + len / max_size_of_packet_ + (last_packet_size ? 1 : 0); + unsigned int num_of_total_packets = + (unsigned int)floor((double)num_of_source_packets / code_rate_); + + for (unsigned int esi = 0; esi < num_of_total_packets; esi++) { + if (fec_packets[esi]) { + free(fec_packets[esi]); + } + } + free(fec_packets); + + return 0; +} + +void FecEncoder::GetFecPacketsParams(unsigned int source_length, + unsigned int &num_of_total_packets, + unsigned int &num_of_source_packets, + unsigned int &last_packet_size) { + last_packet_size = source_length % max_size_of_packet_; + num_of_source_packets = + source_length / max_size_of_packet_ + (last_packet_size ? 1 : 0); + num_of_total_packets = + (unsigned int)floor((double)num_of_source_packets / code_rate_); +} \ No newline at end of file diff --git a/src/fec/fec_encoder.h b/src/fec/fec_encoder.h new file mode 100644 index 0000000..20913b8 --- /dev/null +++ b/src/fec/fec_encoder.h @@ -0,0 +1,49 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-13 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _FEC_ENCODER_H_ +#define _FEC_ENCODER_H_ + +#include + +#include + +#ifdef __cplusplus +extern "C" { +#endif +#include "lib_common/of_openfec_api.h" +#ifdef __cplusplus +}; +#endif + +class FecEncoder { + public: + FecEncoder(); + ~FecEncoder(); + + public: + int Init(); + int Release(); + uint8_t **Encode(const char *data, size_t len); + int ReleaseFecPackets(uint8_t **fec_packets, size_t len); + void GetFecPacketsParams(unsigned int source_length, + unsigned int &num_of_total_packets, + unsigned int &num_of_source_packets, + unsigned int &last_packet_size); + + private: + double code_rate_ = 0.667; + int max_size_of_packet_ = 1400; + + private: + of_codec_id_t fec_codec_id_ = OF_CODEC_REED_SOLOMON_GF_2_M_STABLE; + of_session_t *fec_session_ = nullptr; + of_parameters_t *fec_params_ = nullptr; + of_rs_2_m_parameters_t *fec_rs_params_ = nullptr; + of_ldpc_parameters_t *fec_ldpc_params_ = nullptr; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_codec.cpp b/src/rtp/rtp_codec.cpp index 2bb539c..cefd106 100644 --- a/src/rtp/rtp_codec.cpp +++ b/src/rtp/rtp_codec.cpp @@ -14,7 +14,9 @@ RtpCodec ::RtpCodec(RtpPacket::PAYLOAD_TYPE payload_type) has_padding_(false), has_extension_(false), payload_type_(payload_type), - sequence_number_(0) {} + sequence_number_(0) { + fec_encoder_.Init(); +} RtpCodec ::~RtpCodec() { if (extension_data_) { @@ -34,10 +36,71 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, // rtp_packet_ = new RtpPacket(); // } - RtpPacket rtp_packet; - if (RtpPacket::PAYLOAD_TYPE::H264 == payload_type_) { + if (fec_enable_ && IsKeyFrame((const uint8_t*)buffer, size)) { + uint8_t** fec_packets = fec_encoder_.Encode((const char*)buffer, size); + if (nullptr == fec_packets) { + LOG_ERROR("Invalid fec_packets"); + return; + } + unsigned int num_of_total_packets = 0; + unsigned int num_of_source_packets = 0; + unsigned int last_packet_size = 0; + fec_encoder_.GetFecPacketsParams(size, num_of_total_packets, + num_of_source_packets, last_packet_size); + + for (size_t index = 0; index < num_of_total_packets; index++) { + RtpPacket rtp_packet; + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(index == num_of_total_packets ? 1 : 0); + rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); + + timestamp_ = std::chrono::high_resolution_clock::now() + .time_since_epoch() + .count(); + rtp_packet.SetTimestamp(timestamp_); + rtp_packet.SetSsrc(ssrc_); + + if (!csrcs_.empty()) { + rtp_packet.SetCsrcs(csrcs_); + } + + if (has_extension_) { + rtp_packet.SetExtensionProfile(extension_profile_); + rtp_packet.SetExtensionData(extension_data_, extension_len_); + } + + RtpPacket::FU_INDICATOR fu_indicator; + fu_indicator.forbidden_bit = 0; + fu_indicator.nal_reference_idc = 0; + fu_indicator.nal_unit_type = FU_A; + + RtpPacket::FU_HEADER fu_header; + fu_header.start = index == 0 ? 1 : 0; + fu_header.end = index == num_of_total_packets - 1 ? 1 : 0; + fu_header.remain_bit = 0; + fu_header.nal_unit_type = FU_A; + + rtp_packet.SetFuIndicator(fu_indicator); + rtp_packet.SetFuHeader(fu_header); + + if (index == num_of_source_packets - 1 && last_packet_size > 0) { + rtp_packet.EncodeH264Fua(fec_packets[index], last_packet_size); + } else { + rtp_packet.EncodeH264Fua(fec_packets[index], MAX_NALU_LEN); + } + packets.emplace_back(rtp_packet); + } + + fec_encoder_.ReleaseFecPackets(fec_packets, size); + return; + } + if (size <= MAX_NALU_LEN) { + RtpPacket rtp_packet; rtp_packet.SetVerion(version_); rtp_packet.SetHasPadding(has_padding_); rtp_packet.SetHasExtension(has_extension_); @@ -73,6 +136,7 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, size_t packet_num = size / MAX_NALU_LEN + (last_packet_size ? 1 : 0); for (size_t index = 0; index < packet_num; index++) { + RtpPacket rtp_packet; rtp_packet.SetVerion(version_); rtp_packet.SetHasPadding(has_padding_); rtp_packet.SetHasExtension(has_extension_); @@ -119,6 +183,7 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, } } } else if (RtpPacket::PAYLOAD_TYPE::DATA == payload_type_) { + RtpPacket rtp_packet; rtp_packet.SetVerion(version_); rtp_packet.SetHasPadding(has_padding_); rtp_packet.SetHasExtension(has_extension_); @@ -156,4 +221,11 @@ size_t RtpCodec::Decode(RtpPacket& packet, uint8_t* payload) { LOG_ERROR("Default"); return packet.DecodeData(payload); } +} + +bool RtpCodec::IsKeyFrame(const uint8_t* buffer, size_t size) { + if (buffer != nullptr && size != 0 && (*(buffer + 4) & 0x1f) == 0x07) { + return true; + } + return false; } \ No newline at end of file diff --git a/src/rtp/rtp_codec.h b/src/rtp/rtp_codec.h index 73e263b..eb7659c 100644 --- a/src/rtp/rtp_codec.h +++ b/src/rtp/rtp_codec.h @@ -5,6 +5,7 @@ #include +#include "fec_encoder.h" #include "rtp_packet.h" class RtpCodec { @@ -18,6 +19,8 @@ class RtpCodec { // protected: // void OnReceiveFrame(uint8_t* payload) = 0; + private: + bool IsKeyFrame(const uint8_t* buffer, size_t size); private: uint32_t version_ = 0; @@ -38,6 +41,8 @@ class RtpCodec { private: // RtpPacket* rtp_packet_ = nullptr; RtpPacket::FU_INDICATOR fu_indicator_; + bool fec_enable_ = true; + FecEncoder fec_encoder_; }; #endif \ No newline at end of file diff --git a/xmake.lua b/xmake.lua index a36c8b8..ee5c90a 100644 --- a/xmake.lua +++ b/xmake.lua @@ -26,14 +26,13 @@ elseif is_os("macosx") then add_ldflags("-ld_classic", {force = true}) end -add_requires("asio 1.24.0", "nlohmann_json", "spdlog 1.11.0") -add_packages("spdlog") +add_requires("asio 1.24.0", "nlohmann_json", "spdlog 1.11.0", "openfec") +add_packages("spdlog", "openfec") includes("thirdparty") if has_config("server_only") then includes("application/signal_server") else - add_requires("openfec") if is_os("windows") then add_requires("vcpkg::ffmpeg 5.1.2", {configs = {shared = false}}) add_requires("vcpkg::libnice 0.1.21") @@ -84,6 +83,12 @@ else add_files("src/frame/*.cpp") add_includedirs("src/frame", {public = true}) + target("fec") + set_kind("static") + add_deps("log") + add_files("src/fec/*.cpp") + add_includedirs("src/fec", {public = true}) + target("rtcp") set_kind("static") add_deps("log") @@ -92,7 +97,7 @@ else target("rtp") set_kind("static") - add_deps("log", "frame", "ringbuffer", "thread", "rtcp") + add_deps("log", "frame", "ringbuffer", "thread", "rtcp", "fec") add_files("src/rtp/*.cpp") add_includedirs("src/rtp", {public = true})