Celery

Pourquoi faire

Installer celery

  1. Installer le package celery et au moins un broker

    requirements.txt:

     celery[redis]~=5.2
    
  2. Configurer le broker + celery

    docker-compose.yml:

       # Tasks broker + celery
       redis:
         image: redis:6.0.10
         restart: unless-stopped
         expose:
           - "6379"
         healthcheck:
           test: ["CMD", "redis-cli", "ping"]
           interval: 1s
           timeout: 3s
           retries: 30
    
       celery:
         image: api:latest
         restart: unless-stopped
         command: >
           celery -A core worker
           --concurrency=1
           --max-tasks-per-child 10
           --max-memory-per-child 15000
           --time-limit 3600
           -l info
           -Q celery
         env_file:
           - ./back.env
         depends_on:
           db:
             condition: service_healthy
           redis:
             condition: service_healthy
         healthcheck:
           disable: true
    

    dev.env:

     REDIS_HOST=redis
     REDIS_PORT=6379
    

    settings/defaults.py:

     import sys
    
     # Celery configuration
     # see https://docs.celeryq.dev/en/stable/userguide/configuration.html
    
     CELERY_BROKER_URL = f"redis://{os.getenv('REDIS_HOST', '')}:{os.getenv('REDIS_PORT', '')}/0"
     CELERY_RESULT_BACKEND = f"redis://{os.getenv('REDIS_HOST', '')}:{os.getenv('REDIS_PORT', '')}/0"
     CELERY_RESULT_EXPIRES = 30  # keep the result in redis for how many seconds (1 day by default)
     CELERY_MAX_CACHED_RESULTS = 100  # cache task meta in celery.backends (100 by default)
    
     CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
     CELERY_DEFAULT_RATE_LIMIT = '10/s'
     CELERY_ENABLE_UTC = False
     CELERY_TIMEZONE = TIME_ZONE
     CELERY_TASK_RESULT_EXPIRES = CELERY_RESULT_EXPIRES
     CELERY_TASK_EAGER_PROPAGATES = True
     CELERYD_PREFETCH_MULTIPLIER = 4  # workers retrieve tasks from the broker in batch (4 by default)
    
     if 'celery' in sys.argv[0]:
         DEBUG = False
    

    settings/tests.py:

     CELERY_BROKER_URL = 'memory://'
     BROKER_BACKEND = 'memory'
     CELERY_TASK_ALWAYS_EAGER = True
     CELERY_TASK_EAGER_PROPAGATES = True
    
  3. Lier celery à Django

    core/init.py:

     # This will make sure the app is always imported when
     # Django starts so that shared_task will use this app.
     from .celery import application as celery_app  # noqa
    
     default_app_config = 'core.apps.CoreConfig'
    

    core/celery.py:

     import os
     from celery import Celery
     from django.conf import settings
    
     # Set the default Django settings module for the 'celery' program.
     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
    
     application = Celery('core')
    
     # Using a string here means the worker doesn't have to serialize
     # the configuration object to child processes.
     # - namespace='CELERY' means all celery-related configuration keys
     #   should have a `CELERY_` prefix.
     application.config_from_object('django.conf:settings', namespace='CELERY')
    
     # Load task modules from all registered Django app configs.
     application.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
  4. Créer une tâche celery

     from celery import shared_task
     from celery.utils.log import get_task_logger
     from core.storages import public_storage
    
    
     def delete_public_file_delay(filepath, seconds=0, minutes=0, hours=0, days=0):
         delay = seconds + (minutes * 60) + (hours * 60**2) + (days * 24 * 60**2)
         delete_public_file.apply_async((filepath,), countdown=delay)
    
    
     @shared_task
     def delete_public_file(filepath):
         task_logger = get_task_logger('core')
         task_logger.info(f'[+] Delete file {filepath}')
         try:
             public_storage.delete(filepath)
         except Exception as e:
             task_logger.error(e)
    
         task_logger.info('\t... done')
    

Stamped headers


Commandes utiles

Celery

Redis