Compare commits

...

3 Commits

Author SHA1 Message Date
yuxin-pc
89df3771e7 同步近期采集更改 2026-01-19 09:17:26 +08:00
yuxin-pc
becee60b6c PDF下载工具 2026-01-19 09:17:10 +08:00
yuxin-pc
ee958357b0 针对6S采集的针对性改造 2026-01-19 09:16:46 +08:00
16 changed files with 572 additions and 80 deletions

View File

@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject;
import com.jsc.dsp.model.ReturnT; import com.jsc.dsp.model.ReturnT;
import com.jsc.dsp.utils.AutoExportAndUpload; import com.jsc.dsp.utils.AutoExportAndUpload;
import com.jsc.dsp.utils.DatabaseConnector; import com.jsc.dsp.utils.DatabaseConnector;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMapping;
@ -13,6 +14,7 @@ import javax.annotation.Resource;
@RestController @RestController
@RequestMapping("/export") @RequestMapping("/export")
@ConditionalOnProperty(name = "switch.auto-export-and-upload", havingValue = "true", matchIfMissing = true)
public class ExportController { public class ExportController {
@Resource @Resource

View File

@ -24,10 +24,12 @@ public class EsDataNewsView {
String esSitename; String esSitename;
String esSrcname; String esSrcname;
String esUrlcontent; String esUrlcontent;
String esUrlcontentRaw;
String esUrlimage; String esUrlimage;
String esUrlname; String esUrlname;
String esUrltime; String esUrltime;
String esUrltitle; String esUrltitle;
String esUrltitleRaw;
String esAbstract; String esAbstract;
String esKeywords; String esKeywords;
String file; String file;

View File

@ -4,6 +4,7 @@ import com.jsc.dsp.service.ConfigService;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -28,6 +29,7 @@ import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream; import java.util.zip.ZipOutputStream;
@Component @Component
@ConditionalOnProperty(name = "switch.auto-export-and-upload", havingValue = "true", matchIfMissing = true)
public class AutoExportAndUpload { public class AutoExportAndUpload {
@Resource @Resource

View File

@ -116,18 +116,24 @@ public class DatabaseConnector {
row.createCell(5).setCellValue(item.getEsDoclength()); row.createCell(5).setCellValue(item.getEsDoclength());
row.createCell(6).setCellValue(item.getEsLang()); row.createCell(6).setCellValue(item.getEsLang());
row.createCell(7).setCellValue(item.getEsLasttime()); row.createCell(7).setCellValue(item.getEsLasttime());
if (item.getEsLinks().length() > 10000) {
row.createCell(8).setCellValue(item.getEsLinks().substring(0, 10000));
} else {
row.createCell(8).setCellValue(item.getEsLinks()); row.createCell(8).setCellValue(item.getEsLinks());
}
row.createCell(9).setCellValue(item.getEsLoadtime()); row.createCell(9).setCellValue(item.getEsLoadtime());
row.createCell(10).setCellValue(item.getEsSitename()); row.createCell(10).setCellValue(item.getEsSitename());
row.createCell(11).setCellValue(item.getEsSrcname()); row.createCell(11).setCellValue(item.getEsSrcname());
row.createCell(12).setCellValue(item.getEsUrlcontent()); row.createCell(12).setCellValue(item.getEsUrlcontent());
row.createCell(13).setCellValue(item.getEsUrlimage()); row.createCell(13).setCellValue(item.getEsUrlcontentRaw());
row.createCell(14).setCellValue(item.getEsUrlname()); row.createCell(14).setCellValue(item.getEsUrlimage());
row.createCell(15).setCellValue(item.getEsUrltime()); row.createCell(15).setCellValue(item.getEsUrlname());
row.createCell(16).setCellValue(item.getEsUrltitle()); row.createCell(16).setCellValue(item.getEsUrltime());
row.createCell(17).setCellValue(item.getEsAbstract()); row.createCell(17).setCellValue(item.getEsUrltitle());
row.createCell(18).setCellValue(item.getEsKeywords()); row.createCell(18).setCellValue(item.getEsUrltitleRaw());
row.createCell(19).setCellValue(item.getFile()); row.createCell(19).setCellValue(item.getEsAbstract());
row.createCell(20).setCellValue(item.getEsKeywords());
row.createCell(21).setCellValue(item.getFile());
} }
logger.info("完成excel数据写入" + rowNum + ""); logger.info("完成excel数据写入" + rowNum + "");

View File

@ -63,6 +63,7 @@ switch:
enable-storage-service: false enable-storage-service: false
enable-file-dl-service: false enable-file-dl-service: false
enable-protobuf-service: false enable-protobuf-service: false
auto-export-and-upload: true
ftp: ftp:
host: 144.34.185.108 host: 144.34.185.108

View File

@ -0,0 +1,171 @@
import mysql.connector
import base64
import urllib.parse
import re
# === 数据库配置 ===
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
def decode_rodong_url(url):
"""
从朝鲜劳动新闻URL中提取并Base64解码参数部分
示例输入: http://www.rodong.rep.kp/cn/index.php?MTJAMjAyNi0wMS0wNS0wMDJAMUAxQEAwQDNA==
输出: '12@2026-01-05-002@1@1@@0@37@' None若无法解析
"""
if not url or 'index.php?' not in url:
return None
try:
# 方法1使用 urllib.parse 解析
parsed = urllib.parse.urlparse(url)
query = parsed.query
# 如果 query 为空尝试用正则兜底应对非常规URL
if not query:
match = re.search(r'index\.php\?([A-Za-z0-9+/=]+)', url)
if match:
query = match.group(1)
else:
return None
# Base64 解码
decoded_bytes = base64.b64decode(query)
decoded_str = decoded_bytes.decode('utf-8')
return decoded_str
except Exception as e:
# 记录错误但不中断整体流程
print(f" 解码失败 (URL: {url[:60]}...): {e}")
return None
def main():
try:
# 连接数据库
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(buffered=True)
# 查询所有需要处理的记录(只处理包含 index.php? 的 URL
print("正在查询待处理的新闻记录...")
cursor.execute("""
SELECT es_sid, es_urlname
FROM indeximos
WHERE es_sitename = '劳动新闻'
AND (es_tags IS NULL OR es_tags = '')
""")
records = cursor.fetchall()
if not records:
print("没有找到需要处理的记录。")
return
print(f"共找到 {len(records)} 条待处理记录。")
updated_count = 0
for i, (es_sid, es_urlname) in enumerate(records, 1):
print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ")
decoded = decode_rodong_url(es_urlname)
if decoded is not None:
# 更新 es_tags 字段
update_query = "UPDATE indeximos SET es_tags = %s WHERE es_sid = %s"
cursor.execute(update_query, (decoded, es_sid))
conn.commit()
updated_count += 1
print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}")
else:
print("跳过(无法解码)")
print(f"\n✅ 完成!共更新 {updated_count} 条记录。")
except mysql.connector.Error as db_err:
print(f"❌ 数据库错误: {db_err}")
except Exception as e:
print(f"❌ 脚本执行出错: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭。")
if __name__ == "__main__":
# 动态替换 SQL 中的表名(注意:表名不能用参数化,需手动拼接,但确保安全)
# 为安全起见,可加校验
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', 'indeximos'):
raise ValueError("表名包含非法字符!")
# 临时替换函数中的表名(更优雅的方式是传参,此处为简洁)
import sys
module = sys.modules[__name__]
# 修改 main 函数中的 SQL通过字符串替换
# 实际建议:将表名作为全局变量或参数传递
# 更简单做法:在 main() 上方定义 TABLE_NAME然后在 SQL 中直接引用
# 我们重写 main 函数内部逻辑以支持变量表名
# 重新定义带表名参数的主逻辑
def main_with_table(table_name):
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(buffered=True)
# 查询
query_sql = f"""
SELECT es_sid, es_urlname
FROM `{table_name}`
WHERE es_urlname LIKE '%index.php?%'
AND (es_tags IS NULL OR es_tags = '')
"""
cursor.execute(query_sql)
records = cursor.fetchall()
if not records:
print("没有找到需要处理的记录。")
return
print(f"共找到 {len(records)} 条待处理记录。")
updated_count = 0
for i, (es_sid, es_urlname) in enumerate(records, 1):
print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ")
decoded = decode_rodong_url(es_urlname)
if decoded is not None:
update_sql = f"UPDATE `{table_name}` SET es_tags = %s WHERE es_sid = %s"
cursor.execute(update_sql, (decoded, es_sid))
conn.commit()
updated_count += 1
print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}")
else:
print("跳过(无法解码)")
print(f"\n✅ 完成!共更新 {updated_count} 条记录。")
except mysql.connector.Error as db_err:
print(f"❌ 数据库错误: {db_err}")
except Exception as e:
print(f"❌ 脚本执行出错: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭。")
# 执行
main_with_table('indeximos')

View File

@ -4,6 +4,7 @@ import queue
import threading import threading
import time import time
from datetime import datetime from datetime import datetime
import random
import pymysql import pymysql
from tqdm import tqdm from tqdm import tqdm
@ -43,23 +44,26 @@ BATCH_SIZE = 500
MAX_WORKERS = 1 MAX_WORKERS = 1
TIMEOUT = 10 TIMEOUT = 10
PDF_OUTPUT_DIR = 'D:/data/output/pdf' PDF_OUTPUT_DIR = 'D:/data/output/pdf'
MIN_PDF_SIZE = 80 * 1024 # 80KB MIN_PDF_SIZE = 5 * 1024 # 80KB
MHTML_OUTPUT_DIR = 'D:/data/output/mhtml' MHTML_OUTPUT_DIR = 'D:/data/output/mhtml'
os.makedirs(PDF_OUTPUT_DIR, exist_ok=True) os.makedirs(PDF_OUTPUT_DIR, exist_ok=True)
running = True running = True
running_interval_seconds = 15 running_interval_seconds = 10
remote_host_name = [ skip_host_name = [
'epochtimes.com', 'epochtimes.com',
# 'secretchina.com' 'secretchina.com',
# 'rodong.rep.kp',
# 'kcna.kp'
] ]
class PDFDownloader: class PDFDownloader:
def __init__(self): def __init__(self):
self.db_lock = threading.Lock() self.db_lock = threading.Lock()
self.db_connection = None
self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3) self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3)
self.processed_count = 0 self.processed_count = 0
self.success_count = 0 self.success_count = 0
@ -74,7 +78,7 @@ class PDFDownloader:
# 替换 MYSQL_CONFIG 中的连接方式 # 替换 MYSQL_CONFIG 中的连接方式
def get_db_connection(self): def get_db_connection(self):
return pymysql.connect( self.db_connection = pymysql.connect(
host=MYSQL_CONFIG['host'], host=MYSQL_CONFIG['host'],
port=MYSQL_CONFIG['port'], port=MYSQL_CONFIG['port'],
user=MYSQL_CONFIG['user'], user=MYSQL_CONFIG['user'],
@ -86,19 +90,21 @@ class PDFDownloader:
def get_total_rows(self): def get_total_rows(self):
"""获取总记录数""" """获取总记录数"""
with self.get_db_connection() as conn: if self.db_connection is None:
cursor = conn.cursor() self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute( cursor.execute(
"SELECT COUNT(*) FROM indeximos " "SELECT COUNT(*) FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) " "WHERE (es_video IS NULL OR es_video IN ('-1')) "
"AND es_loadtime > %s", self.last_loadtime "AND es_loadtime > %s", self.last_loadtime
) )
return cursor.fetchone()[0] return cursor.fetchone()[0]
def get_last_loadtime(self): def get_last_loadtime(self):
"""获取上次导出数据的时间""" """获取上次导出数据的时间"""
with self.get_db_connection() as conn: if self.db_connection is None:
cursor = conn.cursor() self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute( cursor.execute(
"SELECT config_value FROM config " "SELECT config_value FROM config "
"WHERE config_name = 'last_loadtime' " "WHERE config_name = 'last_loadtime' "
@ -106,7 +112,7 @@ class PDFDownloader:
return cursor.fetchone()[0] return cursor.fetchone()[0]
def use_remote_selenium(self, url): def use_remote_selenium(self, url):
for host in remote_host_name: for host in skip_host_name:
if host in url: if host in url:
return True return True
return False return False
@ -159,11 +165,12 @@ class PDFDownloader:
def fetch_data_batch(self, offset): def fetch_data_batch(self, offset):
"""分页获取数据""" """分页获取数据"""
with self.get_db_connection() as conn: if self.db_connection is None:
cursor = conn.cursor() self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute( cursor.execute(
"SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos " "SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) " "WHERE (es_video IS NULL OR es_video IN ('-1')) "
"AND es_loadtime > %s " "AND es_loadtime > %s "
"ORDER BY es_urltime LIMIT %s OFFSET %s", "ORDER BY es_urltime LIMIT %s OFFSET %s",
(self.last_loadtime, BATCH_SIZE, offset) (self.last_loadtime, BATCH_SIZE, offset)
@ -174,12 +181,14 @@ class PDFDownloader:
"""更新数据库状态""" """更新数据库状态"""
for attempt in range(retry): for attempt in range(retry):
try: try:
with self.db_lock, self.get_db_connection() as conn: with self.db_lock:
cursor = conn.cursor() if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute( cursor.execute(
"UPDATE indeximos SET es_video = %s WHERE es_sid = %s", "UPDATE indeximos SET es_video = %s WHERE es_sid = %s",
(status, es_sid)) (status, es_sid))
conn.commit() self.db_connection.commit()
return True return True
except Exception as e: except Exception as e:
if attempt == retry - 1: if attempt == retry - 1:
@ -216,20 +225,24 @@ class PDFDownloader:
# 调用下载函数 # 调用下载函数
if self.use_remote_selenium(url): if self.use_remote_selenium(url):
if self.remote_handler is None: self.processed_count += 1
self.remote_handler = RemotePDFSaver() self.task_queue.task_done()
success = self.remote_handler.save_as_pdf( continue
url=url, # if self.remote_handler is None:
output_path=output_file, # self.remote_handler = RemotePDFSaver()
timeout=TIMEOUT # success = self.remote_handler.save_as_pdf(
) # url=url,
# output_path=output_file,
# timeout=TIMEOUT
# )
else: else:
if self.local_handler is None: if self.local_handler is None:
self.local_handler = PDFSaver() self.local_handler = PDFSaver(headless=False)
success = self.local_handler.save_as_pdf( success = self.local_handler.save_as_pdf(
url=url, url=url,
output_path=output_file, output_path=output_file,
timeout=TIMEOUT timeout=TIMEOUT,
wait_time=5
) )
# 验证下载结果 # 验证下载结果
@ -285,7 +298,9 @@ class PDFDownloader:
batch = self.fetch_data_batch(offset) batch = self.fetch_data_batch(offset)
if not batch: if not batch:
break break
batch_list = list(batch)
random.shuffle(batch_list)
batch = tuple(batch_list)
for row in batch: for row in batch:
self.task_queue.put(row) self.task_queue.put(row)
@ -312,11 +327,22 @@ class PDFDownloader:
print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}") print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}")
print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒") print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒")
def terminate(self):
if self.local_handler:
self.local_handler.quit()
if self.remote_handler:
self.remote_handler.quit()
self.db_connection.close()
if __name__ == "__main__": if __name__ == "__main__":
while running: while running:
try:
print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}") print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}")
downloader = PDFDownloader() downloader = PDFDownloader()
downloader.run() downloader.run()
print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...") print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...")
downloader.terminate()
time.sleep(running_interval_seconds) time.sleep(running_interval_seconds)
except Exception as e:
print(repr(e))

View File

@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
class PDFSaver: class PDFSaver:
def __init__(self, headless=True): def __init__(self, headless=True):
logger.info("正在初始化 Chrome WebDriver自动匹配版本...") logger.info("正在初始化 Chrome WebDriver自动匹配版本...")
service = ChromeService(executable_path="C:/Program Files/Python38/chromedriver.exe") service = ChromeService(executable_path="D:/chromedriver.exe")
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36 Edg/143.0.3650.75" user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36 Edg/143.0.3650.75"
# Chrome 选项 # Chrome 选项
@ -42,6 +42,7 @@ class PDFSaver:
}) })
chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument("--disable-blink-features=AutomationControlled") chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.page_load_strategy = 'eager'
# 注意PDF 打印不需要 --save-page-as-mhtml # 注意PDF 打印不需要 --save-page-as-mhtml
self.driver = webdriver.Chrome(service=service, options=chrome_options) self.driver = webdriver.Chrome(service=service, options=chrome_options)

View File

@ -0,0 +1,118 @@
import pymysql
from typing import Dict, List, Tuple, Optional
# ================== 配置区 ==================
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
# 仅用于指定哪些 es_srcname 的记录需要处理(值可为空,因为不再做替换)
TARGET_SRCNAMES: List[str] = [
"http://www.kcna.kp/cn/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf",
# 添加你需要处理的站点名
]
# ================== 工具函数 ==================
def get_suffix_32(url: str) -> Optional[str]:
"""获取 URL 最后 32 个字符,不足则返回 None"""
if not url or len(url) < 32:
return None
return url[-32:]
def find_foreign_by_suffix(cursor, suffix: str, exclude_id: int) -> Optional[Tuple[str, str]]:
"""
根据后缀查找外文记录排除自身
"""
query = """
SELECT es_urltitle, es_urlcontent
FROM indeximos
WHERE
es_sid != %s
AND es_urlname IS NOT NULL
AND CHAR_LENGTH(es_urlname) >= 32
AND RIGHT(es_urlname, 32) = %s
LIMIT 1
"""
cursor.execute(query, (exclude_id, suffix))
result = cursor.fetchone()
return result if result else None
def update_chinese_record(cursor, record_id: int, title: str, content: str):
"""更新中文记录的 es_title 和 es_content"""
update_query = """
UPDATE indeximos
SET es_title = %s, es_content = %s
WHERE es_sid = %s
"""
cursor.execute(update_query, (title, content, record_id))
# ================== 主逻辑 ==================
def main():
if not TARGET_SRCNAMES:
print("⚠️ 未指定任何目标 es_srcname程序退出。")
return
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
# 获取所有目标站点的中文记录
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_srcname, es_urlname
FROM indeximos
WHERE es_srcname IN ({placeholders})
AND es_urlname IS NOT NULL
AND es_urlname != ''
"""
cursor.execute(query, TARGET_SRCNAMES)
records = cursor.fetchall()
total = len(records)
print(f"共加载 {total} 条来自 {TARGET_SRCNAMES} 的记录用于匹配...")
updated_count = 0
skipped_short = 0
for idx, (record_id, es_srcname, es_urlname) in enumerate(records, 1):
suffix = get_suffix_32(es_urlname)
if suffix is None:
skipped_short += 1
continue
foreign_data = find_foreign_by_suffix(cursor, suffix, record_id)
if foreign_data:
title, content = foreign_data
update_chinese_record(cursor, record_id, title, content)
updated_count += 1
print(f"[{idx}/{total}] ✅ 已更新 ID={record_id} | src={es_srcname}")
conn.commit()
print("\n" + "=" * 50)
print(f"✅ 匹配完成!")
print(f" - 成功更新: {updated_count}")
print(f" - 因 URL 长度 <32 跳过: {skipped_short}")
print(f" - 总处理: {total}")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误,已回滚: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,158 @@
import pymysql
import jieba
from collections import Counter
from typing import List, Tuple, Set
# ================== 配置区 ==================
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
# 指定需要处理的中文站点es_srcname
TARGET_SRCNAMES: List[str] = [
"http://www.rodong.rep.kp/cn/index.php?MUBAMUAxQA==",
# 添加你的站点
]
FOREIGN_SRCNAME = 'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA=='
# 相似度阈值(关键词重合率),建议 0.3 ~ 0.6
SIMILARITY_THRESHOLD = 0.3
# ================== 文本相似度函数 ==================
def extract_keywords(text: str) -> Set[str]:
"""提取中文关键词:分词 + 过滤单字、数字、标点"""
if not text:
return set()
words = jieba.lcut(text)
return {w for w in words if len(w) >= 2 and w.isalpha()}
def keyword_overlap_similarity(title1: str, title2: str) -> float:
"""计算两个中文标题的关键词重合率"""
kw1 = extract_keywords(title1)
kw2 = extract_keywords(title2)
if not kw1 and not kw2:
return 1.0 if title1 == title2 else 0.0
if not kw1 or not kw2:
return 0.0
overlap = kw1 & kw2
return len(overlap) / max(len(kw1), len(kw2))
# ================== 数据库操作 ==================
def get_chinese_records(cursor) -> List[Tuple]:
"""获取待处理的中文记录"""
if not TARGET_SRCNAMES:
return []
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_srcname, es_urlname, es_urltitle, es_urltime
FROM indeximos
WHERE es_srcname IN ({placeholders})
AND es_urltitle IS NOT NULL AND TRIM(es_urltitle) != ''
AND es_urltime IS NOT NULL
"""
cursor.execute(query, TARGET_SRCNAMES)
return cursor.fetchall()
def get_foreign_candidates_by_time(cursor, pub_time) -> List[Tuple]:
"""
获取同一发布时间的所有外文候选记录要求 es_abstract 不为空
"""
query = """
SELECT es_sid, es_abstract, es_urltitle, es_urlcontent
FROM indeximos
WHERE es_urltime = %s
AND es_abstract IS NOT NULL AND TRIM(es_abstract) != ''
AND es_urlcontent IS NOT NULL
"""
cursor.execute(query, (pub_time,))
return cursor.fetchall()
def update_chinese_record(cursor, record_id: int, new_title: str, content: str):
"""更新中文记录的标题和内容"""
update_query = """
UPDATE indeximos
SET es_title = %s, es_content = %s
WHERE es_sid = %s
"""
cursor.execute(update_query, (new_title, content, record_id))
# ================== 主逻辑 ==================
def main():
if not TARGET_SRCNAMES:
print("⚠️ 未指定目标站点,退出。")
return
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
chinese_records = get_chinese_records(cursor)
total = len(chinese_records)
print(f"共加载 {total} 条中文记录用于匹配...")
matched_count = 0
for idx, (cid, srcname, urlname, zh_title, pub_time) in enumerate(chinese_records, 1):
print(f"\n[{idx}/{total}] ID={cid}, 时间={pub_time}, 标题='{zh_title[:30]}...'")
candidates = get_foreign_candidates_by_time(cursor, pub_time)
if not candidates:
print(" → 无同时间且有翻译标题的外文记录")
continue
best_score = 0.0
best_candidate = None
for fid, trans_title, ori_title, content in candidates:
# 跳过自己(理论上不会发生,但安全起见)
if fid == cid:
continue
score = keyword_overlap_similarity(zh_title, trans_title)
print(f" 候选ID={fid} | 翻译标题='{trans_title[:30]}...' | 重合度={score:.3f}")
if score > best_score:
best_score = score
best_candidate = (ori_title, content)
if best_candidate and best_score >= SIMILARITY_THRESHOLD:
final_title, final_content = best_candidate
update_chinese_record(cursor, cid, final_title, final_content)
matched_count += 1
print(f" ✅ 匹配成功! 重合度={best_score:.3f}")
else:
print(f" ❌ 未达阈值(最高相似度={best_score:.3f}")
conn.commit()
print("\n" + "=" * 50)
print(f"✅ 匹配完成!成功关联 {matched_count} / {total} 条记录。")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误,已回滚: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()

View File

@ -59,7 +59,14 @@ class SeleniumMiddleware:
# Edge in headless mode # Edge in headless mode
edge_options = EdgeOptions() edge_options = EdgeOptions()
edge_options.use_chromium = True edge_options.use_chromium = True
self.driver = Edge(executable_path='MicrosoftWebDriver.exe', options=edge_options) self.driver = Edge(executable_path='msedgedriver.exe', options=edge_options)
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
@classmethod @classmethod
def from_crawler(cls, crawler): def from_crawler(cls, crawler):

View File

@ -74,26 +74,20 @@ TWITTER_PID_KEY = ''
KAFKA_PROCESS_QUEUE = ['stream-protobuf', 'stream-db'] KAFKA_PROCESS_QUEUE = ['stream-protobuf', 'stream-db']
CUSTOM_USER_AGENT = [ CUSTOM_USER_AGENT = [
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:77.0) Gecko/20100101 Firefox/77.0',
'Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.18363',
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/534.50 (KHTML, like Gecko) Version/5.1 Safari/534.50',
'Opera/9.80 (Windows NT 6.1; U; zh-cn) Presto/2.9.168 Version/11.50',
'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E; SE 2.X MetaSr 1.0',
'Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; Media Center PC 6.0; InfoPath.3; .NET4.0C; .NET4.0E',
'Mozilla/5.0 (Windows NT 6.1) AppleWebKit/535.1 (KHTML, like Gecko) Chrome/13.0.782.41 Safari/535.1 QQBrowser/6.9.11079.201'
] ]
# 部署在外网采集fb时使用selenium_chrome # 部署在外网采集fb时使用selenium_chrome
SELENIUM_DRIVER_NAME = 'chrome' SELENIUM_DRIVER_NAME = 'chrome'
SELENIUM_DRIVER_EXECUTABLE_PATH = 'http://144.34.185.108:28098' SELENIUM_DRIVER_EXECUTABLE_PATH = 'local'
# SELENIUM_DRIVER_EXECUTABLE_PATH = 'http://144.34.185.108:28098'
SELENIUM_DRIVER_ARGUMENTS = [ SELENIUM_DRIVER_ARGUMENTS = [
'--headless', '--headless',
'--no-sandbox', '--no-sandbox',
'--disable-dev-shm-usage', '--disable-dev-shm-usage',
'--disable-gpu', '--disable-gpu',
'--window-size=1920,1080' '--window-size=1920,1080',
'--disable-blink-features=AutomationControlled'
] ]
# 本地调试用 # 本地调试用

View File

@ -65,7 +65,7 @@ class TwitterSpider(scrapy.Spider):
logger.info("login twitter") logger.info("login twitter")
driver = response.request.meta['driver'] driver = response.request.meta['driver']
driver.maximize_window() driver.maximize_window()
driver.get('https://twitter.com/i/flow/login') driver.get('https://x.com/i/flow/login')
time.sleep(5) time.sleep(5)
# 获取采集登录账号并登录 # 获取采集登录账号并登录
login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts') login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts')

View File

@ -87,7 +87,9 @@ def get_format_time(pattern, time_str):
date = result.group(1) date = result.group(1)
time_t = result.group(2) time_t = result.group(2)
date = date.replace('/', '-').replace(".", "-").replace( date = date.replace('/', '-').replace(".", "-").replace(
",", "-").replace("", "-").replace("", "-").replace("", "").replace(' ', '-').replace('--', '-') ",", "-").replace("", "-").replace("", "-").replace("", "").replace(
"", "-").replace("", "-").replace("", "").replace(
' ', '-').replace('--', '-')
date_array = date.split('-') date_array = date.split('-')
for i in range(len(date_array)): for i in range(len(date_array)):
if (date_array[i].endswith('st') or if (date_array[i].endswith('st') or
@ -128,7 +130,7 @@ def get_format_time(pattern, time_str):
if __name__ == '__main__': if __name__ == '__main__':
# a = [' 令和4年6月9日', 'www.kcna.kp (主体111.6.6.)', '民國111年06月09日 ', 'Jun. 9, 2022', '111年 06月 21日'] # a = [' 令和4年6月9日', 'www.kcna.kp (主体111.6.6.)', '民國111年06月09日 ', 'Jun. 9, 2022', '111年 06月 21日']
a = ['06.10.2023 03:24'] a = ['2026년 1월 6일 화요일 1면 [사진있음]']
for _ in a: for _ in a:
print(get_time_stamp(_)) # print(get_time_stamp(_))
# print(get_time_stamp(_, {r"(\d{4}年\d{1,2}月\d{2}日)\D*(\d{2}:\d{2}:\d{2})*\D*": ['%Y-%m-%d %H:%M:%S']})) print(get_time_stamp(_, {r"(\d{4}\d{1,2}월 \d{1,2}일)\D*(\d{2}:\d{2}:\d{2})*\D*": ['%Y-%m-%d %H:%M:%S']}))

View File

@ -7,4 +7,4 @@ from scrapy.cmdline import execute
dirpath = os.path.dirname(os.path.abspath(__file__)) dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath) sys.path.append(dirpath)
execute(['scrapy', 'crawl', 'FacebookUserSpider', '-a', 'params={}']) execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}'])

View File

@ -89,7 +89,9 @@ def get_format_time(pattern, time_str):
date = result.group(1) date = result.group(1)
time_t = result.group(2) time_t = result.group(2)
date = date.replace('/', '-').replace(".", "-").replace( date = date.replace('/', '-').replace(".", "-").replace(
",", "-").replace("", "-").replace("", "-").replace("", "").replace(' ', '-').replace('--', '-') ",", "-").replace("", "-").replace("", "-").replace("", "").replace(
"", "-").replace("", "-").replace("", "").replace(
' ', '-').replace('--', '-')
date_array = date.split('-') date_array = date.split('-')
for i in range(len(date_array)): for i in range(len(date_array)):
if (date_array[i].endswith('st') or if (date_array[i].endswith('st') or
@ -135,7 +137,7 @@ def get_format_time(pattern, time_str):
if __name__ == '__main__': if __name__ == '__main__':
# a = [' 令和4年6月9日', 'www.kcna.kp (主体111.6.6.)', '民國111年06月09日 ', 'Jun. 9, 2022', '111年 06月 21日'] # a = [' 令和4年6月9日', 'www.kcna.kp (主体111.6.6.)', '民國111年06月09日 ', 'Jun. 9, 2022', '111年 06月 21日']
a = ['July 26, 2024 12:53 PM'] a = ['2026년 1월 6일 화요일 1면 [사진있음]']
for _ in a: for _ in a:
print(get_time_stamp(_)) # print(get_time_stamp(_))
# print(get_time_stamp(_, {r"(\w+ \d+, \d{4})\D*(\d+:\d+)\D*": ['%B-%d-%Y %H:%M:%S']})) print(get_time_stamp(_, {r"(\d{4}\d{1,2}월 \d{1,2}일)\D*(\d{2}:\d{2}:\d{2})*\D*": ['%Y-%m-%d %H:%M:%S']}))