trio実装

パッケージtrio実装

Posted by Whale Fall on November 26, 2019

この記事ではhttps://qiita.com/yura/items/689d065aba00fe14fbba メモーです

非同期処理を実行

非同期なメソッドを実行する

import trio


async def func1(t):
    await trio.sleep(t)
    print("func1 finished")


async def func2(t1, t2):
    await trio.sleep(t1 + t2)
    print("func2 finished")


async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(func1, 1.0)
        nursery.start_soon(func2, 0.5, 1.0)
    print("all methods finished")


trio.run(main)

結果

func2 finished
func1 finished
all methods finished

非同期なメソッドを複数実行し、どれか1つが完了したら他の処理を中断

import trio


async def cond1():
    await trio.sleep(1.0)
    print("cond1 satisfied")


async def cond2():
    await trio.sleep(0.5)
    print("cond2 satisfied")


async def wrapper(func, nursery):
    await func()
    nursery.cancel_scope.cancel()


async def main():
    async with trio.open_nursery() as nursery:
        nursery.start_soon(wrapper, cond1, nursery)
        nursery.start_soon(wrapper, cond2, nursery)
    print("one condition satisfied")


trio.run(main)

実行結果。

cond2 satisfied
one condition satisfied

cond2 が満たされた時点で処理が終了して、 cond1 は最後まで実行されなかったことがわかります。

ある処理が特定の条件を満たしてから他の処理を開始

これは、例えばデータベースとの接続を待ってデータの読み書きを開始したり、ある計算の完了を待って別の計算を行ったりするようなパターンで使います。

import trio


async def connect_db(*, task_status=trio.TASK_STATUS_IGNORED):
    await trio.sleep(1.0)
    print("connected to DB")
    # nursery にタスクが開始できたことを通知する
    task_status.started()
    try:
        await trio.sleep_forever()
    finally:
        print("disconnected from DB")


async def write_data(data):
    await trio.sleep(0.5)
    print(f"write data to DB: {data}")


async def read_data():
    await trio.sleep(1.0)
    print("read data from DB")
    return "data"


async def main():
    async with trio.open_nursery() as nursery:
        # nursery.start を使うと、タスクが開始された通知が来るまで待つ
        await nursery.start(connect_db)
        # DB への接続を待ってデータの読み書きを開始
        await write_data("foo")
        _ = await read_data()
        nursery.cancel_scope.cancel()
    print("transaction finished")


trio.run(main)

実行結果。

connected to DB
write data to DB: foo
read data from DB
disconnected from DB
transaction finished

きちんと DB への接続を待って、 write_data, read_data が実行されていることが分かります。

なお、非同期なメソッド内で try: ~ finally:finally 句に終了処理を書くことで、 nursery.cancel_scope.cancelKeyboardInterrupt などで中断された場合にも、終了処理を行うことができます。上記の例では全体が終了する前に、 connect_dbfinally が実行されています。

一定時間で処理をキャンセル

非同期な処理にタイムアウトを設定したい場合のパターンです。

import trio


async def func(name, t):
    try:
        await trio.sleep(t)
        print(f"{name} finished")
    except trio.Cancelled:
        print(f"{name} canceled")


async def main():
    # 2.5秒でタイムアウト
    with trio.move_on_after(2.5):
        async with trio.open_nursery() as nursery:
            nursery.start_soon(func, "func1", 1.0)
            nursery.start_soon(func, "func2", 2.0)
            nursery.start_soon(func, "func3", 3.0)
    print("timeout")


trio.run(main)

実行結果。

func1 finished
func2 finished
func3 canceled
timeout

2.5秒以内に完了する func1, func2 は終了していますが、 3秒かかる func3 は中断されていることが分かります。

sleep 時間をスキップしてテスト時間を短縮

pytest を使ってテストをするときに、 pytest-trio の autojump_clock という機能を使うと、テストの実行時間を短縮することができます。

最初におまじないとして pytest.ini に以下の設定をしておきます。

[pytest]
trio_mode = true

以下のようなテストを実行してみましょう。

import time

import trio


async def test1():
    v0, t0 = trio.current_time(), time.time()
    await trio.sleep(1)
    v1, t1 = trio.current_time(), time.time()
    print(f"test1: virtual time={v1 - v0}sec, real time={t1 - t0}sec")
    assert 1 + 1 == 2


async def test2(autojump_clock):
    v0, t0 = trio.current_time(), time.time()
    await trio.sleep(1)
    v1, t1 = trio.current_time(), time.time()
    print(f"test2: virtual time={v1 - v0}sec, real time={t1 - t0}sec")
    assert 1 + 1 == 2

実行結果。

test1: virtual time=1.0040155599999707sec, real time=1.0039751529693604sec
test2: virtual time=1.0sec, real time=0.0003070831298828125sec

普通に書いた test1 は trio の経過時間も実際の実行時間も約1秒であることが分かります。一方で autojump_clock を用いた test2 では trio の経過時間がピッタリ1秒で、実際の実行時間は1ミリ秒以下となっています。

autojump_clock を利用すると、テスト内ではバーチャルな時計が利用され、他に処理が行われていない trio.sleep の時間を飛ばして次の処理まで進めてくれるので、テスト内で trio.sleep を使っても実際にはほぼ時間をかけずテストを行うことができます。通常では sleep をテスト内で多用するとテスト自体が遅くなったり、それを回避するために細かい sleep を while で回す必要があったりするのですが、 autojump_clock を使うことで格段にテストが書きやすく、しかもスピーディになります。