diff --git a/src/frame/audio_frame.cpp b/src/frame/audio_frame.cpp new file mode 100644 index 0000000..ac9b7c3 --- /dev/null +++ b/src/frame/audio_frame.cpp @@ -0,0 +1,92 @@ +#include "audio_frame.h" + +#include + +#include + +AudioFrame::AudioFrame() {} + +AudioFrame::AudioFrame(size_t size) { + buffer_ = new uint8_t[size]; + size_ = size; + width_ = 0; + height_ = 0; +} + +AudioFrame::AudioFrame(const uint8_t *buffer, size_t size) { + buffer_ = new uint8_t[size]; + memcpy(buffer_, buffer, size); + size_ = size; + width_ = 0; + height_ = 0; +} + +AudioFrame::AudioFrame(const uint8_t *buffer, size_t size, size_t width, + size_t height) { + buffer_ = new uint8_t[size]; + memcpy(buffer_, buffer, size); + size_ = size; + width_ = width; + height_ = height; +} + +AudioFrame::AudioFrame(const AudioFrame &audio_frame) { + if (audio_frame.size_ > 0) { + buffer_ = new uint8_t[audio_frame.size_]; + memcpy(buffer_, audio_frame.buffer_, audio_frame.size_); + size_ = audio_frame.size_; + width_ = audio_frame.width_; + height_ = audio_frame.height_; + } +} + +AudioFrame::AudioFrame(AudioFrame &&audio_frame) + : buffer_((uint8_t *)std::move(audio_frame.buffer_)), + size_(audio_frame.size_), + width_(audio_frame.width_), + height_(audio_frame.height_) { + audio_frame.buffer_ = nullptr; + audio_frame.size_ = 0; + audio_frame.width_ = 0; + audio_frame.height_ = 0; +} + +AudioFrame &AudioFrame::operator=(const AudioFrame &audio_frame) { + if (&audio_frame != this) { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } + buffer_ = new uint8_t[audio_frame.size_]; + memcpy(buffer_, audio_frame.buffer_, audio_frame.size_); + size_ = audio_frame.size_; + width_ = audio_frame.width_; + height_ = audio_frame.height_; + } + return *this; +} + +AudioFrame &AudioFrame::operator=(AudioFrame &&audio_frame) { + if (&audio_frame != this) { + buffer_ = std::move(audio_frame.buffer_); + audio_frame.buffer_ = nullptr; + size_ = audio_frame.size_; + audio_frame.size_ = 0; + width_ = audio_frame.width_; + audio_frame.width_ = 0; + height_ = audio_frame.height_; + audio_frame.height_ = 0; + } + return *this; +} + +AudioFrame::~AudioFrame() { + if (buffer_) { + delete buffer_; + buffer_ = nullptr; + } + + size_ = 0; + width_ = 0; + height_ = 0; +} \ No newline at end of file diff --git a/src/frame/audio_frame.h b/src/frame/audio_frame.h new file mode 100644 index 0000000..3ad4a6b --- /dev/null +++ b/src/frame/audio_frame.h @@ -0,0 +1,39 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _AUDIO_FRAME_H_ +#define _AUDIO_FRAME_H_ + +#include +#include + +class AudioFrame { + public: + AudioFrame(); + AudioFrame(size_t size); + AudioFrame(const uint8_t *buffer, size_t size); + AudioFrame(const uint8_t *buffer, size_t size, size_t width, size_t height); + AudioFrame(const AudioFrame &audio_frame); + AudioFrame(AudioFrame &&audio_frame); + AudioFrame &operator=(const AudioFrame &audio_frame); + AudioFrame &operator=(AudioFrame &&audio_frame); + + ~AudioFrame(); + + public: + const uint8_t *Buffer() { return buffer_; } + const size_t Size() { return size_; } + + uint8_t *GetBuffer() { return buffer_; } + + private: + size_t width_ = 0; + size_t height_ = 0; + uint8_t *buffer_ = nullptr; + size_t size_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/frame/frame.cpp b/src/frame/video_frame.cpp similarity index 98% rename from src/frame/frame.cpp rename to src/frame/video_frame.cpp index eac78bc..1a201af 100644 --- a/src/frame/frame.cpp +++ b/src/frame/video_frame.cpp @@ -1,4 +1,4 @@ -#include "frame.h" +#include "video_frame.h" #include diff --git a/src/frame/frame.h b/src/frame/video_frame.h similarity index 81% rename from src/frame/frame.h rename to src/frame/video_frame.h index 41af760..de55742 100644 --- a/src/frame/frame.h +++ b/src/frame/video_frame.h @@ -1,8 +1,14 @@ -#ifndef _FRAME_H_ -#define _FRAME_H_ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _VIDEO_FRAME_H_ +#define _VIDEO_FRAME_H_ -#include #include +#include class VideoFrame { public: diff --git a/src/ice/ice_agent.cpp b/src/ice/ice_agent.cpp index b44db99..b126f27 100644 --- a/src/ice/ice_agent.cpp +++ b/src/ice/ice_agent.cpp @@ -83,7 +83,6 @@ int IceAgent::CreateIceAgent(nice_cb_state_changed_t on_state_changed, g_main_loop_run(gloop_); exit_nice_thread_ = true; - g_main_loop_unref(gloop_); })); do { @@ -96,7 +95,6 @@ int IceAgent::CreateIceAgent(nice_cb_state_changed_t on_state_changed, } void cb_closed(GObject *src, GAsyncResult *res, gpointer data) { - LOG_ERROR("cb_closed"); NiceAgent *agent = NICE_AGENT(src); g_debug("test-turn:%s: %p", G_STRFUNC, agent); diff --git a/src/ice/ice_agent.h b/src/ice/ice_agent.h index 83f52a2..d8460b6 100644 --- a/src/ice/ice_agent.h +++ b/src/ice/ice_agent.h @@ -1,9 +1,9 @@ #ifndef _ICE_AGENT_H_ #define _ICE_AGENT_H_ +#include #include #include -#include #include "gio/gnetworking.h" #include "glib.h" diff --git a/src/interface/x.h b/src/interface/x.h index 66e4d4b..8be266d 100644 --- a/src/interface/x.h +++ b/src/interface/x.h @@ -14,11 +14,20 @@ enum DATA_TYPE { VIDEO = 0, AUDIO, DATA }; enum ConnectionStatus { Connecting = 0, Connected, + Disconnected, Failed, Closed, IncorrectPassword }; +enum SignalStatus { + SignalConnecting = 0, + SignalConnected, + SignalFailed, + SignalClosed, + SignalReconnecting +}; + #ifdef __cplusplus extern "C" { #endif @@ -27,6 +36,8 @@ typedef struct Peer PeerPtr; typedef void (*OnReceiveBuffer)(const char*, size_t, const char*, size_t); +typedef void (*OnSignalStatus)(SignalStatus status); + typedef void (*OnConnectionStatus)(ConnectionStatus status); typedef void (*NetStatusReport)(const unsigned short, const unsigned short); @@ -36,6 +47,7 @@ typedef struct { OnReceiveBuffer on_receive_video_buffer; OnReceiveBuffer on_receive_audio_buffer; OnReceiveBuffer on_receive_data_buffer; + OnSignalStatus on_signal_status; OnConnectionStatus on_connection_status; NetStatusReport net_status_report; } Params; diff --git a/src/media/audio/decode/audio_decoder.cpp b/src/media/audio/decode/audio_decoder.cpp new file mode 100644 index 0000000..767f25c --- /dev/null +++ b/src/media/audio/decode/audio_decoder.cpp @@ -0,0 +1,66 @@ +#include "audio_decoder.h" + +#include "log.h" + +#define MAX_FRAME_SIZE 6 * 960 +#define CHANNELS 1 +unsigned char pcm_bytes[MAX_FRAME_SIZE * CHANNELS * 2]; +opus_int16 out_data[MAX_FRAME_SIZE * CHANNELS]; + +AudioDecoder::AudioDecoder(int sample_rate, int channel_num, int frame_size) + : sample_rate_(sample_rate), + channel_num_(channel_num), + frame_size_(frame_size) {} + +AudioDecoder::~AudioDecoder() { + if (opus_decoder_) { + opus_decoder_destroy(opus_decoder_); + } +} + +int AudioDecoder::Init() { + int err; + opus_decoder_ = opus_decoder_create(sample_rate_, channel_num_, &err); + opus_decoder_ctl(opus_decoder_, OPUS_SET_LSB_DEPTH(16)); + // opus_decoder_ctl(opus_decoder_, OPUS_SET_INBAND_FEC(1)); + + if (err < 0 || opus_decoder_ == NULL) { + LOG_ERROR("Create opus opus_decoder_ failed"); + return -1; + } + + // pcm_file = fopen("decode.pcm", "wb+"); + // pcm_file1 = fopen("decode1.pcm", "wb+"); + + return 0; +} + +int AudioDecoder::Decode( + const uint8_t* data, int size, + std::function on_receive_decoded_frame) { + // LOG_ERROR("input opus size = {}", size); + auto frame_size = + opus_decode(opus_decoder_, data, size, out_data, MAX_FRAME_SIZE, 0); + + if (frame_size < 0) { + LOG_ERROR("Decode opus frame failed"); + return -1; + } + + // LOG_ERROR("frame_size = {}", frame_size); + + // for (auto i = 0; i < channel_num_ * frame_size; i++) { + // pcm_bytes[2 * i] = out_data[i] & 0xFF; + // pcm_bytes[2 * i + 1] = (out_data[i] >> 8) & 0xFF; + // } + + // fwrite(pcm_bytes, sizeof(short), frame_size * channel_num_, pcm_file); + // fflush(pcm_file); + + if (on_receive_decoded_frame) { + on_receive_decoded_frame((uint8_t*)out_data, + frame_size * channel_num_ * sizeof(opus_int16)); + } + + return 0; +} diff --git a/src/media/audio/decode/audio_decoder.h b/src/media/audio/decode/audio_decoder.h new file mode 100644 index 0000000..0ac7ea9 --- /dev/null +++ b/src/media/audio/decode/audio_decoder.h @@ -0,0 +1,42 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _AUDIO_DECODER_H_ +#define _AUDIO_DECODER_H_ + +#include + +#include +#include +#include +#include +#include + +#include "audio_frame.h" +#include "opus/opus.h" + +class AudioDecoder { + public: + AudioDecoder(int sample_rate, int channel_num, int frame_size); + ~AudioDecoder(); + + public: + int Init(); + int Decode(const uint8_t *data, int size, + std::function on_receive_decoded_frame); + + private: + /* data */ + OpusDecoder *opus_decoder_ = nullptr; + int sample_rate_ = 48000; + int channel_num_ = 1; + int frame_size_ = 0; + + FILE *pcm_file; + FILE *pcm_file1; +}; + +#endif \ No newline at end of file diff --git a/src/media/audio/decode/opus_decoder.cpp b/src/media/audio/decode/opus_decoder.cpp deleted file mode 100644 index e69de29..0000000 diff --git a/src/media/audio/decode/opus_decoder.h b/src/media/audio/decode/opus_decoder.h deleted file mode 100644 index e69de29..0000000 diff --git a/src/media/audio/encode/audio_encoder.cpp b/src/media/audio/encode/audio_encoder.cpp new file mode 100644 index 0000000..104a5d4 --- /dev/null +++ b/src/media/audio/encode/audio_encoder.cpp @@ -0,0 +1,93 @@ +#include "audio_encoder.h" + +#include + +#include +#include + +#include "log.h" + +#define MAX_PACKET_SIZE 4000 +unsigned char output_data[MAX_PACKET_SIZE] = {0}; +static uint32_t last_ts = 0; +static unsigned char out_data[MAX_PACKET_SIZE] = {0}; + +AudioEncoder::AudioEncoder(int sample_rate, int channel_num, int frame_size) + : sample_rate_(sample_rate), + channel_num_(channel_num), + frame_size_(frame_size) {} + +AudioEncoder::~AudioEncoder() { + if (opus_encoder_) { + opus_encoder_destroy(opus_encoder_); + } +} + +int AudioEncoder::Init() { + last_ts = static_cast( + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + int err; + int applications[3] = {OPUS_APPLICATION_AUDIO, OPUS_APPLICATION_VOIP, + OPUS_APPLICATION_RESTRICTED_LOWDELAY}; + + opus_encoder_ = opus_encoder_create(sample_rate_, channel_num_, + OPUS_APPLICATION_VOIP, &err); + + if (err != OPUS_OK || opus_encoder_ == NULL) { + LOG_ERROR("Create opus encoder failed"); + } + + // opus_encoder_ctl(opus_encoder_, OPUS_SET_VBR(0)); + // opus_encoder_ctl(opus_encoder_, OPUS_SET_VBR_CONSTRAINT(true)); + // opus_encoder_ctl(opus_encoder_, + // OPUS_SET_BITRATE(sample_rate_ * channel_num_)); + // opus_encoder_ctl(opus_encoder_, OPUS_SET_COMPLEXITY(0)); + // opus_encoder_ctl(opus_encoder_, OPUS_SET_SIGNAL(OPUS_APPLICATION_VOIP)); + opus_encoder_ctl(opus_encoder_, OPUS_SET_LSB_DEPTH(16)); + // opus_encoder_ctl(opus_encoder_, OPUS_SET_DTX(0)); + // opus_encoder_ctl(opus_encoder_, OPUS_SET_INBAND_FEC(1)); + opus_encoder_ctl(opus_encoder_, + OPUS_SET_EXPERT_FRAME_DURATION(OPUS_FRAMESIZE_10_MS)); + + return 0; +} + +int AudioEncoder::Encode( + const uint8_t *data, int size, + std::function + on_encoded_audio_buffer) { + if (!on_encoded_audio_buffer_) { + on_encoded_audio_buffer_ = on_encoded_audio_buffer; + } + + // uint32_t now_ts = static_cast( + // std::chrono::duration_cast( + // std::chrono::high_resolution_clock::now().time_since_epoch()) + // .count()); + + // printf("1 Time cost: %d size: %d\n", now_ts - last_ts, size); + // last_ts = now_ts; + + auto ret = opus_encode(opus_encoder_, (opus_int16 *)data, size, out_data, + MAX_PACKET_SIZE); + if (ret < 0) { + printf("opus decode failed, %d\n", ret); + return -1; + } + + if (on_encoded_audio_buffer_) { + on_encoded_audio_buffer_((char *)out_data, ret); + } else { + OnEncodedAudioBuffer((char *)out_data, ret); + } + + return 0; +} + +int AudioEncoder::OnEncodedAudioBuffer(char *encoded_audio_buffer, + size_t size) { + LOG_INFO("OnEncodedAudioBuffer not implemented"); + return 0; +} diff --git a/src/media/audio/encode/audio_encoder.h b/src/media/audio/encode/audio_encoder.h new file mode 100644 index 0000000..e24d593 --- /dev/null +++ b/src/media/audio/encode/audio_encoder.h @@ -0,0 +1,41 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _AUDIO_ENCODER_H_ +#define _AUDIO_ENCODER_H_ + +#include +#include +#include +#include +#include + +#include "opus/opus.h" + +class AudioEncoder { + public: + AudioEncoder(int sample_rate, int channel_num, int frame_size); + ~AudioEncoder(); + + public: + int Init(); + int Encode(const uint8_t* data, int size, + std::function + on_encoded_audio_buffer); + int OnEncodedAudioBuffer(char* encoded_audio_buffer, size_t size); + + private: + OpusEncoder* opus_encoder_ = nullptr; + int sample_rate_ = 48000; + int channel_num_ = 1; + int frame_size_ = 480; + + std::queue pcm_queue; + std::function + on_encoded_audio_buffer_ = nullptr; +}; + +#endif \ No newline at end of file diff --git a/src/media/audio/encode/opus_encoder.cpp b/src/media/audio/encode/opus_encoder.cpp deleted file mode 100644 index e69de29..0000000 diff --git a/src/media/audio/encode/opus_encoder.h b/src/media/audio/encode/opus_encoder.h deleted file mode 100644 index e69de29..0000000 diff --git a/src/media/video/decode/video_decoder.h b/src/media/video/decode/video_decoder.h index 4676c80..05b7aec 100644 --- a/src/media/video/decode/video_decoder.h +++ b/src/media/video/decode/video_decoder.h @@ -1,9 +1,15 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + #ifndef _VIDEO_DECODER_H_ #define _VIDEO_DECODER_H_ #include -#include "frame.h" +#include "video_frame.h" class VideoDecoder { public: diff --git a/src/pc/peer_connection.cpp b/src/pc/peer_connection.cpp index e1fbb66..ef57265 100644 --- a/src/pc/peer_connection.cpp +++ b/src/pc/peer_connection.cpp @@ -65,10 +65,30 @@ int PeerConnection::Init(PeerConnectionParams params, on_receive_video_buffer_ = params.on_receive_video_buffer; on_receive_audio_buffer_ = params.on_receive_audio_buffer; on_receive_data_buffer_ = params.on_receive_data_buffer; + on_signal_status_ = params.on_signal_status; on_connection_status_ = params.on_connection_status; on_receive_ws_msg_ = [this](const std::string &msg) { ProcessSignal(msg); }; + on_ws_status_ = [this](WsStatus ws_status) { + if (WsStatus::WsOpening == ws_status) { + signal_status_ = SignalStatus::SignalConnecting; + on_signal_status_(SignalStatus::SignalConnecting); + } else if (WsStatus::WsOpened == ws_status) { + signal_status_ = SignalStatus::SignalConnected; + on_signal_status_(SignalStatus::SignalConnected); + } else if (WsStatus::WsFailed == ws_status) { + signal_status_ = SignalStatus::SignalFailed; + on_signal_status_(SignalStatus::SignalFailed); + } else if (WsStatus::WsClosed == ws_status) { + signal_status_ = SignalStatus::SignalClosed; + on_signal_status_(SignalStatus::SignalClosed); + } else if (WsStatus::WsReconnecting == ws_status) { + signal_status_ = SignalStatus::SignalReconnecting; + on_signal_status_(SignalStatus::SignalReconnecting); + } + }; + on_receive_video_ = [this](const char *data, size_t size, const char *user_id, size_t user_id_size) { int num_frame_returned = video_decoder_->Decode( @@ -83,9 +103,14 @@ int PeerConnection::Init(PeerConnectionParams params, on_receive_audio_ = [this](const char *data, size_t size, const char *user_id, size_t user_id_size) { - if (on_receive_audio_buffer_) { - on_receive_audio_buffer_(data, size, user_id, user_id_size); - } + int num_frame_returned = audio_decoder_->Decode( + (uint8_t *)data, size, + [this, user_id, user_id_size](uint8_t *data, int size) { + if (on_receive_audio_buffer_) { + on_receive_audio_buffer_((const char *)data, size, user_id, + user_id_size); + } + }); }; on_receive_data_ = [this](const char *data, size_t size, const char *user_id, @@ -96,7 +121,12 @@ int PeerConnection::Init(PeerConnectionParams params, }; on_ice_status_change_ = [this](std::string ice_status) { - if ("completed" == ice_status || "ready" == ice_status) { + if ("connecting" == ice_status) { + on_connection_status_(ConnectionStatus::Connecting); + } else if ("disconnected" == ice_status) { + on_connection_status_(ConnectionStatus::Disconnected); + } else if ("completed" == ice_status || "ready" == ice_status || + "connected" == ice_status) { ice_ready_ = true; on_connection_status_(ConnectionStatus::Connected); b_force_i_frame_ = true; @@ -110,18 +140,25 @@ int PeerConnection::Init(PeerConnectionParams params, } }; - ws_transport_ = std::make_shared(on_receive_ws_msg_); + ws_transport_ = + std::make_shared(on_receive_ws_msg_, on_ws_status_); uri_ = "ws://" + cfg_signal_server_ip_ + ":" + cfg_signal_server_port_; if (ws_transport_) { ws_transport_->Connect(uri_); } - do { - } while (SignalStatus::SignalConnected != GetSignalStatus()); + // do { + // } while (SignalStatus::SignalConnected != GetSignalStatus()); nv12_data_ = new char[1280 * 720 * 3 / 2]; if (0 != CreateVideoCodec(hardware_acceleration_)) { + LOG_ERROR("Create video codec failed"); + return -1; + } + + if (0 != CreateAudioCodec()) { + LOG_ERROR("Create audio codec failed"); return -1; } @@ -185,9 +222,30 @@ int PeerConnection::CreateVideoCodec(bool hardware_acceleration) { return 0; } +int PeerConnection::CreateAudioCodec() { + audio_encoder_ = std::make_unique(AudioEncoder(48000, 1, 480)); + if (!audio_encoder_ || 0 != audio_encoder_->Init()) { + LOG_ERROR("Audio encoder init failed"); + return -1; + } + + audio_decoder_ = std::make_unique(AudioDecoder(48000, 1, 480)); + if (!audio_decoder_ || 0 != audio_decoder_->Init()) { + LOG_ERROR("Audio decoder init failed"); + return -1; + } + + return 0; +} + int PeerConnection::Create(PeerConnectionParams params, const std::string &transmission_id, const std::string &password) { + if (SignalStatus::SignalConnected != GetSignalStatus()) { + LOG_ERROR("Signal not connected"); + return -1; + } + int ret = 0; password_ = password; @@ -208,6 +266,11 @@ int PeerConnection::Create(PeerConnectionParams params, int PeerConnection::Join(PeerConnectionParams params, const std::string &transmission_id, const std::string &password) { + if (SignalStatus::SignalConnected != GetSignalStatus()) { + LOG_ERROR("Signal not connected"); + return -1; + } + int ret = 0; password_ = password; @@ -218,6 +281,11 @@ int PeerConnection::Join(PeerConnectionParams params, } int PeerConnection::Leave() { + if (SignalStatus::SignalConnected != GetSignalStatus()) { + LOG_ERROR("Signal not connected"); + return -1; + } + json message = {{"type", "leave_transmission"}, {"user_id", user_id_}, {"transmission_id", transmission_id_}}; @@ -245,8 +313,6 @@ void PeerConnection::ProcessSignal(const std::string &signal) { ws_connection_id_ = j["ws_connection_id"].get(); LOG_INFO("Receive local peer websocket connection id [{}]", ws_connection_id_); - std::lock_guard l(signal_status_mutex_); - signal_status_ = SignalStatus::SignalConnected; break; } case "transmission_id"_H: { @@ -305,7 +371,7 @@ void PeerConnection::ProcessSignal(const std::string &signal) { ice_transmission_list_[remote_user_id]->JoinTransmission(); } - on_connection_status_(ConnectionStatus::Connecting); + // on_connection_status_(ConnectionStatus::Connecting); } break; @@ -363,6 +429,8 @@ void PeerConnection::ProcessSignal(const std::string &signal) { ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); ice_transmission_list_[remote_user_id]->GatherCandidates(); + + on_connection_status_(ConnectionStatus::Connecting); } break; } @@ -381,6 +449,8 @@ void PeerConnection::ProcessSignal(const std::string &signal) { ice_transmission_list_.end()) { ice_transmission_list_[remote_user_id]->SetRemoteSdp(remote_sdp); } + + on_connection_status_(ConnectionStatus::Connecting); } break; } @@ -392,6 +462,11 @@ void PeerConnection::ProcessSignal(const std::string &signal) { int PeerConnection::RequestTransmissionMemberList( const std::string &transmission_id, const std::string &password) { + if (SignalStatus::SignalConnected != GetSignalStatus()) { + LOG_ERROR("Signal not connected"); + return -1; + } + LOG_INFO("Request member list"); json message = {{"type", "query_user_id_list"}, @@ -445,9 +520,17 @@ int PeerConnection::SendVideoData(const char *data, size_t size) { } int PeerConnection::SendAudioData(const char *data, size_t size) { - for (auto &ice_trans : ice_transmission_list_) { - ice_trans.second->SendData(IceTransmission::DATA_TYPE::AUDIO, data, size); - } + int ret = audio_encoder_->Encode( + (uint8_t *)data, size, + [this](char *encoded_audio_buffer, size_t size) -> int { + for (auto &ice_trans : ice_transmission_list_) { + // LOG_ERROR("opus frame size: [{}]", size); + ice_trans.second->SendData(IceTransmission::DATA_TYPE::AUDIO, + encoded_audio_buffer, size); + } + return 0; + }); + return 0; } diff --git a/src/pc/peer_connection.h b/src/pc/peer_connection.h index 8f36f05..6942708 100644 --- a/src/pc/peer_connection.h +++ b/src/pc/peer_connection.h @@ -5,17 +5,19 @@ #include #include +#include "audio_decoder.h" +#include "audio_encoder.h" #include "ice_transmission.h" #include "video_decoder_factory.h" #include "video_encoder_factory.h" #include "ws_transmission.h" #include "x.h" -enum SignalStatus { SignalConnecting = 0, SignalConnected, SignalClosed }; - typedef void (*OnReceiveBuffer)(const char *, size_t, const char *, const size_t); +typedef void (*OnSignalStatus)(SignalStatus status); + typedef void (*OnConnectionStatus)(ConnectionStatus status); typedef void (*NetStatusReport)(const unsigned short, const unsigned short); @@ -25,6 +27,7 @@ typedef struct { OnReceiveBuffer on_receive_video_buffer; OnReceiveBuffer on_receive_audio_buffer; OnReceiveBuffer on_receive_data_buffer; + OnSignalStatus on_signal_status; OnConnectionStatus on_connection_status; NetStatusReport net_status_report; } PeerConnectionParams; @@ -56,6 +59,7 @@ class PeerConnection { private: int CreateVideoCodec(bool hardware_acceleration); + int CreateAudioCodec(); void ProcessSignal(const std::string &signal); @@ -81,12 +85,14 @@ class PeerConnection { private: std::shared_ptr ws_transport_ = nullptr; std::function on_receive_ws_msg_ = nullptr; + std::function on_ws_status_ = nullptr; unsigned int ws_connection_id_ = 0; std::string user_id_ = ""; std::string transmission_id_ = ""; std::vector user_id_list_; SignalStatus signal_status_ = SignalStatus::SignalClosed; std::mutex signal_status_mutex_; + bool leave_ = false; private: std::map> @@ -103,6 +109,7 @@ class PeerConnection { OnReceiveBuffer on_receive_video_buffer_; OnReceiveBuffer on_receive_audio_buffer_; OnReceiveBuffer on_receive_data_buffer_; + OnSignalStatus on_signal_status_; OnConnectionStatus on_connection_status_; char *nv12_data_ = nullptr; bool inited_ = false; @@ -114,6 +121,10 @@ class PeerConnection { bool hardware_accelerated_encode_ = false; bool hardware_accelerated_decode_ = false; bool b_force_i_frame_ = false; + + private: + std::unique_ptr audio_encoder_ = nullptr; + std::unique_ptr audio_decoder_ = nullptr; }; #endif \ No newline at end of file diff --git a/src/rtc/x_inner.cpp b/src/rtc/x_inner.cpp index 89e32c8..28247a5 100644 --- a/src/rtc/x_inner.cpp +++ b/src/rtc/x_inner.cpp @@ -19,6 +19,7 @@ PeerPtr *CreatePeer(const Params *params) { peer_ptr->pc_params.on_receive_video_buffer = params->on_receive_video_buffer; peer_ptr->pc_params.on_receive_audio_buffer = params->on_receive_audio_buffer; peer_ptr->pc_params.on_receive_data_buffer = params->on_receive_data_buffer; + peer_ptr->pc_params.on_signal_status = params->on_signal_status; peer_ptr->pc_params.on_connection_status = params->on_connection_status; peer_ptr->pc_params.net_status_report = params->net_status_report; @@ -26,21 +27,36 @@ PeerPtr *CreatePeer(const Params *params) { } int Init(PeerPtr *peer_ptr, const char *user_id) { + if (!peer_ptr) { + LOG_ERROR("peer_ptr not created"); + return -1; + } + peer_ptr->peer_connection->Init(peer_ptr->pc_params, user_id); return 0; } int CreateConnection(PeerPtr *peer_ptr, const char *transmission_id, const char *password) { - peer_ptr->peer_connection->Create(peer_ptr->pc_params, transmission_id, - password); + if (!peer_ptr) { + LOG_ERROR("peer_ptr not created"); + return -1; + } + LOG_INFO("CreateConnection [{}] with password [{}]", transmission_id, password); - return 0; + + return peer_ptr->peer_connection->Create(peer_ptr->pc_params, transmission_id, + password); } int JoinConnection(PeerPtr *peer_ptr, const char *transmission_id, const char *password) { + if (!peer_ptr) { + LOG_ERROR("peer_ptr not created"); + return -1; + } + peer_ptr->peer_connection->Join(peer_ptr->pc_params, transmission_id, password); LOG_INFO("JoinConnection[{}] with password [{}]", transmission_id, password); @@ -48,6 +64,11 @@ int JoinConnection(PeerPtr *peer_ptr, const char *transmission_id, } int LeaveConnection(PeerPtr *peer_ptr) { + if (!peer_ptr) { + LOG_ERROR("peer_ptr not created"); + return -1; + } + peer_ptr->peer_connection->Leave(); LOG_INFO("LeaveConnection"); return 0; @@ -55,6 +76,11 @@ int LeaveConnection(PeerPtr *peer_ptr) { int SendData(PeerPtr *peer_ptr, DATA_TYPE data_type, const char *data, size_t size) { + if (!peer_ptr) { + LOG_ERROR("peer_ptr not created"); + return -1; + } + if (DATA_TYPE::VIDEO == data_type) { peer_ptr->peer_connection->SendVideoData(data, size); } else if (DATA_TYPE::AUDIO == data_type) { @@ -63,6 +89,4 @@ int SendData(PeerPtr *peer_ptr, DATA_TYPE data_type, const char *data, peer_ptr->peer_connection->SendUserData(data, size); } return 0; -} - -int rtc() { return 0; } \ No newline at end of file +} \ No newline at end of file diff --git a/src/rtp/rtp_audio_receiver.cpp b/src/rtp/rtp_audio_receiver.cpp new file mode 100644 index 0000000..c381173 --- /dev/null +++ b/src/rtp/rtp_audio_receiver.cpp @@ -0,0 +1,90 @@ +#include "rtp_audio_receiver.h" + +#define RTCP_RR_INTERVAL 1000 + +RtpAudioReceiver::RtpAudioReceiver() {} + +RtpAudioReceiver::~RtpAudioReceiver() { + if (rtp_statistics_) { + rtp_statistics_->Stop(); + } +} + +void RtpAudioReceiver::InsertRtpPacket(RtpPacket& rtp_packet) { + if (!rtp_statistics_) { + rtp_statistics_ = std::make_unique(); + rtp_statistics_->Start(); + } + + if (rtp_statistics_) { + rtp_statistics_->UpdateReceiveBytes(rtp_packet.Size()); + } + + if (CheckIsTimeSendRR()) { + RtcpReceiverReport rtcp_rr; + RtcpReportBlock report; + + auto duration = std::chrono::system_clock::now().time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration); + uint32_t seconds_u32 = static_cast( + std::chrono::duration_cast(duration).count()); + + uint32_t fraction_u32 = static_cast( + std::chrono::duration_cast(duration - seconds) + .count()); + + report.source_ssrc = 0x00; + report.fraction_lost = 0; + report.cumulative_lost = 0; + report.extended_high_seq_num = 0; + report.jitter = 0; + report.lsr = 0; + report.dlsr = 0; + + rtcp_rr.SetReportBlock(report); + + rtcp_rr.Encode(); + + // SendRtcpRR(rtcp_rr); + } + + if (on_receive_data_) { + on_receive_data_((const char*)rtp_packet.Payload(), + rtp_packet.PayloadSize()); + } +} + +void RtpAudioReceiver::SetSendDataFunc( + std::function data_send_func) { + data_send_func_ = data_send_func; +} + +int RtpAudioReceiver::SendRtcpRR(RtcpReceiverReport& rtcp_rr) { + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); + return -1; + } + + if (data_send_func_((const char*)rtcp_rr.Buffer(), rtcp_rr.Size())) { + LOG_ERROR("Send RR failed"); + return -1; + } + + // LOG_ERROR("Send RR"); + + return 0; +} + +bool RtpAudioReceiver::CheckIsTimeSendRR() { + uint32_t now_ts = static_cast( + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + + if (now_ts - last_send_rtcp_rr_packet_ts_ >= RTCP_RR_INTERVAL) { + last_send_rtcp_rr_packet_ts_ = now_ts; + return true; + } else { + return false; + } +} \ No newline at end of file diff --git a/src/rtp/rtp_audio_receiver.h b/src/rtp/rtp_audio_receiver.h new file mode 100644 index 0000000..28afe89 --- /dev/null +++ b/src/rtp/rtp_audio_receiver.h @@ -0,0 +1,45 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RTP_AUDIO_RECEIVER_H_ +#define _RTP_AUDIO_RECEIVER_H_ + +#include + +#include "rtcp_receiver_report.h" +#include "rtp_codec.h" +#include "rtp_statistics.h" + +class RtpAudioReceiver { + public: + RtpAudioReceiver(); + ~RtpAudioReceiver(); + + public: + void InsertRtpPacket(RtpPacket& rtp_packet); + + void SetSendDataFunc(std::function data_send_func); + + void SetOnReceiveData( + std::function on_receive_data) { + on_receive_data_ = on_receive_data; + } + + private: + bool CheckIsTimeSendRR(); + int SendRtcpRR(RtcpReceiverReport& rtcp_rr); + + private: + std::function on_receive_data_ = nullptr; + uint32_t last_complete_frame_ts_ = 0; + + private: + std::unique_ptr rtp_statistics_ = nullptr; + uint32_t last_send_rtcp_rr_packet_ts_ = 0; + std::function data_send_func_ = nullptr; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_audio_sender.cpp b/src/rtp/rtp_audio_sender.cpp new file mode 100644 index 0000000..f90fff7 --- /dev/null +++ b/src/rtp/rtp_audio_sender.cpp @@ -0,0 +1,140 @@ +#include "rtp_audio_sender.h" + +#include + +#include "log.h" + +#define RTCP_SR_INTERVAL 1000 + +RtpAudioSender::RtpAudioSender() {} + +RtpAudioSender::~RtpAudioSender() { + if (rtp_statistics_) { + rtp_statistics_->Stop(); + } +} + +void RtpAudioSender::Enqueue(std::vector& rtp_packets) { + if (!rtp_statistics_) { + rtp_statistics_ = std::make_unique(); + rtp_statistics_->Start(); + } + + for (auto& rtp_packet : rtp_packets) { + rtp_packe_queue_.push(rtp_packet); + } +} + +void RtpAudioSender::SetSendDataFunc( + std::function data_send_func) { + data_send_func_ = data_send_func; +} + +int RtpAudioSender::SendRtpPacket(RtpPacket& rtp_packet) { + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); + return -1; + } + + int ret = 0; + + if (0 != + data_send_func_((const char*)rtp_packet.Buffer(), rtp_packet.Size())) { + LOG_ERROR("Send rtp packet failed"); + return -1; + } + + last_send_bytes_ += rtp_packet.Size(); + total_rtp_packets_sent_++; + total_rtp_payload_sent_ += rtp_packet.PayloadSize(); + + if (CheckIsTimeSendSR()) { + RtcpSenderReport rtcp_sr; + SenderInfo sender_info; + RtcpReportBlock report; + + auto duration = std::chrono::system_clock::now().time_since_epoch(); + auto seconds = std::chrono::duration_cast(duration); + uint32_t seconds_u32 = static_cast( + std::chrono::duration_cast(duration).count()); + + uint32_t fraction_u32 = static_cast( + std::chrono::duration_cast(duration - seconds) + .count()); + + sender_info.sender_ssrc = 0x00; + sender_info.ntp_ts_msw = (uint32_t)seconds_u32; + sender_info.ntp_ts_lsw = (uint32_t)fraction_u32; + sender_info.rtp_ts = + std::chrono::high_resolution_clock::now().time_since_epoch().count() * + 1000000; + sender_info.sender_packet_count = total_rtp_packets_sent_; + sender_info.sender_octet_count = total_rtp_payload_sent_; + + rtcp_sr.SetSenderInfo(sender_info); + + report.source_ssrc = 0x00; + report.fraction_lost = 0; + report.cumulative_lost = 0; + report.extended_high_seq_num = 0; + report.jitter = 0; + report.lsr = 0; + report.dlsr = 0; + + rtcp_sr.SetReportBlock(report); + + rtcp_sr.Encode(); + + SendRtcpSR(rtcp_sr); + } + + return 0; +} + +int RtpAudioSender::SendRtcpSR(RtcpSenderReport& rtcp_sr) { + if (!data_send_func_) { + LOG_ERROR("data_send_func_ is nullptr"); + return -1; + } + + if (data_send_func_((const char*)rtcp_sr.Buffer(), rtcp_sr.Size())) { + LOG_ERROR("Send SR failed"); + return -1; + } + + // LOG_ERROR("Send SR"); + + return 0; +} + +bool RtpAudioSender::CheckIsTimeSendSR() { + uint32_t now_ts = static_cast( + std::chrono::duration_cast( + std::chrono::high_resolution_clock::now().time_since_epoch()) + .count()); + + if (now_ts - last_send_rtcp_sr_packet_ts_ >= RTCP_SR_INTERVAL) { + last_send_rtcp_sr_packet_ts_ = now_ts; + return true; + } else { + return false; + } +} + +bool RtpAudioSender::Process() { + last_send_bytes_ = 0; + + for (size_t i = 0; i < 10; i++) + if (!rtp_packe_queue_.isEmpty()) { + RtpPacket rtp_packet; + rtp_packe_queue_.pop(rtp_packet); + SendRtpPacket(rtp_packet); + } + + if (rtp_statistics_) { + rtp_statistics_->UpdateSentBytes(last_send_bytes_); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + return true; +} \ No newline at end of file diff --git a/src/rtp/rtp_audio_sender.h b/src/rtp/rtp_audio_sender.h new file mode 100644 index 0000000..8a7a841 --- /dev/null +++ b/src/rtp/rtp_audio_sender.h @@ -0,0 +1,47 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + +#ifndef _RTP_AUDIO_SENDER_H_ +#define _RTP_AUDIO_SENDER_H_ + +#include + +#include "ringbuffer.h" +#include "rtcp_sender_report.h" +#include "rtp_packet.h" +#include "rtp_statistics.h" +#include "thread_base.h" + +class RtpAudioSender : public ThreadBase { + public: + RtpAudioSender(); + ~RtpAudioSender(); + + public: + void Enqueue(std::vector &rtp_packets); + void SetSendDataFunc(std::function data_send_func); + + private: + private: + int SendRtpPacket(RtpPacket &rtp_packet); + int SendRtcpSR(RtcpSenderReport &rtcp_sr); + + bool CheckIsTimeSendSR(); + + private: + bool Process() override; + + private: + std::function data_send_func_ = nullptr; + RingBuffer rtp_packe_queue_; + std::unique_ptr rtp_statistics_ = nullptr; + uint32_t last_send_bytes_ = 0; + uint32_t last_send_rtcp_sr_packet_ts_ = 0; + uint32_t total_rtp_packets_sent_ = 0; + uint32_t total_rtp_payload_sent_ = 0; +}; + +#endif \ No newline at end of file diff --git a/src/rtp/rtp_codec.cpp b/src/rtp/rtp_codec.cpp index 0e6ffe5..e759bfb 100644 --- a/src/rtp/rtp_codec.cpp +++ b/src/rtp/rtp_codec.cpp @@ -216,6 +216,22 @@ void RtpCodec::Encode(uint8_t* buffer, size_t size, packets.emplace_back(rtp_packet); } } + } else if (RtpPacket::PAYLOAD_TYPE::OPUS == payload_type_) { + RtpPacket rtp_packet; + rtp_packet.SetVerion(version_); + rtp_packet.SetHasPadding(has_padding_); + rtp_packet.SetHasExtension(has_extension_); + rtp_packet.SetMarker(1); + rtp_packet.SetPayloadType(RtpPacket::PAYLOAD_TYPE(payload_type_)); + rtp_packet.SetSequenceNumber(sequence_number_++); + + timestamp_ = + std::chrono::high_resolution_clock::now().time_since_epoch().count(); + rtp_packet.SetTimestamp(timestamp_); + rtp_packet.SetSsrc(ssrc_); + + rtp_packet.Encode(buffer, size); + packets.emplace_back(rtp_packet); } else if (RtpPacket::PAYLOAD_TYPE::DATA == payload_type_) { RtpPacket rtp_packet; rtp_packet.SetVerion(version_); diff --git a/src/rtp/rtp_data_sender.h b/src/rtp/rtp_data_sender.h index 5ab4d7e..53cc0b8 100644 --- a/src/rtp/rtp_data_sender.h +++ b/src/rtp/rtp_data_sender.h @@ -1,3 +1,9 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + #ifndef _RTP_DATA_SENDER_H_ #define _RTP_DATA_SENDER_H_ diff --git a/src/rtp/rtp_packet.cpp b/src/rtp/rtp_packet.cpp index 6d9d4f8..ceba633 100644 --- a/src/rtp/rtp_packet.cpp +++ b/src/rtp/rtp_packet.cpp @@ -17,6 +17,8 @@ void RtpPacket::TryToDecodeRtpPacket() { DecodeH264FecSource(); } else if (PAYLOAD_TYPE::H264_FEC_REPAIR == PAYLOAD_TYPE(buffer_[1] & 0x7F)) { DecodeH264FecRepair(); + } else if (PAYLOAD_TYPE::OPUS == PAYLOAD_TYPE(buffer_[1] & 0x7F)) { + DecodeOpus(); } else if (PAYLOAD_TYPE::DATA == PAYLOAD_TYPE(buffer_[1] & 0x7F)) { DecodeData(); } else { @@ -368,6 +370,50 @@ const uint8_t *RtpPacket::EncodeH264FecRepair( return buffer_; } +size_t RtpPacket::DecodeOpus(uint8_t *payload) { + version_ = (buffer_[0] >> 6) & 0x03; + has_padding_ = (buffer_[0] >> 5) & 0x01; + has_extension_ = (buffer_[0] >> 4) & 0x01; + total_csrc_number_ = buffer_[0] & 0x0f; + marker_ = (buffer_[1] >> 7) & 0x01; + payload_type_ = buffer_[1] & 0x7f; + sequence_number_ = (buffer_[2] << 8) | buffer_[3]; + timestamp_ = + (buffer_[4] << 24) | (buffer_[5] << 16) | (buffer_[6] << 8) | buffer_[7]; + ssrc_ = (buffer_[8] << 24) | (buffer_[9] << 16) | (buffer_[10] << 8) | + buffer_[11]; + + for (uint32_t index = 0; index < total_csrc_number_; index++) { + uint32_t csrc = (buffer_[12 + index] << 24) | (buffer_[13 + index] << 16) | + (buffer_[14 + index] << 8) | buffer_[15 + index]; + csrcs_.push_back(csrc); + } + + uint32_t extension_offset = total_csrc_number_ * 4; + if (has_extension_) { + extension_profile_ = + (buffer_[12 + extension_offset] << 8) | buffer_[13 + extension_offset]; + extension_len_ = + (buffer_[14 + extension_offset] << 8) | buffer_[15 + extension_offset]; + + // extension_data_ = new uint8_t[extension_len_]; + // memcpy(extension_data_, buffer_ + 16 + extension_offset, + // extension_len_); + extension_data_ = buffer_ + 16 + extension_offset; + } + + uint32_t payload_offset = + (has_extension_ ? extension_len_ : 0) + extension_offset; + + payload_size_ = size_ - (12 + payload_offset); + payload_ = buffer_ + 12 + payload_offset; + if (payload) { + memcpy(payload, payload_, payload_size_); + } + + return payload_size_; +} + size_t RtpPacket::DecodeData(uint8_t *payload) { version_ = (buffer_[0] >> 6) & 0x03; has_padding_ = (buffer_[0] >> 5) & 0x01; diff --git a/src/rtp/rtp_packet.h b/src/rtp/rtp_packet.h index cf85f1a..d9e8eb6 100644 --- a/src/rtp/rtp_packet.h +++ b/src/rtp/rtp_packet.h @@ -209,6 +209,7 @@ class RtpPacket { size_t DecodeH264Fua(uint8_t *payload = nullptr); size_t DecodeH264FecSource(uint8_t *payload = nullptr); size_t DecodeH264FecRepair(uint8_t *payload = nullptr); + size_t DecodeOpus(uint8_t *payload = nullptr); public: // Get Header diff --git a/src/rtp/rtp_video_receiver.h b/src/rtp/rtp_video_receiver.h index c6cae4f..36bb4b6 100644 --- a/src/rtp/rtp_video_receiver.h +++ b/src/rtp/rtp_video_receiver.h @@ -7,12 +7,12 @@ #include #include "fec_decoder.h" -#include "frame.h" #include "ringbuffer.h" #include "rtcp_receiver_report.h" #include "rtp_codec.h" #include "rtp_statistics.h" #include "thread_base.h" +#include "video_frame.h" class RtpVideoReceiver : public ThreadBase { public: diff --git a/src/thread/thread_base.cpp b/src/thread/thread_base.cpp index b304d59..47c8cc9 100644 --- a/src/thread/thread_base.cpp +++ b/src/thread/thread_base.cpp @@ -4,7 +4,7 @@ ThreadBase::ThreadBase() {} -ThreadBase::~ThreadBase() {} +ThreadBase::~ThreadBase() { Stop(); } void ThreadBase::Start() { if (!thread_) { diff --git a/src/transmission/ice_transmission.cpp b/src/transmission/ice_transmission.cpp index 1b0e74d..4fd22e3 100644 --- a/src/transmission/ice_transmission.cpp +++ b/src/transmission/ice_transmission.cpp @@ -28,8 +28,8 @@ IceTransmission::~IceTransmission() { rtp_video_sender_->Stop(); } - if (rtp_video_receiver_) { - rtp_video_receiver_->Stop(); + if (rtp_audio_sender_) { + rtp_audio_sender_->Stop(); } if (rtp_data_sender_) { @@ -47,6 +47,7 @@ int IceTransmission::InitIceTransmission(std::string &stun_ip, int stun_port, std::string &turn_username, std::string &turn_password) { video_rtp_codec_ = std::make_unique(RtpPacket::PAYLOAD_TYPE::H264); + audio_rtp_codec_ = std::make_unique(RtpPacket::PAYLOAD_TYPE::OPUS); data_rtp_codec_ = std::make_unique(RtpPacket::PAYLOAD_TYPE::DATA); rtp_video_receiver_ = std::make_unique(); @@ -82,6 +83,26 @@ int IceTransmission::InitIceTransmission(std::string &stun_ip, int stun_port, rtp_video_sender_->Start(); + rtp_audio_receiver_ = std::make_unique(); + rtp_audio_receiver_->SetOnReceiveData( + [this](const char *data, size_t size) -> void { + on_receive_audio_(data, size, remote_user_id_.data(), + remote_user_id_.size()); + }); + + rtp_audio_sender_ = std::make_unique(); + rtp_audio_sender_->SetSendDataFunc( + [this](const char *data, size_t size) -> int { + if (!ice_agent_) { + LOG_ERROR("ice_agent_ is nullptr"); + return -1; + } + + return ice_agent_->Send(data, size); + }); + + rtp_audio_sender_->Start(); + rtp_data_sender_ = std::make_unique(); rtp_data_sender_->SetSendDataFunc( [this](const char *data, size_t size) -> int { @@ -160,6 +181,10 @@ int IceTransmission::InitIceTransmission(std::string &stun_ip, int stun_port, RtpPacket packet((uint8_t *)buffer, size); ice_transmission_obj->rtp_video_receiver_->InsertRtpPacket( packet); + } else if (ice_transmission_obj->CheckIsAudioPacket(buffer, size)) { + RtpPacket packet((uint8_t *)buffer, size); + ice_transmission_obj->rtp_audio_receiver_->InsertRtpPacket( + packet); } else if (ice_transmission_obj->CheckIsDataPacket(buffer, size)) { RtpPacket packet((uint8_t *)buffer, size); ice_transmission_obj->rtp_data_receiver_->InsertRtpPacket(packet); @@ -269,6 +294,12 @@ int IceTransmission::SendData(DATA_TYPE type, const char *data, size_t size) { rtp_video_sender_->Enqueue(packets); } } else if (DATA_TYPE::AUDIO == type) { + if (rtp_audio_sender_) { + if (audio_rtp_codec_) { + audio_rtp_codec_->Encode((uint8_t *)data, size, packets); + rtp_audio_sender_->Enqueue(packets); + } + } } else if (DATA_TYPE::DATA == type) { if (rtp_data_sender_) { if (data_rtp_codec_) { diff --git a/src/transmission/ice_transmission.h b/src/transmission/ice_transmission.h index f062dfb..df24915 100644 --- a/src/transmission/ice_transmission.h +++ b/src/transmission/ice_transmission.h @@ -1,3 +1,9 @@ +/* + * @Author: DI JUNKUN + * @Date: 2023-11-24 + * Copyright (c) 2023 by DI JUNKUN, All Rights Reserved. + */ + #ifndef _ICE_TRANSMISSION_H_ #define _ICE_TRANSMISSION_H_ @@ -6,6 +12,8 @@ #include "congestion_control.h" #include "ice_agent.h" #include "ringbuffer.h" +#include "rtp_audio_receiver.h" +#include "rtp_audio_sender.h" #include "rtp_codec.h" #include "rtp_data_receiver.h" #include "rtp_data_sender.h" @@ -107,6 +115,8 @@ class IceTransmission { std::unique_ptr data_rtp_codec_ = nullptr; std::unique_ptr rtp_video_receiver_ = nullptr; std::unique_ptr rtp_video_sender_ = nullptr; + std::unique_ptr rtp_audio_receiver_ = nullptr; + std::unique_ptr rtp_audio_sender_ = nullptr; std::unique_ptr rtp_data_receiver_ = nullptr; std::unique_ptr rtp_data_sender_ = nullptr; uint8_t *rtp_payload_ = nullptr; diff --git a/src/ws/ws_core.cpp b/src/ws/ws_core.cpp index ac7faf6..5fb5594 100644 --- a/src/ws/ws_core.cpp +++ b/src/ws/ws_core.cpp @@ -20,7 +20,7 @@ WsCore::WsCore() { WsCore::~WsCore() { m_endpoint_.stop_perpetual(); - if (GetStatus() != "Open") { + if (GetStatus() != WsStatus::WsOpened) { // Only close open connections return; } @@ -42,6 +42,8 @@ WsCore::~WsCore() { } int WsCore::Connect(std::string const &uri) { + uri_ = uri; + websocketpp::lib::error_code ec; client::connection_ptr con = m_endpoint_.get_connection(uri, ec); @@ -81,6 +83,9 @@ int WsCore::Connect(std::string const &uri) { m_endpoint_.connect(con); + ws_status_ = WsStatus::WsOpening; + OnWsStatus(WsStatus::WsOpening); + return 0; } @@ -118,21 +123,26 @@ void WsCore::Ping(websocketpp::connection_hdl hdl) { } } -const std::string &WsCore::GetStatus() { return connection_status_; } +WsStatus WsCore::GetStatus() { return ws_status_; } void WsCore::OnOpen(client *c, websocketpp::connection_hdl hdl) { - connection_status_ = "Open"; + ws_status_ = WsStatus::WsOpened; + OnWsStatus(WsStatus::WsOpened); ping_thread_ = websocketpp::lib::make_shared( &WsCore::Ping, this, hdl); } void WsCore::OnFail(client *c, websocketpp::connection_hdl hdl) { - connection_status_ = "Failed"; + ws_status_ = WsStatus::WsFailed; + OnWsStatus(WsStatus::WsFailed); + + Connect(uri_); } void WsCore::OnClose(client *c, websocketpp::connection_hdl hdl) { - connection_status_ = "Closed"; + ws_status_ = WsStatus::WsClosed; + OnWsStatus(WsStatus::WsClosed); } bool WsCore::OnPing(websocketpp::connection_hdl hdl, std::string msg) { @@ -152,6 +162,8 @@ void WsCore::OnPongTimeout(websocketpp::connection_hdl hdl, std::string msg) { LOG_WARN("Pong timeout, reset connection"); // m_endpoint_.close(hdl, websocketpp::close::status::normal, // "OnPongTimeout"); + ws_status_ = WsStatus::WsReconnecting; + OnWsStatus(WsStatus::WsReconnecting); m_endpoint_.reset(); } diff --git a/src/ws/ws_core.h b/src/ws/ws_core.h index 6373467..0d08ad8 100644 --- a/src/ws/ws_core.h +++ b/src/ws/ws_core.h @@ -13,6 +13,8 @@ typedef websocketpp::client client; +enum WsStatus { WsOpening = 0, WsOpened, WsFailed, WsClosed, WsReconnecting }; + class WsCore { public: WsCore(); @@ -27,7 +29,7 @@ class WsCore { void Ping(websocketpp::connection_hdl hdl); - const std::string &GetStatus(); + WsStatus GetStatus(); // Callback void OnOpen(client *c, websocketpp::connection_hdl hdl); @@ -46,14 +48,17 @@ class WsCore { virtual void OnReceiveMessage(const std::string &msg) = 0; + virtual void OnWsStatus(WsStatus ws_status) = 0; + private: client m_endpoint_; websocketpp::connection_hdl connection_handle_; websocketpp::lib::shared_ptr m_thread_; websocketpp::lib::shared_ptr ping_thread_; - std::string connection_status_ = "Connecting"; + WsStatus ws_status_ = WsStatus::WsClosed; int timeout_count_ = 0; + std::string uri_; }; #endif \ No newline at end of file diff --git a/src/ws/ws_transmission.cpp b/src/ws/ws_transmission.cpp index c582794..cb32c7b 100644 --- a/src/ws/ws_transmission.cpp +++ b/src/ws/ws_transmission.cpp @@ -3,8 +3,9 @@ #include "log.h" WsTransmission::WsTransmission( - std::function on_receive_msg_cb) - : on_receive_msg_(on_receive_msg_cb) {} + std::function on_receive_msg_cb, + std::function on_ws_status_cb) + : on_receive_msg_(on_receive_msg_cb), on_ws_status_(on_ws_status_cb) {} WsTransmission::~WsTransmission() {} @@ -13,4 +14,11 @@ void WsTransmission::OnReceiveMessage(const std::string &msg) { if (on_receive_msg_) { on_receive_msg_(msg); } +} + +void WsTransmission::OnWsStatus(WsStatus ws_status) { + // LOG_INFO("Receive msg: {}", msg); + if (on_ws_status_) { + on_ws_status_(ws_status); + } } \ No newline at end of file diff --git a/src/ws/ws_transmission.h b/src/ws/ws_transmission.h index a527087..5a9a6f7 100644 --- a/src/ws/ws_transmission.h +++ b/src/ws/ws_transmission.h @@ -5,14 +5,18 @@ class WsTransmission : public WsCore { public: - WsTransmission(std::function on_receive_msg_cb); + WsTransmission(std::function on_receive_msg_cb, + std::function on_ws_status_cb); ~WsTransmission(); public: void OnReceiveMessage(const std::string &msg); + void OnWsStatus(WsStatus ws_status); + private: std::function on_receive_msg_ = nullptr; + std::function on_ws_status_ = nullptr; }; #endif \ No newline at end of file diff --git a/tests/opus/OpusDecoderImpl.cpp b/tests/opus/OpusDecoderImpl.cpp index 31e4f3e..4be9690 100644 --- a/tests/opus/OpusDecoderImpl.cpp +++ b/tests/opus/OpusDecoderImpl.cpp @@ -1,6 +1,6 @@ #include "OpusDecoderImpl.h" -#define MAX_FRAME_SIZE 6 * 960 -#define CHANNELS 2 +#define MAX_FRAME_SIZE 960 +#define CHANNELS 1 OpusDecoderImpl::OpusDecoderImpl(int sampleRate, int channel) { int err; @@ -9,7 +9,7 @@ OpusDecoderImpl::OpusDecoderImpl(int sampleRate, int channel) { sample_rate = sample_rate; channel_num = channel; if (err < 0 || decoder == NULL) { - printf("创建解码器失败\n"); + printf("Create opus decoder failed\n"); return; } @@ -22,7 +22,7 @@ bool OpusDecoderImpl::Decode(unsigned char* in_data, int len) { auto frame_size = opus_decode(decoder, in_data, len, out, MAX_FRAME_SIZE, 0); if (frame_size < 0) { - printf("解码失败\n"); + printf("Invalid frame size\n"); return false; } diff --git a/tests/opus/OpusEncoderImpl.cpp b/tests/opus/OpusEncoderImpl.cpp index bb835e9..b0240b5 100644 --- a/tests/opus/OpusEncoderImpl.cpp +++ b/tests/opus/OpusEncoderImpl.cpp @@ -22,7 +22,7 @@ OpusEncoderImpl::OpusEncoderImpl(int sampleRate, int channel) opus_encoder_ctl(encoder, OPUS_SET_VBR(0)); // 0:CBR, 1:VBR opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(true)); - opus_encoder_ctl(encoder, OPUS_SET_BITRATE(96000)); + opus_encoder_ctl(encoder, OPUS_SET_BITRATE(sample_rate * channel_num)); opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(8)); // 8 0~10 opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(OPUS_SIGNAL_VOICE)); opus_encoder_ctl(encoder, @@ -57,13 +57,13 @@ bool OpusEncoderImpl::PopFrame(StreamInfo &info) { // 48000 sample rate,48 samples/ms * 20ms * 2 channel = 1920 void OpusEncoderImpl::EncodeRun() { m_thread = std::make_unique([this]() { - const int frame_size = 48 * 20; // 960 - const int input_len = sizeof(opus_int16) * frame_size * 2; + const int frame_size = 48 * 20; // 1920 + int input_len = sizeof(opus_int16) * frame_size * channel_num; - OpusDecoderImpl decoder(48000, channel_num); + OpusDecoderImpl decoder(sample_rate, channel_num); - opus_int16 input_data[frame_size * 2] = {0}; - unsigned char input_buffer[input_len] = {0}; + opus_int16 input_data[frame_size] = {0}; + unsigned char *input_buffer = new unsigned char[input_len]; unsigned char out_data[MAX_PACKET_SIZE] = {0}; while (isRuning) { @@ -99,6 +99,8 @@ void OpusEncoderImpl::EncodeRun() { std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } + + free(input_buffer); }); } diff --git a/tests/opus/OpusEncoderImpl.h b/tests/opus/OpusEncoderImpl.h index 61732f6..baafe9d 100644 --- a/tests/opus/OpusEncoderImpl.h +++ b/tests/opus/OpusEncoderImpl.h @@ -6,6 +6,7 @@ #include #include +#include "OpusDecoderImpl.h" #include "base_type.h" #include "opus/opus.h" @@ -21,6 +22,8 @@ class OpusEncoderImpl { std::mutex access_mutex; std::unique_ptr m_thread; + OpusDecoderImpl *decoder = nullptr; + public: OpusEncoderImpl(int sampleRate, int channel); void Feed(unsigned char *data, int len); diff --git a/tests/opus/main.cpp b/tests/opus/main.cpp index 2703e6a..4decc82 100644 --- a/tests/opus/main.cpp +++ b/tests/opus/main.cpp @@ -1,3 +1,20 @@ +#include +#include +#include + +extern "C" { +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +}; + #include #include #include @@ -5,27 +22,197 @@ #include "OpusEncoderImpl.h" #include "opus/opus.h" -int main() { - OpusEncoderImpl* opusEncoder = new OpusEncoderImpl(48000, 2); +static SDL_AudioDeviceID input_dev; +static SDL_AudioDeviceID output_dev; - std::ifstream inputFile("ls.pcm", std::ios::binary); - if (!inputFile) { - std::cerr << "Failed to open input file." << std::endl; +static Uint8 *buffer = 0; +static int in_pos = 0; +static int out_pos = 0; + +char *out = "audio_old.pcm"; +FILE *outfile = fopen(out, "wb+"); + +static OpusEncoderImpl *opusEncoder = nullptr; + +int64_t src_ch_layout = AV_CH_LAYOUT_MONO; +int src_rate = 48000; +enum AVSampleFormat src_sample_fmt = AV_SAMPLE_FMT_FLT; +int src_nb_channels = 0; +uint8_t **src_data = NULL; // 二级指针 +int src_linesize; +int src_nb_samples = 480; + +// 输出参数 +int64_t dst_ch_layout = AV_CH_LAYOUT_STEREO; +int dst_rate = 48000; +enum AVSampleFormat dst_sample_fmt = AV_SAMPLE_FMT_S16; +int dst_nb_channels = 0; +uint8_t **dst_data = NULL; // 二级指针 +int dst_linesize; +int dst_nb_samples; +int max_dst_nb_samples; + +// 输出文件 +const char *dst_filename = NULL; // 保存输出的pcm到本地,然后播放验证 +FILE *dst_file; + +int dst_bufsize; +const char *fmt; + +// 重采样实例 +struct SwrContext *swr_ctx; + +double t; +int ret; + +void cb_in(void *userdata, Uint8 *stream, int len) { + // If len < 4, the printf below will probably segfault + { + fwrite(stream, 1, len, outfile); + fflush(outfile); + } + { + int64_t delay = swr_get_delay(swr_ctx, src_rate); + dst_nb_samples = + av_rescale_rnd(delay + src_nb_samples, dst_rate, src_rate, AV_ROUND_UP); + if (dst_nb_samples > max_dst_nb_samples) { + av_freep(&dst_data[0]); + ret = av_samples_alloc(dst_data, &dst_linesize, dst_nb_channels, + dst_nb_samples, dst_sample_fmt, 1); + if (ret < 0) return; + max_dst_nb_samples = dst_nb_samples; + } + + ret = swr_convert(swr_ctx, dst_data, dst_nb_samples, + (const uint8_t **)&stream, src_nb_samples); + if (ret < 0) { + fprintf(stderr, "Error while converting\n"); + return; + } + dst_bufsize = av_samples_get_buffer_size(&dst_linesize, dst_nb_channels, + ret, dst_sample_fmt, 1); + if (dst_bufsize < 0) { + fprintf(stderr, "Could not get sample buffer size\n"); + return; + } + printf("t:%f in:%d out:%d\n", t, src_nb_samples, ret); + fwrite(dst_data[0], 1, dst_bufsize, dst_file); + opusEncoder->Feed(dst_data[0], dst_bufsize); + } +} + +void cb_out(void *userdata, Uint8 *stream, int len) { + // If len < 4, the printf below will probably segfault + + SDL_memcpy(buffer + out_pos, stream, len); + out_pos += len; +} + +int init() { + dst_filename = "res.pcm"; + + dst_file = fopen(dst_filename, "wb"); + if (!dst_file) { + fprintf(stderr, "Could not open destination file %s\n", dst_filename); + exit(1); + } + + // 创建重采样器 + /* create resampler context */ + swr_ctx = swr_alloc(); + if (!swr_ctx) { + fprintf(stderr, "Could not allocate resampler context\n"); + ret = AVERROR(ENOMEM); return -1; } - char sample[960]; - while (inputFile.read(sample, 960)) { - opusEncoder->Feed((unsigned char*)sample, 960); + // 设置重采样参数 + /* set options */ + // 输入参数 + av_opt_set_int(swr_ctx, "in_channel_layout", src_ch_layout, 0); + av_opt_set_int(swr_ctx, "in_sample_rate", src_rate, 0); + av_opt_set_sample_fmt(swr_ctx, "in_sample_fmt", src_sample_fmt, 0); + // 输出参数 + av_opt_set_int(swr_ctx, "out_channel_layout", dst_ch_layout, 0); + av_opt_set_int(swr_ctx, "out_sample_rate", dst_rate, 0); + av_opt_set_sample_fmt(swr_ctx, "out_sample_fmt", dst_sample_fmt, 0); + + // 初始化重采样 + /* initialize the resampling context */ + if ((ret = swr_init(swr_ctx)) < 0) { + fprintf(stderr, "Failed to initialize the resampling context\n"); + return -1; } - // // 读取编码后的opus,一般放在单独线程,这里只是为了方便 - // StreamInfo info; - // while (opusEncoder.PopFrame(info)) { - // ..... - // } + /* allocate source and destination samples buffers */ + // 计算出输入源的通道数量 + src_nb_channels = av_get_channel_layout_nb_channels(src_ch_layout); + // 给输入源分配内存空间 + ret = av_samples_alloc_array_and_samples(&src_data, &src_linesize, + src_nb_channels, src_nb_samples, + src_sample_fmt, 0); + if (ret < 0) { + fprintf(stderr, "Could not allocate source samples\n"); + return -1; + } + + /* compute the number of converted samples: buffering is avoided + * ensuring that the output buffer will contain at least all the + * converted input samples */ + // 计算输出采样数量 + max_dst_nb_samples = dst_nb_samples = + av_rescale_rnd(src_nb_samples, dst_rate, src_rate, AV_ROUND_UP); + + /* buffer is going to be directly written to a rawaudio file, no alignment */ + dst_nb_channels = av_get_channel_layout_nb_channels(dst_ch_layout); + // 分配输出缓存内存 + ret = av_samples_alloc_array_and_samples(&dst_data, &dst_linesize, + dst_nb_channels, dst_nb_samples, + dst_sample_fmt, 0); + if (ret < 0) { + fprintf(stderr, "Could not allocate destination samples\n"); + return -1; + } +} + +int main() { + init(); + + SDL_Init(SDL_INIT_AUDIO); + + // 16Mb should be enough; the test lasts 5 seconds + buffer = (Uint8 *)malloc(16777215); + + SDL_AudioSpec want_in, want_out, have_in, have_out; + + SDL_zero(want_in); + want_in.freq = 48000; + want_in.format = AUDIO_F32LSB; + want_in.channels = 2; + want_in.samples = 960; + want_in.callback = cb_in; + + input_dev = SDL_OpenAudioDevice(NULL, 1, &want_in, &have_in, + SDL_AUDIO_ALLOW_ANY_CHANGE); + + printf("%d %d %d %d\n", have_in.freq, have_in.format, have_in.channels, + have_in.samples); + if (input_dev == 0) { + SDL_Log("Failed to open input: %s", SDL_GetError()); + return 1; + } + + SDL_PauseAudioDevice(input_dev, 0); + SDL_PauseAudioDevice(output_dev, 0); + + opusEncoder = new OpusEncoderImpl(have_in.freq, have_in.channels); + + SDL_Delay(5000); opusEncoder->Stop(); + SDL_CloseAudioDevice(output_dev); + SDL_CloseAudioDevice(input_dev); + free(buffer); - return 0; -} \ No newline at end of file + fclose(outfile); +} diff --git a/tests/opus/opus_test.cpp b/tests/opus/opus_test.cpp deleted file mode 100644 index ce9dde2..0000000 --- a/tests/opus/opus_test.cpp +++ /dev/null @@ -1,96 +0,0 @@ -#include -#include -#include - -// Opus编码函数 -#include - -#include -#include - -#define SAMPLE_RATE 48000 -#define CHANNELS 2 -#define FRAME_SIZE 960 -#define APPLICATION OPUS_APPLICATION_AUDIO - -// 编码函数 -int encode(const std::vector& pcm, - std::vector& opus) { - // 创建编码器 - int error; - OpusEncoder* encoder = - opus_encoder_create(SAMPLE_RATE, CHANNELS, APPLICATION, &error); - if (error != OPUS_OK) { - std::cerr << "Failed to create encoder: " << opus_strerror(error) - << std::endl; - return error; - } - - // 设置编码器参数 - opus_encoder_ctl(encoder, OPUS_SET_BITRATE(64000)); - - // 计算最大输出大小 - int maxOpusSize = FRAME_SIZE * CHANNELS * sizeof(opus_int16); - opus.resize(maxOpusSize); - - // 编码 - int encodedSize = - opus_encode(encoder, pcm.data(), FRAME_SIZE, opus.data(), maxOpusSize); - if (encodedSize < 0) { - std::cerr << "Encoding error: " << opus_strerror(encodedSize) << std::endl; - return encodedSize; - } - - // 清理资源 - opus_encoder_destroy(encoder); - - // 调整输出向量的大小 - opus.resize(encodedSize); - - return 0; -} - -int main(int argc, char** argv) { - if (argc != 3) { - std::cerr << "Usage: " << argv[0] << " input.pcm output.opus" << std::endl; - return -1; - } - - // 打开输入文件 - std::ifstream inputFile(argv[1], std::ios::binary); - if (!inputFile) { - std::cerr << "Failed to open input file." << std::endl; - return -1; - } - - // 读取PCM数据 - std::vector pcmData; - opus_int16 sample; - while (inputFile.read(reinterpret_cast(&sample), sizeof(opus_int16))) { - pcmData.push_back(sample); - } - - // 编码为Opus格式 - std::vector opusData; - int result = encode(pcmData, opusData); - if (result != 0) { - std::cerr << "Encoding failed with error code " << result << std::endl; - return result; - } - - // 打开输出文件 - std::ofstream outputFile(argv[2], std::ios::binary); - if (!outputFile) { - std::cerr << "Failed to open output file." << std::endl; - return -1; - } - - // 写入Opus数据 - outputFile.write(reinterpret_cast(opusData.data()), - opusData.size()); - - // 完成 - std::cout << "Encoding complete. size:" << pcmData.size() * 2 << std::endl; - - return 0; -} diff --git a/xmake.lua b/xmake.lua index 48ee512..061e030 100644 --- a/xmake.lua +++ b/xmake.lua @@ -7,16 +7,12 @@ set_languages("c++17") set_installdir("$(projectdir)/out") -option("server_only") - set_showmenu(true) -option_end() - add_defines("ASIO_STANDALONE", "ASIO_HAS_STD_TYPE_TRAITS", "ASIO_HAS_STD_SHARED_PTR", "ASIO_HAS_STD_ADDRESSOF", "ASIO_HAS_STD_ATOMIC", "ASIO_HAS_STD_CHRONO", "ASIO_HAS_CSTDINT", "ASIO_HAS_STD_ARRAY", "ASIO_HAS_STD_SYSTEM_ERROR") -add_requires("asio 1.24.0", "nlohmann_json", "spdlog 1.11.0", "openfec", "libopus 1.4") -add_packages("asio", "nlohmann_json", "spdlog", "openfec", "opus") +add_requires("asio 1.24.0", "nlohmann_json", "spdlog 1.11.0", "openfec", "libopus 1.4", "sdl2") +add_packages("asio", "nlohmann_json", "spdlog", "openfec", "libopus", "sdl2") includes("thirdparty") @@ -271,10 +267,10 @@ target("projectx") -- add_files("tests/fec/simple_server.cpp") -- add_includedirs("tests/fec") --- target("opus_test") --- set_kind("binary") --- add_packages("libopus") --- add_files("tests/opus/OpusEncoderImpl.cpp", --- "tests/opus/OpusDecoderImpl.cpp", --- "tests/opus/main.cpp") --- add_includedirs("tests/opus") \ No newline at end of file +target("opus_test") + set_kind("binary") + add_packages("libopus", "sdl2") + add_files("tests/opus/OpusEncoderImpl.cpp", + "tests/opus/OpusDecoderImpl.cpp", + "tests/opus/main.cpp") + add_includedirs("tests/opus") \ No newline at end of file