Celery 란?
Celery 란 비동기 방식의 작업 큐로, Broker에게 받은 메시지 작업(Task)을 수행하는 Worker 이다.
👉 Celery를 사용하는 이유?
- 동기적으로 처리할 경우 오래 걸리는 메일 전송 / 대용량 파일 업로드 등의 Task를 비동기 처리하기 위함
- Worker들을 병렬적으로 실행 하여 동시에 처리도 가능
일반적으로 비동기 이벤트 기반 처리의 구조는 Producer (Server Framework) => Broker (RabbitMQ, Redis 등) => Consumer (Celery) 로 이루어지는데, Celery는 Consumer 역할을 수행한다.
👉 Broker
Broker로는 일반적으로 RabbitMQ(Message Queue)를 많이 사용하고, Redis와 같은 Pub/Sub 형태도 사용 가능하다. MQ 관련 내용은 해당 링크에서 정리한 적이 있다.
https://yubi5050.tistory.com/224
Celery 사용 예제 (by. FastAPI)
👉 1. Settings
Celery를 설치하고, 만약 환경이 Window라면 gevnt도 설치해준다.
$ pip install Celery
$ pip install Redis # if use redis
$ pip install gevent # if use window
👉 2. Broker(Redis, RabbitMQ) up
# use redis docker
$ docker run --name my-redis -d redis
# use rabbitmq docker
$ docker run --name rabbitmq-container -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:management
👉 3. app.py
celery_worker.py의 add_func 함수를 celery로 적용하겠다는 로직이다.
result.ready()는 현재 응답이 왔는지를 확인하는 것이고, celery로 복수의 인자를 넘겨줄 때는 tuple이나 list 형식으로 넘겨 주어야 한다.
from fastapi import FastAPI
import celery_worker
import uvicorn
import time
app = FastAPI()
@app.get("/worker/add")
async def worker_start(task_id: str, num_1 : int, num_2: int):
result = celery_worker.add_func.apply_async([num_1, num_2], task_id=task_id)
print(result.ready()) # False
time.sleep(3)
print(result.ready()) # True
return {"message": "celery start"}
if __name__ == '__main__':
uvicorn.run('app:app', reload=True)
👉 4. celery_worker.py
해당 예제에서는 rabbitmq나 redis 둘다 Broker로 사용 가능하도록 로직을 구성했다.
아래 add_func이 celery의 task로 등록되어 있는 것을 확인 할 수 있다.
import time
from celery import Celery
mode = 'rabbitmq'
# mode = 'redis'
if mode == 'rabbitmq':
celery_task = Celery(
'celery_app',
broker="pyamqp://guest@127.0.0.1:5672",
backend="rpc://",
# include=['celery_worker']
)
else: # redis
celery_task = Celery(
'celery_app',
broker="redis://localhost:6379",
backend="redis://localhost:6379",
# include=['celery_worker'] # celery_worker.py
)
@celery_task.task
def add_func(x, y):
time.sleep(2)
return x + y
👉 5. FastAPI 서버 실행 & Celery 실행
$ uvicorn app:app --reload
$ celery -A celery_app worker -l info -P gevent
👉 6. API Test 및 결과
# GET Method
$ http://127.0.0.1:8000/worker/add?task_id=smkim&num_1=4&num_2=2
아래와 같이 Fast API 서버에 GET 요청이 도착하고
Celery가 실행되면서 Broker(RabbitMQ)의 Port인 5672로 Connection을 완료하고, 전달 받은 Task를 수행한다.
참고 문헌
- https://ordilog.tistory.com/10 // [celery-django] 1. 순수 Celery 예제
- https://velog.io/@jaeyoung0509/celery-rabbitmq // celery + rabbitmq
- https://velog.io/@ssssujini99/Django-Celery-RabbitMQ-%EB%B6%84%EC%82%B0-%EB%B9%84%EB%8F%99%EA%B8%B0-%EC%9E%91%EC%97%85-%EC%88%98%ED%96%89%EC%9D%84-%EC%9C%84%ED%95%9C-Celery // [Django + Celery + RabbitMQ] 분산 비동기 작업 수행을 위한 Celery
- https://stackoverflow.com/questions/39815771/how-to-combine-celery-with-asyncio // How to combine Celery with asyncio?
'DB > 기본 실습' 카테고리의 다른 글
[DB] MongoDB Docker + Robo3T 셋팅 (0) | 2022.11.13 |
---|---|
[PostgreSQL, Django] PostgreSQL 셋팅 (Local, Docker) (0) | 2022.07.14 |
[DB] AWS EC2에서 Sqlite3 DB 확인 (0) | 2022.06.13 |
[DB] Django 연동 DB 관련 Tools (Connector, DBMS) (0) | 2022.05.30 |
팀 공용 MongoDB-Atlas-Robo 3T 셋팅 (0) | 2022.05.04 |