1. 기본 개념
가. 동작 흐름 with Celery
1) Broker 실행
•
Broker로 redis 선택
→ redis 실행 명령: redis-server
•
Broker로 rabbitmq 선택
→ rabbitmq 실행 명령: rabbitmq-server
2) Consumer 실행
•
Consumer(셀러리) 실행에 따른 Broker, Consumer 연동
→ 셀러리와 장고의 연동 설정에 따라 이하 <project> 부분에 입력할 키워드가 달라짐
→ 셀러리와 장고의 연동 설정에서 Celery 클래스의 매개변수로 전달한 패키지(or Python Directory)에 ‘config’를 전달한다면 이하 <project>에 config 값으로 명령 실행
→ 설정: app = Celery('config')
→ 명령어: celery -A <project> worker --loglevel=info
ex) celery -A config worker --loglevel=info
or celery -A FA worker --loglevel=info
•
결과: Consumer(Celery)와 Broker(RabbitMQ) 연결 완료
3) Producer 실행
비동기 작업은 HTTP Request 또는 Cron Job을 통해서도 실행 가능함
•
스크립트에 따른 비동기 작업 실행 명령어: python -m <task script>
ex) python -m test_celery.run_tasks
•
결과: 비동기 작업이 Broker를 거쳐 Consumer에서 실행 완료
2. Celery 기본 조작
가. Why Celery
1) MultiProcessing
•
싱글 코어 멀티쓰레딩 제약(GIL)을 극복하여 멀티 CPU 자원을 효율적으로 사용 가능
2) Easy to Scale Up
•
CPU 코어의 개수를 기본값으로 celery worker를 동작시키므로 scale up에 유연하게 대응 가능
•
celery worker의 수를 임의로 조절 가능
→ CPU 코어의 수의 2배 이상의 worker는 오히려 성능에 불리하다는 말도 있음
3) Easy to monitor
•
flower 활용하여 실시간 분산 비동기 작업 모니터링 가능
4) Processing Tasks with Priority
•
분산 비동기 처리 작업에 우선순위 부여하여 실행 가능
나. 설정
1) 환경설정
•
일반적으로 ‘celery.py’ 이하에 설정
•
셀리러 인스턴스 설정
→ broker: Broker 연결 주소, broker 종류
→ backend: Result Backend 연결 주소, 메시지 포맷
→ include: 비동기 작업 목록
2) 비동기 작업 정의
•
•
비동기 작업 정의
3) 설정 고도화
•
Auto discover tasks 설정
→ celery.py에서 autodiscover_tasks() 메소드 활용하여 설정
→ celery task 파일에서 app.task → shared_task로 변경
→ 예시 코드
•
settings.py로 celery 기본 설정 이동
→ celery.py에서 셀러리 관련 설정을 settings.py가 있는 경로에서 찾도록 수정
→ namespace로 설정한 값으로 시작되는 설정값을 setting.py에서 찾아서 등록
→ 위의 작업은 config_from_object() 메소드 활용하여 설정
→ 예시 코드
다. 기본 조작
1) apply_async
•
apply_async(args[, kwargs[, …]]): 비동기 작업에 대한 기본 실행 메소드
•
비동기 작업 실행에 따른 결과값을 받기 위해 .get 메소드 호출 필요
response = update_car_name.apply_async(kwargs={"car_id": car_id, "customer_id": customer_id})
result = response.get()
print(result) # {result: "success"}
Python
복사
•
기타 사용 예시
Quick Cheat Sheet
T.apply_async(countdown=10)
executes in 10 seconds from now.
T.apply_async(eta=now + timedelta(seconds=10))
executes in 10 seconds from now, specified using eta
T.apply_async(countdown=60, expires=120)
executes in one minute from now, but expires after 2 minutes.
T.apply_async(expires=now + timedelta(days=2))
expires in 2 days, set using datetime.
Python
복사
from celery document, https://docs.celeryproject.org/en/stable/userguide/calling.html
2) delay
•
delay(*args, **kwargs): 비동기 작업 단축 실행 메소드
→ .delay(*args, **kwargs) calls .apply_async(args, kwargs)
•
실행에 따른 결과값을 반환받을 수 없음
→ delay 명령으로 실행한 비동기 작업에서 특정 값을 리턴한다면 다음과 같은 에러 발생
Object of type AsyncResult is not JSON serializable
Plain Text
복사
3) ready
•
ready(): 비동기 작업 완료여부 확인 메소드
라. celery worker 실행
1) 기본 실행
•
Consumer(Celery Worker)
celery -A backend worker --loglevel=info
2) prefix 붙여서 실행
•
prefix 붙여서 task를 실행하는 경우, worker 실행 명령에 -Q 옵션으로 prefix를 지정해야 함
•
Consumer 실행: celery -A backend worker --loglevel=info -Q manager-api
→ prefix를 manager-api로 지정한 경우, -Q 옵션 다음에 해당 prefix 지정
3. Periodic Task
가. 설정
1) Django 연동
•
settings.py 내 INSTALLED_APPS 내 추가
•
migrate 명령 실행
2) timezone
•
일정에 따른 비동기 동작이므로 시간 설정 중요
CELERY_TIMEZONE = "Asia/Seoul"
CELERY_ENABLE_UTC = False
Python
복사
나. 기본 조작
1) crontab shecules
•
crontab 활용하여 일정 주기로 작업 시작하도록 설정
from celery.schedules import crontab
app.conf.beat_schedule = {
"destroy-all-cars": {
"task": "cars.tasks.destroy_cars",
"schedule": crontab(),
"args": (),
},
}
Python
복사
2) 기본 실행
Producer, Broker, Consumer 모두 실행
•
Producer(Celery Beat) 실행
celery beat 시작 명령: celery -A <project_name> beat
ex)celery -A backend beat or celery -A backend beat --loglevel=info
•
Broker(Redis Server) 실행
redis-server
•
Consumer(Celery Worker)
celery consumer 동작 명령: celery -A <project name> worker --loglevel=info
(기본) celery -A backend worker --loglevel=info
(queue prefix 활용 시) celery -A FA worker --loglevel=info -Q manager-api
4. AWS SQS 활용
가. 사전 작업
1) 패키지 설치
•
•
celery 4.1.1 이상 버전 설치 필요
→ 다음과 같은 이슈 존재, Celery AttributeError: async error
•
pycurl 설치 이슈: pip가 아니라 pycurl을 직접 설치 필요
→ 다음의 에러 발생에 따라 아래와 같은 방식으로 직접 설치
DeprecationWarning: setup.py install is deprecated. Use build and pip and other standards-based tools.
Plain Text
복사
→ 다음의 포스팅 참고하여 설치: 링크
brew install openssl
pip3 uninstall pycurl
PYCURL_SSL_LIBRARY=openssl LDFLAGS="-L/usr/local/opt/openssl/lib" CPPFLAGS="-I/usr/local/opt/openssl/include" pip3 install --no-cache-dir pycurl
Shell
복사
script from https://developpaper.com/installing-pycurl-on-mac/
2) AWS SQS 서비스 등록
•
AWS IAM 계정 생성
•
IAM 계정의 Access Key와 Secret Key 생성
•
SQS 서비스 등록
나. 설정
1) broker url
•
redis나 rabbitmq 설정과 같은 방식으로 broker url 설정
→ 예시 코드
2) result backend 설정
•
•
다음의 포스팅을 참고하여 django_celery_results 설정
→ 참고 링크
Reference
•
Basic Tutorial 참고(Original, English), https://tests4geeks.com/blog/python-celery-rabbitmq-tutorial/
•
Basic Tutorial 참고(번역 포스팅, 한글), https://kimdoky.github.io/tech/2019/01/23/celery-rabbitmq-tuto/
•
Intermediate Tutorial 참고, https://dontrepeatyourself.org/post/asynchronous-tasks-in-django-with-celery-and-rabbitmq/
•
Django, Celery 연동, https://whatisthenext.tistory.com/127
•
Django - AWS SQS
→ package 설치, 4654