【Celery】Celery アプリ・タスクの定義編
テストで作成したアプリは、スクレイピングを行い、titleタグの文字列を取得して来るものです。
PyQueryが必要なのでインストールしておきます。
# yum install libxml2-devel libxslt-devel # pip install pyquery
ソース一式はコチラ https://github.com/dnond/celery_scraping
Taskクラス
Taskを継承したMyScrapingTaskを定義しています。
で、その中のon_failureとon_successで、タスクが失敗・成功イベント時に行う処理を定義してみました。
また、Celeryのインスタンスを作成していますが、これは「celery」という名前で定義する必要があるようです。 (これでかなり悩まされました‥)
##### logger from celery.utils.log import get_task_logger logger = get_task_logger(__name__) now_str = datetime.datetime.fromtimestamp(time.time()).strftime('%Y/%m/%d/ %H:%M:%S') ##### celery # インスタンスは「celery」で! 違う変数名だと怒られる。 celery = Celery('MyApp') celery.config_from_object(config) ##### Task Task = celery.create_task_cls() class MyScrapingTask(Task): def on_failure(self, exc, task_id, args, kwargs, einfo): #タスクを再登録してみる cMyApp = MyApp() cMyApp.task_scrape.apply_async( (args[1],), countdown=900) def on_success(self, retval, task_id, args, kwargs): logger.debug("on_success %s\t\t%s\n" % ( now_str, 'SUCCESS' ))
で、MyAppクラスのtask定義では「base=MyScrapingTask」とし、先ほど定義したTaskクラスを使うように指示します。
##### MyAppクラス class MyApp(object): #taskメソッド @celery.task(filter=task_method, base=MyScrapingTask) def task_scrape(self, url): #titleタグ取得の関数 def getTitle_cl(cPyQuery): title = cPyQuery('title').text() return title try: cScraping = Scraping() title = cScraping.scrape('http://www.yahoo.co.jp', getTitle_cl) if title == None or title == '': raise Exception #失敗させたいときは、例外を強制的に飛ばす #raise Exception except Exception as exc: raise exc #on_failure
タスクをわざと失敗したい場合は、raise Exception
して例外を飛ばしてみます。
すると、on_failureの処理が実行されるはずです。
loggerのログの出力先ですが、CELERY_REDIRECT_STDOUTS_LEVEL='DEBUG'
としているので、ログレベルがDEBUG以上のログは、workerのログに出力されます。
workerのログは、/var/log/celery/以下にworker毎で出力されています。(これは/etc/init.d/celerydで定義されています。)
どのTaskがどのWorkerで処理されたかは、flowerから確認するのが手っ取り早いかと思います。
設定値
http://docs.celeryproject.org/en/latest/configuration.html
設定値はかなり多いのですが、とりあえず目についたものを設定してみました。
# -*- coding: utf-8 -*- # サーバーのTimeZoneを使用する CELERY_ENABLE_UTC = False ################ ## Broker settings BROKER_URL = 'redis://localhost:6379/0' ################ ## Redis設定 # 結果受け取り接続設定 CELERY_RESULT_BACKEND = 'redis' # REDISのコネクションプール数。(結果の受け渡しも込み) CELERY_REDIS_MAX_CONNECTIONS = 10 ################ ## BROKER接続設定 # BROKERの接続タイムアウト。Default: 4 BROKER_CONNECTION_TIMEOUT = 1 # BROKERの再接続回数。Default: 100 BROKER_CONNECTION_MAX_RETRIES = 3 ##################### ## エラーメール設定 # エラー時にADMINSにメールを送るか。Default: Disabled CELERY_SEND_TASK_ERROR_EMAILS = True # エラーメールの受信者。('名前','メルアド')タプルのリスト ADMINS = [('me', 'huga@hogehoge.com'), ] # エラーメールの送信先が複数の場合 # ADMINS = [('me', 'huga@hogehoge.com'), ('you', 'hugahuga@hogehoge.com')] # エラーメールの送信元アドレス SERVER_EMAIL = 'admin@celery.localnet' # エラーメールの送信サーバー。Default : “localhost”. EMAIL_HOST = 'localhost' # エラーメールの送信サーバーのユーザー名 #EMAIL_HOST_USER = 'me' # エラーメールの送信サーバーのパスワード #EMAIL_HOST_PASSWORD = 'password' # エラーメールの送信サーバーのポート番号。Default : 25. EMAIL_PORT = 25 #EMAIL_USE_SSL #EMAIL_USE_TLS
BROKERの接続設定は、実際に稼働させながらの調整になるかと思います。
また、今回はTaskが失敗した時、管理者にメールを送信する要件なので、メール送信の設定も行なっています。
localhostで送信させていますが、メール送信サーバーも設定することも出来るようです。
【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編