개요
이전 포스트에서는 비동기 처리 시 데이터의 저장 구조에 대해 고민해보았다.
이번에는 저장된 데이터를 어떻게 하면 적절히 처리할 수 있을 지 알아보려고 한다.
최종적으로 결정되었던 데이터 저장 구조는 다음과 같다.
위 구조를 기반으로, 데이터를 처리하는데 어떤 부분을 고려해야할지 생각해보았다.
데이터 처리 구조
처리 사이클
메인 큐에서 N개의 데이터를 가져와 처리를 시작하는 구조를 처리 사이클
이라고 하자.
간략한 구조는 다음과 같다.
1
2
3
4
5
6
7
8
9
10
# 처리 프로세스
qdata_list = 데이터_가져오기_및_키값별_데이터_갯수_최신화()
for new_키네임 in qdata_list.keys():
if new_키네임 in 현재_추가된_키네임:
continue
future = 스레드풀.스레드_추가(처리_프로세스, new_키네임)
현재_추가된_future[future] = new_키네임
현재_추가된_키네임.add(new_키네임)
future.콜백추가(커스텀_콜백)
line 2: 데이터를 가져오는 작업을 한다.
line 4: 가져온 데이터에 대한 처리 작업을 시작한다.
line 5-6: 이미 처리가 예약된 키 값
이라면, 스레드 추가 작업을 건너뛴다.
line 7: 스레드에 해당 키 값
에 대한 처리 프로세스를 추가한다.
line 8-9: 현재 실행된 스레드를 관리하는 set과 dict에, 정보를 추가한다.
완료 callback에서는 future를 패러미터로 넘겨준다.
실제 완료된 키 값
을 알기 위해서 set과 dict 두가지를 사용했다.
line 10: 콜백을 추가한다.
처리 프로세스
1
2
3
4
5
6
7
8
9
10
11
12
13
14
def 처리_프로세스(키네임):
이벤트코드_함수_매퍼 = {
...
}
고객ID, 이벤트코드 = 키네임.split(":")
현재_데이터_개수 = 키값별_데이터갯수.get(키네임,0)
while 현재_데이터_개수 != 0:
처리용_데이터 = 데이터저장큐에서_데이터_가져오기(키네임)
이벤트코드_함수_매퍼[이벤트코드]
데이터저장큐에서_완료된_데이터_삭제하기(키네임)
키값별_데이터갯수[키네임] -= 1
처리 프로세스는 간단하다.
같은 키값에 대한 선입선출 처리를 위해, 처리할 데이터가 쌓여있다면 계속 실행하게 된다.
처리 구조의 문제점
이상적인 실행환경이라면 위 프로세스는 잘 작동할 것이다.
하지만 선입선출을 위한 처리 때문에 몇가지 문제가 발생하는데, 이에 대한 추가 수정을 해야한다.
일정 간격으로 계속 같은 이벤트가 발생하는 경우
여기서 말하는 같은 이벤트란, 같은 고객이 발생한 같은 이벤트를 의미한다.
즉, 동일한 키 값
에 대해 주기적으로 이벤트가 발생하는 경우이다.
해당 키 값
처리프로세스는 끝나지 못하고, 영원히 스레드 한 칸을 점유하게 되는 셈이다.
이런 문제를 해결하기 위해서 다음과 같은 정책을 세웠다.
- 같은
키 값
에 대한 데이터가 일정량 쌓이면 데이터를 더 가져오지 못하도록 lock을 건다. - 쌓인 데이터가 모두 소모되면 lock을 푼다.
이에 맞는 수정을 진행해보자.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 처리 프로세스
qdata_list = 데이터_가져오기()
# 데이터 limit 확인 및 처리 시작(limit=100)
메인큐_되돌아갈_목록 = []
처리할_목록 = []
for new_키네임 in qdata_list:
if new_키네임.locked:
메인큐_되돌아갈_목록.append(new_키네임)
else:
처리할_목록.append(new_키네임)
for new_키네임 in 메인큐_되돌아갈_목록:
메인큐로_복귀(new_키네임)
qdata_list = 처리할_목록
# 데이터 limit 확인 및 처리 종료
...
위와 같이 데이터를 가져온 후, 되돌릴 데이터를 처리하는 프로세스를 하나 더 추가하였다.
물론, 이에 따라 현재 데이터 갯수를 산정하는 코드도 추가로 필요하며, 스레드 종료 콜백에서 lock을 해제하는 코드도 추가가 필요하다.
이 부분은 실제 작성된 로직에 따라 달라지기 때문에, 추가하지 않았다.
메인큐 순환 문제
위와 같은 메인큐로 돌아가는 로직을 작성 시에는 추가적인 문제가 발생한다.
메인큐의 끝을 알 수 없어, 방금 들어간 데이터를 다시 가져오게 된다는 문제가 있다.
예를 들어 user_1:post-create
라는 키 값
에 lock이 걸린 상황이고, 메인 큐에는 user_1:post-create
만 50개가 남아있다고 가정해보자
비동기 프로세스는 user_1:post-create
데이터 50개를 가져온 뒤, 전부 메인 큐로 되돌린다. 이 작업은 user_1:post-create
의 lock이 풀릴 때까지 반복된다.
AWS SQS를 사용했다면 이런 문제를 어느정도 해결할 수 있지만, Redis에서는 큐의 끝을 찾는 방법을 발견하지 못했다.
이 문제는 다음 코드를 추가하는 것으로 해결했다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
### 전역 변수 <되돌아간_횟수>
if 되돌아간_횟수 > 100:
delay(60)
# 처리 프로세스
qdata_list = 데이터_가져오기()
# 데이터 limit 확인 및 처리 시작(limit=100)
메인큐_되돌아갈_목록 = []
처리할_목록 = []
for new_키네임 in qdata_list:
if new_키네임.locked:
메인큐_되돌아갈_목록.append(new_키네임)
else:
처리할_목록.append(new_키네임)
for new_키네임 in 메인큐_되돌아갈_목록:
메인큐로_복귀(new_키네임)
# 처리 사이클에서 가져오는 데이터 갯수 50
## 되돌아가는 데이터(반환률)가 80% 이상이라면
## 되돌아간_횟수 카운터를 증가
if len(메인큐_되돌아갈_목록) >= 40:
되돌아간_횟수 += 1
qdata_list = 처리할_목록
# 데이터 limit 확인 및 처리 종료
...
코드와 같이, 반환률이 80%가 넘을 때 마다 카운터를 증가시키고,
카운터가 100이 넘으면 60초 동안 대기하는 것으로 해결했다.
이는 추후에 더 적절한 큐인 SQS로 마이그레이션 할 때 해결해야 겠다고 판단했다.