✍️ 개발 기록

[👻 잼잼, 어디가] 비동기로 성능 개선하기 (asyncio + aiohttp)

ming412 2024. 8. 5. 10:57

모임장소 찾기 플랫폼에서 지하철 역 주변 상권정보 수집 및 활용 역할을 맡음

카카오 맵 API를 사용.

 

스케줄러를 이용해 매주 월요일 새벽 3시에 상권 정보를 갱신한다.

전국 약 800여개 역 주변 상권 정보를 갱신하다보니 시간이 굉장히 오래 걸렸다. (1137초)

 

비동기를 적용하여 이를 개선한 내용을 기록해두려 한다.

 

비동기 적용 전
비동기 적용 후

 

🧚 용어 정리

1) 비동기

비동기 함수는 일반 동기 함수를 호출하듯 호출할 수 없으며 `await` 키워드와 함께 비동기 함수를 호출해야 한다.

파이썬에서 일반 `def`으로 함수를 정의하면 그건 무조건 동기 함수이다.

 

비동기 처리는 정확한 순서가 보장되지 않는다. 대신 cpu를 놀리지 않고 불필요한 지연 없이 여러 요청을 동시에 실행할 수 있다.

 

기존 blocking 방식
non-blocking 방식

 

2) 코루틴(Coroutine)

코루틴은 함수의 실행을 중단했다가 다시 시작할 수 있는 구조를 말한다. 쉽게 말해서, 파이썬에서는 비동기 함수를 코루틴이라고 부른다.

 

`async` 키워드로 정의되며, `await` 키워드를 사용하여 다른 코루틴이 완료될 때까지 기다리지 않도록 명시할 수 있다.

 

싱글스레드 방식
코루틴

 

참고

https://www.youtube.com/watch?v=yOtXl8cW-ag

https://tech.buzzvil.com/blog/asyncio-no-1-coroutine-and-eventloop/

 

3) 이벤트 루프(Eventloop)

Eventloop는 코루틴을 실행하고 관리하는 루프이다. 파이썬의 Eventloop는 단일 스레드에서 동작한다.

 

Eventloop는 실행 가능한 모든 코루틴이 완료될 때까지 계속 돌아가며, `await` 키워드가 있는 곳에서 작업을 멈추고 다른 작업을 실행해 효율적인 동시 처리를 가능하게 한다.

 

4) Future 객체, Task 객체 

계층 구조를 확인해보면 아래와 같다.

`await` 키워드를 붙일 수 있는 최소 조건이 Awaitable이다. 그래서 3가지 모두 await으로 실행이 끝나길 기다릴 수 있다. 하지만 코루틴은 Eventloop에 등록되지 않으면 실행되지 않기 때문에, `await`을 붙이거나 Future나 Task로 감싸야 한다.

 

🧚 Asyncio? Aiohttp?

1) Asyncio

단일 스레드에서 여러 요청을 동시에 처리할 수 있도록 해주는 파이썬의 비동기 라이브러리이다. 이벤트 루프를 사용하여 단일 스레드에서 비동기적으로 작업을 처리한다. `await` 키워드를 사용해 특정 작업이 완료될 때까지 기다리지 않고, 다른 작업을 처리하도록 하여 전체적인 응답 속도를 개선할 수 있다.

 전통적으로 동시 프로그래밍(concurrent programming)은 여러 개의 스레드를 활용하여 이루어졌는데, 여러 개의 스레드를 사용하는 방식은 몇 가지 측면에서 까다로울 수 있다. 우선, thread safe한 프로그램을 작성하는 것은 생각보다 쉽지 않다. 또, 싱글 코어 프로세서에서 이러한 프로그램을 돌리면 동시 처리 성능 향상이 크지 않으며, 오히려 성능이 저하될 가능성도 있다.

 

2) Aiohttp

`aiohttp`는 `asyncio`를 위한 http 서버/클라이언트 요청을 수행하는 라이브러리이다. 파이썬 `request`가 동기로 요청을 처리하기 때문에 비동기로 http 요청을 처리하기 위해서는 `aiohttp`가 필요하다.

 

🧚 Asyncio 문법

1) `asyncio.gather()`

`asyncio.gather()` 함수를 사용하면 파이썬의 이벤트 루프가 세 개의 함수를 알아서 스케줄하여 비동기로 호출해준다.

 

`asyncio.gather(*task)`와 같이 리스트를 언패킹해서 넣어준다. `asyncio.gather()`도 코루틴이므로 `await`로 실행한 뒤 결과를 가져온다.

 

2) `async with`

클래스나 함수를 비동기로 처리한 뒤 결과를 반환하는 문법이다.

 

`with` 다음에 클래스의 인스턴스를 지정하고, `as` 뒤에 결과를 지정할 변수를 지정한다. 파이썬 3.5 이상부터 사용 가능하다.

 

3) `async for`

비동기로 반복하는 문법이다.

 

🧚 코드 예시

상권 정보 수집 메인 함수

`crawling_places()` 함수는 각 지하철역과 키워드에 대해 비동기적으로 상권 정보를 수집한다. 큰 흐름은 아래와 같다.

1. 비동기 처리를 위한 세션(session) 생성
2. session을 이용해 task 생성
3. task로 작업을 async로 실행

 

`asyncio.create_task()`를 사용하여 각 지하철역과 키워드 조합에 대한 `process_station()` 작업을 비동기적으로 실행하는 태스크를 만든다. `asyncio.gather(*task)`로 모든 태스크를 동시에 실행한다.  

async def create_session():
    return aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))
    
async def crawling_places():
    keywords = ['스터디룸', '식당', '카페', '호프', '헬스클럽', '도서관', '공원', '미술관', '애견카페', '셀프사진']
    all_places = []

    session = await create_session()  # 비동기 세션 생성

    tasks = []
    for keyword in keywords:
        for station_name in df['C1']:
            center_lon, center_lat = get_station_info(station_name)

            if center_lon is None or center_lat is None:
                print(f"Coordinates for {station_name} not found.")
                continue

            task = asyncio.create_task(process_station(session, keyword, center_lat, center_lon, station_name, all_places))
            tasks.append(task)

    await asyncio.gather(*tasks)  # 모든 태스크를 동시에 실행
    await session.close()  # 세션 닫기

    return all_places

 

API 요청을 보내고 응답을 파싱

Semaphore는 동시 요청 수를 제한하는 비동기 동기화 메커니즘이다. 여기서는 `sem`을 5로 설정하여 카카오 API의 QPM(Query Per Minute) 제한을 초과하지 않도록 한다.

sem = asyncio.Semaphore(5)  # 동시 요청 수를 5로 제한하여 QPM 제한을 방지
...
# 각 지하철역에 대해 API 요청을 처리하는 비동기 작업
async def process_station(session, keyword, center_lat, center_lon, station_name, all_places):
    page = 1
    while True:
        async with sem:  # 세마포어를 사용하여 동시 요청 수 제한
            data = await fetch_places_from_api(session, keyword, center_lat, center_lon, page)
        if not data or not data['documents']:
            break

        places = parse_places(data, station_name, keyword)
        all_places.extend(places)

        if data.get('meta', {}).get('is_end'):
            break

        page += 1
        await asyncio.sleep(1)  # 요청 사이에 대기 시간 추가

 

🧚 전체 코드

import os
import pandas as pd
from config import KAKAO_REST_API_KEY, KAKAO_SEARCH_KEYWORD_URL
import aiohttp
import asyncio

file_path = os.path.join(os.path.dirname(__file__), '..', '..', 'data', 'subwayinfo.csv')
df = pd.read_csv(file_path, header=None, names=['C1', 'C2', 'C3', 'C4', 'C5'])

# 전역 세마포어 생성 (동시 요청 수 제한)
# 동시 요청 수를 5로 제한
sem = asyncio.Semaphore(5)   # 이 이상 설정하면 카카오톡 QPM(query per minute) 제한 걸림

# 전역에서 세션을 생성
async def create_session():
    # aiohttp ClientSession 생성 및 타임아웃 설정
    return aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=10))

# 모든 지하철역에 대해 상권정보 요청 및 파싱
async def crawling_places():
    keywords = ['스터디룸', '식당', '카페', '호프', '헬스클럽', '도서관', '공원', '미술관', '애견카페', '셀프사진']
    all_places = []

    session = await create_session()

    tasks = []
    for keyword in keywords:
        for station_name in df['C1']:
            center_lon, center_lat = get_station_info(station_name)

            if center_lon is None or center_lat is None:
                print(f"Coordinates for {station_name} not found.")
                continue

            # 각 요청을 비동기적으로 처리
            task = asyncio.create_task(process_station(session, keyword, center_lat, center_lon, station_name, all_places))
            tasks.append(task)

    await asyncio.gather(*tasks)
    await session.close()

    return all_places


# 각 지하철역에 대해 API 요청을 처리하는 비동기 작업
async def process_station(session, keyword, center_lat, center_lon, station_name, all_places):
    page = 1
    while True:
        async with sem:  # 세마포어를 사용하여 동시 요청 수 제한
            data = await fetch_places_from_api(session, keyword, center_lat, center_lon, page)
        if not data or not data['documents']:
            break

        places = parse_places(data, station_name, keyword)
        all_places.extend(places)

        if data.get('meta', {}).get('is_end'):
            break

        page += 1
        await asyncio.sleep(1)  # 요청 사이에 대기 시간 추가


# 지하철 이름으로 위/경도 반환
def get_station_info(station_name):
    try:
        # 첫번째로 만나는 역 정보 반환
        station_row = df[df['C1'] == station_name].iloc[0]
        return station_row['C3'], station_row['C4']
    except IndexError:
        return None, None


# 카카오 API로 특정 지하철역에 대한 상권정보 요청
async def fetch_places_from_api(session, keyword, center_lat, center_lon, page):
    # 요청 헤더
    headers = {
        'Authorization': f'KakaoAK {KAKAO_REST_API_KEY}'
    }
    params = {
        'query': keyword,
        'x': center_lat,
        'y': center_lon,
        'radius': 1000,
        'size': 15,
        'page': page
    }
    max_retries = 5
    for attempt in range(max_retries): # 재시도 로직
        try:
            async with session.get(KAKAO_SEARCH_KEYWORD_URL, headers=headers, params=params) as response:
                if response.status == 200:
                    return await response.json()
                elif response.status == 429:
                    print(f'Rate limit exceeded, retrying in {2 ** attempt} seconds...')
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                else:
                    print(f'Failed with status {response.status}')
                    return None
        except (aiohttp.ClientError, asyncio.TimeoutError, ConnectionResetError) as e:
            print(f'Network error: {e}, retrying in {2 ** attempt} seconds...')
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

    return None

# 카카오 API 응답을 파싱하여 장소 정보 추출
def parse_places(data, station_name, keyword):
    places = []
    for place in data['documents']:
        if not place.get('id'):  # id가 비어있으면 건너뛰기
            continue
        place_info = {
            'station_name': station_name,
            'name': place['place_name'],
            'id': place['id'],
            'category': keyword,
            'road_address': place['road_address_name'],
            'address': place['address_name'],
            'phone': place.get('phone', '정보 없음'),
            'latitude': place['y'],
            'longitude': place['x']
        }
        places.append(place_info)
    return places


if __name__ == "__main__":
    asyncio.run(crawling_places())