메뉴 닫기

파이썬 requests SSE 스트리밍 예제와 레시피 정리

파이썬 requests SSE 스트리밍 예제와 레시피 정리

🧩 SSE를 안정적으로 읽는 법, iter_lines와 재연결까지 한번에 정리

브라우저 없이도 서버에서 흘러오는 이벤트를 실시간으로 읽어야 하는 순간이 잦습니다.
로그 모니터링처럼 줄 단위로 메시지가 이어지고, 지연 없이 바로 처리해야 할 때가 특히 그렇습니다.
파이썬에서 가장 간단한 길은 이미 익숙한 HTTP 클라이언트로 스트림을 다루는 것입니다.
별도 무거운 런타임을 두지 않고도 파일처럼 읽어 들일 수 있고, 네트워크 오류가 나더라도 적절히 재시도하면 안정적으로 운영 환경에 올릴 수 있습니다.
이 글은 그런 요구를 가진 분들이 실전에 바로 가져다 쓸 수 있도록, 핵심 패턴을 이해하기 쉬운 문장과 코드로 풀어 정리합니다.

핵심은 requests의 스트리밍 응답을 사용해 라인 단위로 파싱하고, 유효한 줄만 디코딩해 핸들러로 넘기는 흐름입니다.
특히 다음 레시피는 반드시 기억해둘 만한 기본형입니다.
문자 그대로 적용해도 동작하고, 상황에 맞춰 핸들러만 바꾸면 다양한 SSE 엔드포인트에 대응할 수 있습니다.
for line in s.get(u, stream=True).iter_lines(): if line: handle(line.decode()) 라는 단 한 줄의 패턴이 실시간 처리의 출발점이 됩니다.
여기에 타임아웃, 헤더, 재연결을 더해 운영 환경에서도 흔들림 없이 굴릴 수 있게 만드는 과정을 차근차근 담았습니다.



🔗 SSE란 무엇이고 왜 requests로 읽을까

SSE(Server-Sent Events)는 서버가 클라이언트로 일방향 스트림을 지속적으로 밀어주는 통신 방식입니다.
HTTP 연결을 장시간 유지하면서 텍스트 프레임을 이어 보내므로, 실시간 로그, 알림, 모델 추론 진행 상황 같은 이벤트를 가볍게 전달하는 데 적합합니다.
웹 브라우저에서는 EventSource로 흔히 접하지만, 백엔드나 배치 환경에서는 파이썬으로 받아 처리해야 할 때가 많습니다.
이때 의존성을 최소화하고 운영 환경에 바로 올리기 쉬운 선택지가 requests입니다.
표준 HTTP 위에서 스트림을 다루고, 라인 단위 반복자로 안전하게 읽을 수 있으며, 세션과 타임아웃, 재시도 같은 실무 옵션을 그대로 활용할 수 있습니다.

🧠 SSE의 데이터 포맷과 최소 요건

SSE는 텍스트 이벤트를 줄 단위로 전달합니다.
헤더는 보통 text/event-stream이며, 바디에는 data:로 시작하는 행이 이어집니다.
클라이언트는 빈 줄을 이벤트 경계로 인식하고 누적된 데이터를 한 묶음으로 처리합니다.
따라서 파이썬에서는 바이트 스트림을 행으로 쪼개고, 비어 있지 않은 줄을 디코딩해 파서나 핸들러로 넘기는 패턴이 핵심이 됩니다.

🧰 requests를 선택하는 실용적 이유

별도 런타임이 필요 없고, 익숙한 API로 스트리밍을 다룰 수 있다는 점이 큰 장점입니다.
세션을 열고 stream=True만 지정하면 서버가 보내는 바이트를 끊기지 않게 받아오고, iter_lines로 안전하게 버퍼링하면서 처리할 수 있습니다.
필요하다면 타임아웃과 헤더, 프록시, 인증을 그대로 붙여 확장하면 됩니다.
핵심 레시피는 아래 한 줄로 요약됩니다.
운영 환경에서는 이 기본형 위에 로깅과 재연결, 백오프를 얹어 안정성을 높입니다.

CODE BLOCK
# 핵심 레시피(기본형)
# s: requests.Session(), u: SSE 엔드포인트 URL, handle: 줄 처리 함수
for line in s.get(u, stream=True).iter_lines():
    if line:
        handle(line.decode())

상황 왜 requests가 유리한가
운영 서버에 빠른 적용 기존 HTTP 스택과 동일하게 배포하고 모니터링 가능
간단한 로그/알림 수신 iter_lines로 라인 경계가 보장되어 파싱이 단순
보안/프록시 요구 사항 존재 세션, 인증, 프록시 설정을 그대로 재사용

💡 TIP: 서버가 보내는 줄의 인코딩이 명확하지 않다면, response.iter_lines(decode_unicode=True)를 검토하거나, chardet/charset-normalizer로 감지 후 line.decode(encoding, errors=”ignore”) 패턴을 사용하세요.

⚠️ 주의: SSE는 장시간 연결이 전제입니다.
프록시나 로드밸런서의 유휴 타임아웃에 걸려 끊길 수 있으므로, 서버가 주기적으로 주석/하트비트 라인을 전송하도록 하고, 클라이언트는 타임아웃과 재연결 로직을 반드시 갖추세요.

  • 🔌SSE 엔드포인트의 Content-Type 확인
  • ⏱️클라이언트 timeout과 서버 keepalive 주기 조정
  • 🧪네트워크 단절/재연결 케이스를 로컬에서 리허설

💬 핵심 레시피는 간단하지만, 운영 환경에서는 재시도와 백오프, 로그, 메트릭으로 보완해야 합니다.
기본형을 정확히 이해하면 확장이 쉬워집니다.

🛠️ requests로 SSE 스트림 연결 준비하기

SSE를 안정적으로 처리하려면 연결 전에 몇 가지 필수 설정을 갖춰야 합니다.
서버는 지속적인 연결을 유지하므로, 클라이언트에서 세션을 열고 헤더를 명시적으로 지정해야 합니다.
특히 Accept: text/event-stream 헤더를 추가하면 서버가 스트리밍 모드로 응답을 전환합니다.
이때 일반적인 요청과 달리, 응답을 한 번에 받지 않고 계속해서 받아야 하므로 stream=True 옵션이 필수입니다.

⚙️ 기본 세션 구성과 헤더 설정

다음은 requests 세션을 준비하고, SSE 엔드포인트에 맞는 헤더를 설정하는 기본 형태입니다.
이 구성만으로도 대부분의 서버와 안정적으로 연결할 수 있습니다.

CODE BLOCK
import requests

s = requests.Session()
headers = {
    "Accept": "text/event-stream",
    "Cache-Control": "no-cache",
    "Connection": "keep-alive"
}
url = "https://example.com/stream"

# 스트림 모드로 요청
response = s.get(url, headers=headers, stream=True, timeout=(5, None))

위 코드에서 timeout=(5, None)은 연결 타임아웃만 지정하고, 읽기 타임아웃을 무제한으로 설정한 예시입니다.
이 방식은 서버가 지속적으로 데이터를 밀어주는 상황에서 연결이 끊기지 않도록 해 줍니다.
단, 방화벽이나 중간 네트워크 장비에서 일정 시간 이후 연결을 강제로 종료할 수 있으므로, 서버가 주기적으로 하트비트를 전송하도록 구성하는 것이 바람직합니다.

🔐 인증과 프록시 환경 대응

SSE를 사내 서버나 외부 API에서 사용할 때, 종종 인증 토큰이나 프록시를 통과해야 합니다.
requests는 이런 환경을 자연스럽게 지원합니다.
세션 단위로 인증 헤더를 추가하거나, proxies 매개변수를 이용하면 됩니다.

CODE BLOCK
s.headers.update({
    "Authorization": f"Bearer {ACCESS_TOKEN}"
})

proxies = {
    "http": "http://proxy.company.local:8080",
    "https": "http://proxy.company.local:8080"
}

r = s.get(url, stream=True, proxies=proxies)

특히 프록시를 거치는 환경에서는 Connection: keep-alive 헤더가 손실될 수 있으므로, 서버 로그에서 실제 요청 헤더를 확인하는 것이 좋습니다.
또한 토큰 만료나 연결 갱신 시점을 대비해, try-except 블록 내에서 재요청 로직을 준비해두면 장애 대응이 빨라집니다.

💎 핵심 포인트:
SSE는 단순히 연결을 유지하는 것이 아니라, 신뢰성 있게 이어지는 흐름이 중요합니다. 세션과 헤더를 미리 세밀하게 설계하면 나중의 오류를 크게 줄일 수 있습니다.



⚙️ iter_lines로 이벤트 읽고 디코딩 처리하기

SSE의 핵심은 스트림을 실시간으로 받아 라인 단위로 처리하는 데 있습니다.
requests에서는 iter_lines() 메서드를 통해 이를 간단히 구현할 수 있습니다.
이 메서드는 내부적으로 청크 단위 데이터를 누적해 개행 문자 기준으로 끊어주므로, 별도의 버퍼링 로직이 필요 없습니다.
바로 이 덕분에 다음과 같은 한 줄 레시피가 가능해집니다.

CODE BLOCK
for line in s.get(u, stream=True).iter_lines():
    if line:
        handle(line.decode())

이 구문은 SSE뿐 아니라 일반 텍스트 스트리밍 응답에도 유효합니다.
반복문은 서버가 데이터를 푸시할 때마다 새로운 라인을 yield하며, if line: 조건으로 빈 줄을 건너뜁니다.
그 후, line.decode()로 바이트를 문자열로 변환해 handle() 함수에서 후속 처리를 수행합니다.
이 handle은 로깅, JSON 파싱, 이벤트 타입 분기 등 원하는 형태로 정의할 수 있습니다.

🧩 디코딩과 에러 대응 방식

SSE 서버마다 인코딩 방식이 다를 수 있습니다.
UTF-8을 쓰는 경우가 대부분이지만, 일부에서는 ISO-8859-1이나 기타 인코딩을 반환하기도 합니다.
이럴 때는 decode() 인자로 명시적 인코딩을 지정하거나, 실패 시 무시 옵션을 사용하는 것이 좋습니다.

CODE BLOCK
def handle(line):
    try:
        text = line.decode("utf-8")
        print("event:", text)
    except UnicodeDecodeError:
        print("⚠️ 디코딩 실패, 원본 바이트:", line)

또는 iter_lines(decode_unicode=True)를 사용하면 requests가 내부적으로 디코딩을 처리합니다.
이 경우 line이 이미 문자열로 반환되므로, 별도의 decode() 호출이 필요 없습니다.

📦 JSON 파싱과 이벤트 분기

많은 SSE 서버가 data: 필드에 JSON 형태로 이벤트를 담습니다.
이 경우 단순한 문자열 출력 대신 구조화된 파싱이 유용합니다.
아래는 기본적인 JSON 이벤트 처리 예시입니다.

CODE BLOCK
import json

def handle(line):
    text = line.decode()
    if text.startswith("data:"):
        payload = text[5:].strip()
        try:
            event = json.loads(payload)
            print("이벤트 수신:", event)
        except json.JSONDecodeError:
            print("단순 메시지:", payload)

이 패턴은 단일 이벤트뿐 아니라, 연속 스트림을 처리하는 실시간 분석 서비스에도 적용할 수 있습니다.
특히 OpenAI API, GitHub Copilot, Firebase 등 다양한 SSE 기반 API들이 같은 구조를 사용합니다.
결국 iter_lines()handle()의 조합만으로 대부분의 실시간 피드를 안정적으로 소비할 수 있습니다.

💎 핵심 포인트:
iter_lines()는 단순 반복자가 아니라, 스트림 처리의 안전한 추상화입니다. 이를 정확히 이해하면 비동기 없이도 실시간 처리를 충분히 구현할 수 있습니다.

🔌 예제 코드 레시피와 공통 패턴

지금까지의 설명을 실제 코드로 통합하면, requests 기반 SSE 클라이언트는 단 몇 줄로 완성됩니다.
여기에 재연결, 예외 처리, 로그 출력을 추가하면 실무에서도 바로 활용할 수 있는 견고한 형태가 됩니다.
다음은 가장 널리 쓰이는 기본 구조 예제입니다.

CODE BLOCK
import requests, time, json

def handle(line):
    try:
        data = json.loads(line)
        print("[이벤트]", data)
    except json.JSONDecodeError:
        print("[메시지]", line)

def connect_sse(url):
    s = requests.Session()
    headers = {"Accept": "text/event-stream"}
    while True:
        try:
            with s.get(url, headers=headers, stream=True, timeout=(5, None)) as r:
                for line in r.iter_lines():
                    if line:
                        handle(line.decode())
        except Exception as e:
            print("⚠️ 연결 오류:", e)
            time.sleep(3)  # 백오프 후 재연결

이 코드는 세션을 열고 서버에서 오는 이벤트를 지속적으로 읽습니다.
연결이 끊어지면 예외를 잡고 일정 시간 대기한 뒤 다시 연결을 시도합니다.
이러한 구조는 대부분의 실시간 로그 스트리밍, AI 모델 출력 스트림, 주식 시세 피드 등에서 안정적으로 동작합니다.

🧱 모듈화된 구조로 리팩터링

실무에서는 스트림 데이터의 목적에 따라 handle()을 다양하게 바꿔서 사용합니다.
예를 들어, JSON 파싱 결과를 데이터베이스에 넣거나, 특정 이벤트만 필터링하는 로직을 추가할 수 있습니다.
또한, SSE 서버가 보낸 id:, event: 필드를 이용해 재연결 시 마지막 이벤트 ID를 전달하면, 중단된 위치에서 다시 이어받는 것도 가능합니다.

CODE BLOCK
last_event_id = None

def connect_with_resume(url):
    global last_event_id
    headers = {"Accept": "text/event-stream"}
    if last_event_id:
        headers["Last-Event-ID"] = last_event_id

    with requests.get(url, headers=headers, stream=True) as r:
        for line in r.iter_lines():
            if line:
                text = line.decode()
                if text.startswith("id:"):
                    last_event_id = text.split(":", 1)[1].strip()
                handle(text)

이처럼 SSE의 상태를 이어받는 기능을 구현하면, 네트워크 불안정 환경에서도 데이터 유실 없이 연속성을 유지할 수 있습니다.
이는 특히 AI 모델 출력 로그, 지속형 채팅 스트림, IoT 데이터 스트림 처리에 매우 유용합니다.

💡 TIP: 운영 환경에서는 반드시 로그 레벨을 조절하고, 예외별 재시도 정책을 구분해두세요. 일시적 네트워크 장애와 인증 오류를 구분해야 안정적입니다.

⚠️ 주의: 재연결 루프를 무한히 돌리면 서버에 과부하를 줄 수 있습니다. 백오프 시간을 점진적으로 늘리는 지수형 재시도 정책을 권장합니다.

  • stream=True 옵션 확인
  • 🔁연결 끊김 감지 후 재연결 루프 구성
  • 🧩핸들러 함수로 이벤트 구조화 및 필터링

💬 requests로 구현한 SSE 클라이언트는 단순하지만 강력합니다. 올바른 재연결 정책과 인코딩 처리를 결합하면, 고가용성 스트리밍 클라이언트로 발전시킬 수 있습니다.



💡 타임아웃 재연결 에러 처리 베스트프랙티스

SSE 연결은 이론상 무한히 유지되지만, 현실의 네트워크 환경에서는 다양한 이유로 끊깁니다.
방화벽, 게이트웨이, 서버 재시작, 클라우드 인스턴스 교체 등으로 예기치 않은 단절이 발생하죠.
따라서 requests를 이용할 때는 반드시 타임아웃과 예외 처리를 함께 구성해야 안정적으로 운용할 수 있습니다.

⏱️ 타임아웃과 keep-alive 전략

requests는 timeout을 연결 및 읽기 단계로 구분할 수 있습니다.
읽기 타임아웃을 None으로 두면 무제한 대기를 허용하지만, 서버가 오랫동안 아무 이벤트를 보내지 않으면 연결이 유휴 상태로 인식될 수 있습니다.
이를 방지하려면 서버가 주기적으로 “ping” 또는 주석 형태의 keep-alive 라인을 전송하도록 구성하거나, 클라이언트 측에서 read_timeout을 합리적인 값으로 설정해 재연결을 트리거해야 합니다.

CODE BLOCK
def connect_sse(url):
    s = requests.Session()
    headers = {"Accept": "text/event-stream"}
    while True:
        try:
            with s.get(url, headers=headers, stream=True, timeout=(5, 30)) as r:
                for line in r.iter_lines():
                    if line:
                        handle(line.decode())
        except requests.exceptions.ReadTimeout:
            print("⏳ 읽기 타임아웃 - 재연결 시도 중...")
        except requests.exceptions.ConnectionError:
            print("⚠️ 네트워크 연결 문제 발생, 재시도 중...")
        time.sleep(3)

위 코드에서 5초는 연결 타임아웃, 30초는 읽기 타임아웃입니다.
30초 동안 아무 이벤트가 없으면 ReadTimeout 예외가 발생하고, 루프를 통해 자동 재연결이 수행됩니다.
이 패턴은 스트림이 간헐적으로 끊기는 환경에서도 안정적으로 동작합니다.

🔁 재연결 루프와 백오프 설계

무한 루프를 통한 재연결은 필수적이지만, 연속 실패 시 서버나 네트워크에 부하를 줄 수 있습니다.
따라서 지수형 백오프(Exponential Backoff)를 적용하는 것이 좋습니다.
다음은 단순하면서도 실용적인 구현 예시입니다.

CODE BLOCK
delay = 1
while True:
    try:
        connect_sse("https://example.com/stream")
        delay = 1  # 성공 시 초기화
    except Exception as e:
        print(f"⚠️ 오류 발생: {e}")
        time.sleep(delay)
        delay = min(delay * 2, 60)  # 최대 60초까지 증가

이 로직은 실패할 때마다 대기 시간을 두 배로 늘리고, 일정 시간 후 정상화되면 다시 짧은 주기로 되돌립니다.
API 서버의 rate-limit을 고려하면서도 빠른 복구를 보장하는 균형 잡힌 방식입니다.

💎 핵심 포인트:
재연결은 단순 반복보다, 실패의 원인에 따른 백오프 전략과 로깅이 함께 있어야 합니다. 타임아웃·네트워크·인증 실패를 각각 다른 방식으로 다뤄야 장기적으로 안정적인 SSE 클라이언트를 구축할 수 있습니다.

  • ⏱️timeout은 (연결, 읽기) 두 단계로 분리 설정
  • 🔁끊김 시 즉시 재연결하지 말고 지수형 백오프 적용
  • 🧩예외별 로깅으로 원인 추적 용이하게 구성

💬 스트리밍은 안정성의 예술입니다. 타임아웃과 백오프를 제대로 설계한 클라이언트만이 장기 운영 환경에서 생존합니다.

자주 묻는 질문 (FAQ)

requests로 SSE를 읽을 때 asyncio를 꼭 써야 하나요?
아닙니다. requests의 iter_lines()는 동기 방식으로 충분히 안정적인 스트림 처리를 지원합니다.
단, 매우 많은 동시 연결이 필요하다면 aiohttp나 httpx와 같은 비동기 라이브러리로 확장하는 것이 좋습니다.
iter_lines() 대신 readline()을 써도 되나요?
readline()은 파일 객체에서 한 줄씩 읽을 때 적합하지만, SSE처럼 청크 스트리밍에서는 버퍼 경계가 깨질 수 있습니다.
iter_lines()는 내부 버퍼링을 자동 관리하므로 안전하고 효율적입니다.
decode()에서 에러가 날 때는 어떻게 처리하나요?
UTF-8 외의 인코딩이 사용될 가능성이 있다면 errors=”ignore” 또는
chardet을 통한 자동 인코딩 감지를 적용하세요.
데이터 손실보다는 로그 기록을 우선하는 것이 안정적입니다.
SSE 서버가 일정 시간 후 자동 종료됩니다. 어떻게 해야 하나요?
서버 측 keep-alive 정책이나 프록시 유휴 타임아웃 때문일 가능성이 높습니다.
주기적으로 하트비트 이벤트를 보내거나 클라이언트에서 재연결 로직을 넣어 대응하세요.
stream=True를 빼면 무슨 일이 생기나요?
응답 전체를 한 번에 메모리로 로드합니다.
즉, SSE처럼 무한히 이어지는 응답에서는 프로그램이 멈추거나 메모리 누수가 발생합니다.
반드시 stream=True를 유지해야 합니다.
재연결 시 이벤트가 중복되는 경우는 왜 그런가요?
SSE 서버가 재전송 기능을 지원할 때, 클라이언트가 마지막 id: 값을 전달하지 않으면 중복 수신이 발생할 수 있습니다.
Last-Event-ID 헤더를 사용해 이전 위치를 이어받도록 하세요.
OpenAI API의 스트리밍 응답도 같은 방식인가요?
맞습니다. OpenAI의 ChatCompletion 스트리밍은 SSE 형식을 기반으로 하며, data: 필드에 JSON 조각을 전송합니다.
동일한 iter_lines 패턴으로 읽을 수 있습니다.
장시간 실행 시 메모리 누수는 없나요?
iter_lines()는 스트림 버퍼를 순차적으로 비우기 때문에 일반적으로 누수가 발생하지 않습니다.
다만 handle() 내부에서 데이터를 누적하거나 전역 변수에 저장할 때는 주의해야 합니다.

🧠 requests로 구현하는 실시간 스트림 처리의 완성

파이썬의 requests는 단순한 HTTP 클라이언트로 알려져 있지만, stream=True 옵션과 iter_lines()를 적절히 활용하면 강력한 실시간 데이터 처리 도구로 변신합니다.
SSE(Server-Sent Events)처럼 서버에서 지속적으로 푸시되는 데이터를 안전하게 읽어내려면, 스트림을 안정적으로 유지하고, 예외가 발생해도 자연스럽게 복구할 수 있는 구조를 설계해야 합니다.
그 핵심이 바로 다음과 같습니다.

  • 🔹iter_lines()로 라인 단위 안정적 스트림 파싱
  • 🔹디코딩 오류 방지를 위한 decode()와 인코딩 감지
  • 🔹재연결 루프 및 지수형 백오프 기반의 오류 복구
  • 🔹이벤트 핸들러로 data: 필드 파싱 및 구조화
  • 🔹중단 지점 복구를 위한 Last-Event-ID 관리

이러한 원칙을 따르면 단순한 스트림 수신기를 넘어, 실제 서비스에서도 견딜 수 있는 실시간 데이터 파이프라인을 구축할 수 있습니다.
로그 스트리밍, AI 응답 스트림, 센서 데이터 피드, 웹훅 이벤트 소비 등 모든 경우에 공통적으로 적용됩니다.
결국 중요한 건 단순함을 유지하면서 복원력 있는 구조를 설계하는 것입니다.


🏷️ 관련 태그 : 파이썬requests, SSE, 스트리밍API, 실시간데이터, 서버이벤트, HTTP스트림, 재연결로직, 데이터파이프라인, 백오프, iter_lines