DB/기본 실습

[Celery] Celery 비동기 작업 큐 예제 (by. FastAPI)

yubi5050 2022. 11. 29. 06:48

 

Celery 란?

Celery 란 비동기 방식의 작업 큐로, Broker에게 받은 메시지 작업(Task)을 수행하는 Worker 이다.

 

 

👉 Celery를 사용하는 이유?

  • 동기적으로 처리할 경우 오래 걸리는 메일 전송 / 대용량 파일 업로드 등의 Task를 비동기 처리하기 위함
  • Worker들을 병렬적으로 실행 하여 동시에 처리도 가능

일반적으로 비동기 이벤트 기반 처리의 구조는 Producer (Server Framework) => Broker (RabbitMQ, Redis 등) => Consumer (Celery) 로 이루어지는데, Celery는 Consumer 역할을 수행한다.

 

👉 Broker

Broker로는 일반적으로 RabbitMQ(Message Queue)를 많이 사용하고, Redis와 같은 Pub/Sub 형태도 사용 가능하다. MQ 관련 내용은 해당 링크에서 정리한 적이 있다.

https://yubi5050.tistory.com/224

 

[MQ] 메시지 지향 미들웨어 (by. RabbitMQ, Redis)

메시지 지향 미들웨어(Message Oriented Middleware, MOM) 란? 메시지 지향 미들웨어(Message Oriented Middleware, MOM)이란, 응용 프로세스 간 비동기 방식의 데이터 통신을 통해, 메시지를 전달 해주는 시스템을

yubi5050.tistory.com

 

Celery 사용 예제 (by. FastAPI)

👉 1. Settings

Celery를 설치하고, 만약 환경이 Window라면 gevnt도 설치해준다.

$ pip install Celery
$ pip install Redis # if use redis
$ pip install gevent # if use window

 

👉 2. Broker(Redis, RabbitMQ) up

# use redis docker
$ docker run --name my-redis -d redis

# use rabbitmq docker
$ docker run --name rabbitmq-container -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:management

 

👉 3. app.py

celery_worker.py의 add_func 함수를 celery로 적용하겠다는 로직이다. 

result.ready()는 현재 응답이 왔는지를 확인하는 것이고, celery로 복수의 인자를 넘겨줄 때는 tuple이나 list 형식으로 넘겨 주어야 한다.

from fastapi import FastAPI
import celery_worker
import uvicorn
import time

app = FastAPI()

@app.get("/worker/add")
async def worker_start(task_id: str, num_1 : int, num_2: int):
    result = celery_worker.add_func.apply_async([num_1, num_2], task_id=task_id)
    print(result.ready()) # False
    time.sleep(3)
    print(result.ready()) # True
    return {"message": "celery start"}

if __name__ == '__main__':
    uvicorn.run('app:app', reload=True)

 

👉 4. celery_worker.py

해당 예제에서는 rabbitmq나 redis 둘다 Broker로 사용 가능하도록 로직을 구성했다. 

아래 add_func이 celery의 task로 등록되어 있는 것을 확인 할 수 있다.

import time
from celery import Celery

mode = 'rabbitmq'
# mode = 'redis'
if mode == 'rabbitmq':
    celery_task = Celery(
        'celery_app',
        broker="pyamqp://guest@127.0.0.1:5672",
        backend="rpc://",
        # include=['celery_worker']
    )
     
else: # redis
    celery_task = Celery(
        'celery_app',
        broker="redis://localhost:6379",
        backend="redis://localhost:6379",
        # include=['celery_worker'] # celery_worker.py
    )

@celery_task.task
def add_func(x, y):
    time.sleep(2)
    return x + y

 

👉  5. FastAPI 서버 실행 & Celery 실행

$ uvicorn app:app --reload
$ celery -A celery_app worker -l info -P gevent

 

👉  6. API Test 및 결과

# GET Method
$ http://127.0.0.1:8000/worker/add?task_id=smkim&num_1=4&num_2=2

 

아래와 같이 Fast API 서버에 GET 요청이 도착하고

 

 

Celery가 실행되면서 Broker(RabbitMQ)의 Port인 5672로 Connection을 완료하고, 전달 받은 Task를 수행한다.

 

 

참고 문헌