Python で並行処理(マルチコア、マルチスレッド)を実装する
Python のマルチコア、マルチスレッドの実装を勉強する機会がありましたので、メモ書きします。
簡単に違いを説明すると、
- マルチコア
- 複数のプロセスを立ち上げることで複数のCPUコアを利用することができる
- 得意なもの:CPUバウンドな並列処理も複数のCPUコアを利用することで効率化できる
- 苦手なもの:並列処理間でプロセスが異なり、データの受け渡しが難しい
- マルチスレッド
- 一つのプロセスを立ち上げ、一つのCPUコアを時分割で利用することができる
- 得意なもの:並列処理間も同じメモリ空間を利用することができる
- 苦手なもの:CPUバウンドな並列処理はCPUがボトルネックになり、並列処理の効果が出ない
Python による並列処理
Python はマルチコア、マルチスレッドを実装するパッケージを標準で持っています。マルチコアにはmultiprocssing
パッケージ、マルチスレッドにはconcurrent
パッケージを利用します。マルチスレッドはthreading
パッケージもありますが、3.2系から追加されたconcurrent
パッケージが簡単だったので、今回はconcurrent
パッケージで説明します。
マルチコア
まずはマルチコアの実装方法です。
同時実行数制御
マルチコアの同時実行数制御はmultiprocessing.Pool
クラスの引数でプロセス数を設定するだけです。
from multiprocessing import Pool pool = Pool(4)
こちらは 4プロセスを起動します。
並列処理実行
Pool から別プロセスで処理を実行する場合、apply_async
メソッドを利用します。
from multiprocessing import Pool import time def f(): time.sleep(1) print(1) pool = Pool(4) pool.apply_async(f) print(2) time.sleep(2)
こちらは実行プロセスとは別のプロセスでf
関数が実行されます。普通にf
関数を呼び出すだけであれば、1 -> 2 の順に表示されますが、apply_async
で別プロセスで処理を開始しているため、2 -> 1 の順に表示されます。
python ./multi_core_test.py 2 1
非同期処理に引数を渡す
apply_async
メソッドの第二引数にタプル形式で渡すだけです。引数が一つでも、二つでもタプルに並べるだけです。
from multiprocessing import Pool import time def f(num): time.sleep(num) print(num) pool = Pool(4) pool.apply_async(f, (1, )) print(2) time.sleep(2)
python ./multi_core_test.py 2 1
非同期処理の終了を待機
先ほどのサンプルで最後に 2秒のスリープを入れています。これには理由があり、別プロセスは実行プロセスの子プロセスとして起動します。そのため、実行したプロセスは処理を終えると、別プロセスで処理が実行中であろうと、プロセスを終了します。ただ今回は 1秒のスリープと print による標準出力だけなので、スリープに必要な秒数が分かりますが、複雑な処理の場合、適切なスリープの設定は難しいでしょう。そこでmultiprocessing
パッケージは並列処理と同期を取ることができます。
from multiprocessing import Pool import time def f(): time.sleep(1) print(1) pool = Pool(4) result = pool.apply_async(f) print(2) result.wait() print(3)
apply_async
が返すApplyResult
のwait
メソッドにより、並列処理の実行の終了を待つことができます。
python ./multi_core_test.py 2 1 3
非同期処理の戻り値を受け取る
先ほどのサンプルでは戻り値を返していませんでしたが、今度はf
関数で返した値を取得してみましょう。
from multiprocessing import Pool import time def f(): time.sleep(1) print(1) return 4 pool = Pool(4) result = pool.apply_async(f) print(2) print(result.get()) print(3)
apply_async
が返すApplyResult
のget
メソッドにより、並列処理の実行の終了を待ち、戻り値を取得することができます。
python ./multi_core_test.py 2 1 4 3
マルチスレッド
まずはマルチスレッドの実装方法です。
同時実行数制御
マルチスレッドの同時実行数制御はconcurrent.futures.ThreadPoolExecutor
クラスの引数でプロセス数を設定するだけです。
from concurrent.futures import ThreadPoolExecutor pool = ThreadPoolExecutor(4)
こちらは 4プロセスを起動します。
並列処理実行
ThreadPoolExecutor から別スレッドで処理を実行する場合、submit
メソッドを利用します。第一引数に並列実行したい関数を渡すだけです。
from concurrent.futures import ThreadPoolExecutor import time def f(): time.sleep(1) print(1) pool = ThreadPoolExecutor(4) pool.submit(f) print(2) time.sleep(2)
こちらは別スレッドでf
関数が実行されます。
python ./multi_thread_test.py 2 1
非同期処理に引数を渡す
submit
メソッドの第二引数以降に引数となる値を渡すだけです。引数が一つの場合は第二引数に、引数が二つの場合は第二・第三引数に渡します。
from concurrent.futures import ThreadPoolExecutor import time def f(num): time.sleep(num) print(num) pool = ThreadPoolExecutor(4) pool.submit(f, 1) print(2) time.sleep(2)
python ./multi_thread_test.py 2 1
非同期処理の終了を待機
concurrent.futures
パッケージは並列処理と同期を取ることができます。as_completed
を利用します。
from concurrent.futures import ThreadPoolExecutor, as_completed import time def f(num): time.sleep(num) print(num) pool = ThreadPoolExecutor(4) result = pool.submit(f, 1) print(2) as_completed([result]).next() print(3)
python ./multi_thread_test.py 2 1 3
非同期処理の戻り値を受け取る
先ほどのサンプルでは戻り値を返していませんでしたが、今度はf
関数で返した値を取得してみましょう。
from concurrent.futures import ThreadPoolExecutor, as_completed import time def f(num): time.sleep(num) print(num) return 4 pool = ThreadPoolExecutor(4) result = pool.submit(f, 1) print(2) print(as_completed([result]).next().result()) print(3)
as_completed
のresult
メソッドで戻り値を取得することができます。
python ./multi_thread_test.py 2 1 4 3
まとめ
簡単な紹介となりましたが、標準パッケージでも様々な機能を備えています。また公式ドキュメントに分かりやすく色々な機能を紹介されていますので是非そちらもご参考ください。
http://docs.python.jp/3/library/concurrent.futures.html#module-concurrent.futures http://docs.python.jp/3.5/library/multiprocessing.html