Python API開発のためのRedisキューとRedisキュー・ダッシュボード

インメモリデータストアであるRedisは、データベース、キャッシュ層、ジョブキューの管理などとして開発者に広く利用されています。

メモリ負荷の高いジョブをバックグラウンドで実行したり、ページの訪問回数をカウントしたり、大量のメールキャンペーンを送信したりといったタスクを処理するジョブキューメカニズムでAPIを構築する際に、Redisは重宝されるでしょう。

PythonでAPIを構築する場合、Redis Queue(RQ)モジュールは、ワーカーを配置してこれらのジョブをキューに入れ、スケジュールし、処理する機能を提供します。

この記事では、ジョブキューとワーカーをセットアップするRQモジュールと、それらを可視化するRQ-Dashboardモジュールについて説明します。

Redisのセットアップ

もしAPIやRedis RQを適用できるコードがまだない場合は、サンプルコードが入ったGitHubリポジトリをクローンしてください。

それでは、レポジトリをクローンして、依存関係をインストールしましょう。

$ git clone git@github.com:StackAbuse/redis-queues-redis-queue-dashboards.git
$ cd redis-queues-redis-queue-dashboards
$ python -m venv env
$ . env/bin/activate
$ pip install -r requirements.txt


Redisもインストールしなければなりませんが、これはLinuxベースのOSを使用していない場合、少し厄介なことになります。

Linux以外のOSにインストールする一番簡単な方法は、Docker Composeを使うことです。

$ docker-compose up -d


私たちの docker-compose ファイルはRedisイメージをダウンロードするように設定されており、このコマンドはバックグラウンドでそれを実行します。

また、Redisをローカルにインストールすることもできます。

Ubuntuの場合、そのインストールは以下のようになります。

$ sudo apt-get install redis
$ sudo service redis-server start
Starting redis-server: redis-server.
$ redis-cli -v       
redis-cli 4.0.9


さて、環境ができたので、Redis Queueについて見ていきましょう。

Redis Queue (RQ)

Redis Queue (RQ) は Redis 上で動作するキューイング・モジュールです。

キューにジョブを投入するプロデューサとして動作します。

また、キューから投入されたジョブを非同期に処理するコンシューマとして動作するワーカーも付属しています。

ジョブとは何でしょうか?
ジョブとは、キューにプッシュされるPythonの関数への参照です。

ジョブを処理するキューは複数存在することができ、これらのキューには好きな名前を付けることができます。

キューに投入されたジョブは、そのジョブIDを使って監視することができます。

例えば、test.py で Redis Queue にジョブを投入する簡単なスクリプトを書いてみましょう。

# Imported to assign redis as the backend to rq
from redis import Redis
# Imported to initialize the queue object
from rq import Queue
# Functions from the __main__ module can't be processed by workers
# Hence, we have a separate Python file containing the function
from test_job import i_am_a_job


# Create the queue object by passing in the redis object
q = Queue(connection=Redis())

# Run the job asynchronously
job = q.enqueue(i_am_a_job, 1)
# Return the function output


ジョブをキューに入れる関数は、別の Python ファイルからインポートする必要があります。

ここでは、test_job.pyからインポートしています。

# A Function (or) a job
def i_am_a_job(arg1):
    # Perform some function
    return arg1


これでRedis Queueと関数が設定されましたので、Pythonスクリプトを実行してみましょう。

$ python test.py


このコマンドを実行すると、ジョブが作成されますが、何も出力されません。

このコマンドを実行すると、ジョブが作成されますが、何も出力されません。

エラーが発生した場合は、続行する前にもう一度セットアップ手順を確認してください。

すべてうまくいったなら、RQ-Dashboard を使ってジョブを管理してみましょう。

RQ-DashboardによるRedisキュージョブの管理

Redis Queueのジョブの状態は、RQ-DashboardというRedis Queueを監視するための軽量なFlaskアプリを使うことで検査することができます。

RQ-Dashboardを起動して、先ほど作成したジョブを監視してみましょう。

別のターミナルで、レポをクローンしたフォルダーに移動します。

そこで、RQ-Dashboard をスピンアップします。

$ . env/bin/activate
$ rq-dashboard
RQ Dashboard version 0.5.2
 * Serving Flask app 'rq_dashboard.cli' (lazy loading)
 * Environment: production
   WARNING: This is a development server. Do not use it in a production deployment.
   Use a production WSGI server instead.
 * Debug mode: off
 * Running on all addresses.
   WARNING: This is a development server. Do not use it in a production deployment.
 * Running on http://192.168.1.10:9181/ (Press CTRL+C to quit)
192.168.1.10 - - [11/Jun/2021 15:30:12] "GET / HTTP/1.1" 200 -


RQ-Dashboard は http://localhost:9181 からアクセスできます。

このリンクを開くと、あなたの仕事はまだキューの中にあり、ワーカーもまだ割り当てられていないことがわかります。

Redisキューワーカー

Workerはキューからジョブを選んで実行します。

別のTerminal(最初のTerminalでも可)で、Workerを作成しましょう。

$ . env/bin/activate # For new Terminals when you don't install the dependencies locally
$ rq worker --with-scheduler
15:42:38 Worker rq:worker:a33eb6277eda4969921cc8e3f1e857c0: started, version 1.8.1
15:42:38 Subscribing to channel rq:pubsub:a33eb6277eda4969921cc8e3f1e857c0
15:42:38 *** Listening on default...
15:42:38 Trying to acquire locks for default
15:42:38 Cleaning registries for queue: default
15:42:38 Scheduler for default started with PID 1093
15:42:38 default: test_job.i_am_a_job(1) (b92bf928-48dd-4fb9-a551-427866c46a38)
15:42:38 default: Job OK (b92bf928-48dd-4fb9-a551-427866c46a38)
15:42:38 Result is kept for 500 seconds


投入したジョブが実行され、その結果が500秒間Redisに保持されます。

ジョブは即時実行の他に、CRONジョブのように将来の時刻に実行するようにスケジュールすることもできます。

enqueue文は、以下の方法でスケジュールされたものとして記述することができます。

job = queue.enqueue_at(datetime(2021, 7, 7, 13, 15), i_am_a_job, 1)


これらの操作は、Redis Queueの利用、監視、Workerの割り当ての基本になります。

では、実際にページの訪問回数をカウントするアプリケーションを書いてみましょう。

Redis Queueデモアプリケーション – サイト訪問数のカウント

先ほどダウンロードしたリポジトリのコードには、Flaskアプリケーションが含まれています。

from flask import Flask
from redis import Redis
from rq import Queue
from counter import visit


app = Flask(__name__)
q = Queue(connection=Redis())


@app.route('/visit')
def count_visit():
    count = q.enqueue(visit)
    return "Visit has been registered"


@app.route('/')
def return_visit_count():
    count = Redis().get('count').decode('utf-8') if Redis().get('count') else '0'
    return (f'<h1 Congrats! Your are the visitor no.: {count} </h1')


ターミナルで、このFlaskアプリを実行してみましょう。

$ . env/bin/activate # Unless it's already running
$ flask run


これで app.py にあるFlaskアプリが起動します。

このアプリには2つのルートが含まれています。

このアプリには、//visit の2つのルートがあります。

http://localhost:5000/visit のエンドポイントがヒットするたびに、Redis の count キーが 1 つずつ増加し、以下のような Web ページが返されます。

このインクリメント関数はジョブとしてキューに入れられる。

訪問回数はエンドポイント: http://localhost:5000 で確認できます。

エンドポイント http://localhost:5000/visit に 3 回アクセスしてみましょう。

これでジョブは 3 回投入されます。

次に、RQ-Dashboard でジョブの状態を確認してみましょう。

http://localhost:9181 にアクセスすると、以下のような Web ページが表示され、ジョブは正常に投入されていますが、ワーカーが処理をしていないことがわかります。

Redis Queue のワーカーとスケジューラーを起動するために、別のターミナルを開いてワーカーを起動するコマンドを入力します。

投入されたジョブが次々と実行されていることを確認してください。

$ . env/bin/activate # For new Terminals when you don't install the dependencies locally
$ rq worker --with-scheduler
23:40:06 Worker rq:worker:f5a178b0931b42859699ce57696ed402: started, version 1.8.1
23:40:06 Subscribing to channel rq:pubsub:f5a178b0931b42859699ce57696ed402
23:40:06 *** Listening on default...
23:40:06 Trying to acquire locks for default
23:40:06 Cleaning registries for queue: default
23:40:06 Scheduler for default started with PID 2889
23:40:06 default: counter.visit() (d23c4df8-d638-476b-b70a-dbb4b6f091f2)
23:40:06 default: Job OK (d23c4df8-d638-476b-b70a-dbb4b6f091f2)
23:40:06 Result is kept for 500 seconds
23:40:06 default: counter.visit() (f4ca10c4-16f2-4578-b1b7-67dfce3cee5a)
23:40:06 default: Job OK (f4ca10c4-16f2-4578-b1b7-67dfce3cee5a)
23:40:06 Result is kept for 500 seconds
23:40:06 default: counter.visit() (956b7b39-0b82-4ac6-b29e-fe3f0706431e)
23:40:06 default: Job OK (956b7b39-0b82-4ac6-b29e-fe3f0706431e)
23:40:06 Result is kept for 500 seconds


再度ダッシュボードを確認すると、ジョブが実行されていることがわかります。

これは、ブラウザで http://localhost:9181 の URL を指すことで確認できます。

ワーカーが起動し、ジョブが正常に処理されていることに注意してください。

http://localhost:5000 でアプリを開いたり更新したりして、訪問回数を確認してみましょう。

ほら! ページ訪問カウンターは 3 増えています。

アクセス数の多いWebサイトで、サイト訪問数やページ訪問数を監視したい場合を考えてみましょう。

この場合、このAPIの複数のインスタンスはロードバランサーの下で提供され、カウントは非同期にキューに提出されたジョブを基に行われます。

結論

この記事では、ジョブキューの重要性と、RQ と RQ-Dashboards が Web アプリの最小限のジョブキュースタックとしてどのように機能するかを探りました。

この実用例は、他の実世界のアプリケーションに拡張することができ、その可能性は無限大です。

タイトルとURLをコピーしました