Python 3 の asyncio
モジュールは、Python で非同期 I/O を実装するための基本的なツールを提供します。
Python 3.4で導入され、その後のマイナーリリースごとに、このモジュールは大きく発展してきました。
このチュートリアルでは、非同期パラダイムの一般的な概要と、Python 3.7でどのように実装されているかを説明します。
ブロッキングI/OとノンブロッキングI/O
非同期が解決しようとする問題は、I/Oのブロッキングです。
デフォルトでは、プログラムがI/Oソースからデータにアクセスするとき、その操作が完了するのを待ってからプログラムの実行を続行します。
with open('myfile.txt', 'r') as file:
data = file.read()
# Until the data is read into memory, the program waits here
print(data)
物理デバイスにアクセスし、データを転送している間、プログラムの実行フローはブロックされます。
ネットワーク操作もブロッキングの一般的な原因である。
# pip install --user requests
import requests
req = requests.get('https://www.stackabuse.com/')
## Blocking occurs here, waiting for completion of an HTTPS request
#
print(req.text)
多くの場合、ブロッキングによる遅延はごくわずかである。
しかし、ブロッキングI/Oは非常にスケールが悪い。
1010個のファイル読み取りやネットワークトランザクションを待つ必要がある場合、性能は低下します。
マルチプロセッシング、スレッディング、非同期処理
ブロッキングI/Oの遅延を最小化する戦略は、マルチプロセシング、スレッディング、非同期の3つに大別される。
マルチプロセッシング
マルチプロセシングは並列コンピューティングの一種で、複数の物理プロセッサまたはコアで重複した時間枠で命令を実行します。
カーネルによって生成された各プロセスは、独立して割り当てられたメモリチャンク(ヒープ)を含むオーバーヘッドコストを発生させます。
Python は multiprocessing
モジュールを使って並列処理を実装しています。
以下は、4つの子プロセスを生成するPython 3プログラムの例です。
各プロセスはランダムで独立した遅延を示します。
出力は各子プロセスのプロセスID、各遅延の前後のシステム時間、各ステップでの現在とピーク時のメモリ割り当てを示しています。
from multiprocessing import Process
import os, time, datetime, random, tracemalloc
tracemalloc.start()
children = 4 # number of child processes to spawn
maxdelay = 6 # maximum delay in seconds
def status():
return ('Time: ' +
str(datetime.datetime.now().time()) +
' Malloc, Peak: ' +
str(tracemalloc.get_traced_memory()))
def child(num):
delay = random.randrange(maxdelay)
print(f"{status()} Process {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
time.sleep(delay)
print(f"{status()} Process {num}: Done.")
if __name__ == '__main__':
print(f"Parent PID: {os.getpid()}")
for i in range(children):
proc = Process(target=child, args=(i,))
proc.start()
出力
Parent PID: 16048
Time: 09:52:47.014906 Malloc, Peak: (228400, 240036) Process 0, PID: 16051, Delay: 1 seconds...
Time: 09:52:47.016517 Malloc, Peak: (231240, 240036) Process 1, PID: 16052, Delay: 4 seconds...
Time: 09:52:47.018786 Malloc, Peak: (231616, 240036) Process 2, PID: 16053, Delay: 3 seconds...
Time: 09:52:47.019398 Malloc, Peak: (232264, 240036) Process 3, PID: 16054, Delay: 2 seconds...
Time: 09:52:48.017104 Malloc, Peak: (228434, 240036) Process 0: Done.
Time: 09:52:49.021636 Malloc, Peak: (232298, 240036) Process 3: Done.
Time: 09:52:50.022087 Malloc, Peak: (231650, 240036) Process 2: Done.
Time: 09:52:51.020856 Malloc, Peak: (231274, 240036) Process 1: Done.
スレッド
スレッドはマルチプロセッシングの代替手段であり、利点と欠点があります。
スレッドは独立してスケジュールされ、その実行は重複する時間内に行われることがあります。
しかし、マルチプロセッシングとは異なり、スレッドは完全に1つのカーネルプロセスの中に存在し、1つの割り当てられたヒープを共有します。
Python のスレッドは並列です – 複数の機械語コードのシーケンスが、重なった時間枠で実行されます。
しかし、並列ではありません – 複数の物理コアで同時に実行されることはありません。
Pythonのスレッドの主な欠点は、メモリ安全性と競合状態です。
親プロセスのすべての子スレッドは、同じ共有メモリ空間で動作します。
追加の保護がなければ、あるスレッドが他のスレッドに気づかれることなく、メモリ内の共有値を上書きしてしまうかもしれません。
このようなデータの破損は悲惨なことになります。
スレッドセーフを強制するために、CPythonの実装ではグローバルインタープリターロック(GIL)を使用します。
GILはPythonのオブジェクト上で複数のスレッドが同時に実行されるのを防ぐミューテックス機構です。
事実上、これは任意の時間に1つのスレッドのみが実行されることを意味します。
ここに、前のセクションのマルチプロセシングの例のスレッド化されたバージョンがあります。
multiprocessing.Processは
threading.Thread` に置き換えられています。
出力に示されているように、すべてが1つのプロセスで行われ、メモリフットプリントはかなり小さくなっています。
from threading import Thread
import os, time, datetime, random, tracemalloc
tracemalloc.start()
children = 4 # number of child threads to spawn
maxdelay = 6 # maximum delay in seconds
def status():
return ('Time: ' +
str(datetime.datetime.now().time()) +
' Malloc, Peak: ' +
str(tracemalloc.get_traced_memory()))
def child(num):
delay = random.randrange(maxdelay)
print(f"{status()} Process {num}, PID: {os.getpid()}, Delay: {delay} seconds...")
time.sleep(delay)
print(f"{status()} Process {num}: Done.")
if __name__ == '__main__':
print(f"Parent PID: {os.getpid()}")
for i in range(children):
thr = Thread(target=child, args=(i,))
thr.start()
出力
Parent PID: 19770
Time: 10:44:40.942558 Malloc, Peak: (9150, 9264) Process 0, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.942937 Malloc, Peak: (13989, 14103) Process 1, PID: 19770, Delay: 5 seconds...
Time: 10:44:40.943298 Malloc, Peak: (18734, 18848) Process 2, PID: 19770, Delay: 3 seconds...
Time: 10:44:40.943746 Malloc, Peak: (23959, 24073) Process 3, PID: 19770, Delay: 2 seconds...
Time: 10:44:42.945896 Malloc, Peak: (26599, 26713) Process 3: Done.
Time: 10:44:43.945739 Malloc, Peak: (26741, 27223) Process 0: Done.
Time: 10:44:43.945942 Malloc, Peak: (26851, 27333) Process 2: Done.
Time: 10:44:45.948107 Malloc, Peak: (24639, 27475) Process 1: Done.
非同期
非同期は、並行アプリケーションを記述するためのスレッドに代わるものです。
非同期イベントは、1つのスレッド内で、互いに「同期しない」独立したスケジュールで発生します。
スレッドとは異なり、非同期プログラムでは、プログラマはいつ、どのように任意の先取りが発生するかを制御し、競合状態を容易に分離・回避することができます。
Python 3.7 asyncio モジュール入門
Python 3.7 では、非同期操作は asyncio
モジュールによって提供されます。
高水準のasyncio APIと低水準のasyncio API
asyncio のコンポーネントは、高レベルの API(プログラムを書くためのもの)と、低レベルの API(asyncio
をベースにしたライブラリやフレームワークを書くためのもの)に分けられる。
すべての asyncio
プログラムは高レベルの API だけを使って書くことができる。
フレームワークやライブラリを書かないのであれば、低レベルのものを触る必要はない。
ということで、コアとなる高レベルの API を見て、コアとなる概念について議論しましょう。
コルーチン
一般に、コルーチン (cooperative subroutine の略) は、自発的なプリエンプティブ・マルチタスクのために設計された関数で、カーネルによって強制的に先取りされるのではなく、他のルーチンやプロセスに積極的に譲るものです。
コルーチン “という用語は1958年にメルヴィン・コンウェイ(”コンウェイの法則 “で有名)によって作られ、システムの他の部分のニーズを積極的に促進するコードを記述するために使われました。
asyncioでは、この自発的な先取りをawaitingと呼びます。
待ち受け、非同期、待ち受け
待ち受け可能なオブジェクト(コルーチンによって自発的に先取りされるオブジェクト)は、awaitable と呼ばれます。
await` キーワードは、現在のコルーチンの実行を一時停止し、指定された awaitable を呼び出します。
Python 3.7 では、 coroutine
, task
, future
という 3 つの待ち行列オブジェクトが用意されています。
asyncio の coroutine
は、 async
キーワードを先頭に持つ任意の Python 関数です。
async def my_coro():
pass
asyncio の task
はコルーチンをラップしたオブジェクトであり、その実行を制御したり、状態を問い合わせるためのメソッドを提供します。
タスクは asyncio.create_task()
または asyncio.gather()
で生成することができる。
asyncio の future
は低レベルのオブジェクトであり、まだ計算や取得が行われていないデータのプレースホルダとして動作する。
これは後でデータを入れるための空の構造体と、データの準備ができたときに起動されるコールバック機構を提供することができます。
タスクは future
で利用可能な2つのメソッドを除くすべてのメソッドを継承しているので、 Python 3.7 では future
オブジェクトを直接生成する必要はありません。
イベントループ
asyncioでは、イベントループがawaitableオブジェクトのスケジューリングと通信を制御します。
イベントループはawaitableを使用するために必要です。
すべての asyncio プログラムは少なくとも1つのイベントループを持ちます。
複数のイベントループを持つことも可能ですが、 Python 3.7 では複数のイベントループは強く推奨されません。
現在実行中のループオブジェクトへの参照は、 asyncio.get_running_loop()
を呼び出して取得します。
ーーーーーーーーーーーーーーーーーーーー
asyncio.sleep(delay)コルーチンは
delay` 秒間ブロックを行います。
これは I/O のブロッキングをシミュレートするのに便利です。
import asyncio
async def main():
print("Sleep now.")
await asyncio.sleep(1.5)
print("OK, wake up!")
asyncio.run(main())
メインイベントループを開始する
asyncio プログラムの標準的な入り口は asyncio.run(main())
で、ここで main()
はトップレベルのコルーチンです。
import asyncio
async def my_coro(arg):
"A coroutine."
print(arg)
async def main():
"The top-level coroutine."
await my_coro(42)
asyncio.run(main())
asyncio.run()を呼び出すと、暗黙のうちにイベントループが作成され、実行されます。
loop オブジェクトは、ループの内部クロックによって計測された現在時刻を float で返すloop.time()` を含む多くの便利なメソッドを持っています。
注意: 既存のイベントループの中から asyncio.run()
関数を呼び出すことはできません。
したがって、Anaconda や Jupyter のような、それ自身のイベントループを実行している監視環境内でプログラムを実行している場合、エラーが表示される可能性があります。
このセクションと次のセクションのサンプルプログラムは、pythonファイルを実行してコマンドラインから直接実行する必要があります。
次のプログラムは、テキスト行を表示し、各行の後に1秒間ブロックして、最終行まで表示します。
import asyncio
async def my_coro(delay):
loop = asyncio.get_running_loop()
end_time = loop.time() + delay
while True:
print("Blocking...")
await asyncio.sleep(1)
if loop.time() > end_time:
print("Done.")
break
async def main():
await my_coro(3.0)
asyncio.run(main())
出力します。
Blocking...
Blocking...
Blocking...
Done.
タスク
タスクは、コルーチンをラップした待ち行列オブジェクトです。
タスクを作成し、すぐにスケジューリングするには、以下を呼び出します。
asyncio.create_task(coro(args...))
これはタスクオブジェクトを返します。
タスクの作成は、ループに対して “このコルーチンをできるだけ早く実行しなさい “と伝えるものです。
タスクを待機させると、そのタスクが完了するまで、現在のコルーチンの実行はブロックされます。
import asyncio
async def my_coro(n):
print(f"The answer is {n}.")
async def main():
# By creating the task, it's scheduled to run
# concurrently, at the event loop's discretion.
mytask = asyncio.create_task(my_coro(42))
# If we later await the task, execution stops there
# until the task is complete. If the task is already
# complete before it is awaited, nothing is awaited.
await mytask
asyncio.run(main())
出力する。
The answer is 42.
タスクは、ラップされたコルーチンを管理するためのいくつかの便利なメソッドを持っています。
特に、タスクの .cancel()
メソッドを呼び出すことで、タスクをキャンセルするように要求することができます。
タスクはイベントループの次のサイクルでキャンセルされるようにスケジューリングされます。
タスクはイベントループの次のサイクルでキャンセルされるようにスケジュールされます。
キャンセルは保証されていません:タスクはそのサイクルの前に完了するかもしれませんが、その場合はキャンセルは起こりません。
集まる待ちぼうけ
待ち行列は、組み込みのコルーチン asyncio.gather(awaitables)
のリスト引数として与えることで、グループとしてまとめることができる。
asyncio.gather()は集めた awaitables を表す awaitable を返すので、プレフィックスとして
await` を付ける必要があります。
awaitables のいずれかの要素がコルーチンである場合、それは直ちにタスクとしてスケジュールされます。
収集は、複数のコルーチンをタスクとして同時に実行させるのに便利な方法です。
また、収集されたタスクは、いくつかの便利な方法で関連付けられます。
- 収集されたタスクがすべて完了すると、その戻り値がリストとして返され、awaitablesリストの順序にしたがって並べられます。
- 集められたタスクは、他のタスクをキャンセルすることなく、キャンセルすることができます。
- 収集されたタスクをキャンセルしても、他のタスクはキャンセルされない。
例 aiohttp による非同期 Web リクエスト
次の例は、これらの高レベルの asyncio API がどのように実装されるかを示しています。
以下は、Scott Robinson氏の粋なasyncioの例をPython 3.7用にアップデートした修正版です。
彼のプログラムは aiohttp
モジュールを利用して Reddit のトップ投稿を取得し、コンソールに出力しています。
以下のスクリプトを実行する前に、aiohttp
モジュールがインストールされていることを確認してください。
モジュールは以下のpipコマンドでダウンロードできます。
$ pip install --user aiohttp
import sys
import asyncio
import aiohttp
import json
import datetime
async def get_json(client, url):
async with client.get(url) as response:
assert response.status == 200
return await response.read()
async def get_reddit_top(subreddit, client, numposts):
data = await get_json(client, 'https://www.reddit.com/r/' +
subreddit + '/top.json?sort=top&t=day&limit=' +
str(numposts))
print(f'
/r/{subreddit}:')
j = json.loads(data.decode('utf-8'))
for i in j['data']['children']:
score = i['data']['score']
title = i['data']['title']
link = i['data']['url']
print(' ' + str(score) + ': ' + title + '
(' + link + ')')
async def main():
print(datetime.datetime.now().strftime("%A, %B %d, %I:%M %p"))
print('---------------------------')
loop = asyncio.get_running_loop()
async with aiohttp.ClientSession(loop=loop) as client:
await asyncio.gather(
get_reddit_top('python', client, 3),
get_reddit_top('programming', client, 4),
get_reddit_top('asyncio', client, 2),
get_reddit_top('dailyprogrammer', client, 1)
)
asyncio.run(main())
このプログラムを複数回実行すると、出力の順番が変わるのがわかると思います。
これは、JSON リクエストを受信したときに表示されるためで、サーバーの応答時間や中間ネットワークのレイテンシーに依存するためです。
Linux システムでは、(例えば) watch -n 5
というプレフィックスをつけたスクリプトを実行すると、5 秒ごとに出力がリフレッシュされ、この動作を観察できます。
その他の高水準API
この概要で、asyncio をいつ、どのように、そしてなぜ使うのかの基礎が固まったと思う。
ここでカバーされていない他の高レベルの asyncio API は以下の通りです。
- stream, 非同期TCPイベントを管理するための高レベルなネットワーキングのプリミティブのセット。
- ロック、イベント、条件、スレッドモジュールで提供される同期プリミティブの非同期アナログ。
- サブプロセス、シェルコマンドのような非同期サブプロセスを実行するためのツール群。
- queue: queue モジュールの非同期アナログ。
- exception, 非同期コードで例外を処理するためのものです。
結論
パフォーマンス上の理由から非同期を必要としないプログラムであっても、非同期パラダイムで書くことを好むのであれば asyncio
を使うことができることを覚えておいてください。
この概要が asyncio
をどのように、いつ、そしてなぜ使い始めるかについての確かな理解を与えてくれることを願っています。