【Celery】Celery 結果をmysqlに格納する編
ソース一式はコチラ https://github.com/dnond/celery_scraping
下準備
CeleryはSQLAlchemyをサポートしているので、インストールします。
# pip install SQLAlchemy
あと、mysql本体もインストールしておく必要があります。
RESULT_BACKENDの設定
参考:http://docs.celeryproject.org/en/latest/configuration.html#database-backend-settings
config.pyのCELERY_RESULT_BACKENDの設定を変更します。
CELERY_RESULT_BACKEND = "database" CELERY_RESULT_DBURI = "mysql://user:password@host/dbname" # echo enables verbose logging from SQLAlchemy. CELERY_RESULT_ENGINE_OPTIONS = {"echo": True}
CELERY_RESULT_ENGINE_OPTIONS = {"echo": True} は動作確認のために入れてみました。
パフォーマンスが悪くなるので、本番では外したほうが良さそうです。
詳細はコチラ
確認
タスクを追加して動かしてみます。
mysqlの該当databaseを覗いてみると、自動的にceleryのテーブルが作成されています。
tableの中身を見ると、ちゃんと結果が格納されているようです。
タスクの追加時に返されるtask_idをどこかのテーブルに格納しておき、この結果テーブルとjoinすることで各種の処理も行うことが出来そうです。
BROKERにmysql
Resultではなく、BROKERにmysqlを使うのは、
https://groups.google.com/d/msg/celery-users/WAxM-mTKhMM/1Uz3sEDLF38J
上の記事を見るとあまりオススメではないようです。
【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編
【Celery】Celery Task Status編
Task Status
参考:http://docs.celeryproject.org/en/latest/userguide/tasks.html#states
Celeryには、Taskの状態を示すStatusが定義されています。
成功時(success)・失敗時(failure)・再実行時(retry)のStatusには、Statusが変わった時に発火する処理を定義出来ます。
デフォルトのStatus
http://docs.celeryproject.org/en/latest/userguide/tasks.html#built-in-states
PENDING :実行待ち
STARTED :実行開始した(デフォルトでは報告されない) CELERY_TRACK_STARTED=Trueで、報告されるようになるようです。参考
SUCCESS :成功した
発火する処理 : Task.on_success(retval, task_id, args, kwargs)
retval – The return value of the task. task_id – Unique id of the executed task. args – Original arguments for the executed task. kwargs – Original keyword arguments for the executed task.
FAILURE :失敗した
発火する処理 :Task.on_failure(exc, task_id, args, kwargs, einfo)
exc – The exception raised by the task. task_id – Unique id of the failed task. args – Original arguments for the task that failed. kwargs – Original keyword arguments for the task that failed. einfo – ExceptionInfo instance, containing the traceback.
RETRY :再実行された
発火する処理 :Task. on_retry(exc, task_id, args, kwargs, einfo)
exc – The exception sent to retry(). task_id – Unique id of the retried task. args – Original arguments for the retried task. kwargs – Original keyword arguments for the retried task. einfo – ExceptionInfo instance, containing the traceback.
REVOKED :無効
retry
http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying
リトライ回数等、さくっと定義出来るみたいです。
今回は未実装です。
独自の Statusの作成
http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-states
独自のStatusも作成できるようです。
また、Statusはupdate_stateを呼ぶことで、変更することが出来るようです。
from celery import current_task current_task.update_state(state=HOGE)
【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編
【Celery】Celery タスクの追加とflower編
ソース一式はコチラ https://github.com/dnond/celery_scraping
タスクの実行確認
タスクを追加するコードです。
task_scrape.apply_async()でタスクを追加しています。
# -*- coding: utf-8 -*- import sys from MyApp import MyApp ## デフォルト値 target_url = 'http://www.yahoo.co.jp/' countdown = 60 ## 引数の取得 argvs = sys.argv argc = len(argvs) if (argc == 3): target_url = argvs[1] if argvs[2].isdigit(): countdown = int( argvs[2] ) ###### ## 実行 if __name__ == '__main__': cMyApp = MyApp() cAsyncResult = cMyApp.task_scrape.apply_async( (target_url,), countdown=60) print "task_id:%s" % (cAsyncResult.id)
タスクを追加します。 追加した結果は、AsyncResultとして返されます。
$ python ./add_task_2_myapp.py task_id:aac9a6f1-8d54-4ac9-972f-4cfe37864a19
60秒後、MyApp.task_scrapeが実行され、www.yahoo.co.jpからtitleタグを拾ってきて/tmp/scrape_resultに追記されるはず、です。
flower
かなり高機能なTask管理ツールです。
こいつがあるという理由でCeleryを使っても良いかもしれません。
インストール
# pip install flower
こんだけ。
起動
今回はRedisをBROKERに使っているので、起動時に指定しておきます。
# celery flower --broker=redis://localhost:6379/0 --port=5555
詳しくはコチラ
表示
ブラウザから http://インストールしたサーバーのIP:5555/ にアクセスします。
なぜかメニューバーが2つ表示されていますが‥
/etc/sysconfig/celerydで3つのworkerの起動を指定
したので、それが反映されています。
どのworkerがいくつのタスクを処理したのかもGUIで分かりやすく表示してくれます。
タスクを追加すると、flowerのタスク管理ページで確認出来ます。
各タスクを選択すると、実行日時や、引数、実行結果、どのworkerで実行されたのか、等の詳細を得ることが出来ます。
【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編