たんたんめん日記

ソシャゲ関連のなんでもやさん備忘録

【Celery】Celery 結果をmysqlに格納する編

ソース一式はコチラ https://github.com/dnond/celery_scraping

下準備

CeleryはSQLAlchemyをサポートしているので、インストールします。

# pip install SQLAlchemy

あと、mysql本体もインストールしておく必要があります。

RESULT_BACKENDの設定

参考:http://docs.celeryproject.org/en/latest/configuration.html#database-backend-settings

config.pyのCELERY_RESULT_BACKENDの設定を変更します。

CELERY_RESULT_BACKEND = "database"
CELERY_RESULT_DBURI = "mysql://user:password@host/dbname"

# echo enables verbose logging from SQLAlchemy.
CELERY_RESULT_ENGINE_OPTIONS = {"echo": True}

CELERY_RESULT_ENGINE_OPTIONS = {"echo": True} は動作確認のために入れてみました。
パフォーマンスが悪くなるので、本番では外したほうが良さそうです。
詳細はコチラ


確認

タスクを追加して動かしてみます。

mysqlの該当databaseを覗いてみると、自動的にceleryのテーブルが作成されています。

f:id:dnond:20130731144709p:plain

tableの中身を見ると、ちゃんと結果が格納されているようです。

f:id:dnond:20130731144714p:plain

タスクの追加時に返されるtask_idをどこかのテーブルに格納しておき、この結果テーブルとjoinすることで各種の処理も行うことが出来そうです。


BROKERにmysql

Resultではなく、BROKERにmysqlを使うのは、 https://groups.google.com/d/msg/celery-users/WAxM-mTKhMM/1Uz3sEDLF38J
上の記事を見るとあまりオススメではないようです。


【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編

【Celery】Celery Task Status編

Task Status

参考:http://docs.celeryproject.org/en/latest/userguide/tasks.html#states

Celeryには、Taskの状態を示すStatusが定義されています。
成功時(success)・失敗時(failure)・再実行時(retry)のStatusには、Statusが変わった時に発火する処理を定義出来ます。


デフォルトのStatus

http://docs.celeryproject.org/en/latest/userguide/tasks.html#built-in-states

  • PENDING :実行待ち

  • STARTED :実行開始した(デフォルトでは報告されない) CELERY_TRACK_STARTED=Trueで、報告されるようになるようです。参考

  • SUCCESS :成功した

    発火する処理 : Task.on_success(retval, task_id, args, kwargs)

      retval – The return value of the task.
      task_id – Unique id of the executed task.
      args – Original arguments for the executed task.
      kwargs – Original keyword arguments for the executed task.
    
  • FAILURE :失敗した

    発火する処理 :Task.on_failure(exc, task_id, args, kwargs, einfo)

      exc – The exception raised by the task.
      task_id – Unique id of the failed task.
      args – Original arguments for the task that failed.
      kwargs – Original keyword arguments for the task that failed.
      einfo – ExceptionInfo instance, containing the traceback.
    
  • RETRY :再実行された

    発火する処理 :Task. on_retry(exc, task_id, args, kwargs, einfo)

      exc – The exception sent to retry().
      task_id – Unique id of the retried task.
      args – Original arguments for the retried task.
      kwargs – Original keyword arguments for the retried task.
      einfo – ExceptionInfo instance, containing the traceback.
    
  • REVOKED :無効


retry

http://docs.celeryproject.org/en/latest/userguide/tasks.html#retrying

リトライ回数等、さくっと定義出来るみたいです。
今回は未実装です。


独自の Statusの作成

http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-states

独自のStatusも作成できるようです。

また、Statusはupdate_stateを呼ぶことで、変更することが出来るようです。

from celery import current_task

current_task.update_state(state=HOGE)

【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編

【Celery】Celery タスクの追加とflower編

ソース一式はコチラ https://github.com/dnond/celery_scraping

タスクの実行確認

タスクを追加するコードです。

task_scrape.apply_async()でタスクを追加しています。

# -*- coding: utf-8 -*-
import sys
from MyApp import MyApp

## デフォルト値
target_url = 'http://www.yahoo.co.jp/'
countdown = 60

## 引数の取得
argvs = sys.argv
argc = len(argvs)
           
if (argc == 3):
    target_url = argvs[1]
    if argvs[2].isdigit():
        countdown = int( argvs[2] )
        
######
## 実行
if __name__ == '__main__':
    cMyApp = MyApp()
    cAsyncResult = cMyApp.task_scrape.apply_async( (target_url,), countdown=60)
    print "task_id:%s" % (cAsyncResult.id)

タスクを追加します。 追加した結果は、AsyncResultとして返されます。

$ python ./add_task_2_myapp.py 
task_id:aac9a6f1-8d54-4ac9-972f-4cfe37864a19

60秒後、MyApp.task_scrapeが実行され、www.yahoo.co.jpからtitleタグを拾ってきて/tmp/scrape_resultに追記されるはず、です。


flower

http://docs.celeryproject.org/en/latest/userguide/monitoring.html#flower-real-time-celery-web-monitor

かなり高機能なTask管理ツールです。
こいつがあるという理由でCeleryを使っても良いかもしれません。

インストール

# pip install flower

こんだけ。

起動

今回はRedisをBROKERに使っているので、起動時に指定しておきます。

# celery flower --broker=redis://localhost:6379/0  --port=5555

詳しくはコチラ

表示

ブラウザから http://インストールしたサーバーのIP:5555/ にアクセスします。

f:id:dnond:20130731143614p:plain

なぜかメニューバーが2つ表示されていますが‥

/etc/sysconfig/celerydで3つのworkerの起動を指定 したので、それが反映されています。
どのworkerがいくつのタスクを処理したのかもGUIで分かりやすく表示してくれます。

f:id:dnond:20130731143618p:plain

タスクを追加すると、flowerのタスク管理ページで確認出来ます。

各タスクを選択すると、実行日時や、引数、実行結果、どのworkerで実行されたのか、等の詳細を得ることが出来ます。


【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編