Outline

Revision as of 2022-05-01 17:37

 
<Header> <Parent> ../Cpp <Title> C++ で, スレッドプールを実装する <CreatedAt> 2022-01-15 <Tags> Cpp, 非同期処理, デザインパターン, スレッドプール <Summary> 本稿では, スレッドプールの仕組みを理解して, C++を用いて, スレッドプールを自身で実装できることを目指します. 対応環境は, C++14 からを想定しています. </Summary> </Header> # はじめに スレッドプールは, ソフトウェアのデザインパターンの一つで, コンピュータプログラムにおいて非同期処理を実現するためのものです^[wiki-threadpool]. スレッドプールは, 同時で実行されるタスクの割り当てを待つ複数のスレッドを持ちます^[wiki-threadpool]. スレッドプールを維持することにより, すぐに終わるタスクのためにスレッドを頻繁に生成と破棄することによる オーバーヘッドを回避することができます^[wiki-threadpool]. C++ では, C++11 ~ C++20 において, 標準でスレッドプールが用意されておらず, スレッドプールを使いたい場合は, 外部のライブラリを使うか自前で実装する必要があります. スレッドプールを実現するため, 外部ライブラリをそのまま使うこともできますが, 自身で作れた方が, スレッドプールの仕組みを知ることができ, カスタマイズできることや, C++のように標準でサポートされていない多くの言語へ対応できるため, より良い方法といえます. 本稿では, スレッドプールの仕組みを理解して, C++を用いて, スレッドプールを自身で実装できることを目指します. 対応環境は, C++14 からを想定しています. # スレッドプールの仕組み スレッドプールには, タスクの割り当てを待つ複数のスレッドがあります. 各スレッドは, 待ち行列 (Task Queue) に積まれているタスクを取り出し, 他スレッドと並列に処理を行います. 待ち状態のタスクがない場合は, 新しいタスクが割り当てられるまで待機します. ![スレッドプール. \ 新しいタスク(青い丸)は, Thread Queue の最後尾に追加される. \ 各スレッド(緑色の箱)は, Task Queue から一つタスクを取り出し処理する. \ ](CURRENT_DIR/images/fig-overview.jpg) # 各機能ごとの実装 スレッドプールは大きく分けて以下の4つの機能を持ちます. * スレッドプールの作成 * タスクの取り出しと実行 * Task Queue へのタスク追加 * スレッドプールの終了 各機能ごとに, 実装していきます. 実装において, 以下の実装を参考にしました. * ["bshoshany/thread-pool"](https://github.com/bshoshany/thread-pool). \ github. accessed at 2022-01-15 * ["progschj/ThreadPool"](https://github.com/progschj/ThreadPool). \ github. accessed at 2022-01-15 クラス名は`ThreadPoolExecutor`とします^[注.name-thread-pool-executor]. # スレッドプールの作成 初期化処理で, スレッドプールを構成する各スレッドを作成します. まずメンバ変数で, スレッドを保持する配列を用意しましょう. ```cpp /** * @brief A smart pointer to manage the memory allocated for the threads. */ std::unique_ptr<std::thread[]> threads; ``` 次に, コンストラクタで指定された数でスレッドを作成します. 指定されていない場合は, 処理系によりサポートされるスレッド並行数(`hardware_concurrency`)で作成します. [`thread::hardware_concurrency`^[cpprefjp-thread-hardware_concurrency]] ===== ```cpp static unsigned int hardware_concurrency() noexcept; ``` 概要: 処理系によりサポートされるスレッド並行数を取得する. 戻り値: サポートされるスレッド並行数. その処理系において値を取得できない場合は0を返す. ===== ```cpp ThreadPoolExecutor(const ui32& thread_count = std::thread::hardware_concurrency()) : thread_count_{ thread_count ? thread_count : std::thread::hardware_concurrency() } { threads.reset(new std::thread[thread_count_]); for (ui32 i = 0; i < thread_count_; ++i) { threads[i] = std::thread(&ThreadPoolExecutor::worker, this); } } ``` `&ThreadPoolExecutor::worker`は, 後に解説する, 実際にスレッドで実行される処理(タスクの取り出し, タスクの実行など)を記述します. # タスクの取り出しと実行 各スレッドでは, Task Queue からのタスク取り出しと実行を行います. タスクがない場合は, 追加されるまで待機します. ![タスクの取り出しと実行. スレッドは, Task Queue からタスクを取り出し実行する.](CURRENT_DIR/images/fig-worker-pop-execute.jpg) ```cpp /** * @brief A worker function to be assigned to each thread in the pool. * * Continuously pops tasks out of the queue and executes them, as long as the atomic variable running is set to true. */ void worker() { for (;;) { std::function<void()> task; { // Task Queue が空でない, もしくは, Thread Pool が終了するまで待機 std::unique_lock<std::mutex> lock(tasks_mutex); condition.wait(lock, [&] {return !tasks.empty() || !running; }); // Thread Pool が終了中で, Task Queue が空の時は, ループを抜ける. -> スレッドが終了する if (!running && tasks.empty()) { return; } // Task Queue から, Task を取り出す task = std::move(tasks.front()); tasks.pop(); } // Task を実行する task(); } } ``` `condition` は, `std::condition_variable`です. `condition.wait()`で, Task Queue への追加があった場合などに外部から`notify()`されるまで待機します. 起床時, Task Queue が空でない, もしくは, Thread Pool が終了状態でない場合, 再び待機します. ![スレッドの起床. タスクがない場合は待機する. タスクが追加された場合に外部から`notify`され, 起床する.](CURRENT_DIR/images/fig-worker-wakeup.jpg) [`std::condition_variable::wait`^[cpprefjp-condition_variable-wait]] === ```cpp void wait(unique_lock<mutex>& lock); // (1) template <class Predicate> void wait(unique_lock<mutex>& lock, Predicate pred); // (2) ``` 概要: 処理をするための準備ができたことを`notify_one()`/`notify_all()`によって通知されるまでスレッドを待機する. 第二引数`pred`を指定する場合, `pred()`呼び出しがtrueになるまで待機を続行する. 効果: (1): 1. アトミックに`lock.unlock()`し、`*this`に対してブロッキングする 2. `notify_one()`/`notify_all()`もしくはそれ以外の理由で通知があるまでブロッキングされる 3. この関数を抜ける際に`lock.lock()`する 4. この関数が例外送出によって終了する場合、関数を抜ける前に`lock.lock()`する (2): 以下と等価の処理を行う ```cpp while (!pred()) { wait(lock); } ``` `pred`の指定について: `pred`の指定は, 冗長に見えますが, `wait()`で待機しているはずが, `notify()`を受ける前に起床してしまう現象, //Spurious Wakeup//への対応に必要です^[cpprefjp-how_to_use_cv]. === # Task Queue へのタスク追加 Task Queue へタスクを追加します. まず, メンバ変数で Task Queue を用意します. ```cpp /** * @brief A queue of tasks to be executed by the threads. */ std::queue<std::function<void()>> tasks{}; ``` タスク追加では, 待機中のスレッドを起床させるため, `condition.notify_one()`を呼びます. ```cpp template <typename F> void push_task(const F& task) { { const std::lock_guard<std::mutex> lock(tasks_mutex); if (!running) { throw std::runtime_error("Cannot schedule new task after shutdown."); } // タスクの追加 tasks.push(std::function<void()>(task)); } // 待機スレッドを起床 condition.notify_one(); } ``` ただ, このままだと返り値が`void`型のタスクしか受け付けないため, 任意の返り値に対応します. タスク実行時に呼ばれる関数または関数オブジェクトを引数にとり, 関数の返り値に応じた`future`オブジェクトを返す`submit()`を実装します. ![タスクの追加.\ 呼び出し元は, `func`を`submit()`し, 処理結果を得るための`future`を取得する.\ Thread がタスクを実行し, 結果の取得が準備完了になると,\ 呼び出し元は, `future`を通じて結果を取得できるようになる.\ ](CURRENT_DIR/images/fig-submit.jpg) ```cpp #if ((defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) || __cplusplus >= 201703L) /** * @brief Submit a function with zero or more arguments and a return value into the task queue, * and get a future for its eventual returned value. */ // C++17 から推奨されている, 関数の返り値の型R を取得する方法 template <typename F, typename... Args, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> #else // C++14 で, 関数の返り値の型R を取得する方法 template <typename F, typename... Args, typename R = typename std::result_of<std::decay_t<F>(std::decay_t<Args>...)>::type> #endif std::future<R> submit(F&& func, const Args&&... args) { auto task = std::make_shared<std::packaged_task<R()>>([func, args...]() { return func(args...); }); auto future = task->get_future(); push_task([task]() { (*task)(); }); return future; } ``` タスク関数の返り値の型を取得するため, C++17 までの環境では, `result_of`を使用し, C++17 からの環境では, `invoke_result_t`を使用しています. `result_of`は, C++17 で非推奨になり, C++20 で削除されました^[cpprefjp-result_of]. [`packaged_task`^[cpprefjp-packaged_task]] === ```cpp namespace std { template <class> class packaged_task; // 宣言のみで定義なし template <class R, class... ArgTypes> class packaged_task<R(ArgTypes...)>; } ``` 概要: `packaged_task`は, 「別スレッドでの処理完了を待ち, その処理結果を取得する」といった非同期処理を実現するためのクラスであり, `future`クラスと組み合わせて使用します. `packaged_task`に登録した非同期実行する関数の戻り値を`future`が読み取ります. === # スレッドプールの終了 スレッドプールのデストラクト時は, 新規でタスクを追加できなくし, 待機中のタスクをすべて処理し, すべてのスレッドが終了するまで待機します. ```cpp /** * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all thread. */ ~ThreadPoolExecutor() { { // Lock task queue to prevent adding new task. std::lock_guard<std::mutex> lock(tasks_mutex); running = false; } // Wake up all threads so that they may exist condition.notify_all(); for (ui32 i = 0; i < thread_count_; ++i) { threads[i].join(); } } ``` # コード コード全文 ```cpp #ifndef CONCURRENT__THREAD_POOL_EXECUTOR_HPP_ #define CONCURRENT__THREAD_POOL_EXECUTOR_HPP_ #include <thread> #include <cstdint> #include <atomic> #include <mutex> #include <queue> #include <functional> #include <memory> #include <future> #include <type_traits> namespace concurrent { // This code based on // * <https://github.com/bshoshany/thread-pool> // * <https://github.com/progschj/ThreadPool> // * <https://github.com/SandSnip3r/thread-pool> // Thank you! :) class ThreadPoolExecutor { using ui32 = std::uint_fast32_t; using ui64 = std::uint_fast64_t; public: ThreadPoolExecutor(const ui32& thread_count = std::thread::hardware_concurrency()) : thread_count_{ thread_count ? thread_count : std::thread::hardware_concurrency() } { threads.reset(new std::thread[thread_count_]); for (ui32 i = 0; i < thread_count_; ++i) { threads[i] = std::thread(&ThreadPoolExecutor::worker, this); } } /** * @brief Destruct the thread pool. Waits for all tasks to complete, then destroys all thread. * */ ~ThreadPoolExecutor() { { // Lock task queue to prevent adding new task. std::lock_guard<std::mutex> lock(tasks_mutex); running = false; } // Wake up all threads so that they may exist condition.notify_all(); for (ui32 i = 0; i < thread_count_; ++i) { threads[i].join(); } } ui32 thread_count() const { return thread_count_; } #if ((defined(_MSVC_LANG) && _MSVC_LANG >= 201703L) || __cplusplus >= 201703L) /** * @brief Submit a function with zero or more arguments and a return value into the task queue, * and get a future for its eventual returned value. */ template <typename F, typename... Args, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>> #else template <typename F, typename... Args, typename R = typename std::result_of<std::decay_t<F>(std::decay_t<Args>...)>::type> #endif std::future<R> submit(F&& func, const Args&&... args) { auto task = std::make_shared<std::packaged_task<R()>>([func, args...]() { return func(args...); }); auto future = task->get_future(); push_task([task]() { (*task)(); }); return future; } private: template <typename F> void push_task(const F& task) { { const std::lock_guard<std::mutex> lock(tasks_mutex); if (!running) { throw std::runtime_error("Cannot schedule new task after shutdown."); } tasks.push(std::function<void()>(task)); } condition.notify_one(); } /** * @brief A worker function to be assigned to each thread in the pool. * * Continuously pops tasks out of the queue and executes them, as long as the atomic variable running is set to true. */ void worker() { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(tasks_mutex); condition.wait(lock, [&] {return !tasks.empty() || !running; }); if (!running && tasks.empty()) { return; } task = std::move(tasks.front()); tasks.pop(); } task(); } } private: /** * @brief A mutex to synchronize access to the task queue by different threads. */ mutable std::mutex tasks_mutex{}; /** * @brief An atomic variable indicating to the workers to keep running. * * When set to false, the workers permanently stop working. */ std::atomic<bool> running{ true }; /** * @brief A queue of tasks to be executed by the threads. */ std::queue<std::function<void()>> tasks{}; /** * @brief The number of threads in the pool. */ const ui32 thread_count_; /** * @brief A smart pointer to manage the memory allocated for the threads. */ std::unique_ptr<std::thread[]> threads; /** * @brief A condition variable used to notify worker threads of state changes. */ std::condition_variable condition; }; } #endif ``` [::Note] ======= 本コードは, nodecプロジェクト([/nodec/nodec])で利用しており, メンテナンスは, このプロジェクト下で行われています. 最新コードは以下を参照してください. * [`ContentsViewer/nodec/concurrent/thread_pool_executor.hpp`](https://github.com/ContentsViewer/nodec/blob/main/nodec/include/nodec/concurrent/thread_pool_executor.hpp) ライセンスは, [nodecプロジェクト](https://github.com/ContentsViewer/nodec)と同じです. ======= # 使用例 ```cpp class HelloWorld { public: std::string say_hello(int number) { std::cout << "[say_hello (" << std::this_thread::get_id() << ")] >>> Start" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(2000)); // 何か重い処理 std::ostringstream oss; oss << "[say_hello (" << std::this_thread::get_id() << ")] >>> Hello! number: " << number; std::cout << "[say_hello (" << std::this_thread::get_id() << ")] >>> End" << std::endl; return oss.str(); } }; std::string say_ok(int number) { std::cout << "[say_ok (" << std::this_thread::get_id() << ")] >>> Start" << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(3000)); // 何か重い処理 std::ostringstream oss; oss << "[say_ok (" << std::this_thread::get_id() << ")] >>> OK! number: " << number; std::cout << "[say_ok (" << std::this_thread::get_id() << ")] >>> End" << std::endl; return oss.str(); } int main() { // スレッドプールの作成 concurrent::ThreadPoolExecutor executor; // スレッド数の確認 std::cout << "thread_count: " << executor.thread_count() << std::endl; // グローバル関数の非同期実行 auto ok_future = executor.submit(say_ok, 100); // メンバ関数の非同期実行 HelloWorld hello; auto hello_future = executor.submit([&](auto number){ return hello.say_hello(number); }, 999); // 結果の取得 std::cout << ok_future.get() << std::endl; std::cout << hello_future.get() << std::endl; return 0; } ``` 出力: ``` thread_count: 8 [say_ok (20792)] >>> Start [say_hello (18596)] >>> Start [say_hello (18596)] >>> End [say_ok (20792)] >>> End [say_ok (20792)] >>> OK! number: 100 [say_hello (18596)] >>> Hello! number: 999 ``` # 注釈 [注.name-thread-pool-executor]: この名前は, python で同じような機能を提供するクラス[ThreadPoolExecutor](https://docs.python.org/ja/3/library/concurrent.futures.html#threadpoolexecutor) \ からとりました. APIも似せています (`submit()`しかありませんが). # 参考文献 [wiki-threadpool]: ["Thread pool"](https://en.wikipedia.org/wiki/Thread_pool). \ wikipedia. accessed at 2022-01-15 [cpprefjp-thread-hardware_concurrency]: ["thread::hardware_concurrency"](https://cpprefjp.github.io/reference/thread/thread/hardware_concurrency.html). \ cpprefjp. accessed at 2022-01-16 [cpprefjp-condition_variable-wait]: ["condition_variable::wait"](https://cpprefjp.github.io/reference/condition_variable/condition_variable/wait.html). \ cpprefjp. accessed at 2022-01-16 [cpprefjp-how_to_use_cv]: ["条件変数の利用方法"](https://cpprefjp.github.io/article/lib/how_to_use_cv.html). \ cpprefjp. accessed at 2022-01-16 [cpprefjp-result_of]: ["result_of"](https://cpprefjp.github.io/reference/type_traits/result_of.html). \ cpprefjp. accessed at 2022-01-16 [cpprefjp-packaged_task]: ["packaged_task"](https://cpprefjp.github.io/reference/future/packaged_task.html). \ cpprefjp. accessed at 2022-01-16
Retrieved from "https://contentsviewer.work/Master/Cpp/how-to-implement-a-thread-pool/article?cmd=history&rev=1651394267"