모임장소 찾기 플랫폼에서 지하철 역 주변 상권정보 수집 및 활용 역할을 맡음
카카오 맵 API를 사용.
스케줄러를 이용해 매주 월요일 새벽 3시에 상권 정보를 갱신한다.
전국 약 800여개 역 주변 상권 정보를 갱신하다보니 시간이 굉장히 오래 걸렸다. (1137초)
비동기를 적용하여 이를 개선한 내용을 기록해두려 한다.


🧚 용어 정리
1) 비동기
비동기 함수는 일반 동기 함수를 호출하듯 호출할 수 없으며 `await` 키워드와 함께 비동기 함수를 호출해야 한다.
파이썬에서 일반 `def`으로 함수를 정의하면 그건 무조건 동기 함수이다.
비동기 처리는 정확한 순서가 보장되지 않는다. 대신 cpu를 놀리지 않고 불필요한 지연 없이 여러 요청을 동시에 실행할 수 있다.


2) 코루틴(Coroutine)
코루틴은 함수의 실행을 중단했다가 다시 시작할 수 있는 구조를 말한다. 쉽게 말해서, 파이썬에서는 비동기 함수를 코루틴이라고 부른다.
`async` 키워드로 정의되며, `await` 키워드를 사용하여 다른 코루틴이 완료될 때까지 기다리지 않도록 명시할 수 있다.


참고
https://www.youtube.com/watch?v=yOtXl8cW-ag
https://tech.buzzvil.com/blog/asyncio-no-1-coroutine-and-eventloop/
3) 이벤트 루프(Eventloop)
Eventloop는 코루틴을 실행하고 관리하는 루프이다. 파이썬의 Eventloop는 단일 스레드에서 동작한다.
Eventloop는 실행 가능한 모든 코루틴이 완료될 때까지 계속 돌아가며, `await` 키워드가 있는 곳에서 작업을 멈추고 다른 작업을 실행해 효율적인 동시 처리를 가능하게 한다.
4) Future 객체, Task 객체
계층 구조를 확인해보면 아래와 같다.

`await` 키워드를 붙일 수 있는 최소 조건이 Awaitable이다. 그래서 3가지 모두 await으로 실행이 끝나길 기다릴 수 있다. 하지만 코루틴은 Eventloop에 등록되지 않으면 실행되지 않기 때문에, `await`을 붙이거나 Future나 Task로 감싸야 한다.
🧚 Asyncio? Aiohttp?
1) Asyncio
단일 스레드에서 여러 요청을 동시에 처리할 수 있도록 해주는 파이썬의 비동기 라이브러리이다. 이벤트 루프를 사용하여 단일 스레드에서 비동기적으로 작업을 처리한다. `await` 키워드를 사용해 특정 작업이 완료될 때까지 기다리지 않고, 다른 작업을 처리하도록 하여 전체적인 응답 속도를 개선할 수 있다.
전통적으로 동시 프로그래밍(concurrent programming)은 여러 개의 스레드를 활용하여 이루어졌는데, 여러 개의 스레드를 사용하는 방식은 몇 가지 측면에서 까다로울 수 있다. 우선, thread safe한 프로그램을 작성하는 것은 생각보다 쉽지 않다. 또, 싱글 코어 프로세서에서 이러한 프로그램을 돌리면 동시 처리 성능 향상이 크지 않으며, 오히려 성능이 저하될 가능성도 있다.
2) Aiohttp
`aiohttp`는 `asyncio`를 위한 http 서버/클라이언트 요청을 수행하는 라이브러리이다. 파이썬 `request`가 동기로 요청을 처리하기 때문에 비동기로 http 요청을 처리하기 위해서는 `aiohttp`가 필요하다.
🧚 Asyncio 문법
1) `asyncio.gather()`
`asyncio.gather()` 함수를 사용하면 파이썬의 이벤트 루프가 세 개의 함수를 알아서 스케줄하여 비동기로 호출해준다.
`asyncio.gather(*task)`와 같이 리스트를 언패킹해서 넣어준다. `asyncio.gather()`도 코루틴이므로 `await`로 실행한 뒤 결과를 가져온다.
2) `async with`
클래스나 함수를 비동기로 처리한 뒤 결과를 반환하는 문법이다.
`with` 다음에 클래스의 인스턴스를 지정하고, `as` 뒤에 결과를 지정할 변수를 지정한다. 파이썬 3.5 이상부터 사용 가능하다.
3) `async for`
비동기로 반복하는 문법이다.
🧚 코드 예시
상권 정보 수집 메인 함수
`crawling_places()` 함수는 각 지하철역과 키워드에 대해 비동기적으로 상권 정보를 수집한다. 큰 흐름은 아래와 같다.
1. 비동기 처리를 위한 세션(session) 생성
2. session을 이용해 task 생성
3. task로 작업을 async로 실행
`asyncio.create_task()`를 사용하여 각 지하철역과 키워드 조합에 대한 `process_station()` 작업을 비동기적으로 실행하는 태스크를 만든다. `asyncio.gather(*task)`로 모든 태스크를 동시에 실행한다.
async def create_session():
return aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
async def crawling_places():
keywords = ['스터디룸', '식당', '카페', '호프', '헬스클럽', '도서관', '공원', '미술관', '애견카페', '셀프사진']
all_places = []
session = await create_session() # 비동기 세션 생성
tasks = []
for keyword in keywords:
for station_name in df['C1']:
center_lon, center_lat = get_station_info(station_name)
if center_lon is None or center_lat is None:
print(f"Coordinates for {station_name} not found.")
continue
task = asyncio.create_task(process_station(session, keyword, center_lat, center_lon, station_name, all_places))
tasks.append(task)
await asyncio.gather(*tasks) # 모든 태스크를 동시에 실행
await session.close() # 세션 닫기
return all_places
API 요청을 보내고 응답을 파싱
Semaphore는 동시 요청 수를 제한하는 비동기 동기화 메커니즘이다. 여기서는 `sem`을 5로 설정하여 카카오 API의 QPM(Query Per Minute) 제한을 초과하지 않도록 한다.
sem = asyncio.Semaphore(5) # 동시 요청 수를 5로 제한하여 QPM 제한을 방지
...
# 각 지하철역에 대해 API 요청을 처리하는 비동기 작업
async def process_station(session, keyword, center_lat, center_lon, station_name, all_places):
page = 1
while True:
async with sem: # 세마포어를 사용하여 동시 요청 수 제한
data = await fetch_places_from_api(session, keyword, center_lat, center_lon, page)
if not data or not data['documents']:
break
places = parse_places(data, station_name, keyword)
all_places.extend(places)
if data.get('meta', {}).get('is_end'):
break
page += 1
await asyncio.sleep(1) # 요청 사이에 대기 시간 추가
🧚 전체 코드
import os
import pandas as pd
from config import KAKAO_REST_API_KEY, KAKAO_SEARCH_KEYWORD_URL
import aiohttp
import asyncio
file_path = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'subwayinfo.csv')
df = pd.read_csv(file_path, header=None, names=['C1', 'C2', 'C3', 'C4', 'C5'])
# 전역 세마포어 생성 (동시 요청 수 제한)
# 동시 요청 수를 5로 제한
sem = asyncio.Semaphore(5) # 이 이상 설정하면 카카오톡 QPM(query per minute) 제한 걸림
# 전역에서 세션을 생성
async def create_session():
# aiohttp ClientSession 생성 및 타임아웃 설정
return aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
# 모든 지하철역에 대해 상권정보 요청 및 파싱
async def crawling_places():
keywords = ['스터디룸', '식당', '카페', '호프', '헬스클럽', '도서관', '공원', '미술관', '애견카페', '셀프사진']
all_places = []
session = await create_session()
tasks = []
for keyword in keywords:
for station_name in df['C1']:
center_lon, center_lat = get_station_info(station_name)
if center_lon is None or center_lat is None:
print(f"Coordinates for {station_name} not found.")
continue
# 각 요청을 비동기적으로 처리
task = asyncio.create_task(process_station(session, keyword, center_lat, center_lon, station_name, all_places))
tasks.append(task)
await asyncio.gather(*tasks)
await session.close()
return all_places
# 각 지하철역에 대해 API 요청을 처리하는 비동기 작업
async def process_station(session, keyword, center_lat, center_lon, station_name, all_places):
page = 1
while True:
async with sem: # 세마포어를 사용하여 동시 요청 수 제한
data = await fetch_places_from_api(session, keyword, center_lat, center_lon, page)
if not data or not data['documents']:
break
places = parse_places(data, station_name, keyword)
all_places.extend(places)
if data.get('meta', {}).get('is_end'):
break
page += 1
await asyncio.sleep(1) # 요청 사이에 대기 시간 추가
# 지하철 이름으로 위/경도 반환
def get_station_info(station_name):
try:
# 첫번째로 만나는 역 정보 반환
station_row = df[df['C1'] == station_name].iloc[0]
return station_row['C3'], station_row['C4']
except IndexError:
return None, None
# 카카오 API로 특정 지하철역에 대한 상권정보 요청
async def fetch_places_from_api(session, keyword, center_lat, center_lon, page):
# 요청 헤더
headers = {
'Authorization': f'KakaoAK {KAKAO_REST_API_KEY}'
}
params = {
'query': keyword,
'x': center_lat,
'y': center_lon,
'radius': 1000,
'size': 15,
'page': page
}
max_retries = 5
for attempt in range(max_retries): # 재시도 로직
try:
async with session.get(KAKAO_SEARCH_KEYWORD_URL, headers=headers, params=params) as response:
if response.status == 200:
return await response.json()
elif response.status == 429:
print(f'Rate limit exceeded, retrying in {2 ** attempt} seconds...')
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
print(f'Failed with status {response.status}')
return None
except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionResetError) as e:
print(f'Network error: {e}, retrying in {2 ** attempt} seconds...')
await asyncio.sleep(2 ** attempt) # Exponential backoff
return None
# 카카오 API 응답을 파싱하여 장소 정보 추출
def parse_places(data, station_name, keyword):
places = []
for place in data['documents']:
if not place.get('id'): # id가 비어있으면 건너뛰기
continue
place_info = {
'station_name': station_name,
'name': place['place_name'],
'id': place['id'],
'category': keyword,
'road_address': place['road_address_name'],
'address': place['address_name'],
'phone': place.get('phone', '정보 없음'),
'latitude': place['y'],
'longitude': place['x']
}
places.append(place_info)
return places
if __name__ == "__main__":
asyncio.run(crawling_places())
'✍️ 개발 기록' 카테고리의 다른 글
| [👀 Owing] Framer motion 애니메이션 적용기 (0) | 2024.09.27 |
|---|---|
| [트러블슈팅] 504 Gateway Time-out 에러 해결 (1) | 2024.08.05 |
| Access Token과 Refresh Token을 어떻게 전달하고 클라이언트는 어디에 저장할까? (0) | 2024.06.21 |
| authenticate에서 발생하는 '자격 증명에 실패하였습니다' 문제 해결 (0) | 2024.06.19 |
| JPA Auditing으로 생성일/수정일 자동 갱신 (+date format이 동작하지 않는 문제 해결) (0) | 2024.06.14 |