From becee60b6c3752b94d90d094981bee7f04b2cc0b Mon Sep 17 00:00:00 2001 From: yuxin-pc Date: Mon, 19 Jan 2026 09:17:10 +0800 Subject: [PATCH] =?UTF-8?q?PDF=E4=B8=8B=E8=BD=BD=E5=B7=A5=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../decode-url-for-rodong-news.py | 171 ++++++++++++++++++ .../pdf_downloader/save-page-with-selenium.py | 124 ++++++++----- research/pdf_downloader/save_page_as_pdf.py | 3 +- research/pdf_downloader/set_raw_title_kcna.py | 118 ++++++++++++ .../pdf_downloader/set_raw_title_rodong.py | 158 ++++++++++++++++ 5 files changed, 524 insertions(+), 50 deletions(-) create mode 100644 research/pdf_downloader/decode-url-for-rodong-news.py create mode 100644 research/pdf_downloader/set_raw_title_kcna.py create mode 100644 research/pdf_downloader/set_raw_title_rodong.py diff --git a/research/pdf_downloader/decode-url-for-rodong-news.py b/research/pdf_downloader/decode-url-for-rodong-news.py new file mode 100644 index 0000000..cd2fee4 --- /dev/null +++ b/research/pdf_downloader/decode-url-for-rodong-news.py @@ -0,0 +1,171 @@ +import mysql.connector +import base64 +import urllib.parse +import re + +# === 数据库配置 === +DB_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', +} + + +def decode_rodong_url(url): + """ + 从朝鲜劳动新闻URL中提取并Base64解码参数部分 + 示例输入: http://www.rodong.rep.kp/cn/index.php?MTJAMjAyNi0wMS0wNS0wMDJAMUAxQEAwQDNA== + 输出: '12@2026-01-05-002@1@1@@0@37@' 或 None(若无法解析) + """ + if not url or 'index.php?' not in url: + return None + + try: + # 方法1:使用 urllib.parse 解析 + parsed = urllib.parse.urlparse(url) + query = parsed.query + + # 如果 query 为空,尝试用正则兜底(应对非常规URL) + if not query: + match = re.search(r'index\.php\?([A-Za-z0-9+/=]+)', url) + if match: + query = match.group(1) + else: + return None + + # Base64 解码 + decoded_bytes = base64.b64decode(query) + decoded_str = decoded_bytes.decode('utf-8') + return decoded_str + + except Exception as e: + # 记录错误但不中断整体流程 + print(f" 解码失败 (URL: {url[:60]}...): {e}") + return None + + +def main(): + try: + # 连接数据库 + conn = mysql.connector.connect(**DB_CONFIG) + cursor = conn.cursor(buffered=True) + + # 查询所有需要处理的记录(只处理包含 index.php? 的 URL) + print("正在查询待处理的新闻记录...") + cursor.execute(""" + SELECT es_sid, es_urlname + FROM indeximos + WHERE es_sitename = '劳动新闻' + AND (es_tags IS NULL OR es_tags = '') + """) + records = cursor.fetchall() + + if not records: + print("没有找到需要处理的记录。") + return + + print(f"共找到 {len(records)} 条待处理记录。") + + updated_count = 0 + for i, (es_sid, es_urlname) in enumerate(records, 1): + print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ") + + decoded = decode_rodong_url(es_urlname) + if decoded is not None: + # 更新 es_tags 字段 + update_query = "UPDATE indeximos SET es_tags = %s WHERE es_sid = %s" + cursor.execute(update_query, (decoded, es_sid)) + conn.commit() + updated_count += 1 + print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}") + else: + print("跳过(无法解码)") + + print(f"\n✅ 完成!共更新 {updated_count} 条记录。") + + except mysql.connector.Error as db_err: + print(f"❌ 数据库错误: {db_err}") + except Exception as e: + print(f"❌ 脚本执行出错: {e}") + finally: + if 'cursor' in locals(): + cursor.close() + if 'conn' in locals() and conn.is_connected(): + conn.close() + print("数据库连接已关闭。") + + +if __name__ == "__main__": + + # 动态替换 SQL 中的表名(注意:表名不能用参数化,需手动拼接,但确保安全) + # 为安全起见,可加校验 + if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', 'indeximos'): + raise ValueError("表名包含非法字符!") + + # 临时替换函数中的表名(更优雅的方式是传参,此处为简洁) + import sys + + module = sys.modules[__name__] + + + # 修改 main 函数中的 SQL(通过字符串替换) + # 实际建议:将表名作为全局变量或参数传递 + + # 更简单做法:在 main() 上方定义 TABLE_NAME,然后在 SQL 中直接引用 + # 我们重写 main 函数内部逻辑以支持变量表名 + + # 重新定义带表名参数的主逻辑 + def main_with_table(table_name): + try: + conn = mysql.connector.connect(**DB_CONFIG) + cursor = conn.cursor(buffered=True) + + # 查询 + query_sql = f""" + SELECT es_sid, es_urlname + FROM `{table_name}` + WHERE es_urlname LIKE '%index.php?%' + AND (es_tags IS NULL OR es_tags = '') + """ + cursor.execute(query_sql) + records = cursor.fetchall() + + if not records: + print("没有找到需要处理的记录。") + return + + print(f"共找到 {len(records)} 条待处理记录。") + + updated_count = 0 + for i, (es_sid, es_urlname) in enumerate(records, 1): + print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ") + + decoded = decode_rodong_url(es_urlname) + if decoded is not None: + update_sql = f"UPDATE `{table_name}` SET es_tags = %s WHERE es_sid = %s" + cursor.execute(update_sql, (decoded, es_sid)) + conn.commit() + updated_count += 1 + print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}") + else: + print("跳过(无法解码)") + + print(f"\n✅ 完成!共更新 {updated_count} 条记录。") + + except mysql.connector.Error as db_err: + print(f"❌ 数据库错误: {db_err}") + except Exception as e: + print(f"❌ 脚本执行出错: {e}") + finally: + if 'cursor' in locals(): + cursor.close() + if 'conn' in locals() and conn.is_connected(): + conn.close() + print("数据库连接已关闭。") + + + # 执行 + main_with_table('indeximos') diff --git a/research/pdf_downloader/save-page-with-selenium.py b/research/pdf_downloader/save-page-with-selenium.py index 48bc560..eea8e43 100644 --- a/research/pdf_downloader/save-page-with-selenium.py +++ b/research/pdf_downloader/save-page-with-selenium.py @@ -4,6 +4,7 @@ import queue import threading import time from datetime import datetime +import random import pymysql from tqdm import tqdm @@ -43,23 +44,26 @@ BATCH_SIZE = 500 MAX_WORKERS = 1 TIMEOUT = 10 PDF_OUTPUT_DIR = 'D:/data/output/pdf' -MIN_PDF_SIZE = 80 * 1024 # 80KB +MIN_PDF_SIZE = 5 * 1024 # 80KB MHTML_OUTPUT_DIR = 'D:/data/output/mhtml' os.makedirs(PDF_OUTPUT_DIR, exist_ok=True) running = True -running_interval_seconds = 15 +running_interval_seconds = 10 -remote_host_name = [ +skip_host_name = [ 'epochtimes.com', - # 'secretchina.com' + 'secretchina.com', + # 'rodong.rep.kp', + # 'kcna.kp' ] class PDFDownloader: def __init__(self): self.db_lock = threading.Lock() + self.db_connection = None self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3) self.processed_count = 0 self.success_count = 0 @@ -74,7 +78,7 @@ class PDFDownloader: # 替换 MYSQL_CONFIG 中的连接方式 def get_db_connection(self): - return pymysql.connect( + self.db_connection = pymysql.connect( host=MYSQL_CONFIG['host'], port=MYSQL_CONFIG['port'], user=MYSQL_CONFIG['user'], @@ -86,27 +90,29 @@ class PDFDownloader: def get_total_rows(self): """获取总记录数""" - with self.get_db_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT COUNT(*) FROM indeximos " - "WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) " - "AND es_loadtime > %s", self.last_loadtime - ) - return cursor.fetchone()[0] + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "SELECT COUNT(*) FROM indeximos " + "WHERE (es_video IS NULL OR es_video IN ('-1')) " + "AND es_loadtime > %s", self.last_loadtime + ) + return cursor.fetchone()[0] def get_last_loadtime(self): """获取上次导出数据的时间""" - with self.get_db_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT config_value FROM config " - "WHERE config_name = 'last_loadtime' " - ) - return cursor.fetchone()[0] + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "SELECT config_value FROM config " + "WHERE config_name = 'last_loadtime' " + ) + return cursor.fetchone()[0] def use_remote_selenium(self, url): - for host in remote_host_name: + for host in skip_host_name: if host in url: return True return False @@ -159,27 +165,30 @@ class PDFDownloader: def fetch_data_batch(self, offset): """分页获取数据""" - with self.get_db_connection() as conn: - cursor = conn.cursor() - cursor.execute( - "SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos " - "WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) " - "AND es_loadtime > %s " - "ORDER BY es_urltime LIMIT %s OFFSET %s", - (self.last_loadtime, BATCH_SIZE, offset) - ) - return cursor.fetchall() + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() + cursor.execute( + "SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos " + "WHERE (es_video IS NULL OR es_video IN ('-1')) " + "AND es_loadtime > %s " + "ORDER BY es_urltime LIMIT %s OFFSET %s", + (self.last_loadtime, BATCH_SIZE, offset) + ) + return cursor.fetchall() def update_file_status(self, es_sid, status, retry=3): """更新数据库状态""" for attempt in range(retry): try: - with self.db_lock, self.get_db_connection() as conn: - cursor = conn.cursor() + with self.db_lock: + if self.db_connection is None: + self.get_db_connection() + cursor = self.db_connection.cursor() cursor.execute( "UPDATE indeximos SET es_video = %s WHERE es_sid = %s", (status, es_sid)) - conn.commit() + self.db_connection.commit() return True except Exception as e: if attempt == retry - 1: @@ -216,20 +225,24 @@ class PDFDownloader: # 调用下载函数 if self.use_remote_selenium(url): - if self.remote_handler is None: - self.remote_handler = RemotePDFSaver() - success = self.remote_handler.save_as_pdf( - url=url, - output_path=output_file, - timeout=TIMEOUT - ) + self.processed_count += 1 + self.task_queue.task_done() + continue + # if self.remote_handler is None: + # self.remote_handler = RemotePDFSaver() + # success = self.remote_handler.save_as_pdf( + # url=url, + # output_path=output_file, + # timeout=TIMEOUT + # ) else: if self.local_handler is None: - self.local_handler = PDFSaver() + self.local_handler = PDFSaver(headless=False) success = self.local_handler.save_as_pdf( url=url, output_path=output_file, - timeout=TIMEOUT + timeout=TIMEOUT, + wait_time=5 ) # 验证下载结果 @@ -285,7 +298,9 @@ class PDFDownloader: batch = self.fetch_data_batch(offset) if not batch: break - + batch_list = list(batch) + random.shuffle(batch_list) + batch = tuple(batch_list) for row in batch: self.task_queue.put(row) @@ -312,11 +327,22 @@ class PDFDownloader: print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}条") print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒") + def terminate(self): + if self.local_handler: + self.local_handler.quit() + if self.remote_handler: + self.remote_handler.quit() + self.db_connection.close() + if __name__ == "__main__": while running: - print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}") - downloader = PDFDownloader() - downloader.run() - print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...") - time.sleep(running_interval_seconds) + try: + print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}") + downloader = PDFDownloader() + downloader.run() + print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...") + downloader.terminate() + time.sleep(running_interval_seconds) + except Exception as e: + print(repr(e)) diff --git a/research/pdf_downloader/save_page_as_pdf.py b/research/pdf_downloader/save_page_as_pdf.py index e54fb55..6a01fb9 100644 --- a/research/pdf_downloader/save_page_as_pdf.py +++ b/research/pdf_downloader/save_page_as_pdf.py @@ -23,7 +23,7 @@ logger = logging.getLogger(__name__) class PDFSaver: def __init__(self, headless=True): logger.info("正在初始化 Chrome WebDriver(自动匹配版本)...") - service = ChromeService(executable_path="C:/Program Files/Python38/chromedriver.exe") + service = ChromeService(executable_path="D:/chromedriver.exe") user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36 Edg/143.0.3650.75" # Chrome 选项 @@ -42,6 +42,7 @@ class PDFSaver: }) chrome_options.add_argument('--window-size=1920,1080') chrome_options.add_argument("--disable-blink-features=AutomationControlled") + chrome_options.page_load_strategy = 'eager' # 注意:PDF 打印不需要 --save-page-as-mhtml self.driver = webdriver.Chrome(service=service, options=chrome_options) diff --git a/research/pdf_downloader/set_raw_title_kcna.py b/research/pdf_downloader/set_raw_title_kcna.py new file mode 100644 index 0000000..2925987 --- /dev/null +++ b/research/pdf_downloader/set_raw_title_kcna.py @@ -0,0 +1,118 @@ +import pymysql +from typing import Dict, List, Tuple, Optional + +# ================== 配置区 ================== + +DB_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', +} + +# 仅用于指定哪些 es_srcname 的记录需要处理(值可为空,因为不再做替换) +TARGET_SRCNAMES: List[str] = [ + "http://www.kcna.kp/cn/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf", + # 添加你需要处理的站点名 +] + + +# ================== 工具函数 ================== + +def get_suffix_32(url: str) -> Optional[str]: + """获取 URL 最后 32 个字符,不足则返回 None""" + if not url or len(url) < 32: + return None + return url[-32:] + + +def find_foreign_by_suffix(cursor, suffix: str, exclude_id: int) -> Optional[Tuple[str, str]]: + """ + 根据后缀查找外文记录(排除自身) + """ + query = """ + SELECT es_urltitle, es_urlcontent + FROM indeximos + WHERE + es_sid != %s + AND es_urlname IS NOT NULL + AND CHAR_LENGTH(es_urlname) >= 32 + AND RIGHT(es_urlname, 32) = %s + LIMIT 1 + """ + cursor.execute(query, (exclude_id, suffix)) + result = cursor.fetchone() + return result if result else None + + +def update_chinese_record(cursor, record_id: int, title: str, content: str): + """更新中文记录的 es_title 和 es_content""" + update_query = """ + UPDATE indeximos + SET es_title = %s, es_content = %s + WHERE es_sid = %s + """ + cursor.execute(update_query, (title, content, record_id)) + + +# ================== 主逻辑 ================== + +def main(): + if not TARGET_SRCNAMES: + print("⚠️ 未指定任何目标 es_srcname,程序退出。") + return + + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + + try: + # 获取所有目标站点的中文记录 + placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES)) + query = f""" + SELECT es_sid, es_srcname, es_urlname + FROM indeximos + WHERE es_srcname IN ({placeholders}) + AND es_urlname IS NOT NULL + AND es_urlname != '' + """ + cursor.execute(query, TARGET_SRCNAMES) + records = cursor.fetchall() + total = len(records) + print(f"共加载 {total} 条来自 {TARGET_SRCNAMES} 的记录用于匹配...") + + updated_count = 0 + skipped_short = 0 + + for idx, (record_id, es_srcname, es_urlname) in enumerate(records, 1): + suffix = get_suffix_32(es_urlname) + if suffix is None: + skipped_short += 1 + continue + + foreign_data = find_foreign_by_suffix(cursor, suffix, record_id) + if foreign_data: + title, content = foreign_data + update_chinese_record(cursor, record_id, title, content) + updated_count += 1 + print(f"[{idx}/{total}] ✅ 已更新 ID={record_id} | src={es_srcname}") + + conn.commit() + print("\n" + "=" * 50) + print(f"✅ 匹配完成!") + print(f" - 成功更新: {updated_count} 条") + print(f" - 因 URL 长度 <32 跳过: {skipped_short} 条") + print(f" - 总处理: {total} 条") + + except Exception as e: + conn.rollback() + print(f"❌ 发生错误,已回滚: {e}") + raise + finally: + cursor.close() + conn.close() + + +if __name__ == "__main__": + main() diff --git a/research/pdf_downloader/set_raw_title_rodong.py b/research/pdf_downloader/set_raw_title_rodong.py new file mode 100644 index 0000000..0aaba06 --- /dev/null +++ b/research/pdf_downloader/set_raw_title_rodong.py @@ -0,0 +1,158 @@ +import pymysql +import jieba +from collections import Counter +from typing import List, Tuple, Set + +# ================== 配置区 ================== + +DB_CONFIG = { + 'host': '47.113.231.200', + 'port': 28089, + 'user': 'root', + 'password': 'passok123A', + 'database': 'dsp', + 'charset': 'utf8mb4', +} + +# 指定需要处理的中文站点(es_srcname) +TARGET_SRCNAMES: List[str] = [ + "http://www.rodong.rep.kp/cn/index.php?MUBAMUAxQA==", + # 添加你的站点 +] + +FOREIGN_SRCNAME = 'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA==' + +# 相似度阈值(关键词重合率),建议 0.3 ~ 0.6 +SIMILARITY_THRESHOLD = 0.3 + + +# ================== 文本相似度函数 ================== + +def extract_keywords(text: str) -> Set[str]: + """提取中文关键词:分词 + 过滤单字、数字、标点""" + if not text: + return set() + words = jieba.lcut(text) + return {w for w in words if len(w) >= 2 and w.isalpha()} + + +def keyword_overlap_similarity(title1: str, title2: str) -> float: + """计算两个中文标题的关键词重合率""" + kw1 = extract_keywords(title1) + kw2 = extract_keywords(title2) + + if not kw1 and not kw2: + return 1.0 if title1 == title2 else 0.0 + if not kw1 or not kw2: + return 0.0 + + overlap = kw1 & kw2 + return len(overlap) / max(len(kw1), len(kw2)) + + +# ================== 数据库操作 ================== + +def get_chinese_records(cursor) -> List[Tuple]: + """获取待处理的中文记录""" + if not TARGET_SRCNAMES: + return [] + placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES)) + query = f""" + SELECT es_sid, es_srcname, es_urlname, es_urltitle, es_urltime + FROM indeximos + WHERE es_srcname IN ({placeholders}) + AND es_urltitle IS NOT NULL AND TRIM(es_urltitle) != '' + AND es_urltime IS NOT NULL + """ + cursor.execute(query, TARGET_SRCNAMES) + return cursor.fetchall() + + +def get_foreign_candidates_by_time(cursor, pub_time) -> List[Tuple]: + """ + 获取同一发布时间的所有外文候选记录(要求 es_abstract 不为空) + """ + query = """ + SELECT es_sid, es_abstract, es_urltitle, es_urlcontent + FROM indeximos + WHERE es_urltime = %s + AND es_abstract IS NOT NULL AND TRIM(es_abstract) != '' + AND es_urlcontent IS NOT NULL + """ + cursor.execute(query, (pub_time,)) + return cursor.fetchall() + + +def update_chinese_record(cursor, record_id: int, new_title: str, content: str): + """更新中文记录的标题和内容""" + update_query = """ + UPDATE indeximos + SET es_title = %s, es_content = %s + WHERE es_sid = %s + """ + cursor.execute(update_query, (new_title, content, record_id)) + + +# ================== 主逻辑 ================== + +def main(): + if not TARGET_SRCNAMES: + print("⚠️ 未指定目标站点,退出。") + return + + conn = pymysql.connect(**DB_CONFIG) + cursor = conn.cursor() + + try: + chinese_records = get_chinese_records(cursor) + total = len(chinese_records) + print(f"共加载 {total} 条中文记录用于匹配...") + + matched_count = 0 + + for idx, (cid, srcname, urlname, zh_title, pub_time) in enumerate(chinese_records, 1): + print(f"\n[{idx}/{total}] ID={cid}, 时间={pub_time}, 标题='{zh_title[:30]}...'") + + candidates = get_foreign_candidates_by_time(cursor, pub_time) + if not candidates: + print(" → 无同时间且有翻译标题的外文记录") + continue + + best_score = 0.0 + best_candidate = None + + for fid, trans_title, ori_title, content in candidates: + # 跳过自己(理论上不会发生,但安全起见) + if fid == cid: + continue + + score = keyword_overlap_similarity(zh_title, trans_title) + print(f" 候选ID={fid} | 翻译标题='{trans_title[:30]}...' | 重合度={score:.3f}") + + if score > best_score: + best_score = score + best_candidate = (ori_title, content) + + if best_candidate and best_score >= SIMILARITY_THRESHOLD: + final_title, final_content = best_candidate + update_chinese_record(cursor, cid, final_title, final_content) + matched_count += 1 + print(f" ✅ 匹配成功! 重合度={best_score:.3f}") + else: + print(f" ❌ 未达阈值(最高相似度={best_score:.3f})") + + conn.commit() + print("\n" + "=" * 50) + print(f"✅ 匹配完成!成功关联 {matched_count} / {total} 条记录。") + + except Exception as e: + conn.rollback() + print(f"❌ 发生错误,已回滚: {e}") + raise + finally: + cursor.close() + conn.close() + + +if __name__ == "__main__": + main()