[通用翻译] 功能提交

This commit is contained in:
DELL 2026-01-20 16:13:05 +08:00
parent 910794aff7
commit 399165404e
7 changed files with 293 additions and 6 deletions

View File

@ -19,4 +19,4 @@ dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath)
# 等效于scrapy crawl FacebookUserSpider -a params="{}"
execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}'])
execute(['scrapy', 'crawl', 'WeiboUserSpider', '-a', 'params={}'])

View File

@ -0,0 +1,175 @@
import time
from typing import List, Tuple, Optional
import pymysql
import requests
# ================== 配置区 ==================
# 数据库配置
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
'autocommit': False # 手动控制事务
}
# 翻译 API 地址(替换为你的服务器 IP 或域名)
TRANSLATE_API_URL = "http://47.113.231.200:28081/translate"
# 指定时间格式YYYY-MM-DD HH:MM:SS
LOADTIME_AFTER = "2026-01-16 10:40:00"
# 目标站点列表
TARGET_SRCNAMES = [
'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA==',
'http://www.kcna.kp/kp/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf',
'https://energynow.com/category/press_releases/',
'https://www.fao.org/newsroom/en' # 添加你的站点
]
# 单次请求间隔(秒),避免 API 被限流
REQUEST_DELAY = 1
# 最大文本长度(与 API 一致)
MAX_TEXT_LENGTH = 5000
def normalize_newlines(text: str) -> str:
"""\r\n\r 统一转换为 \n"""
if not text:
return text
return text.replace('\r\n', '\n').replace('\r', '\n')
def translate_single(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Optional[str]:
"""翻译单段文本,失败返回 None"""
if not text or not text.strip():
return ""
payload = {
"text": text[:MAX_TEXT_LENGTH],
"source_lang": source_lang,
"target_lang": target_lang
}
try:
response = requests.post(TRANSLATE_API_URL, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
return result.get("translated_text")
except Exception as e:
print(f"⚠️ 翻译失败: {e}")
return None
def translate_content_with_paragraphs(content: str) -> str:
"""
按段落翻译内容支持容错
- 某段失败 跳过该段保留空行或原文
- 返回拼接后的完整内容
"""
if not content:
return ""
# 标准化换行符
content = normalize_newlines(content)
paragraphs = content.split('\n')
translated_paragraphs = []
for para in paragraphs:
if not para.strip():
# 保留空行
translated_paragraphs.append("")
continue
trans = translate_single(para)
if trans is None:
# 段落翻译失败:跳过该段(可选:保留原文或留空)
print(f" ⚠️ 段落翻译失败,跳过: {para[:30]}...")
translated_paragraphs.append("") # 或 append(para) 保留原文
else:
translated_paragraphs.append(trans)
time.sleep(REQUEST_DELAY)
return '\n'.join(translated_paragraphs)
# ================== 数据库操作 ==================
def update_record(cursor, es_sid: int, new_title: str, new_content: str):
update_query = """
UPDATE indeximos
SET es_title = % s, es_content = % s
WHERE es_sid = % s
"""
cursor.execute(update_query, (new_title, new_content, es_sid))
# ================== 主逻辑 ==================
def main():
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_urltitle, es_urlcontent
FROM indeximos
WHERE es_loadtime > %s
AND (es_title IS NULL OR TRIM(es_title) = '')
AND es_srcname IN ({placeholders})
AND LENGTH(es_video) > 5
"""
params = [LOADTIME_AFTER] + TARGET_SRCNAMES
cursor.execute(query, params)
records: List[Tuple] = cursor.fetchall()
total = len(records)
print(f"✅ 共找到 {total} 条待翻译记录")
if total == 0:
return
success_count = 0
for idx, (es_sid, urltitle, urlcontent) in enumerate(records, 1):
print(f"\n[{idx}/{total}] 处理 es_sid={es_sid}")
start_time = time.time()
# 翻译标题
title_trans = translate_single(urltitle) if urltitle else ""
if title_trans is None:
print(" → 标题翻译失败,跳过整条")
continue
# 翻译内容(按段落,容错)
content_trans = translate_content_with_paragraphs(urlcontent)
# 更新数据库
update_record(cursor, es_sid, title_trans, content_trans)
success_count += 1
elapsed = time.time() - start_time
print(f" ✅ 翻译成功 | 耗时: {elapsed:.2f}s | 标题: {title_trans[:30]}...")
conn.commit()
print(f"\n🎉 完成!成功翻译 {success_count} / {total} 条记录")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()

View File

@ -19,6 +19,7 @@ class WebsiteSpiderItem(scrapy.Item):
es_extname = scrapy.Field()
es_channel = scrapy.Field()
es_groupname = scrapy.Field()
es_title = scrapy.Field()
es_urltitle = scrapy.Field()
es_urltopic = scrapy.Field()
es_lasttime = scrapy.Field()

View File

@ -26,8 +26,10 @@ PER_BATCH_IP_USE_TIMES = 5 # 代理中间件每次从ip池获取一批ip
# REDIS_HOST = '38.54.94.107'
# REDIS_PORT = '28097'
REDIS_HOST = '10.55.13.3'
REDIS_PORT = '7379'
# REDIS_HOST = '10.55.13.3'
# REDIS_PORT = '7379'
REDIS_HOST = '107.182.191.3'
REDIS_PORT = 7379
REDIS_PWD = 'jlkj-841-2-redis'
REDIS_PARAMS = {
'password': 'jlkj-841-2-redis',
@ -164,3 +166,10 @@ ITEM_PIPELINES = {
'scrapy.pipelines.images.ImagesPipeline': 2,
'WebsiteSpider.pipelines.ProtobufSavePipeline': 300,
}
############################## 翻译
MAX_TEXT_LENGTH = 100
# 翻译 API 地址(替换为你的服务器 IP 或域名)
TRANSLATE_API_URL = "http://47.113.231.200:28081/translate"
# 单次请求间隔(秒),避免 API 被限流
REQUEST_DELAY = 1

View File

@ -5,7 +5,9 @@ import re
import scrapy
import validators
from scrapy_redis.spiders import RedisSpider
import redis
from WebsiteSpider.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD
from WebsiteSpider.scrapy_selenium import SeleniumRequest
from WebsiteSpider.utils.http_utils import build_url
from WebsiteSpider.utils.parser_utils import parse_item_from_response
@ -18,7 +20,8 @@ class WebsiteInfoCommonSpider(RedisSpider):
super(WebsiteInfoCommonSpider, self).__init__(*args, **kwargs)
json_params = json.loads(params)
self.name = 'WebSite_' + json_params['clusterName']
self.redis_client = None
self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT,
password=REDIS_PWD)
if 'job_id' in json_params:
self.job_id = json_params['job_id']
@ -36,7 +39,7 @@ class WebsiteInfoCommonSpider(RedisSpider):
# 根据url特征判断是否为内容页若是则解析文本内容
detail_page_reg = parse_rule['detailPageReg']
if detail_page_reg == "" or re.search(detail_page_reg, response.url) is not None:
yield_flag, webpage_item = parse_item_from_response(response, parse_rule)
yield_flag, webpage_item = parse_item_from_response(response, parse_rule, self.redis_client)
if yield_flag:
yield webpage_item

View File

@ -9,9 +9,10 @@ from scrapy.loader import ItemLoader
from WebsiteSpider.items import WebsiteSpiderItem
from WebsiteSpider.utils.date_utils import transfer_time_zone, get_time_stamp
from WebsiteSpider.utils.http_utils import filter_html_tags, build_url
from WebsiteSpider.utils.traslate_utils import translate_single, translate_content_with_paragraphs, update_record
def parse_item_from_response(response, parse_rule):
def parse_item_from_response(response, parse_rule, redis_client):
current_url = response.url
allowed_domains = parse_rule['allowDomain'].split(';')
mapping = parse_rule['fieldMappings']
@ -116,6 +117,7 @@ def parse_item_from_response(response, parse_rule):
logger.info("urltime: %s" % webpage_item['es_urltime'])
except KeyError:
logger.info('时间解析失败当前页面url: %s' % response.url)
time_parse_rule = None
if 'dateReg' in mapping:
time_parse_rule = {
@ -155,4 +157,26 @@ def parse_item_from_response(response, parse_rule):
logger.info('时间无法解析,解析规则是:' + mapping['es_urltime'])
if filter_VIP_content:
logger.info('当前内容是VIP文章并不完整已经过滤。')
if yield_flag:
try:
# 1. 从 Redis 获取原始数据
raw_urls = redis_client.lrange('WebsiteSpider:translate_sites', 0, -1)
translate_list = [
url_bytes.decode('utf-8').strip()
for url_bytes in raw_urls
if url_bytes and url_bytes.decode('utf-8').strip()
]
if webpage_item['es_srcname'] in translate_list:
# 翻译标题
webpage_item['es_title'] = translate_single(webpage_item['es_urltitle'])
if webpage_item['es_title'] is None:
logger.warning(" → 标题翻译失败,跳过整条")
else:
logger.info(f"翻译成功,标题译文长度:{len(webpage_item['es_title'])}")
# 翻译内容(按段落,容错)
webpage_item['es_content'] = translate_content_with_paragraphs(webpage_item['es_urlcontent'])
logger.info(f"翻译成功,内容译文长度:{len(webpage_item['es_content'])}")
except Exception as e:
logger.error(repr(e))
return yield_flag, webpage_item

View File

@ -0,0 +1,75 @@
from WebsiteSpider.settings import MAX_TEXT_LENGTH, TRANSLATE_API_URL, REQUEST_DELAY
import requests
import time
from typing import List, Tuple, Optional
def normalize_newlines(text: str) -> str:
"""\r\n\r 统一转换为 \n"""
if not text:
return text
return text.replace('\r\n', '\n').replace('\r', '\n')
def translate_single(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Optional[str]:
"""翻译单段文本,失败返回 None"""
if not text or not text.strip():
return ""
payload = {
"text": text[:MAX_TEXT_LENGTH],
"source_lang": source_lang,
"target_lang": target_lang
}
try:
response = requests.post(TRANSLATE_API_URL, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
return result.get("translated_text")
except Exception as e:
print(f"⚠️ 翻译失败: {e}")
return None
def translate_content_with_paragraphs(content: str) -> str:
"""
按段落翻译内容支持容错
- 某段失败 跳过该段保留空行或原文
- 返回拼接后的完整内容
"""
if not content:
return ""
# 标准化换行符
content = normalize_newlines(content)
paragraphs = content.split('\n')
translated_paragraphs = []
for para in paragraphs:
if not para.strip():
# 保留空行
translated_paragraphs.append("")
continue
trans = translate_single(para)
if trans is None:
# 段落翻译失败:跳过该段(可选:保留原文或留空)
print(f" ⚠️ 段落翻译失败,跳过: {para[:30]}...")
translated_paragraphs.append("") # 或 append(para) 保留原文
else:
translated_paragraphs.append(trans)
time.sleep(REQUEST_DELAY)
return '\n'.join(translated_paragraphs)
# ================== 数据库操作 ==================
def update_record(cursor, es_sid: int, new_title: str, new_content: str):
update_query = """
UPDATE indeximos
SET es_title = % s, es_content = % s
WHERE es_sid = % s
"""
cursor.execute(update_query, (new_title, new_content, es_sid))