From 78c54136e225098409a712d3114c57b8e2549af8 Mon Sep 17 00:00:00 2001 From: kunkundi Date: Tue, 15 Jul 2025 15:38:58 +0800 Subject: [PATCH] [fix] fix crash when Stop() called in SpeakerCapturerLinux --- .../linux/speaker_capturer_linux.cpp | 353 ++++++++++-------- .../linux/speaker_capturer_linux.h | 38 +- thirdparty/minirtc | 2 +- 3 files changed, 234 insertions(+), 159 deletions(-) diff --git a/src/speaker_capturer/linux/speaker_capturer_linux.cpp b/src/speaker_capturer/linux/speaker_capturer_linux.cpp index 71797b3..777a6ec 100644 --- a/src/speaker_capturer/linux/speaker_capturer_linux.cpp +++ b/src/speaker_capturer/linux/speaker_capturer_linux.cpp @@ -1,132 +1,20 @@ #include "speaker_capturer_linux.h" #include -#include -#include +#include -#include #include -#include -#include -#include +#include #include -#include -#include #include "rd_log.h" -namespace { constexpr int kSampleRate = 48000; constexpr pa_sample_format_t kFormat = PA_SAMPLE_S16LE; constexpr int kChannels = 1; -constexpr int kFrameSize = 480; - -std::string GetDefaultMonitorSourceName() { - std::string result; - std::mutex mtx; - std::condition_variable cv; - bool done = false; - - pa_mainloop* m = pa_mainloop_new(); - pa_mainloop_api* api = pa_mainloop_get_api(m); - pa_context* context = pa_context_new(api, "MonitorSourceQuery"); - - auto user_data = - new std::tuple{&result, &done, &mtx, &cv}; - - pa_context_set_state_callback( - context, - [](pa_context* c, void* userdata) { - if (!c) return; - auto state = pa_context_get_state(c); - if (state == PA_CONTEXT_READY) { - pa_operation* op = pa_context_get_server_info( - c, - [](pa_context* c, const pa_server_info* info, void* userdata) { - if (!info) return; - - auto [result_ptr, done_ptr, mtx_ptr, cv_ptr] = - *static_cast*>( - userdata); - - std::string default_sink = info->default_sink_name; - std::string expected_monitor = default_sink + ".monitor"; - - pa_operation* o2 = pa_context_get_source_info_list( - c, - [](pa_context* c, const pa_source_info* i, int eol, - void* userdata) { - if (eol) { - auto [result_ptr, done_ptr, mtx_ptr, cv_ptr] = - *static_cast< - std::tuple*>( - userdata); - std::lock_guard lock(*mtx_ptr); - *done_ptr = true; - cv_ptr->notify_one(); - return; - } - - if (i && i->monitor_of_sink != PA_INVALID_INDEX) { - auto [result_ptr, done_ptr, mtx_ptr, cv_ptr] = - *static_cast< - std::tuple*>( - userdata); - *result_ptr = i->name; - } - }, - userdata); - - if (o2) pa_operation_unref(o2); - }, - userdata); - if (op) pa_operation_unref(op); - } else if (state == PA_CONTEXT_FAILED || - state == PA_CONTEXT_TERMINATED) { - auto [result_ptr, done_ptr, mtx_ptr, cv_ptr] = - *static_cast*>(userdata); - std::lock_guard lock(*mtx_ptr); - *done_ptr = true; - cv_ptr->notify_one(); - } - }, - user_data); - - if (pa_context_connect(context, nullptr, PA_CONTEXT_NOFLAGS, nullptr) < 0) { - LOG_ERROR("Failed to connect to PulseAudio context"); - pa_context_unref(context); - pa_mainloop_free(m); - delete user_data; - return ""; - } - - std::thread loop_thread([&]() { pa_mainloop_run(m, nullptr); }); - - { - std::unique_lock lock(mtx); - cv.wait(lock, [&] { return done; }); - } - - pa_context_disconnect(context); - pa_context_unref(context); - pa_mainloop_quit(m, 0); - loop_thread.join(); - pa_mainloop_free(m); - delete user_data; - - return result; -} -} // namespace - -// ===================== SpeakerCapturerLinux 实现 ======================== - -SpeakerCapturerLinux::SpeakerCapturerLinux() = default; +constexpr size_t kFrameSizeBytes = 480 * sizeof(int16_t); +SpeakerCapturerLinux::SpeakerCapturerLinux() {} SpeakerCapturerLinux::~SpeakerCapturerLinux() { Stop(); Destroy(); @@ -144,59 +32,230 @@ int SpeakerCapturerLinux::Destroy() { return 0; } +std::string SpeakerCapturerLinux::GetDefaultMonitorSourceName() { + std::string monitor_name; + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + + pa_mainloop* mainloop = pa_mainloop_new(); + pa_mainloop_api* api = pa_mainloop_get_api(mainloop); + pa_context* context = pa_context_new(api, "GetMonitor"); + + pa_context_set_state_callback( + context, + [](pa_context* c, void* userdata) { + auto* state = + static_cast*>(userdata); + if (pa_context_get_state(c) == PA_CONTEXT_READY) { + pa_operation* o = pa_context_get_server_info( + c, + [](pa_context*, const pa_server_info* info, void* userdata) { + auto* state = + static_cast*>( + userdata); + if (info && info->default_sink_name) { + *(std::get<0>(*state)) = + std::string(info->default_sink_name) + ".monitor"; + } + { + std::lock_guard lock(*std::get<1>(*state)); + *std::get<3>(*state) = true; + } + std::get<2>(*state)->notify_one(); + }, + userdata); + if (o) pa_operation_unref(o); + } + }, + new std::tuple(&monitor_name, &mtx, &cv, &ready)); + + pa_context_connect(context, nullptr, PA_CONTEXT_NOFLAGS, nullptr); + + std::thread loop_thread([&]() { + int ret = 0; + pa_mainloop_run(mainloop, &ret); + }); + + { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(2), [&] { return ready; }); + } + + pa_context_disconnect(context); + pa_context_unref(context); + pa_mainloop_quit(mainloop, 0); + loop_thread.join(); + pa_mainloop_free(mainloop); + + return monitor_name; +} + int SpeakerCapturerLinux::Start() { - if (!inited_ || capture_thread_.joinable()) return -1; + if (!inited_ || mainloop_thread_.joinable()) return -1; + stop_flag_ = false; - capture_thread_ = std::thread([this]() { - std::string monitor_device = GetDefaultMonitorSourceName(); - if (monitor_device.empty()) { - LOG_ERROR("Failed to find monitor source"); + mainloop_thread_ = std::thread([this]() { + std::string monitor_name = GetDefaultMonitorSourceName(); + if (monitor_name.empty()) { + LOG_ERROR("Failed to get monitor source"); return; } - LOG_INFO("Using monitor device: {}", monitor_device.c_str()); + mainloop_ = pa_threaded_mainloop_new(); + pa_mainloop_api* api = pa_threaded_mainloop_get_api(mainloop_); + context_ = pa_context_new(api, "SpeakerCapturer"); - int error = 0; - pa_sample_spec ss; - ss.format = kFormat; - ss.rate = kSampleRate; - ss.channels = kChannels; + pa_context_set_state_callback( + context_, + [](pa_context* c, void* userdata) { + auto self = static_cast(userdata); + pa_context_state_t state = pa_context_get_state(c); + if (state == PA_CONTEXT_READY || state == PA_CONTEXT_FAILED || + state == PA_CONTEXT_TERMINATED) { + pa_threaded_mainloop_signal(self->mainloop_, 0); + } + }, + this); - pa_simple* stream = pa_simple_new(nullptr, "SpeakerCapture", - PA_STREAM_RECORD, monitor_device.c_str(), - "capture", &ss, nullptr, nullptr, &error); - - if (!stream) { - LOG_ERROR("pa_simple_new() failed: {}", pa_strerror(error)); + if (pa_threaded_mainloop_start(mainloop_) < 0) { + LOG_ERROR("Failed to start mainloop"); + Cleanup(); return; } - const size_t buffer_bytes = kFrameSize * sizeof(int16_t); - std::vector buffer(buffer_bytes); + pa_threaded_mainloop_lock(mainloop_); - while (inited_) { - if (!cb_) break; + if (pa_context_connect(context_, nullptr, PA_CONTEXT_NOFLAGS, nullptr) < + 0) { + LOG_ERROR("Failed to connect context"); + pa_threaded_mainloop_unlock(mainloop_); + Cleanup(); + return; + } - if (pa_simple_read(stream, buffer.data(), buffer.size(), &error) < 0) { - LOG_ERROR("pa_simple_read() failed: {}", pa_strerror(error)); - break; + while (true) { + pa_context_state_t state = pa_context_get_state(context_); + if (state == PA_CONTEXT_READY) break; + if (!PA_CONTEXT_IS_GOOD(state) || stop_flag_) { + pa_threaded_mainloop_unlock(mainloop_); + Cleanup(); + return; } - - cb_(buffer.data(), buffer.size(), "audio"); + pa_threaded_mainloop_wait(mainloop_); } - pa_simple_free(stream); + pa_sample_spec ss = {kFormat, kSampleRate, kChannels}; + stream_ = pa_stream_new(context_, "Capture", &ss, nullptr); + + pa_stream_set_read_callback( + stream_, + [](pa_stream* s, size_t len, void* u) { + auto self = static_cast(u); + if (self->paused_ || self->stop_flag_) return; + + const void* data = nullptr; + if (pa_stream_peek(s, &data, &len) < 0 || !data) return; + + const uint8_t* p = static_cast(data); + self->frame_cache_.insert(self->frame_cache_.end(), p, p + len); + + while (self->frame_cache_.size() >= kFrameSizeBytes) { + self->cb_(self->frame_cache_.data(), kFrameSizeBytes, "audio"); + self->frame_cache_.erase( + self->frame_cache_.begin(), + self->frame_cache_.begin() + kFrameSizeBytes); + } + + pa_stream_drop(s); + }, + this); + + pa_buffer_attr attr = {.maxlength = (uint32_t)-1, + .tlength = 0, + .prebuf = 0, + .minreq = 0, + .fragsize = (uint32_t)kFrameSizeBytes}; + + if (pa_stream_connect_record(stream_, monitor_name.c_str(), &attr, + PA_STREAM_ADJUST_LATENCY) < 0) { + LOG_ERROR("Failed to connect stream"); + pa_threaded_mainloop_unlock(mainloop_); + Cleanup(); + return; + } + + while (true) { + pa_stream_state_t s = pa_stream_get_state(stream_); + if (s == PA_STREAM_READY) break; + if (!PA_STREAM_IS_GOOD(s) || stop_flag_) { + pa_threaded_mainloop_unlock(mainloop_); + Cleanup(); + return; + } + pa_threaded_mainloop_wait(mainloop_); + } + + pa_threaded_mainloop_unlock(mainloop_); + + while (!stop_flag_) + std::this_thread::sleep_for(std::chrono::milliseconds(100)); }); return 0; } int SpeakerCapturerLinux::Stop() { - if (!inited_) return -1; - if (capture_thread_.joinable()) capture_thread_.join(); + stop_flag_ = true; + + if (mainloop_) { + pa_threaded_mainloop_lock(mainloop_); + pa_threaded_mainloop_signal(mainloop_, 0); + pa_threaded_mainloop_unlock(mainloop_); + } + + if (mainloop_thread_.joinable()) { + mainloop_thread_.join(); + } + + Cleanup(); return 0; } -int SpeakerCapturerLinux::Pause() { return 0; } +void SpeakerCapturerLinux::Cleanup() { + if (mainloop_) { + pa_threaded_mainloop_stop(mainloop_); + pa_threaded_mainloop_lock(mainloop_); -int SpeakerCapturerLinux::Resume() { return 0; } + if (stream_) { + pa_stream_disconnect(stream_); + pa_stream_unref(stream_); + stream_ = nullptr; + } + + if (context_) { + pa_context_disconnect(context_); + pa_context_unref(context_); + context_ = nullptr; + } + + pa_threaded_mainloop_unlock(mainloop_); + pa_threaded_mainloop_free(mainloop_); + mainloop_ = nullptr; + } + + frame_cache_.clear(); +} + +int SpeakerCapturerLinux::Pause() { + paused_ = true; + return 0; +} + +int SpeakerCapturerLinux::Resume() { + paused_ = false; + return 0; +} \ No newline at end of file diff --git a/src/speaker_capturer/linux/speaker_capturer_linux.h b/src/speaker_capturer/linux/speaker_capturer_linux.h index dfa532e..0345add 100644 --- a/src/speaker_capturer/linux/speaker_capturer_linux.h +++ b/src/speaker_capturer/linux/speaker_capturer_linux.h @@ -1,12 +1,18 @@ /* * @Author: DI JUNKUN - * @Date: 2024-08-02 - * Copyright (c) 2024 by DI JUNKUN, All Rights Reserved. + * @Date: 2025-07-15 + * Copyright (c) 2025 by DI JUNKUN, All Rights Reserved. */ #ifndef _SPEAKER_CAPTURER_LINUX_H_ #define _SPEAKER_CAPTURER_LINUX_H_ +#include + +#include +#include +#include +#include #include #include @@ -17,22 +23,32 @@ class SpeakerCapturerLinux : public SpeakerCapturer { SpeakerCapturerLinux(); ~SpeakerCapturerLinux(); - public: - virtual int Init(speaker_data_cb cb); - virtual int Destroy(); - virtual int Start(); - virtual int Stop(); + int Init(speaker_data_cb cb) override; + int Destroy() override; + int Start() override; + int Stop() override; int Pause(); int Resume(); private: - speaker_data_cb cb_ = nullptr; + std::string GetDefaultMonitorSourceName(); + void Cleanup(); private: - bool inited_ = false; - // thread - std::thread capture_thread_; + speaker_data_cb cb_ = nullptr; + + std::atomic inited_; + std::atomic paused_; + std::atomic stop_flag_; + + std::thread mainloop_thread_; + pa_threaded_mainloop* mainloop_ = nullptr; + pa_context* context_ = nullptr; + pa_stream* stream_ = nullptr; + + std::mutex state_mtx_; + std::vector frame_cache_; }; #endif \ No newline at end of file diff --git a/thirdparty/minirtc b/thirdparty/minirtc index d237ebd..c9f8859 160000 --- a/thirdparty/minirtc +++ b/thirdparty/minirtc @@ -1 +1 @@ -Subproject commit d237ebdba45cfe60c1fdac10005a1699f1f095a0 +Subproject commit c9f88596054133aba8f9d73ca536fa5ee8863a64