たんたんめん日記

ソシャゲ関連のなんでもやさん備忘録

【Celery】Celery アプリ・タスクの定義編

テストで作成したアプリは、スクレイピングを行い、titleタグの文字列を取得して来るものです。

PyQueryが必要なのでインストールしておきます。

# yum install libxml2-devel libxslt-devel
# pip install pyquery

ソース一式はコチラ https://github.com/dnond/celery_scraping

Taskクラス

celery.app.taskクラスのリファレンス

Taskを継承したMyScrapingTaskを定義しています。
で、その中のon_failureとon_successで、タスクが失敗・成功イベント時に行う処理を定義してみました。

また、Celeryインスタンスを作成していますが、これは「celery」という名前で定義する必要があるようです。 (これでかなり悩まされました‥)

##### logger
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
now_str = datetime.datetime.fromtimestamp(time.time()).strftime('%Y/%m/%d/ %H:%M:%S')

##### celery
# インスタンスは「celery」で! 違う変数名だと怒られる。
celery = Celery('MyApp')
celery.config_from_object(config)

##### Task
Task = celery.create_task_cls()
class MyScrapingTask(Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
    #タスクを再登録してみる
        cMyApp = MyApp()
        cMyApp.task_scrape.apply_async( (args[1],), countdown=900)

    def on_success(self, retval, task_id, args, kwargs):
        logger.debug("on_success %s\t\t%s\n" % ( now_str, 'SUCCESS' ))

で、MyAppクラスのtask定義では「base=MyScrapingTask」とし、先ほど定義したTaskクラスを使うように指示します。

##### MyAppクラス
class MyApp(object):
    
    #taskメソッド
    @celery.task(filter=task_method, base=MyScrapingTask)
    def task_scrape(self, url):
        #titleタグ取得の関数
        def getTitle_cl(cPyQuery):
            title = cPyQuery('title').text()
            return title
        
        try:
            cScraping = Scraping()
            title = cScraping.scrape('http://www.yahoo.co.jp', getTitle_cl)
            
            if title == None or title == '':
                raise Exception

             #失敗させたいときは、例外を強制的に飛ばす
             #raise  Exception
        except Exception as exc:
            raise exc #on_failure

タスクをわざと失敗したい場合は、raise Exceptionして例外を飛ばしてみます。 すると、on_failureの処理が実行されるはずです。

loggerのログの出力先ですが、CELERY_REDIRECT_STDOUTS_LEVEL='DEBUG'としているので、ログレベルがDEBUG以上のログは、workerのログに出力されます。

workerのログは、/var/log/celery/以下にworker毎で出力されています。(これは/etc/init.d/celerydで定義されています。)

どのTaskがどのWorkerで処理されたかは、flowerから確認するのが手っ取り早いかと思います。


設定値

http://docs.celeryproject.org/en/latest/configuration.html

設定値はかなり多いのですが、とりあえず目についたものを設定してみました。

# -*- coding: utf-8 -*-

# サーバーのTimeZoneを使用する
CELERY_ENABLE_UTC = False

################
## Broker settings
BROKER_URL = 'redis://localhost:6379/0'

################
## Redis設定

# 結果受け取り接続設定
CELERY_RESULT_BACKEND = 'redis'

#  REDISのコネクションプール数。(結果の受け渡しも込み)
CELERY_REDIS_MAX_CONNECTIONS = 10

################
## BROKER接続設定

# BROKERの接続タイムアウト。Default: 4
BROKER_CONNECTION_TIMEOUT = 1

# BROKERの再接続回数。Default: 100
BROKER_CONNECTION_MAX_RETRIES = 3


#####################
## エラーメール設定

# エラー時にADMINSにメールを送るか。Default: Disabled
CELERY_SEND_TASK_ERROR_EMAILS = True 

# エラーメールの受信者。('名前','メルアド')タプルのリスト
ADMINS = [('me', 'huga@hogehoge.com'), ]

# エラーメールの送信先が複数の場合
# ADMINS = [('me', 'huga@hogehoge.com'), ('you', 'hugahuga@hogehoge.com')]

# エラーメールの送信元アドレス
SERVER_EMAIL = 'admin@celery.localnet'

# エラーメールの送信サーバー。Default : “localhost”.
EMAIL_HOST = 'localhost'

# エラーメールの送信サーバーのユーザー名
#EMAIL_HOST_USER = 'me'

# エラーメールの送信サーバーのパスワード
#EMAIL_HOST_PASSWORD = 'password'

# エラーメールの送信サーバーのポート番号。Default : 25.
EMAIL_PORT = 25

#EMAIL_USE_SSL 
#EMAIL_USE_TLS

BROKERの接続設定は、実際に稼働させながらの調整になるかと思います。

また、今回はTaskが失敗した時、管理者にメールを送信する要件なので、メール送信の設定も行なっています。
localhostで送信させていますが、メール送信サーバーも設定することも出来るようです。


【Celery】Celery 要件と環境編
【Celery】Celery インストール、Worker立ち上げ編
【Celery】Celery アプリ・タスクの定義編
【Celery】Celery タスクの追加とflower編
【Celery】Celery Task Status編
【Celery】Celery 結果をmysqlに格納する編
【Celery】Celery 複数サーバーでworkerを立ち上げてみる編
【Celery】Celery 感想編