Merge branch 'main' of ssh://144.34.185.108:5282/osc-group/osc

This commit is contained in:
yuxin-pc 2026-01-20 16:43:17 +08:00
commit f7a210473a
9 changed files with 304 additions and 88 deletions

View File

@ -89,85 +89,6 @@ class TwitterSpider(scrapy.Spider):
login_users=login_users, login_users=login_users,
response=response response=response
) )
# driver = response.request.meta['driver']
# driver.maximize_window()
# # 1. 打开第一个标签页
# driver.get('https://x.com/i/flow/login')
# user_list = []
# for u in login_users:
# user_list.append(json.loads(u.decode()))
#
# login_user = random.choice(user_list)
# wait = WebDriverWait(driver, 15)
# # 2. 通过 JS 打开第二个标签页(新 Tab
# time.sleep(5)
# driver.execute_script("window.open('');")
# driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');")
#
# # 3. 获取所有标签页句柄
# handles = driver.window_handles # [handle1, handle2]
#
# # 4. 切换到第二个标签页(可选)
# driver.switch_to.window(handles[1])
#
# logger.info(f"login as user {login_user['uid']}")
# # time.sleep(random.uniform(1.5, 3.0))
# # driver.find_element_by_xpath("//input").send_keys(login_user['uid'])
# # 等待并定位用户名输入框
# username_input = wait.until(
# EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]'))
# )
#
# # 模拟真人逐字输入(带随机延迟)
# username = login_user['uid']
# for char in username:
# username_input.send_keys(char)
# time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms
#
# time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿
#
# # 尝试点击 "Next" 按钮(主逻辑:带文本的按钮)
# try:
# next_button = wait.until(
# EC.element_to_be_clickable(
# (By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]")
# )
# )
# body = driver.find_element(By.TAG_NAME, "body")
# ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform()
# time.sleep(0.5)
# # 模拟鼠标移动到按钮并点击
# actions = ActionChains(driver)
# actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
#
# except Exception as e:
# logger.info("主 Next 按钮未找到,尝试备用定位方式")
# try:
# # 备用:通过 role 定位第二个 button
# next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]")
# actions = ActionChains(driver)
# actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
# except Exception as e2:
# logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}")
# raise
# time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
# try:
# logger.info("输入手机号验证...")
# driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641")
# # driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click()
# driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click()
# time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
# except Exception:
# logger.info("无需输入手机号验证")
# driver.find_element_by_xpath("//input[@name='password']").send_keys(login_user['pwd'])
# driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click()
# time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
# try:
# driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click()
# time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载
# except:
# time.sleep(5)
cookies = driver.get_cookies() cookies = driver.get_cookies()
# 取cookie中的ct0为x-csrf-token取gt为x-guest-token # 取cookie中的ct0为x-csrf-token取gt为x-guest-token
self.cookie_dict = {} self.cookie_dict = {}
@ -190,10 +111,8 @@ class TwitterSpider(scrapy.Spider):
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'content-type': 'application/json', 'content-type': 'application/json',
'authorization': self.authorization, 'authorization': self.authorization,
# 'x-twitter-active-user': 'yes',
'Origin': 'https://twitter.com', 'Origin': 'https://twitter.com',
'Cookie': cookie_string, 'Cookie': cookie_string,
# 'Connection': 'keep-alive',
'X-Csrf-Token': ct0 'X-Csrf-Token': ct0
} }
self.filter_key = self.settings['TWITTER_FILTER_KEY'] self.filter_key = self.settings['TWITTER_FILTER_KEY']

View File

@ -2,10 +2,12 @@
import json import json
import time import time
import redis
import scrapy import scrapy
from redisbloom.client import Client from redisbloom.client import Client
from MediaSpiders.items import MediaspidersItem from MediaSpiders.items import MediaspidersItem
from MediaSpiders.spiders.TwitterUserSpider import form_cookie_dict
from MediaSpiders.utils.http_utils import http_post from MediaSpiders.utils.http_utils import http_post
from MediaSpiders.utils.string_utils import find_text from MediaSpiders.utils.string_utils import find_text
from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp
@ -59,6 +61,11 @@ class WeiboSpider(scrapy.Spider):
account_rsp = json.loads( account_rsp = json.loads(
http_post(account_query_api, json.dumps(post_data), headers={"Content-Type": "application/json"}).text) http_post(account_query_api, json.dumps(post_data), headers={"Content-Type": "application/json"}).text)
self.simhash_filter_key = self.settings['WEIBO_SIMHASH_FILTER_KEY'] self.simhash_filter_key = self.settings['WEIBO_SIMHASH_FILTER_KEY']
# 从 redis 中 获取 微博所需的 cookie
cookie_string = redis.Redis(host=self.settings['REDIS_HOST'], port=self.settings['REDIS_PORT'],
password=self.settings['REDIS_PWD']).get("MediaSpiders:Weibo_Cookies").decode()
self.cookie_dict = form_cookie_dict(cookie_string)
all_user_info = [] all_user_info = []
if account_rsp['code'] == 200: if account_rsp['code'] == 200:
all_user_info = account_rsp['content'] all_user_info = account_rsp['content']
@ -67,7 +74,10 @@ class WeiboSpider(scrapy.Spider):
if uid[:6] != '107603': if uid[:6] != '107603':
uid = f'107603{uid}' uid = f'107603{uid}'
yield scrapy.Request('https://m.weibo.cn/api/container/getIndex?containerid=%s' % uid, yield scrapy.Request('https://m.weibo.cn/api/container/getIndex?containerid=%s' % uid,
callback=self.parse, meta={'currentCount': 0, 'uid': uid}) callback=self.parse,
meta={'currentCount': 0, 'uid': uid},
cookies=self.cookie_dict
)
def parse(self, response): def parse(self, response):
rsp = json.loads(response.text) rsp = json.loads(response.text)

View File

@ -19,4 +19,4 @@ dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath) sys.path.append(dirpath)
# 等效于scrapy crawl FacebookUserSpider -a params="{}" # 等效于scrapy crawl FacebookUserSpider -a params="{}"
execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}']) execute(['scrapy', 'crawl', 'WeiboUserSpider', '-a', 'params={}'])

View File

@ -0,0 +1,175 @@
import time
from typing import List, Tuple, Optional
import pymysql
import requests
# ================== 配置区 ==================
# 数据库配置
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
'autocommit': False # 手动控制事务
}
# 翻译 API 地址(替换为你的服务器 IP 或域名)
TRANSLATE_API_URL = "http://47.113.231.200:28081/translate"
# 指定时间格式YYYY-MM-DD HH:MM:SS
LOADTIME_AFTER = "2026-01-16 10:40:00"
# 目标站点列表
TARGET_SRCNAMES = [
'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA==',
'http://www.kcna.kp/kp/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf',
'https://energynow.com/category/press_releases/',
'https://www.fao.org/newsroom/en' # 添加你的站点
]
# 单次请求间隔(秒),避免 API 被限流
REQUEST_DELAY = 1
# 最大文本长度(与 API 一致)
MAX_TEXT_LENGTH = 5000
def normalize_newlines(text: str) -> str:
"""\r\n\r 统一转换为 \n"""
if not text:
return text
return text.replace('\r\n', '\n').replace('\r', '\n')
def translate_single(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Optional[str]:
"""翻译单段文本,失败返回 None"""
if not text or not text.strip():
return ""
payload = {
"text": text[:MAX_TEXT_LENGTH],
"source_lang": source_lang,
"target_lang": target_lang
}
try:
response = requests.post(TRANSLATE_API_URL, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
return result.get("translated_text")
except Exception as e:
print(f"⚠️ 翻译失败: {e}")
return None
def translate_content_with_paragraphs(content: str) -> str:
"""
按段落翻译内容支持容错
- 某段失败 跳过该段保留空行或原文
- 返回拼接后的完整内容
"""
if not content:
return ""
# 标准化换行符
content = normalize_newlines(content)
paragraphs = content.split('\n')
translated_paragraphs = []
for para in paragraphs:
if not para.strip():
# 保留空行
translated_paragraphs.append("")
continue
trans = translate_single(para)
if trans is None:
# 段落翻译失败:跳过该段(可选:保留原文或留空)
print(f" ⚠️ 段落翻译失败,跳过: {para[:30]}...")
translated_paragraphs.append("") # 或 append(para) 保留原文
else:
translated_paragraphs.append(trans)
time.sleep(REQUEST_DELAY)
return '\n'.join(translated_paragraphs)
# ================== 数据库操作 ==================
def update_record(cursor, es_sid: int, new_title: str, new_content: str):
update_query = """
UPDATE indeximos
SET es_title = % s, es_content = % s
WHERE es_sid = % s
"""
cursor.execute(update_query, (new_title, new_content, es_sid))
# ================== 主逻辑 ==================
def main():
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_urltitle, es_urlcontent
FROM indeximos
WHERE es_loadtime > %s
AND (es_title IS NULL OR TRIM(es_title) = '')
AND es_srcname IN ({placeholders})
AND LENGTH(es_video) > 5
"""
params = [LOADTIME_AFTER] + TARGET_SRCNAMES
cursor.execute(query, params)
records: List[Tuple] = cursor.fetchall()
total = len(records)
print(f"✅ 共找到 {total} 条待翻译记录")
if total == 0:
return
success_count = 0
for idx, (es_sid, urltitle, urlcontent) in enumerate(records, 1):
print(f"\n[{idx}/{total}] 处理 es_sid={es_sid}")
start_time = time.time()
# 翻译标题
title_trans = translate_single(urltitle) if urltitle else ""
if title_trans is None:
print(" → 标题翻译失败,跳过整条")
continue
# 翻译内容(按段落,容错)
content_trans = translate_content_with_paragraphs(urlcontent)
# 更新数据库
update_record(cursor, es_sid, title_trans, content_trans)
success_count += 1
elapsed = time.time() - start_time
print(f" ✅ 翻译成功 | 耗时: {elapsed:.2f}s | 标题: {title_trans[:30]}...")
conn.commit()
print(f"\n🎉 完成!成功翻译 {success_count} / {total} 条记录")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()

View File

@ -19,6 +19,7 @@ class WebsiteSpiderItem(scrapy.Item):
es_extname = scrapy.Field() es_extname = scrapy.Field()
es_channel = scrapy.Field() es_channel = scrapy.Field()
es_groupname = scrapy.Field() es_groupname = scrapy.Field()
es_title = scrapy.Field()
es_urltitle = scrapy.Field() es_urltitle = scrapy.Field()
es_urltopic = scrapy.Field() es_urltopic = scrapy.Field()
es_lasttime = scrapy.Field() es_lasttime = scrapy.Field()

View File

@ -26,8 +26,10 @@ PER_BATCH_IP_USE_TIMES = 5 # 代理中间件每次从ip池获取一批ip
# REDIS_HOST = '38.54.94.107' # REDIS_HOST = '38.54.94.107'
# REDIS_PORT = '28097' # REDIS_PORT = '28097'
REDIS_HOST = '10.55.13.3' # REDIS_HOST = '10.55.13.3'
REDIS_PORT = '7379' # REDIS_PORT = '7379'
REDIS_HOST = '107.182.191.3'
REDIS_PORT = 7379
REDIS_PWD = 'jlkj-841-2-redis' REDIS_PWD = 'jlkj-841-2-redis'
REDIS_PARAMS = { REDIS_PARAMS = {
'password': 'jlkj-841-2-redis', 'password': 'jlkj-841-2-redis',
@ -164,3 +166,10 @@ ITEM_PIPELINES = {
'scrapy.pipelines.images.ImagesPipeline': 2, 'scrapy.pipelines.images.ImagesPipeline': 2,
'WebsiteSpider.pipelines.ProtobufSavePipeline': 300, 'WebsiteSpider.pipelines.ProtobufSavePipeline': 300,
} }
############################## 翻译
MAX_TEXT_LENGTH = 100
# 翻译 API 地址(替换为你的服务器 IP 或域名)
TRANSLATE_API_URL = "http://47.113.231.200:28081/translate"
# 单次请求间隔(秒),避免 API 被限流
REQUEST_DELAY = 1

View File

@ -5,7 +5,9 @@ import re
import scrapy import scrapy
import validators import validators
from scrapy_redis.spiders import RedisSpider from scrapy_redis.spiders import RedisSpider
import redis
from WebsiteSpider.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD
from WebsiteSpider.scrapy_selenium import SeleniumRequest from WebsiteSpider.scrapy_selenium import SeleniumRequest
from WebsiteSpider.utils.http_utils import build_url from WebsiteSpider.utils.http_utils import build_url
from WebsiteSpider.utils.parser_utils import parse_item_from_response from WebsiteSpider.utils.parser_utils import parse_item_from_response
@ -18,7 +20,8 @@ class WebsiteInfoCommonSpider(RedisSpider):
super(WebsiteInfoCommonSpider, self).__init__(*args, **kwargs) super(WebsiteInfoCommonSpider, self).__init__(*args, **kwargs)
json_params = json.loads(params) json_params = json.loads(params)
self.name = 'WebSite_' + json_params['clusterName'] self.name = 'WebSite_' + json_params['clusterName']
self.redis_client = None self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT,
password=REDIS_PWD)
if 'job_id' in json_params: if 'job_id' in json_params:
self.job_id = json_params['job_id'] self.job_id = json_params['job_id']
@ -36,7 +39,7 @@ class WebsiteInfoCommonSpider(RedisSpider):
# 根据url特征判断是否为内容页若是则解析文本内容 # 根据url特征判断是否为内容页若是则解析文本内容
detail_page_reg = parse_rule['detailPageReg'] detail_page_reg = parse_rule['detailPageReg']
if detail_page_reg == "" or re.search(detail_page_reg, response.url) is not None: if detail_page_reg == "" or re.search(detail_page_reg, response.url) is not None:
yield_flag, webpage_item = parse_item_from_response(response, parse_rule) yield_flag, webpage_item = parse_item_from_response(response, parse_rule, self.redis_client)
if yield_flag: if yield_flag:
yield webpage_item yield webpage_item

View File

@ -9,9 +9,10 @@ from scrapy.loader import ItemLoader
from WebsiteSpider.items import WebsiteSpiderItem from WebsiteSpider.items import WebsiteSpiderItem
from WebsiteSpider.utils.date_utils import transfer_time_zone, get_time_stamp from WebsiteSpider.utils.date_utils import transfer_time_zone, get_time_stamp
from WebsiteSpider.utils.http_utils import filter_html_tags, build_url from WebsiteSpider.utils.http_utils import filter_html_tags, build_url
from WebsiteSpider.utils.traslate_utils import translate_single, translate_content_with_paragraphs, update_record
def parse_item_from_response(response, parse_rule): def parse_item_from_response(response, parse_rule, redis_client):
current_url = response.url current_url = response.url
allowed_domains = parse_rule['allowDomain'].split(';') allowed_domains = parse_rule['allowDomain'].split(';')
mapping = parse_rule['fieldMappings'] mapping = parse_rule['fieldMappings']
@ -116,6 +117,7 @@ def parse_item_from_response(response, parse_rule):
logger.info("urltime: %s" % webpage_item['es_urltime']) logger.info("urltime: %s" % webpage_item['es_urltime'])
except KeyError: except KeyError:
logger.info('时间解析失败当前页面url: %s' % response.url) logger.info('时间解析失败当前页面url: %s' % response.url)
time_parse_rule = None time_parse_rule = None
if 'dateReg' in mapping: if 'dateReg' in mapping:
time_parse_rule = { time_parse_rule = {
@ -155,4 +157,26 @@ def parse_item_from_response(response, parse_rule):
logger.info('时间无法解析,解析规则是:' + mapping['es_urltime']) logger.info('时间无法解析,解析规则是:' + mapping['es_urltime'])
if filter_VIP_content: if filter_VIP_content:
logger.info('当前内容是VIP文章并不完整已经过滤。') logger.info('当前内容是VIP文章并不完整已经过滤。')
if yield_flag:
try:
# 1. 从 Redis 获取原始数据
raw_urls = redis_client.lrange('WebsiteSpider:translate_sites', 0, -1)
translate_list = [
url_bytes.decode('utf-8').strip()
for url_bytes in raw_urls
if url_bytes and url_bytes.decode('utf-8').strip()
]
if webpage_item['es_srcname'] in translate_list:
# 翻译标题
webpage_item['es_title'] = translate_single(webpage_item['es_urltitle'])
if webpage_item['es_title'] is None:
logger.warning(" → 标题翻译失败,跳过整条")
else:
logger.info(f"翻译成功,标题译文长度:{len(webpage_item['es_title'])}")
# 翻译内容(按段落,容错)
webpage_item['es_content'] = translate_content_with_paragraphs(webpage_item['es_urlcontent'])
logger.info(f"翻译成功,内容译文长度:{len(webpage_item['es_content'])}")
except Exception as e:
logger.error(repr(e))
return yield_flag, webpage_item return yield_flag, webpage_item

View File

@ -0,0 +1,75 @@
from WebsiteSpider.settings import MAX_TEXT_LENGTH, TRANSLATE_API_URL, REQUEST_DELAY
import requests
import time
from typing import List, Tuple, Optional
def normalize_newlines(text: str) -> str:
"""\r\n\r 统一转换为 \n"""
if not text:
return text
return text.replace('\r\n', '\n').replace('\r', '\n')
def translate_single(text: str, source_lang: str = "auto", target_lang: str = "zh") -> Optional[str]:
"""翻译单段文本,失败返回 None"""
if not text or not text.strip():
return ""
payload = {
"text": text[:MAX_TEXT_LENGTH],
"source_lang": source_lang,
"target_lang": target_lang
}
try:
response = requests.post(TRANSLATE_API_URL, json=payload, timeout=10)
response.raise_for_status()
result = response.json()
return result.get("translated_text")
except Exception as e:
print(f"⚠️ 翻译失败: {e}")
return None
def translate_content_with_paragraphs(content: str) -> str:
"""
按段落翻译内容支持容错
- 某段失败 跳过该段保留空行或原文
- 返回拼接后的完整内容
"""
if not content:
return ""
# 标准化换行符
content = normalize_newlines(content)
paragraphs = content.split('\n')
translated_paragraphs = []
for para in paragraphs:
if not para.strip():
# 保留空行
translated_paragraphs.append("")
continue
trans = translate_single(para)
if trans is None:
# 段落翻译失败:跳过该段(可选:保留原文或留空)
print(f" ⚠️ 段落翻译失败,跳过: {para[:30]}...")
translated_paragraphs.append("") # 或 append(para) 保留原文
else:
translated_paragraphs.append(trans)
time.sleep(REQUEST_DELAY)
return '\n'.join(translated_paragraphs)
# ================== 数据库操作 ==================
def update_record(cursor, es_sid: int, new_title: str, new_content: str):
update_query = """
UPDATE indeximos
SET es_title = % s, es_content = % s
WHERE es_sid = % s
"""
cursor.execute(update_query, (new_title, new_content, es_sid))