osc/research/pdf_downloader/translate-news.py
yuxin-pc 910794aff7 Create translate-news.py
执行翻译
2026-01-20 11:08:23 +08:00

176 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import time
from typing import List, Tuple, Optional
import pymysql
import requests
# ================== 配置区 ==================
# 数据库配置
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
'autocommit': False # 手动控制事务
}
# 翻译 API 地址(替换为你的服务器 IP 或域名)
TRANSLATE_API_URL = "http://47.113.231.200:28081/translate"
# 指定时间格式YYYY-MM-DD HH:MM:SS
LOADTIME_AFTER = "2026-01-16 10:40:00"
# 目标站点列表
TARGET_SRCNAMES = [
'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA==',
'http://www.kcna.kp/kp/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf',
'https://energynow.com/category/press_releases/',
'https://www.fao.org/newsroom/en' # 添加你的站点
]
# 单次请求间隔(秒),避免 API 被限流
REQUEST_DELAY = 1
# 最大文本长度(与 API 一致)
MAX_TEXT_LENGTH = 5000
def normalize_newlines(text: str) -> str:
"""\r\n\r 统一转换为 \n"""
if not text:
return text
return text.replace('\r\n', '\n').replace('\r', '\n')
def translate_single(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Optional[str]:
"""翻译单段文本,失败返回 None"""
if not text or not text.strip():
return ""
payload = {
"text": text[:MAX_TEXT_LENGTH],
"source_lang": source_lang,
"target_lang": target_lang
}
try:
response = requests.post(TRANSLATE_API_URL, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
return result.get("translated_text")
except Exception as e:
print(f"⚠️ 翻译失败: {e}")
return None
def translate_content_with_paragraphs(content: str) -> str:
"""
按段落翻译内容,支持容错:
- 某段失败 → 跳过该段(保留空行或原文)
- 返回拼接后的完整内容
"""
if not content:
return ""
# 标准化换行符
content = normalize_newlines(content)
paragraphs = content.split('\n')
translated_paragraphs = []
for para in paragraphs:
if not para.strip():
# 保留空行
translated_paragraphs.append("")
continue
trans = translate_single(para)
if trans is None:
# 段落翻译失败:跳过该段(可选:保留原文或留空)
print(f" ⚠️ 段落翻译失败,跳过: {para[:30]}...")
translated_paragraphs.append("") # 或 append(para) 保留原文
else:
translated_paragraphs.append(trans)
time.sleep(REQUEST_DELAY)
return '\n'.join(translated_paragraphs)
# ================== 数据库操作 ==================
def update_record(cursor, es_sid: int, new_title: str, new_content: str):
update_query = """
UPDATE indeximos
SET es_title = % s, es_content = % s
WHERE es_sid = % s
"""
cursor.execute(update_query, (new_title, new_content, es_sid))
# ================== 主逻辑 ==================
def main():
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_urltitle, es_urlcontent
FROM indeximos
WHERE es_loadtime > %s
AND (es_title IS NULL OR TRIM(es_title) = '')
AND es_srcname IN ({placeholders})
AND LENGTH(es_video) > 5
"""
params = [LOADTIME_AFTER] + TARGET_SRCNAMES
cursor.execute(query, params)
records: List[Tuple] = cursor.fetchall()
total = len(records)
print(f"✅ 共找到 {total} 条待翻译记录")
if total == 0:
return
success_count = 0
for idx, (es_sid, urltitle, urlcontent) in enumerate(records, 1):
print(f"\n[{idx}/{total}] 处理 es_sid={es_sid}")
start_time = time.time()
# 翻译标题
title_trans = translate_single(urltitle) if urltitle else ""
if title_trans is None:
print(" → 标题翻译失败,跳过整条")
continue
# 翻译内容(按段落,容错)
content_trans = translate_content_with_paragraphs(urlcontent)
# 更新数据库
update_record(cursor, es_sid, title_trans, content_trans)
success_count += 1
elapsed = time.time() - start_time
print(f" ✅ 翻译成功 | 耗时: {elapsed:.2f}s | 标题: {title_trans[:30]}...")
conn.commit()
print(f"\n🎉 完成!成功翻译 {success_count} / {total} 条记录")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()