Django + Celery + Redis로 비동기 작업 처리하기 (실전 예제 포함)
안녕하세요! 오늘은 Django에서 Celery와 Redis를 사용하여 비동기 작업을 처리하는 방법에 대해 알아보겠습니다. 이 조합은 이메일 발송, 데이터 처리, 파일 변환 등 시간이 오래 걸리는 작업을 비동기적으로 처리할 때 매우 유용합니다.
왜 비동기 처리가 필요할까요?
웹 애플리케이션에서 다음과 같은 작업들은 사용자 경험을 해칠 수 있습니다:
- 대용량 파일 처리
- 이메일 발송
- 외부 API 호출
- 복잡한 데이터 분석
- 이미지 처리
이러한 작업들을 동기적으로 처리하면 사용자는 작업이 완료될 때까지 기다려야 합니다. 비동기 처리를 통해 이러한 문제를 해결할 수 있습니다.
프로젝트 설정
먼저 필요한 패키지들을 설치합니다:
pip install celery redis django-celery-results
Redis 설치 (Windows)
Windows에서는 Redis를 설치하기 위해 WSL2를 사용하거나, Docker를 사용하는 것을 추천드립니다:
docker run -d -p 6379:6379 redis
Django 설정
settings.py에 Celery 설정을 추가합니다:
# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'
# Celery Beat 설정 (주기적 작업)
CELERY_BEAT_SCHEDULE = {
'check-expired-items': {
'task': 'your_app.tasks.check_expired_items',
'schedule': 3600.0, # 1시간마다 실행
},
}
Celery 설정 파일 생성
프로젝트 루트에 celery.py 파일을 생성합니다:
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')
app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
실전 예제: 이메일 발송 시스템
1. 기본 이메일 발송 태스크
# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings
@shared_task
def send_welcome_email(user_email, username):
subject = '환영합니다!'
message = f'안녕하세요 {username}님! 가입을 환영합니다.'
from_email = settings.DEFAULT_FROM_EMAIL
send_mail(
subject,
message,
from_email,
[user_email],
fail_silently=False,
)
return f'이메일이 {user_email}로 발송되었습니다.'
2. 대용량 파일 처리 태스크
# tasks.py
import pandas as pd
from celery import shared_task
from django.core.files.storage import default_storage
@shared_task
def process_large_csv(file_path):
try:
# 파일 읽기
with default_storage.open(file_path, 'rb') as f:
df = pd.read_csv(f)
# 데이터 처리
processed_data = df.groupby('category').sum()
# 결과 저장
output_path = f'processed_{file_path}'
with default_storage.open(output_path, 'w') as f:
processed_data.to_csv(f)
return f'파일 처리 완료: {output_path}'
except Exception as e:
return f'에러 발생: {str(e)}'
3. 외부 API 호출 태스크
# tasks.py
import requests
from celery import shared_task
from django.core.cache import cache
@shared_task
def fetch_weather_data(city):
api_key = settings.WEATHER_API_KEY
url = f'https://api.weatherapi.com/v1/current.json?key={api_key}&q={city}'
try:
response = requests.get(url)
data = response.json()
# 결과 캐싱
cache_key = f'weather_{city}'
cache.set(cache_key, data, timeout=3600) # 1시간 캐시
return data
except Exception as e:
return f'API 호출 실패: {str(e)}'
태스크 실행 방법
1. 뷰에서 태스크 호출
# views.py
from .tasks import send_welcome_email
def register_user(request):
if request.method == 'POST':
# 사용자 등록 로직...
# 비동기 이메일 발송
send_welcome_email.delay(user.email, user.username)
return HttpResponse('등록 완료! 이메일이 발송됩니다.')
2. 관리자 명령어에서 태스크 호출
# management/commands/process_data.py
from django.core.management.base import BaseCommand
from your_app.tasks import process_large_csv
class Command(BaseCommand):
def handle(self, *args, **options):
result = process_large_csv.delay('data.csv')
self.stdout.write(f'태스크 ID: {result.id}')
모니터링과 디버깅
1. Flower 설치
pip install flower
2. Flower 실행
celery -A your_project flower
3. 태스크 상태 확인
from your_app.tasks import send_welcome_email
# 태스크 실행
result = send_welcome_email.delay('user@example.com', 'John')
# 상태 확인
print(result.state) # PENDING, STARTED, SUCCESS, FAILURE
print(result.get()) # 결과 확인
프로덕션 환경 설정
1. Supervisor 설정
[program:celery]
command=/path/to/venv/bin/celery -A your_project worker -l INFO
directory=/path/to/your/project
user=your_user
autostart=true
autorestart=true
stderr_logfile=/var/log/celery/err.log
stdout_logfile=/var/log/celery/out.log
[program:celerybeat]
command=/path/to/venv/bin/celery -A your_project beat -l INFO
directory=/path/to/your/project
user=your_user
autostart=true
autorestart=true
stderr_logfile=/var/log/celerybeat/err.log
stdout_logfile=/var/log/celerybeat/out.log
주의사항
- 에러 처리: 모든 태스크에는 적절한 예외 처리가 필요합니다.
- 리소스 관리: 메모리 사용량을 모니터링하고 제한하세요.
- 타임아웃 설정: 장시간 실행되는 태스크에는 타임아웃을 설정하세요.
- 재시도 정책: 실패한 태스크에 대한 재시도 정책을 설정하세요.
결론
Django + Celery + Redis 조합은 강력한 비동기 작업 처리 시스템을 제공합니다. 이 글에서 소개한 예제들을 기반으로 자신의 프로젝트에 맞는 비동기 작업을 구현해보세요. 추가적인 질문이나 궁금한 점이 있으시면 댓글로 남겨주세요!

댓글
댓글 쓰기