import time from typing import List, Tuple, Optional import pymysql import requests # ================== 配置区 ================== # 数据库配置 DB_CONFIG = { 'host': '47.113.231.200', 'port': 28089, 'user': 'root', 'password': 'passok123A', 'database': 'dsp', 'charset': 'utf8mb4', 'autocommit': False # 手动控制事务 } # 翻译 API 地址(替换为你的服务器 IP 或域名) TRANSLATE_API_URL = "http://47.113.231.200:28081/translate" # 指定时间(格式:YYYY-MM-DD HH:MM:SS) LOADTIME_AFTER = "2026-01-16 10:40:00" # 目标站点列表 TARGET_SRCNAMES = [ 'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA==', 'http://www.kcna.kp/kp/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf', 'https://energynow.com/category/press_releases/', 'https://www.fao.org/newsroom/en' # 添加你的站点 ] # 单次请求间隔(秒),避免 API 被限流 REQUEST_DELAY = 1 # 最大文本长度(与 API 一致) MAX_TEXT_LENGTH = 5000 def normalize_newlines(text: str) -> str: """将 \r\n 和 \r 统一转换为 \n""" if not text: return text return text.replace('\r\n', '\n').replace('\r', '\n') def translate_single(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Optional[str]: """翻译单段文本,失败返回 None""" if not text or not text.strip(): return "" payload = { "text": text[:MAX_TEXT_LENGTH], "source_lang": source_lang, "target_lang": target_lang } try: response = requests.post(TRANSLATE_API_URL, json=payload, timeout=10) response.raise_for_status() result = response.json() return result.get("translated_text") except Exception as e: print(f"⚠️ 翻译失败: {e}") return None def translate_content_with_paragraphs(content: str) -> str: """ 按段落翻译内容,支持容错: - 某段失败 → 跳过该段(保留空行或原文) - 返回拼接后的完整内容 """ if not content: return "" # 标准化换行符 content = normalize_newlines(content) paragraphs = content.split('\n') translated_paragraphs = [] for para in paragraphs: if not para.strip(): # 保留空行 translated_paragraphs.append("") continue trans = translate_single(para) if trans is None: # 段落翻译失败:跳过该段(可选:保留原文或留空) print(f" ⚠️ 段落翻译失败,跳过: {para[:30]}...") translated_paragraphs.append("") # 或 append(para) 保留原文 else: translated_paragraphs.append(trans) time.sleep(REQUEST_DELAY) return '\n'.join(translated_paragraphs) # ================== 数据库操作 ================== def update_record(cursor, es_sid: int, new_title: str, new_content: str): update_query = """ UPDATE indeximos SET es_title = % s, es_content = % s WHERE es_sid = % s """ cursor.execute(update_query, (new_title, new_content, es_sid)) # ================== 主逻辑 ================== def main(): conn = pymysql.connect(**DB_CONFIG) cursor = conn.cursor() try: placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES)) query = f""" SELECT es_sid, es_urltitle, es_urlcontent FROM indeximos WHERE es_loadtime > %s AND (es_title IS NULL OR TRIM(es_title) = '') AND es_srcname IN ({placeholders}) AND LENGTH(es_video) > 5 """ params = [LOADTIME_AFTER] + TARGET_SRCNAMES cursor.execute(query, params) records: List[Tuple] = cursor.fetchall() total = len(records) print(f"✅ 共找到 {total} 条待翻译记录") if total == 0: return success_count = 0 for idx, (es_sid, urltitle, urlcontent) in enumerate(records, 1): print(f"\n[{idx}/{total}] 处理 es_sid={es_sid}") start_time = time.time() # 翻译标题 title_trans = translate_single(urltitle) if urltitle else "" if title_trans is None: print(" → 标题翻译失败,跳过整条") continue # 翻译内容(按段落,容错) content_trans = translate_content_with_paragraphs(urlcontent) # 更新数据库 update_record(cursor, es_sid, title_trans, content_trans) success_count += 1 elapsed = time.time() - start_time print(f" ✅ 翻译成功 | 耗时: {elapsed:.2f}s | 标题: {title_trans[:30]}...") conn.commit() print(f"\n🎉 完成!成功翻译 {success_count} / {total} 条记录") except Exception as e: conn.rollback() print(f"❌ 发生错误: {e}") raise finally: cursor.close() conn.close() if __name__ == "__main__": main()