feat(lotto_buyer): RSA 암호화를 통한 로그인 로직 개선

This commit is contained in:
hyeonggil
2026-03-28 10:52:10 +09:00
parent 1d3c4850bf
commit 52a06b163d

View File

@@ -1,19 +1,27 @@
import binascii
import json
import logging
import re
from dataclasses import dataclass
import requests
from Crypto.Cipher import PKCS1_v1_5
from Crypto.PublicKey import RSA
logger = logging.getLogger(__name__)
BASE_URL = "https://dhlottery.co.kr"
LOGIN_URL = f"{BASE_URL}/userSsl.do?method=login"
BUY_URL = f"{BASE_URL}/gameStore.do?method=saveBuy"
CHECK_BALANCE_URL = f"{BASE_URL}/userSsl.do?method=myPage"
GAME_INFO_URL = f"{BASE_URL}/common.do?method=getGameInfo"
BASE_URL = "https://www.dhlottery.co.kr"
GAME_URL = "https://ol.dhlottery.co.kr"
AUTO_MODE_CODE = "2"
LOGIN_PAGE_URL = f"{BASE_URL}/login"
RSA_API_URL = f"{BASE_URL}/login/selectRsaModulus.do"
LOGIN_URL = f"{BASE_URL}/login/securityLoginCheck.do"
GAME_645_URL = f"{GAME_URL}/olotto/game/game645.do"
BUY_URL = f"{GAME_URL}/olotto/game/execBuy.do"
READY_SOCKET_URL = f"{GAME_URL}/olotto/game/egovUserReadySocket.json"
SERVER_PROP_URL = f"{BASE_URL}/sy/getServerPropInfo.do"
AUTO_MODE_CODE = "0"
@dataclass
@@ -38,49 +46,101 @@ class LottoBuyer:
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
),
"Referer": BASE_URL + "/",
}
)
def _rsa_encrypt(self, rsa_data: dict, text: str) -> str:
"""RSA 공개키로 텍스트를 암호화합니다."""
modulus = int(rsa_data["rsaModulus"], 16)
exponent = int(rsa_data["publicExponent"], 16)
pub_key = RSA.construct((modulus, exponent))
cipher = PKCS1_v1_5.new(pub_key)
encrypted = cipher.encrypt(text.encode("utf-8"))
return binascii.hexlify(encrypted).decode("ascii")
def login(self) -> bool:
self.session.get(BASE_URL, timeout=30)
# 1. 로그인 페이지 GET (쿠키 세팅)
self.session.get(LOGIN_PAGE_URL, timeout=30)
payload = {
"userId": self.username,
"password": self.password,
"checkSave": "Y",
"newsEventYn": "",
}
response = self.session.post(LOGIN_URL, data=payload, timeout=30)
# 2. RSA 공개키 API 호출
self.session.headers.update({"Referer": LOGIN_PAGE_URL})
r_rsa = self.session.get(RSA_API_URL, timeout=30)
rsa_data = r_rsa.json()["data"]
if "JSESSIONID" not in self.session.cookies:
logger.error("로그인 실패: 세션 쿠키 없음")
# 3. 아이디/비밀번호 RSA 암호화
enc_id = self._rsa_encrypt(rsa_data, self.username)
enc_pw = self._rsa_encrypt(rsa_data, self.password)
# 4. 로그인 POST (Origin/Referer 헤더 필수)
response = self.session.post(
LOGIN_URL,
data={"userId": enc_id, "userPswdEncn": enc_pw},
timeout=30,
headers={
"Origin": BASE_URL,
"Referer": LOGIN_PAGE_URL,
},
)
if "userId" not in self.session.cookies:
logger.error("로그인 실패: userId 쿠키 없음")
return False
if "login" in response.url.lower():
logger.error("로그인 실패: 로그인 페이지로 리다이렉트")
if "loginsuccess" not in response.url.lower() and (
"login" in response.url.lower() or "error" in response.url.lower()
):
logger.error("로그인 실패: %s 로 리다이렉트", response.url)
return False
logger.info("로그인 성공")
return True
def get_current_round(self) -> int:
response = self.session.get(GAME_INFO_URL, timeout=30)
def _get_game_page_info(self) -> dict:
"""game645.do 페이지에서 회차, 추첨일 등 정보를 파싱합니다."""
# 게임 팝업 진입 경로를 통해 Referer 체인을 올바르게 설정
el_game_url = "https://el.dhlottery.co.kr/game/TotalGame.jsp?LottoId=LO40"
self.session.headers.update({"Referer": f"{BASE_URL}/main"})
self.session.get(el_game_url, timeout=30)
self.session.headers.update({"Referer": el_game_url})
response = self.session.get(GAME_645_URL, timeout=30)
response.raise_for_status()
data = response.json()
return int(data.get("curDrwNo", 0))
html = response.text
round_match = re.search(r'id="curRound">(\d+)<', html)
draw_date_match = re.search(
r'id="ROUND_DRAW_DATE"[^>]+value="([^"]+)"', html
)
pay_limit_match = re.search(
r'id="WAMT_PAY_TLMT_END_DT"[^>]+value="([^"]+)"', html
)
return {
"round": int(round_match.group(1)) if round_match else 0,
"ROUND_DRAW_DATE": draw_date_match.group(1) if draw_date_match else "",
"WAMT_PAY_TLMT_END_DT": pay_limit_match.group(1) if pay_limit_match else "",
}
def get_balance(self) -> int:
response = self.session.get(CHECK_BALANCE_URL, timeout=30)
response.raise_for_status()
match = re.search(r"보유금액.*?([\d,]+)원", response.text)
if match:
return int(match.group(1).replace(",", ""))
return 0
"""예치금 잔액을 조회합니다."""
try:
self.session.headers.update({"Referer": f"{BASE_URL}/mypage/home"})
response = self.session.get(
f"{BASE_URL}/mypage/getMndpInfo.do", timeout=30
)
data = response.json()
mndp = data.get("data", {})
# 예치금 잔액 = 입금액 합계 - 출금액 합계
deposit = int(mndp.get("csblDpstAmt", 0) or 0)
withdraw = int(mndp.get("csblTkmnyAmt", 0) or 0)
return max(0, deposit - withdraw)
except Exception:
return 0
def buy(self, game_count: int) -> PurchaseResult:
try:
current_round = self.get_current_round()
game_info = self._get_game_page_info()
current_round = game_info["round"]
lotto_num_list = []
for i in range(game_count):
@@ -94,17 +154,42 @@ class LottoBuyer:
payload = {
"round": current_round,
"direct": "172.17.20.52",
"direct": "",
"nBuyAmount": str(1000 * game_count),
"param": json.dumps(lotto_num_list, ensure_ascii=False),
"ROUND_DRAW_DATE": game_info["ROUND_DRAW_DATE"],
"WAMT_PAY_TLMT_END_DT": game_info["WAMT_PAY_TLMT_END_DT"],
"gameCnt": game_count,
"saleMdaDcd": "10",
}
# 대기열 체크 및 direct IP 취득 (구매 직전 필수 호출)
self.session.headers.update({
"Referer": GAME_645_URL,
"Content-Type": "application/json; charset=UTF-8",
})
r_ready = self.session.post(READY_SOCKET_URL, data="", timeout=30)
r_ready.raise_for_status()
ready_data = r_ready.json()
payload["direct"] = ready_data.get("ready_ip", "")
self.session.headers.pop("Content-Type", None)
response = self.session.post(BUY_URL, data=payload, timeout=60)
response.raise_for_status()
result_data = response.json()
if result_data.get("result", {}).get("resultMsg") == "SUCCESS":
if result_data.get("loginYn") == "N":
return PurchaseResult(
success=False,
round_number=current_round,
games=[],
total_price=0,
balance_after=0,
error_message="세션 만료 - 재로그인 필요",
)
result = result_data.get("result", {})
if result.get("resultMsg") == "SUCCESS":
games = self._parse_bought_numbers(result_data)
balance_after = self.get_balance()
return PurchaseResult(
@@ -115,7 +200,7 @@ class LottoBuyer:
balance_after=balance_after,
)
error_msg = result_data.get("result", {}).get("resultMsg", "알 수 없는 오류")
error_msg = result.get("resultMsg", "알 수 없는 오류")
return PurchaseResult(
success=False,
round_number=current_round,
@@ -137,18 +222,26 @@ class LottoBuyer:
)
def _parse_bought_numbers(self, response_data: dict) -> list[list[int]]:
"""새 API 응답 형식(arrGameChoiceNum) 파싱.
예: ["A|05|07|08|11|12|253"] → [[5,7,8,11,12,25]]
마지막 토큰은 두 자리 번호 + 단일 코드 숫자로 구성될 수 있음.
"""
games = []
lotto_list = response_data.get("result", {}).get("ary645Lotto", [])
for item in lotto_list:
numbers = sorted(
[
int(item.get("drwtNo1", 0)),
int(item.get("drwtNo2", 0)),
int(item.get("drwtNo3", 0)),
int(item.get("drwtNo4", 0)),
int(item.get("drwtNo5", 0)),
int(item.get("drwtNo6", 0)),
]
)
games.append(numbers)
result = response_data.get("result", {})
game_choice_list = result.get("arrGameChoiceNum", [])
for game_str in game_choice_list:
parts = game_str.split("|")
# 첫 요소는 게임 레이블 (A~E), 나머지가 번호
raw_numbers = parts[1:]
numbers = []
for token in raw_numbers:
# 마지막 토큰에 코드가 붙을 수 있으므로 2자리씩 파싱
while len(token) >= 2:
numbers.append(int(token[:2]))
token = token[2:]
if len(token) == 1:
break # 남은 1자리는 코드로 무시
numbers = sorted(set(n for n in numbers if 1 <= n <= 45))[:6]
if len(numbers) == 6:
games.append(numbers)
return games