기본 콘텐츠로 건너뛰기

Django + Celery + Redis로 비동기 작업 처리하기 (실전 예제 포함)

Django + Celery + Redis로 비동기 작업 처리하기 (실전 예제 포함)

안녕하세요! 오늘은 Django에서 Celery와 Redis를 사용하여 비동기 작업을 처리하는 방법에 대해 알아보겠습니다. 이 조합은 이메일 발송, 데이터 처리, 파일 변환 등 시간이 오래 걸리는 작업을 비동기적으로 처리할 때 매우 유용합니다.

왜 비동기 처리가 필요할까요?

웹 애플리케이션에서 다음과 같은 작업들은 사용자 경험을 해칠 수 있습니다:

  1. 대용량 파일 처리
  2. 이메일 발송
  3. 외부 API 호출
  4. 복잡한 데이터 분석
  5. 이미지 처리

이러한 작업들을 동기적으로 처리하면 사용자는 작업이 완료될 때까지 기다려야 합니다. 비동기 처리를 통해 이러한 문제를 해결할 수 있습니다.

프로젝트 설정

먼저 필요한 패키지들을 설치합니다:

pip install celery redis django-celery-results

Redis 설치 (Windows)

Windows에서는 Redis를 설치하기 위해 WSL2를 사용하거나, Docker를 사용하는 것을 추천드립니다:

docker run -d -p 6379:6379 redis

Django 설정

settings.py에 Celery 설정을 추가합니다:

# Celery 설정
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Seoul'

# Celery Beat 설정 (주기적 작업)
CELERY_BEAT_SCHEDULE = {
    'check-expired-items': {
        'task': 'your_app.tasks.check_expired_items',
        'schedule': 3600.0,  # 1시간마다 실행
    },
}

Celery 설정 파일 생성

프로젝트 루트에 celery.py 파일을 생성합니다:

import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project.settings')

app = Celery('your_project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

실전 예제: 이메일 발송 시스템

1. 기본 이메일 발송 태스크

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.conf import settings

@shared_task
def send_welcome_email(user_email, username):
    subject = '환영합니다!'
    message = f'안녕하세요 {username}님! 가입을 환영합니다.'
    from_email = settings.DEFAULT_FROM_EMAIL

    send_mail(
        subject,
        message,
        from_email,
        [user_email],
        fail_silently=False,
    )
    return f'이메일이 {user_email}로 발송되었습니다.'

2. 대용량 파일 처리 태스크

# tasks.py
import pandas as pd
from celery import shared_task
from django.core.files.storage import default_storage

@shared_task
def process_large_csv(file_path):
    try:
        # 파일 읽기
        with default_storage.open(file_path, 'rb') as f:
            df = pd.read_csv(f)

        # 데이터 처리
        processed_data = df.groupby('category').sum()

        # 결과 저장
        output_path = f'processed_{file_path}'
        with default_storage.open(output_path, 'w') as f:
            processed_data.to_csv(f)

        return f'파일 처리 완료: {output_path}'
    except Exception as e:
        return f'에러 발생: {str(e)}'

3. 외부 API 호출 태스크

# tasks.py
import requests
from celery import shared_task
from django.core.cache import cache

@shared_task
def fetch_weather_data(city):
    api_key = settings.WEATHER_API_KEY
    url = f'https://api.weatherapi.com/v1/current.json?key={api_key}&q={city}'

    try:
        response = requests.get(url)
        data = response.json()

        # 결과 캐싱
        cache_key = f'weather_{city}'
        cache.set(cache_key, data, timeout=3600)  # 1시간 캐시

        return data
    except Exception as e:
        return f'API 호출 실패: {str(e)}'

태스크 실행 방법

1. 뷰에서 태스크 호출

# views.py
from .tasks import send_welcome_email

def register_user(request):
    if request.method == 'POST':
        # 사용자 등록 로직...

        # 비동기 이메일 발송
        send_welcome_email.delay(user.email, user.username)

        return HttpResponse('등록 완료! 이메일이 발송됩니다.')

2. 관리자 명령어에서 태스크 호출

# management/commands/process_data.py
from django.core.management.base import BaseCommand
from your_app.tasks import process_large_csv

class Command(BaseCommand):
    def handle(self, *args, **options):
        result = process_large_csv.delay('data.csv')
        self.stdout.write(f'태스크 ID: {result.id}')

모니터링과 디버깅

1. Flower 설치

pip install flower

2. Flower 실행

celery -A your_project flower

3. 태스크 상태 확인

from your_app.tasks import send_welcome_email

# 태스크 실행
result = send_welcome_email.delay('user@example.com', 'John')

# 상태 확인
print(result.state)  # PENDING, STARTED, SUCCESS, FAILURE
print(result.get())  # 결과 확인

프로덕션 환경 설정

1. Supervisor 설정

[program:celery]
command=/path/to/venv/bin/celery -A your_project worker -l INFO
directory=/path/to/your/project
user=your_user
autostart=true
autorestart=true
stderr_logfile=/var/log/celery/err.log
stdout_logfile=/var/log/celery/out.log

[program:celerybeat]
command=/path/to/venv/bin/celery -A your_project beat -l INFO
directory=/path/to/your/project
user=your_user
autostart=true
autorestart=true
stderr_logfile=/var/log/celerybeat/err.log
stdout_logfile=/var/log/celerybeat/out.log

주의사항

  1. 에러 처리: 모든 태스크에는 적절한 예외 처리가 필요합니다.
  2. 리소스 관리: 메모리 사용량을 모니터링하고 제한하세요.
  3. 타임아웃 설정: 장시간 실행되는 태스크에는 타임아웃을 설정하세요.
  4. 재시도 정책: 실패한 태스크에 대한 재시도 정책을 설정하세요.

결론

Django + Celery + Redis 조합은 강력한 비동기 작업 처리 시스템을 제공합니다. 이 글에서 소개한 예제들을 기반으로 자신의 프로젝트에 맞는 비동기 작업을 구현해보세요. 추가적인 질문이나 궁금한 점이 있으시면 댓글로 남겨주세요!

댓글

이 블로그의 인기 게시물

Django에서 트랜잭션 관리하기

Django에서 트랜잭션 관리하기 안녕하세요! 오늘은 Django에서 데이터베이스 트랜잭션을 효과적으로 관리하는 방법에 대해 알아보겠습니다. 1. 트랜잭션의 중요성 트랜잭션은 데이터베이스의 일관성과 무결성을 보장하는 중요한 개념입니다. Django에서는 여러 가지 방법으로 트랜잭션을 관리할 수 있습니다. 1.1 기본 개념 원자성(Atomicity) : 트랜잭션은 모두 실행되거나 모두 실행되지 않아야 합니다. 일관성(Consistency) : 트랜잭션 전후로 데이터베이스의 일관성이 유지되어야 합니다. 격리성(Isolation) : 동시에 실행되는 트랜잭션들이 서로 영향을 주지 않아야 합니다. 지속성(Durability) : 완료된 트랜잭션의 결과는 영구적으로 저장되어야 합니다. 2. Django의 트랜잭션 관리 2.1 기본 설정 # settings.py DATABASES = { 'default': { 'ENGINE': 'django.db.backends.postgresql', 'NAME': 'mydatabase', 'USER': 'myuser', 'PASSWORD': 'mypassword', 'HOST': 'localhost', 'PORT': '5432', 'ATOMIC_REQUESTS': True, # 모든 뷰를 트랜잭션으로 래핑 } } 2.2 데코레이터 사용 from django.db import transaction @transaction.atomic def create_order(user, items): order = Order.objects.create(user=...

AWS S3 + CloudFront로 정적 파일 서빙 완전 가이드

AWS S3 + CloudFront로 정적 파일 서빙 완전 가이드 안녕하세요! 오늘은 AWS S3와 CloudFront를 사용하여 정적 파일을 효율적으로 서빙하는 방법에 대해 알아보겠습니다. 왜 S3와 CloudFront를 사용할까요? 높은 가용성 : AWS의 글로벌 인프라를 활용 빠른 전송 속도 : CloudFront의 CDN 기능으로 전 세계 사용자에게 빠른 전송 비용 효율성 : 사용한 만큼만 지불 보안 : AWS의 보안 기능 활용 확장성 : 트래픽 증가에 자동 대응 1. S3 버킷 설정 1.1 버킷 생성 및 설정 import boto3 def create_s3_bucket(): s3 = boto3.client('s3') # 버킷 생성 bucket_name = 'your-static-files-bucket' s3.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': 'ap-northeast-2' } ) # 버킷 정책 설정 bucket_policy = { "Version": "2012-10-17", "Statement": [ { "Sid": "PublicReadGetObject", "Effect": "Allow", "Principal": "*", "Action": "s3:GetObje...

Python에서 asyncio 완전 정복 (await, async, gather 등)

어휴, 요즘 파이썬으로 비동기 프로그래밍 하는 재미에 푹 빠졌어요! 특히 asyncio 는 정말 마법 같더라고요. 처음엔 좀 낯설었는데, 익숙해지니까 속도 향상이 눈에 띄게 느껴져서 완전 반해버렸습니다. 이 글에선 제가 asyncio 를 배우면서 깨달은 점들을 풀어놓을게요. 혹시 비동기 프로그래밍이 뭔지 잘 모르시겠다면, 간단히 말해 여러 작업을 동시에 처리해서 프로그램 속도를 엄청나게 높이는 기술이라고 생각하시면 돼요. 마치 여러 요리사가 동시에 음식을 만들어서 손님에게 빨리 제공하는 것과 비슷하죠! 일단 async 와 await 라는 녀석들이 핵심인데요, async 는 함수 앞에 붙여서 "얘는 비동기 함수야!"라고 선언하는 거예요. 그리고 await 는 다른 비동기 함수가 끝날 때까지 기다리라고 지시하는 역할을 하죠. 예를 들어, 네트워크에서 데이터를 가져오는 함수가 있다면, await 를 사용해서 데이터가 다 가져올 때까지 기다렸다가 다음 작업을 진행할 수 있어요. 그 동안 다른 작업을 처리할 수 있으니, 마치 멀티태스킹을 하는 것처럼 느껴져요. 신기하지 않나요? 그리고 asyncio.gather 는 여러 비동기 함수를 동시에 실행하고 결과를 모아주는 아주 유용한 친구입니다. 제가 웹사이트 여러 개에서 데이터를 동시에 가져와야 할 때 정말 요긴하게 썼어요. 하나씩 순서대로 가져오는 것보다 훨씬 빠르더라고요! 마치 여러 개의 탭을 동시에 열어놓고 작업하는 것과 같다고 생각하시면 될 것 같아요. 실제로 제가 썼던 코드를 보여드릴게요. 세 개의 웹사이트에서 데이터를 가져오는 예제인데요. (아래 코드 삽입) 이 코드를 보시면, fetch_data 함수가 각 웹사이트에서 데이터를 가져오는 역할을 하고, asyncio.gather 가 이 함수들을 동시에 실행하도록 도와주는 것을 볼 수 있을 거예요. asyncio.sleep(2) 는 네트워크 지연을 시뮬레이션하기 위해 넣...