Flask、Redis、Celeryによる非同期タスクの実現

Web アプリケーションが進化し、その利用が増加するにつれて、ユースケースも多様化しています。

私たちは現在、以前よりも複雑なタスクのためにウェブサイトを構築し、使用しています。

これらのタスクの中には、即座に処理してユーザーにフィードバックできるものもあれば、後でさらに処理して結果を伝える必要のあるものもあります。

インターネットへのアクセスやインターネット接続可能なデバイスの普及に伴い、エンドユーザーのトラフィックは増加しています。

トラフィックの増加や機能の複雑化に対応するために、作業を先延ばしにして、結果を後日伝えるという方法をとることもあります。

このようにすれば、Webアプリケーション上でユーザーを未知の時間待たせることなく、結果を後日送信することができます。

このためには、トラフィックが少ないときに作業を処理するバックグラウンドタスクや、作業をまとめて処理するバックグラウンドタスクが有効です。

そのために使えるソリューションの一つがCeleryです。

Celeryは、複雑な作業を分解して別のマシンに実行させることで、1台のマシンの負荷を軽減したり、完了までの時間を短縮したりするのに役立つ。

この記事では、FlaskアプリケーションのバックグラウンドタスクをスケジュールするためにCeleryを使用し、リソース集約的なタスクをオフロードし、エンドユーザへの対応を優先させる方法を探ります。

タスクキューとは?

タスクキューは、ほとんどの Web ベースのアプリケーションのリクエストとレスポンスのサイクルを妨げることなく実行できる、小さな作業単位やタスクを配布するメカニズムです。

タスクキューは、応答を待っている間にアプリケーションを遅くしてしまうような作業を委任するのに便利です。

また、メインマシンやプロセスがユーザーとやりとりしている間に、リソース集約的なタスクを処理するために使用することもできます。

このようにして、ユーザーとの対話は一貫してタイムリーであり、作業負荷に影響されません。

セロリとは?

Celeryは、分散メッセージパッシングに基づく非同期タスクキューで、マシンやスレッドに作業負荷を分散させることができます。

Celery システムは、クライアント、ブローカー、および複数のワーカーで構成されます。

これらのワーカーは、キューに入れられたタスクや仕事の断片を実行し、その結果を中継する役割を担っています。

Celeryでは、ローカルワーカーとリモートワーカーの両方を持つことができます。

つまり、インターネットを介して、より能力の高い別のマシンに仕事を委ね、結果をクライアントにリレーバックすることができます。

こうすることで、メインマシンの負荷が軽減され、より多くのリソースがユーザーからのリクエストに対応できるようになる。

Celeryのセットアップでは、クライアントがワーカーにジョブを発行し、メッセージブローカーを使ってワーカーと通信する役割を担う。

ブローカーは、メッセージキューを通じてクライアントとワーカー間の通信を促進し、メッセージがキューに追加されると、ブローカーはそれをクライアントに配信する。

このようなメッセージブローカーの例としては、RedisやRabbitMQがあります。

なぜセロリを使うのか?

バックグラウンドタスクにCeleryを採用する理由は様々です。

まず、スケーラブルであり、負荷やトラフィックの増加に対応するためにオンデマンドでワーカーを追加することができます。

また、Celeryは現在も活発に開発が行われており、簡潔なドキュメントや活発なユーザーコミュニティとともに、サポートされているプロジェクトであることを意味する。

また、Celeryは複数のWebフレームワークへの統合が容易であり、ほとんどのフレームワークが統合を容易にするライブラリを持っていることも利点である。

また、ライブラリがない場合でも、Webhooksを利用して他のWebアプリケーションと連携することができる。

また、Celeryは様々なメッセージブローカーを使用することができ、柔軟性を提供してくれる。

RabbitMQを推奨しているが、RedisやBeanstalkもサポートできる。

デモアプリケーション

ユーザーが設定した時刻にメールに配信されるリマインダーを設定できるFlaskアプリケーションを構築します。

また、メッセージやリマインダーが呼び出され、ユーザーにメッセージが送信されるまでの時間をカスタマイズする機能も提供します。

セットアップ

他のプロジェクトと同様に、Pipenvツールを使って作成・管理する仮想環境上で作業を行います。

$ pipenv install --three
$ pipenv shell


このプロジェクトでは、FlaskとCeleryのパッケージをインストールする必要があります。

$ pipenv install flask celery


Flaskアプリケーションのファイル構成はこのようになります。

.
├── Pipfile                    # manage our environment
├── Pipfile.lock
├── README.md
├── __init__.py
├── app.py                     # main Flask application implementation
├── config.py                  # to host the configuration
├── requirements.txt           # store our requirements
└── templates
    └── index.html             # the landing page


1 directory, 8 files


Celeryベースのプロジェクトでは、メッセージブローカーとしてRedisを使用します。

実装

まず、将来送信するメッセージの詳細を入力するフォームをレンダリングする Flask アプリケーションを作成しましょう。

以下のコードを app.py ファイルに追加します。

from flask import Flask, flash, render_template, request, redirect, url_for


app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')


elif request.method == 'POST':
        email = request.form['email']
        first_name = request.form['first_name']
        last_name = request.form['last_name']
        message = request.form['message']
        duration = request.form['duration']
        duration_unit = request.form['duration_unit']


flash(“Message scheduled”)
        return redirect(url_for('index'))


if __name__ == '__main__':
    app.run(debug=True)


これは本当にシンプルなアプリで、フォームへの GETPOST リクエストを処理するルートを1つだけ持っています。

フォームの詳細が送信されると、ジョブをスケジューリングする関数にデータを渡すことができます。

メインアプリケーションのファイルを整理するために、設定変数を別の config.py ファイルに置き、そのファイルから設定をロードします。

app.config.from_object("config")


この config.py ファイルは app.py ファイルと同じフォルダにあり、いくつかの基本的な設定を含んでいます。

SECRET_KEY = 'very_very_secure_and_secret'
# more config


とりあえず、ランディングページを index.html として実装してみます。

{% for message in get_flashed_messages() %}
  <p style="color: red;"{{ message }}</p
{% endfor %}


<form method="POST"
    First Name: <input id="first_name" name="first_name" type="text"/
    Last Name: <input id="last_name" name="last_name" type="text"/
    Email: <input id="email" name="email" type="email"/
    Message: <textarea id="textarea" name="message"</textarea
    Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text"/
<select name="duration_unit"
<option disabled="" selected="" value=""Choose the duration</option
<option value="1"Minutes</option
<option value="2"Hours</option
<option value="3"Days</option
</select
<button name="action" type="submit"Submit </button
</form


スタイリングとフォーマットは簡潔にするために切り捨てています、あなたの好きなようにHTMLをフォーマット/スタイリングしてください。

これで、アプリケーションを開始することができます。

Flask-Mailを使ったメール送信方法

Flaskアプリケーションからメールを送信するために、以下のようにFlask-Mailライブラリをプロジェクトに追加して使用します。

$ pipenv install flask-mail


Flaskアプリケーションとフォームを用意したら、app.pyにFlask-Mailを組み込むことができます。

from flask_mail import Mail, Message


app = Flask(__name__)
app.config.from_object("config")
app.secret_key = app.config['SECRET_KEY']


# set up Flask-Mail Integration
mail = Mail(app)


def send_mail(data):
    """ Function to send emails.
    """
    with app.app_context():
        msg = Message("Ping!",
                    sender="admin.ping",
                    recipients=[data['email']])
        msg.body = data['message']
        mail.send(msg)


関数 send_main(data) は、送信するメッセージとメールの受信者を受け取り、指定した時間が経過した後に呼び出され、ユーザーにメールを送信します。

また、Flask-Mail を動作させるために、以下の変数を config.py に追加する必要があります。

# Flask-Mail
MAIL_SERVER = 'smtp.googlemail.com'
MAIL_PORT = 587
MAIL_USE_TLS = True
MAIL_USERNAME = 'mail-username'
MAIL_PASSWORD = 'mail-password'


セロリ統合

Flaskアプリケーションの準備が整い、メール送信機能が搭載されたので、今度はCeleryを統合して、後日メールを送信するようにスケジュールします。

また、app.pyを修正します。

# Existing imports are maintained
from celery import Celery


# Flask app and flask-mail configuration truncated


# Set up celery client
client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
client.conf.update(app.config)


# Add this decorator to our send_mail function
@client.task
def send_mail(data):
    # Function remains the same


@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'GET':
        return render_template('index.html')


elif request.method == 'POST':
        data = {}
        data['email'] = request.form['email']
        data['first_name'] = request.form['first_name']
        data['last_name'] = request.form['last_name']
        data['message'] = request.form['message']
        duration = int(request.form['duration'])
        duration_unit = request.form['duration_unit']


if duration_unit == 'minutes':
            duration *= 60
        elif duration_unit == 'hours':
            duration *= 3600
        elif duration_unit == 'days':
            duration *= 86400


send_mail.apply_async(args=[data], countdown=duration)
        flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")


return redirect(url_for('index'))


Celleryをインポートし、メッセージングブローカーの URL を添付して Flask アプリケーションの Celery クライアントを初期化します。

今回はRedisをブローカーとして使うので、以下の内容をconfig.py` に追加します。

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'


send_mail()関数をバックグラウンドタスクとして実行させるために、@client.task` デコレーターを追加して、Celery クライアントがそれを認識するようにします。

Celeryクライアントをセットアップした後、フォームの入力を処理するmain関数を修正します。

まず、send_mail()関数の入力データを辞書に格納します。

次に、Celery Task Calling API を使って apply_async 関数を呼び出すと、この関数が必要とする引数を受け取ります。

オプションで countdown パラメータが設定され、コードを実行してからタスクを実行するまでの遅延時間が定義されます。

この期間は秒単位で、ユーザーが選択した時間の単位に応じて、渡された期間を秒に変換する理由です。

ユーザーがフォームを送信した後、受信を確認し、メッセージが送信されるときにバナーメッセージで通知します。

すべてをひとつにする

プロジェクトを実行するには、Flaskアプリケーションを起動するターミナルと、バックグラウンドでメッセージを送信するCeleryワーカーを起動するターミナルの2つが必要です。

最初の端末でFlaskアプリを起動します。

$ python app.py


2つ目の端末では、仮想環境を起動し、Celery Workerを起動します。

# start the virtualenv
$ pipenv shell
$ celery worker -A app.client --loglevel=info


うまくいくと、Celeryクライアントを実行しているターミナルに以下のようなフィードバックが表示されます。

では、http://localhost:5000に移動して、送信から2分後に到着するメールをスケジュールする詳細を入力してみましょう。

フォームの上に、メールを受信するアドレスとメールが送信されるまでの時間を示すメッセージが表示されます。

また、Celeryターミナルでは、メールがスケジュールされたことを示すログエントリを見ることができるようになります。

[2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]


このエントリの ETA セクションは、いつ send_email() 関数が呼び出され、いつメールが送信されるかを示しています。

ここまでは順調です。

メールはスケジュールされ、指定された時間に送信されますが、一つ足りないものがあります。

タスクの実行前や実行後を見ることができず、メールが実際に送信されたかどうかを知る方法がないのです。

そこで、バックグラウンドタスクの監視ソリューションを導入し、タスクを確認できるようにするとともに、万が一、何か問題が発生してタスクが計画通りに実行されなかった場合に備えて、注意喚起を行うことにしましょう。

Flowerを用いたCeleryクラスタの監視

FlowerはCeleryのセットアップを可視化するWebベースのツールで、タスクの進捗、履歴、詳細、成功率や失敗率などの統計情報を表示する機能を提供します。

また、クラスタ内のすべてのワーカーと、彼らが現在処理しているタスクを監視することができます。

Flower`のインストールは以下のように簡単です。

$ pipenv install flower


先ほど、Celeryクライアントの詳細を app.py ファイルで指定しました。

そのクライアントを監視するために、Flowerに渡す必要があります。

そのためには、3つ目のターミナルウィンドウを開き、仮想環境に飛び込んで、モニタリングツールを起動する必要があります。

$ pipenv shell
$ flower -A app.client --port=5555


Flowerを起動する際、アプリケーション(-A)引数でCeleryクライアントを指定し、--port引数で使用するポートを指定します。

モニタリングが完了したので、ダッシュボードでメールを送信するスケジュールを組み、http://localhost:5555にアクセスすると、以下のように表示されます。

このページでは、Celeryクラスタのワーカーのリストが表示されます。

今スケジュールしたメールを見るには、ダッシュボードの左上にある Tasks ボタンをクリックすると、スケジュールされたタスクを見ることができるページに移動します。

このセクションでは、2つのメールがスケジュールされ、1つはスケジュールされた時間に送信されたことがわかります。

テスト用に、それぞれ1分後と5分後にメールが送信されるようにスケジュールされています。

また、このセクションでは、テキストを受信した時刻と実行された時刻を確認できます。

モニターセクションには、バックグラウンドタスクの成功率と失敗率を表示するグラフがあります。

メッセージを好きなだけスケジュールすることができますが、それは同時に、タスクが実行されるはずの時間にワーカーがオンラインであり、機能していなければならないことを意味します。

結論

Celeryクラスタのセットアップに成功し、ユーザが将来のある時間以降にメールを送信するようスケジュールできるFlaskアプリケーションに統合しました。

メール送信機能はバックグラウンドタスクに委ねられ、ローカルのCeleryクラスタのワーカーによって選択・実行されるキューに配置されました。

このプロジェクトのソースコードは、いつものようにGithubで公開されています。

タイトルとURLをコピーしました