C++ で, スレッドプールを実装する
本稿では, スレッドプールの仕組みを理解して, C++を用いて, スレッドプールを自身で実装できることを目指します. 対応環境は, C++14 からを想定しています.
はじめに
スレッドプールは, ソフトウェアのデザインパターンの一つで, コンピュータプログラムにおいて非同期処理を実現するためのものです[1]. スレッドプールは, 同時で実行されるタスクの割り当てを待つ複数のスレッドを持ちます[1]. スレッドプールを維持することにより, すぐに終わるタスクのためにスレッドを頻繁に生成と破棄することによるオーバーヘッドを回避することができます[1].
C++ では, C++11 ~ C++20 において, 標準でスレッドプールが用意されておらず, スレッドプールを使いたい場合は, 外部のライブラリを使うか自前で実装する必要があります.
スレッドプールを実現するため, 外部ライブラリをそのまま使うこともできますが, 自身で作れた方が, スレッドプールの仕組みを知ることができ, カスタマイズできることや, C++のように標準でサポートされていない多くの言語へ対応できるため, より良い方法といえます.
本稿では, スレッドプールの仕組みを理解して, C++を用いて, スレッドプールを自身で実装できることを目指します. 対応環境は, C++14 からを想定しています.
スレッドプールの仕組み
スレッドプールには, タスクの割り当てを待つ複数のスレッドがあります. 各スレッドは, 待ち行列 (Task Queue) に積まれているタスクを取り出し, 他スレッドと並列に処理を行います. 待ち状態のタスクがない場合は, 新しいタスクが割り当てられるまで待機します.
各機能ごとの実装
スレッドプールは大きく分けて以下の4つの機能を持ちます.
- スレッドプールの作成
- タスクの取り出しと実行
- Task Queue へのタスク追加
- スレッドプールの終了
各機能ごとに, 実装していきます. 実装において, 以下の実装を参考にしました.
- "bshoshany/thread-pool". github. accessed at 2022-01-15
- "progschj/ThreadPool". github. accessed at 2022-01-15
クラス名はThreadPoolExecutor
とします[注 1].
スレッドプールの作成
初期化処理で, スレッドプールを構成する各スレッドを作成します.
まずメンバ変数で, スレッドを保持する配列を用意しましょう.
/** * @brief A smart pointer to manage the memory allocated for the threads. */ std::unique_ptr<std::thread[]> threads;
次に, コンストラクタで指定された数でスレッドを作成します. 指定されていない場合は, 処理系によりサポートされるスレッド並行数(hardware_concurrency
)で作成します.
thread::hardware_concurrency
[2]static unsigned int hardware_concurrency() noexcept;
- 概要
処理系によりサポートされるスレッド並行数を取得する.
- 戻り値
サポートされるスレッド並行数. その処理系において値を取得できない場合は0を返す.
ThreadPoolExecutor(const ui32& thread_count = std::thread::hardware_concurrency()) : thread_count_{ thread_count ? thread_count : std::thread::hardware_concurrency() } { threads.reset(new std::thread[thread_count_]); for (ui32 i = 0; i < thread_count_; ++i) { threads[i] = std::thread(&ThreadPoolExecutor::worker, this); } }
&ThreadPoolExecutor::worker
は, 後に解説する, 実際にスレッドで実行される処理(タスクの取り出し, タスクの実行など)を記述します.
タスクの取り出しと実行
各スレッドでは, Task Queue からのタスク取り出しと実行を行います. タスクがない場合は, 追加されるまで待機します.
/** * @brief A worker function to be assigned to each thread in the pool. * * Continuously pops tasks out of the queue and executes them, as long as the atomic variable running is set to true. */ void worker() { for (;;) { std::function<void()> task; { // Task Queue が空でない, もしくは, Thread Pool が終了するまで待機 std::unique_lock<std::mutex> lock(tasks_mutex); condition.wait(lock, [&] {return !tasks.empty() || !running; }); // Thread Pool が終了中で, Task Queue が空の時は, ループを抜ける. -> スレッドが終了する if (!running && tasks.empty()) { return; } // Task Queue から, Task を取り出す task = std::move(tasks.front()); tasks.pop(); } // Task を実行する task(); } }
condition
は, std::condition_variable
です. condition.wait()
で, Task Queue への追加があった場合などに外部からnotify()
されるまで待機します. 起床時, Task Queue が空でない, もしくは, Thread Pool が終了状態でない場合, 再び待機します.
std::condition_variable::wait
[3]void wait(unique_lock<mutex>& lock); // (1) template <class Predicate> void wait(unique_lock<mutex>& lock, Predicate pred); // (2)
- 概要
処理をするための準備ができたことを
notify_one()
/notify_all()
によって通知されるまでスレッドを待機する.第二引数
pred
を指定する場合,pred()
呼び出しがtrueになるまで待機を続行する.- 効果
- (1)
- アトミックに
lock.unlock()
し、*this
に対してブロッキングする notify_one()
/notify_all()
もしくはそれ以外の理由で通知があるまでブロッキングされる- この関数を抜ける際に
lock.lock()
する - この関数が例外送出によって終了する場合、関数を抜ける前に
lock.lock()
する
- アトミックに
- (2)
以下と等価の処理を行う
while (!pred()) { wait(lock); }
pred
の指定についてpred
の指定は, 冗長に見えますが,wait()
で待機しているはずが,notify()
を受ける前に起床してしまう現象, Spurious Wakeupへの対応に必要です[4].
Task Queue へのタスク追加
Task Queue へタスクを追加します.
まず, メンバ変数で Task Queue を用意します.
/** * @brief A queue of tasks to be executed by the threads. */ std::queue<std::function<void()>> tasks{};
タスク追加では, 待機中のスレッドを起床させるため, condition.notify_one()
を呼びます.
template <typename F> void push_task(const F& task) { { const std::lock_guard<std::mutex> lock(tasks_mutex); if (!running) { throw std::runtime_error("Cannot schedule new task after shutdown."); } // タスクの追加 tasks.push(std::function<void()>(task)); } // 待機スレッドを起床 condition.notify_one(); }
ただ, このままだと返り値がvoid
型のタスクしか受け付けないため, 任意の返り値に対応します. タスク実行時に呼ばれる関数または関数オブジェクトを引数にとり, 関数の返り値に応じたfuture
オブジェクトを返すsubmit()
を実装します.
#if ((defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) || __cplusplus >= 201703L) /** * @brief Submit a function with zero or more arguments and a return value into the task queue, * and get a future for its eventual returned value. */ // C++17 から推奨されている, 関数の返り値の型R を取得する方法 template <typename F, typename... Args, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> #else // C++14 で, 関数の返り値の型R を取得する方法 template <typename F, typename... Args, typename R = typename std::result_of<std::decay_t<F>(std::decay_t<Args>...)>::type> #endif std::future<R> submit(F&& func, const Args&&... args) { auto task = std::make_shared<std::packaged_task<R()>>([func, args...]() { return func(args...); }); auto future = task->get_future(); push_task([task]() { (*task)(); }); return future; }
タスク関数の返り値の型を取得するため, C++17 までの環境では, result_of
を使用し, C++17 からの環境では, invoke_result_t
を使用しています. result_of
は, C++17 で非推奨になり, C++20 で削除されました[5].
packaged_task
[6]namespace std { template <class> class packaged_task; // 宣言のみで定義なし template <class R, class... ArgTypes> class packaged_task<R(ArgTypes...)>; }
- 概要
packaged_task
は, 「別スレッドでの処理完了を待ち, その処理結果を取得する」といった非同期処理を実現するためのクラスであり,future
クラスと組み合わせて使用します.packaged_task
に登録した非同期実行する関数の戻り値をfuture
が読み取ります.
スレッドプールの終了
スレッドプールのデストラクト時は, 新規でタスクを追加できなくし, 待機中のタスクをすべて処理し, すべてのスレッドが終了するまで待機します.
/** * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all thread. */ ~ThreadPoolExecutor() { { // Lock task queue to prevent adding new task. std::lock_guard<std::mutex> lock(tasks_mutex); running = false; } // Wake up all threads so that they may exist condition.notify_all(); for (ui32 i = 0; i < thread_count_; ++i) { threads[i].join(); } }
コード
コード全文
#ifndef CONCURRENT__THREAD_POOL_EXECUTOR_HPP_ #define CONCURRENT__THREAD_POOL_EXECUTOR_HPP_ #include <thread> #include <cstdint> #include <atomic> #include <mutex> #include <queue> #include <functional> #include <memory> #include <future> #include <type_traits> namespace concurrent { // This code based on // * <https://github.com/bshoshany/thread-pool> // * <https://github.com/progschj/ThreadPool> // * <https://github.com/SandSnip3r/thread-pool> // Thank you! :) class ThreadPoolExecutor { using ui32 = std::uint_fast32_t; using ui64 = std::uint_fast64_t; public: ThreadPoolExecutor(const ui32& thread_count = std::thread::hardware_concurrency()) : thread_count_{ thread_count ? thread_count : std::thread::hardware_concurrency() } { threads.reset(new std::thread[thread_count_]); for (ui32 i = 0; i < thread_count_; ++i) { threads[i] = std::thread(&ThreadPoolExecutor::worker, this); } } /** * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all thread. * */ ~ThreadPoolExecutor() { { // Lock task queue to prevent adding new task. std::lock_guard<std::mutex> lock(tasks_mutex); running = false; } // Wake up all threads so that they may exist condition.notify_all(); for (ui32 i = 0; i < thread_count_; ++i) { threads[i].join(); } } ui32 thread_count() const { return thread_count_; } #if ((defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) || __cplusplus >= 201703L) /** * @brief Submit a function with zero or more arguments and a return value into the task queue, * and get a future for its eventual returned value. */ template <typename F, typename... Args, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> #else template <typename F, typename... Args, typename R = typename std::result_of<std::decay_t<F>(std::decay_t<Args>...)>::type> #endif std::future<R> submit(F&& func, const Args&&... args) { auto task = std::make_shared<std::packaged_task<R()>>([func, args...]() { return func(args...); }); auto future = task->get_future(); push_task([task]() { (*task)(); }); return future; } private: template <typename F> void push_task(const F& task) { { const std::lock_guard<std::mutex> lock(tasks_mutex); if (!running) { throw std::runtime_error("Cannot schedule new task after shutdown."); } tasks.push(std::function<void()>(task)); } condition.notify_one(); } /** * @brief A worker function to be assigned to each thread in the pool. * * Continuously pops tasks out of the queue and executes them, as long as the atomic variable running is set to true. */ void worker() { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(tasks_mutex); condition.wait(lock, [&] {return !tasks.empty() || !running; }); if (!running && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); } } private: /** * @brief A mutex to synchronize access to the task queue by different threads. */ mutable std::mutex tasks_mutex{}; /** * @brief An atomic variable indicating to the workers to keep running. * * When set to false, the workers permanently stop working. */ std::atomic<bool> running{ true }; /** * @brief A queue of tasks to be executed by the threads. */ std::queue<std::function<void()>> tasks{}; /** * @brief The number of threads in the pool. */ const ui32 thread_count_; /** * @brief A smart pointer to manage the memory allocated for the threads. */ std::unique_ptr<std::thread[]> threads; /** * @brief A condition variable used to notify worker threads of state changes. */ std::condition_variable condition; }; } #endif
本コードは, nodecプロジェクト([Welcome!/nodec framework(プラットフォーム開発のためのフレームワーク)])で利用しており, メンテナンスは, このプロジェクト下で行われています.
最新コードは以下を参照してください.
ライセンスは, nodecプロジェクトと同じです.
使用例
class HelloWorld { public: std::string say_hello(int number) { std::cout << "[say_hello (" << std::this_thread::get_id() << ")] >>> Start" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // 何か重い処理 std::ostringstream oss; oss << "[say_hello (" << std::this_thread::get_id() << ")] >>> Hello! number: " << number; std::cout << "[say_hello (" << std::this_thread::get_id() << ")] >>> End" << std::endl; return oss.str(); } }; std::string say_ok(int number) { std::cout << "[say_ok (" << std::this_thread::get_id() << ")] >>> Start" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(3000)); // 何か重い処理 std::ostringstream oss; oss << "[say_ok (" << std::this_thread::get_id() << ")] >>> OK! number: " << number; std::cout << "[say_ok (" << std::this_thread::get_id() << ")] >>> End" << std::endl; return oss.str(); } int main() { // スレッドプールの作成 concurrent::ThreadPoolExecutor executor; // スレッド数の確認 std::cout << "thread_count: " << executor.thread_count() << std::endl; // グローバル関数の非同期実行 auto ok_future = executor.submit(say_ok, 100); // メンバ関数の非同期実行 HelloWorld hello; auto hello_future = executor.submit([&](auto number){ return hello.say_hello(number); }, 999); // 結果の取得 std::cout << ok_future.get() << std::endl; std::cout << hello_future.get() << std::endl; return 0; }
- 出力
thread_count: 8 [say_ok (20792)] >>> Start [say_hello (18596)] >>> Start [say_hello (18596)] >>> End [say_ok (20792)] >>> End [say_ok (20792)] >>> OK! number: 100 [say_hello (18596)] >>> Hello! number: 999
注釈
- ^ この名前は, python で同じような機能を提供するクラスThreadPoolExecutor からとりました. APIも似せています (
submit()
しかありませんが).
参考文献
- ^ a b c "Thread pool". wikipedia. accessed at 2022-01-15
- ^ "thread::hardware_concurrency". cpprefjp. accessed at 2022-01-16
- ^ "condition_variable::wait". cpprefjp. accessed at 2022-01-16
- ^ "条件変数の利用方法". cpprefjp. accessed at 2022-01-16
- ^ "result_of". cpprefjp. accessed at 2022-01-16
- ^ "packaged_task". cpprefjp. accessed at 2022-01-16