読者です 読者をやめる 読者になる 読者になる

Python で並行処理(マルチコア、マルチスレッド)を実装する

Python のマルチコア、マルチスレッドの実装を勉強する機会がありましたので、メモ書きします。

簡単に違いを説明すると、

  • マルチコア
    • 複数のプロセスを立ち上げることで複数のCPUコアを利用することができる
    • 得意なもの:CPUバウンドな並列処理も複数のCPUコアを利用することで効率化できる
    • 苦手なもの:並列処理間でプロセスが異なり、データの受け渡しが難しい
  • マルチスレッド
    • 一つのプロセスを立ち上げ、一つのCPUコアを時分割で利用することができる
    • 得意なもの:並列処理間も同じメモリ空間を利用することができる
    • 苦手なもの:CPUバウンドな並列処理はCPUがボトルネックになり、並列処理の効果が出ない

Python による並列処理

Python はマルチコア、マルチスレッドを実装するパッケージを標準で持っています。マルチコアにはmultiprocssingパッケージ、マルチスレッドにはconcurrentパッケージを利用します。マルチスレッドはthreadingパッケージもありますが、3.2系から追加されたconcurrentパッケージが簡単だったので、今回はconcurrentパッケージで説明します。

マルチコア

まずはマルチコアの実装方法です。

同時実行数制御

マルチコアの同時実行数制御はmultiprocessing.Poolクラスの引数でプロセス数を設定するだけです。

from multiprocessing import Pool
pool = Pool(4)

こちらは 4プロセスを起動します。

並列処理実行

Pool から別プロセスで処理を実行する場合、apply_asyncメソッドを利用します。

from multiprocessing import Pool
import time

def f():
    time.sleep(1)
    print(1)

pool = Pool(4)
pool.apply_async(f)
print(2)
time.sleep(2)

こちらは実行プロセスとは別のプロセスでf関数が実行されます。普通にf関数を呼び出すだけであれば、1 -> 2 の順に表示されますが、apply_asyncで別プロセスで処理を開始しているため、2 -> 1 の順に表示されます。

python ./multi_core_test.py
2
1

非同期処理に引数を渡す

apply_asyncメソッドの第二引数にタプル形式で渡すだけです。引数が一つでも、二つでもタプルに並べるだけです。

from multiprocessing import Pool
import time

def f(num):
    time.sleep(num)
    print(num)

pool = Pool(4)
pool.apply_async(f, (1, ))
print(2)
time.sleep(2)
python ./multi_core_test.py
2
1

非同期処理の終了を待機

先ほどのサンプルで最後に 2秒のスリープを入れています。これには理由があり、別プロセスは実行プロセスの子プロセスとして起動します。そのため、実行したプロセスは処理を終えると、別プロセスで処理が実行中であろうと、プロセスを終了します。ただ今回は 1秒のスリープと print による標準出力だけなので、スリープに必要な秒数が分かりますが、複雑な処理の場合、適切なスリープの設定は難しいでしょう。そこでmultiprocessingパッケージは並列処理と同期を取ることができます。

from multiprocessing import Pool
import time

def f():
    time.sleep(1)
    print(1)

pool = Pool(4)
result = pool.apply_async(f)
print(2)
result.wait()
print(3)

apply_asyncが返すApplyResultwaitメソッドにより、並列処理の実行の終了を待つことができます。

python ./multi_core_test.py
2
1
3

非同期処理の戻り値を受け取る

先ほどのサンプルでは戻り値を返していませんでしたが、今度はf関数で返した値を取得してみましょう。

from multiprocessing import Pool
import time

def f():
    time.sleep(1)
    print(1)
    return 4

pool = Pool(4)
result = pool.apply_async(f)
print(2)
print(result.get())
print(3)

apply_asyncが返すApplyResultgetメソッドにより、並列処理の実行の終了を待ち、戻り値を取得することができます。

python ./multi_core_test.py
2
1
4
3

マルチスレッド

まずはマルチスレッドの実装方法です。

同時実行数制御

マルチスレッドの同時実行数制御はconcurrent.futures.ThreadPoolExecutorクラスの引数でプロセス数を設定するだけです。

from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor(4)

こちらは 4プロセスを起動します。

並列処理実行

ThreadPoolExecutor から別スレッドで処理を実行する場合、submitメソッドを利用します。第一引数に並列実行したい関数を渡すだけです。

from concurrent.futures import ThreadPoolExecutor
import time

def f():
    time.sleep(1)
    print(1)

pool = ThreadPoolExecutor(4)
pool.submit(f)
print(2)
time.sleep(2)

こちらは別スレッドでf関数が実行されます。

python ./multi_thread_test.py
2
1

非同期処理に引数を渡す

submitメソッドの第二引数以降に引数となる値を渡すだけです。引数が一つの場合は第二引数に、引数が二つの場合は第二・第三引数に渡します。

from concurrent.futures import ThreadPoolExecutor
import time

def f(num):
    time.sleep(num)
    print(num)

pool = ThreadPoolExecutor(4)
pool.submit(f, 1)
print(2)
time.sleep(2)
python ./multi_thread_test.py
2
1

非同期処理の終了を待機

concurrent.futuresパッケージは並列処理と同期を取ることができます。as_completedを利用します。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def f(num):
    time.sleep(num)
    print(num)

pool = ThreadPoolExecutor(4)
result = pool.submit(f, 1)
print(2)
as_completed([result]).next()
print(3)
python ./multi_thread_test.py
2
1
3

非同期処理の戻り値を受け取る

先ほどのサンプルでは戻り値を返していませんでしたが、今度はf関数で返した値を取得してみましょう。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time

def f(num):
    time.sleep(num)
    print(num)
    return 4

pool = ThreadPoolExecutor(4)
result = pool.submit(f, 1)
print(2)
print(as_completed([result]).next().result())
print(3)

as_completedresultメソッドで戻り値を取得することができます。

python ./multi_thread_test.py
2
1
4
3

まとめ

簡単な紹介となりましたが、標準パッケージでも様々な機能を備えています。また公式ドキュメントに分かりやすく色々な機能を紹介されていますので是非そちらもご参考ください。

http://docs.python.jp/3/library/concurrent.futures.html#module-concurrent.futures http://docs.python.jp/3.5/library/multiprocessing.html