From 52a06b163d2d2c10cb5108c6c1ddb927d6e4096f Mon Sep 17 00:00:00 2001 From: hyeonggil <> Date: Sat, 28 Mar 2026 10:52:10 +0900 Subject: [PATCH] =?UTF-8?q?feat(lotto=5Fbuyer):=20RSA=20=EC=95=94=ED=98=B8?= =?UTF-8?q?=ED=99=94=EB=A5=BC=20=ED=86=B5=ED=95=9C=20=EB=A1=9C=EA=B7=B8?= =?UTF-8?q?=EC=9D=B8=20=EB=A1=9C=EC=A7=81=20=EA=B0=9C=EC=84=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/lotto_buyer.py | 185 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 139 insertions(+), 46 deletions(-) diff --git a/src/lotto_buyer.py b/src/lotto_buyer.py index 33eddc4..b44a41a 100644 --- a/src/lotto_buyer.py +++ b/src/lotto_buyer.py @@ -1,19 +1,27 @@ +import binascii import json import logging import re from dataclasses import dataclass import requests +from Crypto.Cipher import PKCS1_v1_5 +from Crypto.PublicKey import RSA logger = logging.getLogger(__name__) -BASE_URL = "https://dhlottery.co.kr" -LOGIN_URL = f"{BASE_URL}/userSsl.do?method=login" -BUY_URL = f"{BASE_URL}/gameStore.do?method=saveBuy" -CHECK_BALANCE_URL = f"{BASE_URL}/userSsl.do?method=myPage" -GAME_INFO_URL = f"{BASE_URL}/common.do?method=getGameInfo" +BASE_URL = "https://www.dhlottery.co.kr" +GAME_URL = "https://ol.dhlottery.co.kr" -AUTO_MODE_CODE = "2" +LOGIN_PAGE_URL = f"{BASE_URL}/login" +RSA_API_URL = f"{BASE_URL}/login/selectRsaModulus.do" +LOGIN_URL = f"{BASE_URL}/login/securityLoginCheck.do" +GAME_645_URL = f"{GAME_URL}/olotto/game/game645.do" +BUY_URL = f"{GAME_URL}/olotto/game/execBuy.do" +READY_SOCKET_URL = f"{GAME_URL}/olotto/game/egovUserReadySocket.json" +SERVER_PROP_URL = f"{BASE_URL}/sy/getServerPropInfo.do" + +AUTO_MODE_CODE = "0" @dataclass @@ -38,49 +46,101 @@ class LottoBuyer: "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/120.0.0.0 Safari/537.36" ), - "Referer": BASE_URL + "/", } ) + def _rsa_encrypt(self, rsa_data: dict, text: str) -> str: + """RSA 공개키로 텍스트를 암호화합니다.""" + modulus = int(rsa_data["rsaModulus"], 16) + exponent = int(rsa_data["publicExponent"], 16) + pub_key = RSA.construct((modulus, exponent)) + cipher = PKCS1_v1_5.new(pub_key) + encrypted = cipher.encrypt(text.encode("utf-8")) + return binascii.hexlify(encrypted).decode("ascii") + def login(self) -> bool: - self.session.get(BASE_URL, timeout=30) + # 1. 로그인 페이지 GET (쿠키 세팅) + self.session.get(LOGIN_PAGE_URL, timeout=30) - payload = { - "userId": self.username, - "password": self.password, - "checkSave": "Y", - "newsEventYn": "", - } - response = self.session.post(LOGIN_URL, data=payload, timeout=30) + # 2. RSA 공개키 API 호출 + self.session.headers.update({"Referer": LOGIN_PAGE_URL}) + r_rsa = self.session.get(RSA_API_URL, timeout=30) + rsa_data = r_rsa.json()["data"] - if "JSESSIONID" not in self.session.cookies: - logger.error("로그인 실패: 세션 쿠키 없음") + # 3. 아이디/비밀번호 RSA 암호화 + enc_id = self._rsa_encrypt(rsa_data, self.username) + enc_pw = self._rsa_encrypt(rsa_data, self.password) + + # 4. 로그인 POST (Origin/Referer 헤더 필수) + response = self.session.post( + LOGIN_URL, + data={"userId": enc_id, "userPswdEncn": enc_pw}, + timeout=30, + headers={ + "Origin": BASE_URL, + "Referer": LOGIN_PAGE_URL, + }, + ) + + if "userId" not in self.session.cookies: + logger.error("로그인 실패: userId 쿠키 없음") return False - if "login" in response.url.lower(): - logger.error("로그인 실패: 로그인 페이지로 리다이렉트") + if "loginsuccess" not in response.url.lower() and ( + "login" in response.url.lower() or "error" in response.url.lower() + ): + logger.error("로그인 실패: %s 로 리다이렉트", response.url) return False logger.info("로그인 성공") return True - def get_current_round(self) -> int: - response = self.session.get(GAME_INFO_URL, timeout=30) + def _get_game_page_info(self) -> dict: + """game645.do 페이지에서 회차, 추첨일 등 정보를 파싱합니다.""" + # 게임 팝업 진입 경로를 통해 Referer 체인을 올바르게 설정 + el_game_url = "https://el.dhlottery.co.kr/game/TotalGame.jsp?LottoId=LO40" + self.session.headers.update({"Referer": f"{BASE_URL}/main"}) + self.session.get(el_game_url, timeout=30) + + self.session.headers.update({"Referer": el_game_url}) + response = self.session.get(GAME_645_URL, timeout=30) response.raise_for_status() - data = response.json() - return int(data.get("curDrwNo", 0)) + html = response.text + + round_match = re.search(r'id="curRound">(\d+)<', html) + draw_date_match = re.search( + r'id="ROUND_DRAW_DATE"[^>]+value="([^"]+)"', html + ) + pay_limit_match = re.search( + r'id="WAMT_PAY_TLMT_END_DT"[^>]+value="([^"]+)"', html + ) + + return { + "round": int(round_match.group(1)) if round_match else 0, + "ROUND_DRAW_DATE": draw_date_match.group(1) if draw_date_match else "", + "WAMT_PAY_TLMT_END_DT": pay_limit_match.group(1) if pay_limit_match else "", + } def get_balance(self) -> int: - response = self.session.get(CHECK_BALANCE_URL, timeout=30) - response.raise_for_status() - match = re.search(r"보유금액.*?([\d,]+)원", response.text) - if match: - return int(match.group(1).replace(",", "")) - return 0 + """예치금 잔액을 조회합니다.""" + try: + self.session.headers.update({"Referer": f"{BASE_URL}/mypage/home"}) + response = self.session.get( + f"{BASE_URL}/mypage/getMndpInfo.do", timeout=30 + ) + data = response.json() + mndp = data.get("data", {}) + # 예치금 잔액 = 입금액 합계 - 출금액 합계 + deposit = int(mndp.get("csblDpstAmt", 0) or 0) + withdraw = int(mndp.get("csblTkmnyAmt", 0) or 0) + return max(0, deposit - withdraw) + except Exception: + return 0 def buy(self, game_count: int) -> PurchaseResult: try: - current_round = self.get_current_round() + game_info = self._get_game_page_info() + current_round = game_info["round"] lotto_num_list = [] for i in range(game_count): @@ -94,17 +154,42 @@ class LottoBuyer: payload = { "round": current_round, - "direct": "172.17.20.52", + "direct": "", "nBuyAmount": str(1000 * game_count), "param": json.dumps(lotto_num_list, ensure_ascii=False), + "ROUND_DRAW_DATE": game_info["ROUND_DRAW_DATE"], + "WAMT_PAY_TLMT_END_DT": game_info["WAMT_PAY_TLMT_END_DT"], "gameCnt": game_count, + "saleMdaDcd": "10", } + # 대기열 체크 및 direct IP 취득 (구매 직전 필수 호출) + self.session.headers.update({ + "Referer": GAME_645_URL, + "Content-Type": "application/json; charset=UTF-8", + }) + r_ready = self.session.post(READY_SOCKET_URL, data="", timeout=30) + r_ready.raise_for_status() + ready_data = r_ready.json() + payload["direct"] = ready_data.get("ready_ip", "") + self.session.headers.pop("Content-Type", None) + response = self.session.post(BUY_URL, data=payload, timeout=60) response.raise_for_status() result_data = response.json() - if result_data.get("result", {}).get("resultMsg") == "SUCCESS": + if result_data.get("loginYn") == "N": + return PurchaseResult( + success=False, + round_number=current_round, + games=[], + total_price=0, + balance_after=0, + error_message="세션 만료 - 재로그인 필요", + ) + + result = result_data.get("result", {}) + if result.get("resultMsg") == "SUCCESS": games = self._parse_bought_numbers(result_data) balance_after = self.get_balance() return PurchaseResult( @@ -115,7 +200,7 @@ class LottoBuyer: balance_after=balance_after, ) - error_msg = result_data.get("result", {}).get("resultMsg", "알 수 없는 오류") + error_msg = result.get("resultMsg", "알 수 없는 오류") return PurchaseResult( success=False, round_number=current_round, @@ -137,18 +222,26 @@ class LottoBuyer: ) def _parse_bought_numbers(self, response_data: dict) -> list[list[int]]: + """새 API 응답 형식(arrGameChoiceNum) 파싱. + 예: ["A|05|07|08|11|12|253"] → [[5,7,8,11,12,25]] + 마지막 토큰은 두 자리 번호 + 단일 코드 숫자로 구성될 수 있음. + """ games = [] - lotto_list = response_data.get("result", {}).get("ary645Lotto", []) - for item in lotto_list: - numbers = sorted( - [ - int(item.get("drwtNo1", 0)), - int(item.get("drwtNo2", 0)), - int(item.get("drwtNo3", 0)), - int(item.get("drwtNo4", 0)), - int(item.get("drwtNo5", 0)), - int(item.get("drwtNo6", 0)), - ] - ) - games.append(numbers) + result = response_data.get("result", {}) + game_choice_list = result.get("arrGameChoiceNum", []) + for game_str in game_choice_list: + parts = game_str.split("|") + # 첫 요소는 게임 레이블 (A~E), 나머지가 번호 + raw_numbers = parts[1:] + numbers = [] + for token in raw_numbers: + # 마지막 토큰에 코드가 붙을 수 있으므로 2자리씩 파싱 + while len(token) >= 2: + numbers.append(int(token[:2])) + token = token[2:] + if len(token) == 1: + break # 남은 1자리는 코드로 무시 + numbers = sorted(set(n for n in numbers if 1 <= n <= 45))[:6] + if len(numbers) == 6: + games.append(numbers) return games