Search

MQ & Django

1. 기본 개념

가. 동작 흐름 with Celery

1) Broker 실행

Broker로 redis 선택
→ redis 실행 명령: redis-server
Broker로 rabbitmq 선택
→ rabbitmq 실행 명령: rabbitmq-server

2) Consumer 실행

Consumer(셀러리) 실행에 따른 Broker, Consumer 연동
→ 셀러리와 장고의 연동 설정에 따라 이하 <project> 부분에 입력할 키워드가 달라짐
→ 셀러리와 장고의 연동 설정에서 Celery 클래스의 매개변수로 전달한 패키지(or Python Directory)에 ‘config’를 전달한다면 이하 <project>에 config 값으로 명령 실행
→ 설정: app = Celery('config')
→ 명령어: celery -A <project> worker --loglevel=info
ex) celery -A config worker --loglevel=info
or celery -A FA worker --loglevel=info
결과: Consumer(Celery)와 Broker(RabbitMQ) 연결 완료

3) Producer 실행

비동기 작업은 HTTP Request 또는 Cron Job을 통해서도 실행 가능함
스크립트에 따른 비동기 작업 실행 명령어: python -m <task script>
ex) python -m test_celery.run_tasks
결과: 비동기 작업이 Broker를 거쳐 Consumer에서 실행 완료

2. Celery 기본 조작

가. Why Celery

1) MultiProcessing

싱글 코어 멀티쓰레딩 제약(GIL)을 극복하여 멀티 CPU 자원을 효율적으로 사용 가능

2) Easy to Scale Up

CPU 코어의 개수를 기본값으로 celery worker를 동작시키므로 scale up에 유연하게 대응 가능
celery worker의 수를 임의로 조절 가능
→ CPU 코어의 수의 2배 이상의 worker는 오히려 성능에 불리하다는 말도 있음

3) Easy to monitor

flower 활용하여 실시간 분산 비동기 작업 모니터링 가능

4) Processing Tasks with Priority

분산 비동기 처리 작업에 우선순위 부여하여 실행 가능

나. 설정

1) 환경설정

일반적으로 ‘celery.py’ 이하에 설정
셀리러 인스턴스 설정
→ broker: Broker 연결 주소, broker 종류
→ backend: Result Backend 연결 주소, 메시지 포맷
→ include: 비동기 작업 목록

2) 비동기 작업 정의

일반적으로 tasks 디렉토리 또는 tasks.py 이하에 정의
비동기 작업 정의

3) 설정 고도화

Auto discover tasks 설정
→ celery.py에서 autodiscover_tasks() 메소드 활용하여 설정
→ celery task 파일에서 app.task → shared_task로 변경
settings.py로 celery 기본 설정 이동
→ celery.py에서 셀러리 관련 설정을 settings.py가 있는 경로에서 찾도록 수정
→ namespace로 설정한 값으로 시작되는 설정값을 setting.py에서 찾아서 등록
→ 위의 작업은 config_from_object() 메소드 활용하여 설정

다. 기본 조작

1) apply_async

apply_async(args[, kwargs[, …]]): 비동기 작업에 대한 기본 실행 메소드
비동기 작업 실행에 따른 결과값을 받기 위해 .get 메소드 호출 필요
response = update_car_name.apply_async(kwargs={"car_id": car_id, "customer_id": customer_id}) result = response.get() print(result) # {result: "success"}
Python
복사
기타 사용 예시
Quick Cheat Sheet T.apply_async(countdown=10) executes in 10 seconds from now. T.apply_async(eta=now + timedelta(seconds=10)) executes in 10 seconds from now, specified using eta T.apply_async(countdown=60, expires=120) executes in one minute from now, but expires after 2 minutes. T.apply_async(expires=now + timedelta(days=2)) expires in 2 days, set using datetime.
Python
복사
from celery document, https://docs.celeryproject.org/en/stable/userguide/calling.html

2) delay

delay(*args, **kwargs): 비동기 작업 단축 실행 메소드
→ .delay(*args, **kwargs) calls .apply_async(args, kwargs)
실행에 따른 결과값을 반환받을 수 없음
→ delay 명령으로 실행한 비동기 작업에서 특정 값을 리턴한다면 다음과 같은 에러 발생
Object of type AsyncResult is not JSON serializable
Plain Text
복사

3) ready

ready(): 비동기 작업 완료여부 확인 메소드

라. celery worker 실행

1) 기본 실행

Consumer(Celery Worker)
celery -A backend worker --loglevel=info

2) prefix 붙여서 실행

prefix 붙여서 task를 실행하는 경우, worker 실행 명령에 -Q 옵션으로 prefix를 지정해야 함
Consumer 실행: celery -A backend worker --loglevel=info -Q manager-api
→ prefix를 manager-api로 지정한 경우, -Q 옵션 다음에 해당 prefix 지정

3. Periodic Task

가. 설정

1) Django 연동

settings.py 내 INSTALLED_APPS 내 추가
migrate 명령 실행

2) timezone

일정에 따른 비동기 동작이므로 시간 설정 중요
CELERY_TIMEZONE = "Asia/Seoul" CELERY_ENABLE_UTC = False
Python
복사

나. 기본 조작

1) crontab shecules

crontab 활용하여 일정 주기로 작업 시작하도록 설정
from celery.schedules import crontab app.conf.beat_schedule = { "destroy-all-cars": { "task": "cars.tasks.destroy_cars", "schedule": crontab(), "args": (), }, }
Python
복사

2) 기본 실행

Producer, Broker, Consumer 모두 실행
Producer(Celery Beat) 실행
celery beat 시작 명령: celery -A <project_name> beat
ex)celery -A backend beat or celery -A backend beat --loglevel=info
Broker(Redis Server) 실행
redis-server
Consumer(Celery Worker)
celery consumer 동작 명령: celery -A <project name> worker --loglevel=info
(기본) celery -A backend worker --loglevel=info
(queue prefix 활용 시) celery -A FA worker --loglevel=info -Q manager-api

4. AWS SQS 활용

가. 사전 작업

1) 패키지 설치

celery 4.1.1 이상 버전 설치 필요
→ 다음과 같은 이슈 존재, Celery AttributeError: async error
pycurl 설치 이슈: pip가 아니라 pycurl을 직접 설치 필요
→ 다음의 에러 발생에 따라 아래와 같은 방식으로 직접 설치
DeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools.
Plain Text
복사
→ 다음의 포스팅 참고하여 설치: 링크
brew install openssl pip3 uninstall pycurl PYCURL_SSL_LIBRARY=openssl LDFLAGS="-L/usr/local/opt/openssl/lib" CPPFLAGS="-I/usr/local/opt/openssl/include" pip3 install --no-cache-dir pycurl
Shell
복사
script from https://developpaper.com/installing-pycurl-on-mac/

2) AWS SQS 서비스 등록

AWS IAM 계정 생성
AWS 실무 활용의 ‘IAM’ 참고
IAM 계정의 Access Key와 Secret Key 생성
SQS 서비스 등록

나. 설정

1) broker url

redis나 rabbitmq 설정과 같은 방식으로 broker url 설정

2) result backend 설정

Third party package 활용하여 result backend 설정
→ AWS SQS는 result backend를 지원하지 않음
다음의 포스팅을 참고하여 django_celery_results 설정

Reference

Basic Tutorial 참고(Original, English), https://tests4geeks.com/blog/python-celery-rabbitmq-tutorial/
Basic Tutorial 참고(번역 포스팅, 한글), https://kimdoky.github.io/tech/2019/01/23/celery-rabbitmq-tuto/