개요
서비스에서 하나의 로직을 작성할 때면, 비동기로 작업을 수행해야 할 때가 있다.
일반적으로는 파이썬의 비동기 처리 방법에서 자주쓰이는 Celery
가 생각이 들 것이다.
하지만 이번에도 제약조건이 몇가지 있다.
첫번째, celery를 사용할 수 없다.
- celery worker의 추가 관리 소요에 의해, 당장 사용할 수 없었다.
두번째, 메시지 브로커는 Redis를 사용한다.
- 메시지 관리에는 RabbitMQ, AWS SQS 같은 좋은 메시지 브로커가 있지만 사용할 수 없었다.
두번째, 외부 서비스의 웹훅을 처리한다.
- 서비스 내부적으로 동작하는 기능이 아니라, 외부 서비스의 웹훅으로 들어온 데이터를 처리한다.
- 웹훅 정보 대부분이 최소한의 정보를 가지고 있으므로, 추가 API 호출이 필요하다.
추가 API
는 고객의 토큰을 기반으로 동작한다.동일한 토큰 & 동일한 엔드포인트
를 호출 시
1초에 2회라는 제한이 존재한다.
세번째, 웹훅은 최대한 선입선출로 처리해야 한다.
- 데이터의 일관성을 위해, 선입선출로 처리해야 한다.
- 서로 다른 고객의 웹훅이라면 병렬로 처리되어도 된다.
- 같은 고객의 웹훅이라도, 기능이 다르다면 병렬로 처리되어도 된다.
위와 같은 제한을 고려했을 때, 어떻게 하면 작업을 빠르게 수행할 수 있을지 생각해보았다.
웹훅 데이터를 받고 저장하는 인터페이스
API 작성
외부 서비스에 등록할 수 있는 엔드포인트는 하나였다.
대신 위에서 설명한 것과 같이 이벤트 코드
가 있기에, 이를 이용하여야 한다.
외부 서비스에서 설명하는 웹훅의 구조는 다음과 같다.
1
2
3
4
5
6
{
"code" : "post-create",
"data": {
...
}
}
여기서 웹훅에서 준 “data” 구조는 이벤트 코드마다 다르다.
그렇기에 이벤트 코드에 따라 다른 처리를 진행해야 한다.
이러한 점에서 웹훅의 처리는 가벼운 함수형 함수형 뷰(FBV: Function Based View)
가 적합하다고 판단하였다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
WEBHOOK_LISTENER = {
"post-create": post_create, # 게시글 저장 listener
"post-update": post_update, # 게시글 수정 listener
"post-delete": post_delete, # 게시글 삭제 listener
"comment-update": comment_create, # 댓글 저장 listener
"comment-update": comment_update, # 댓글 수정 listener
"comment-update": comment_delete, # 댓글 삭제 listener
}
@api_view(http_method_names=['POST'])
def webhook_main_view(request):
body_data = request.data
if 'code' not in body_data:
...
if 'data' not in body_data:
...
code = body_data.get('code', None)
if code not in WEBHOOK_LISTENER:
...
data = body_data.get('data')
return WEBHOOK_LISTENER[code](data)
상단의 WEBHOOK_LISTENER
는 이벤트 코드를 실질적인 저장 함수로 매핑하는 기능을 한다.
메인 함수에서는 데이터에 대해 예외처리를 해 두고, 매핑을 통해 필요한 저장 함수를 호출하도록 하였다.
저장 함수 구현
개요에서 밝힌 제약사항과 웹훅 정보를 볼 때, 각 이벤트를 독립적인 키로 구분하면, 다음과 같이 구분할 수 있다.
1
<고객ID>:<이벤트 코드>
그리고 위 정보를 키값으로 해서, 처리 데이터를 저장할 메인 큐를 이렇게 구성할 수 있다.
1
2
3
4
5
6
7
8
9
10
11
12
13
메인 큐
[
"user_1:post-create": <data1> ,
"user_1:post-create": <data2> ,
"user_3:post-create": <data3> ,
"user_2:post-create": <data4> ,
"user_1:post-create": <data5> ,
"user_1:post-create": <data6> ,
"user_2:post-create": <data7> ,
"user_3:post-create": <data8> ,
...
...
]
위 형태는 하나의 프로세스가 하나씩 처리한다면 가정하에, 적절한 저장구조일 것이다.
하지만 여기서 끝내는 것이 아니라 병렬 처리를 위한 고민이 더 필요하다.
쌓인 웹훅 데이터를 처리하는 프로세스는 다음과 같은 조건으로 작동해야 한다.
- 같은
키 값
에 대한 데이터는 선입선출로 처리되어야 한다. - 최대한 여러 스레드로 동작하여야 한다.
병럴 처리를 위한 고민
병렬 처리를 위해서는, 여러개의 데이터를 가져와서 동시에 여러개를 동작시키면 된다.
여기서는 편의상 멀티스레드를 사용하기로 했다.
위 예시 데이터를 전부 가져와서, 최대 4개의 스레드로 처리한다고 생각해보자.
같은 키 값`에 대해 선입선출을 해야한다는 점을 고려해야 한다.
그렇다면 실질적으로 처리하는 구조는 다음과 같이 될 것이다.
8개의 데이터 중 중복되지 않는 키값은 3개로, 4개의 스레드 중 3개가 켜진다.
하나의 스레드 놀고 있지만, 주어진 작업을 최대한 분배한 모습니다.
하지만 위의 예시는 이상적인 케이스이고, 추가 문제를 몇가지 고려해야 한다.
작업량 쏠림 문제
키 값
이 균등하게 분배되었다면 동시 작업이 원활하게 처리되었을 것이다. 그리고 다음 N개의 데이터를 불러와 다음 처리를 진행하게 된다.
하지만 실제로 들어오는 웹훅 데이터는 균등하게 들어올 것을 보장하지 않는다.
고객마다 활동률이 다르고, 특정 작업(예를 들어 post-create
)이 몰려있을 수도 있다.
예를 들면 이런 극단적인 경우도 나올 수 있다.
특정 작업의 과다 점유 문제
위에서 설명한 문제를 해결하기 위해, 스레드가 너무 비어있다면 다음 N개의 데이터를 불러올 수 있을 것이다.
하지만 다음으로 불러온 데이터도 user_1:post-create
50개라면 어떨까?
극단적인 케이스 중 하나는, user_1:post-create
수만개가 대기 중인 케이스였다.
스레드가 늘어날 때 까지 데이터를 가져왔다면 user_1:post-create
수만개를 가져오다가 메모리가 터졌을 것이다.
문제점을 고려한 저장함수 구현
위 문제점을 고려해서, 큐 구조를 다음과 같이 변경했다.
메인 큐는 각 작업의 키값만 존재한다.
그리고 각각의 키값에 대해, 처리해야할 데이터를 저장하는 큐가 별도로 존재한다.
추후 처리 함수에서 처리해야 하는 프로세스를 숫자형으로 관리하게 되면, 메모리 문제도 해결할 수 있는 셈이다.