PDF下载工具

This commit is contained in:
yuxin-pc 2026-01-19 09:17:10 +08:00
parent ee958357b0
commit becee60b6c
5 changed files with 524 additions and 50 deletions

View File

@ -0,0 +1,171 @@
import mysql.connector
import base64
import urllib.parse
import re
# === 数据库配置 ===
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
def decode_rodong_url(url):
"""
从朝鲜劳动新闻URL中提取并Base64解码参数部分
示例输入: http://www.rodong.rep.kp/cn/index.php?MTJAMjAyNi0wMS0wNS0wMDJAMUAxQEAwQDNA==
输出: '12@2026-01-05-002@1@1@@0@37@' None若无法解析
"""
if not url or 'index.php?' not in url:
return None
try:
# 方法1使用 urllib.parse 解析
parsed = urllib.parse.urlparse(url)
query = parsed.query
# 如果 query 为空尝试用正则兜底应对非常规URL
if not query:
match = re.search(r'index\.php\?([A-Za-z0-9+/=]+)', url)
if match:
query = match.group(1)
else:
return None
# Base64 解码
decoded_bytes = base64.b64decode(query)
decoded_str = decoded_bytes.decode('utf-8')
return decoded_str
except Exception as e:
# 记录错误但不中断整体流程
print(f" 解码失败 (URL: {url[:60]}...): {e}")
return None
def main():
try:
# 连接数据库
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(buffered=True)
# 查询所有需要处理的记录(只处理包含 index.php? 的 URL
print("正在查询待处理的新闻记录...")
cursor.execute("""
SELECT es_sid, es_urlname
FROM indeximos
WHERE es_sitename = '劳动新闻'
AND (es_tags IS NULL OR es_tags = '')
""")
records = cursor.fetchall()
if not records:
print("没有找到需要处理的记录。")
return
print(f"共找到 {len(records)} 条待处理记录。")
updated_count = 0
for i, (es_sid, es_urlname) in enumerate(records, 1):
print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ")
decoded = decode_rodong_url(es_urlname)
if decoded is not None:
# 更新 es_tags 字段
update_query = "UPDATE indeximos SET es_tags = %s WHERE es_sid = %s"
cursor.execute(update_query, (decoded, es_sid))
conn.commit()
updated_count += 1
print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}")
else:
print("跳过(无法解码)")
print(f"\n✅ 完成!共更新 {updated_count} 条记录。")
except mysql.connector.Error as db_err:
print(f"❌ 数据库错误: {db_err}")
except Exception as e:
print(f"❌ 脚本执行出错: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭。")
if __name__ == "__main__":
# 动态替换 SQL 中的表名(注意:表名不能用参数化,需手动拼接,但确保安全)
# 为安全起见,可加校验
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', 'indeximos'):
raise ValueError("表名包含非法字符!")
# 临时替换函数中的表名(更优雅的方式是传参,此处为简洁)
import sys
module = sys.modules[__name__]
# 修改 main 函数中的 SQL通过字符串替换
# 实际建议:将表名作为全局变量或参数传递
# 更简单做法:在 main() 上方定义 TABLE_NAME然后在 SQL 中直接引用
# 我们重写 main 函数内部逻辑以支持变量表名
# 重新定义带表名参数的主逻辑
def main_with_table(table_name):
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(buffered=True)
# 查询
query_sql = f"""
SELECT es_sid, es_urlname
FROM `{table_name}`
WHERE es_urlname LIKE '%index.php?%'
AND (es_tags IS NULL OR es_tags = '')
"""
cursor.execute(query_sql)
records = cursor.fetchall()
if not records:
print("没有找到需要处理的记录。")
return
print(f"共找到 {len(records)} 条待处理记录。")
updated_count = 0
for i, (es_sid, es_urlname) in enumerate(records, 1):
print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ")
decoded = decode_rodong_url(es_urlname)
if decoded is not None:
update_sql = f"UPDATE `{table_name}` SET es_tags = %s WHERE es_sid = %s"
cursor.execute(update_sql, (decoded, es_sid))
conn.commit()
updated_count += 1
print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}")
else:
print("跳过(无法解码)")
print(f"\n✅ 完成!共更新 {updated_count} 条记录。")
except mysql.connector.Error as db_err:
print(f"❌ 数据库错误: {db_err}")
except Exception as e:
print(f"❌ 脚本执行出错: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭。")
# 执行
main_with_table('indeximos')

View File

@ -4,6 +4,7 @@ import queue
import threading
import time
from datetime import datetime
import random
import pymysql
from tqdm import tqdm
@ -43,23 +44,26 @@ BATCH_SIZE = 500
MAX_WORKERS = 1
TIMEOUT = 10
PDF_OUTPUT_DIR = 'D:/data/output/pdf'
MIN_PDF_SIZE = 80 * 1024 # 80KB
MIN_PDF_SIZE = 5 * 1024 # 80KB
MHTML_OUTPUT_DIR = 'D:/data/output/mhtml'
os.makedirs(PDF_OUTPUT_DIR, exist_ok=True)
running = True
running_interval_seconds = 15
running_interval_seconds = 10
remote_host_name = [
skip_host_name = [
'epochtimes.com',
# 'secretchina.com'
'secretchina.com',
# 'rodong.rep.kp',
# 'kcna.kp'
]
class PDFDownloader:
def __init__(self):
self.db_lock = threading.Lock()
self.db_connection = None
self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3)
self.processed_count = 0
self.success_count = 0
@ -74,7 +78,7 @@ class PDFDownloader:
# 替换 MYSQL_CONFIG 中的连接方式
def get_db_connection(self):
return pymysql.connect(
self.db_connection = pymysql.connect(
host=MYSQL_CONFIG['host'],
port=MYSQL_CONFIG['port'],
user=MYSQL_CONFIG['user'],
@ -86,19 +90,21 @@ class PDFDownloader:
def get_total_rows(self):
"""获取总记录数"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT COUNT(*) FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) "
"WHERE (es_video IS NULL OR es_video IN ('-1')) "
"AND es_loadtime > %s", self.last_loadtime
)
return cursor.fetchone()[0]
def get_last_loadtime(self):
"""获取上次导出数据的时间"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT config_value FROM config "
"WHERE config_name = 'last_loadtime' "
@ -106,7 +112,7 @@ class PDFDownloader:
return cursor.fetchone()[0]
def use_remote_selenium(self, url):
for host in remote_host_name:
for host in skip_host_name:
if host in url:
return True
return False
@ -159,11 +165,12 @@ class PDFDownloader:
def fetch_data_batch(self, offset):
"""分页获取数据"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) "
"WHERE (es_video IS NULL OR es_video IN ('-1')) "
"AND es_loadtime > %s "
"ORDER BY es_urltime LIMIT %s OFFSET %s",
(self.last_loadtime, BATCH_SIZE, offset)
@ -174,12 +181,14 @@ class PDFDownloader:
"""更新数据库状态"""
for attempt in range(retry):
try:
with self.db_lock, self.get_db_connection() as conn:
cursor = conn.cursor()
with self.db_lock:
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"UPDATE indeximos SET es_video = %s WHERE es_sid = %s",
(status, es_sid))
conn.commit()
self.db_connection.commit()
return True
except Exception as e:
if attempt == retry - 1:
@ -216,20 +225,24 @@ class PDFDownloader:
# 调用下载函数
if self.use_remote_selenium(url):
if self.remote_handler is None:
self.remote_handler = RemotePDFSaver()
success = self.remote_handler.save_as_pdf(
url=url,
output_path=output_file,
timeout=TIMEOUT
)
self.processed_count += 1
self.task_queue.task_done()
continue
# if self.remote_handler is None:
# self.remote_handler = RemotePDFSaver()
# success = self.remote_handler.save_as_pdf(
# url=url,
# output_path=output_file,
# timeout=TIMEOUT
# )
else:
if self.local_handler is None:
self.local_handler = PDFSaver()
self.local_handler = PDFSaver(headless=False)
success = self.local_handler.save_as_pdf(
url=url,
output_path=output_file,
timeout=TIMEOUT
timeout=TIMEOUT,
wait_time=5
)
# 验证下载结果
@ -285,7 +298,9 @@ class PDFDownloader:
batch = self.fetch_data_batch(offset)
if not batch:
break
batch_list = list(batch)
random.shuffle(batch_list)
batch = tuple(batch_list)
for row in batch:
self.task_queue.put(row)
@ -312,11 +327,22 @@ class PDFDownloader:
print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}")
print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒")
def terminate(self):
if self.local_handler:
self.local_handler.quit()
if self.remote_handler:
self.remote_handler.quit()
self.db_connection.close()
if __name__ == "__main__":
while running:
try:
print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}")
downloader = PDFDownloader()
downloader.run()
print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...")
downloader.terminate()
time.sleep(running_interval_seconds)
except Exception as e:
print(repr(e))

View File

@ -23,7 +23,7 @@ logger = logging.getLogger(__name__)
class PDFSaver:
def __init__(self, headless=True):
logger.info("正在初始化 Chrome WebDriver自动匹配版本...")
service = ChromeService(executable_path="C:/Program Files/Python38/chromedriver.exe")
service = ChromeService(executable_path="D:/chromedriver.exe")
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.3650.75 Safari/537.36 Edg/143.0.3650.75"
# Chrome 选项
@ -42,6 +42,7 @@ class PDFSaver:
})
chrome_options.add_argument('--window-size=1920,1080')
chrome_options.add_argument("--disable-blink-features=AutomationControlled")
chrome_options.page_load_strategy = 'eager'
# 注意PDF 打印不需要 --save-page-as-mhtml
self.driver = webdriver.Chrome(service=service, options=chrome_options)

View File

@ -0,0 +1,118 @@
import pymysql
from typing import Dict, List, Tuple, Optional
# ================== 配置区 ==================
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
# 仅用于指定哪些 es_srcname 的记录需要处理(值可为空,因为不再做替换)
TARGET_SRCNAMES: List[str] = [
"http://www.kcna.kp/cn/category/articles/q/5394b80bdae203fadef02522cfb578c0.kcmsf",
# 添加你需要处理的站点名
]
# ================== 工具函数 ==================
def get_suffix_32(url: str) -> Optional[str]:
"""获取 URL 最后 32 个字符,不足则返回 None"""
if not url or len(url) < 32:
return None
return url[-32:]
def find_foreign_by_suffix(cursor, suffix: str, exclude_id: int) -> Optional[Tuple[str, str]]:
"""
根据后缀查找外文记录排除自身
"""
query = """
SELECT es_urltitle, es_urlcontent
FROM indeximos
WHERE
es_sid != %s
AND es_urlname IS NOT NULL
AND CHAR_LENGTH(es_urlname) >= 32
AND RIGHT(es_urlname, 32) = %s
LIMIT 1
"""
cursor.execute(query, (exclude_id, suffix))
result = cursor.fetchone()
return result if result else None
def update_chinese_record(cursor, record_id: int, title: str, content: str):
"""更新中文记录的 es_title 和 es_content"""
update_query = """
UPDATE indeximos
SET es_title = %s, es_content = %s
WHERE es_sid = %s
"""
cursor.execute(update_query, (title, content, record_id))
# ================== 主逻辑 ==================
def main():
if not TARGET_SRCNAMES:
print("⚠️ 未指定任何目标 es_srcname程序退出。")
return
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
# 获取所有目标站点的中文记录
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_srcname, es_urlname
FROM indeximos
WHERE es_srcname IN ({placeholders})
AND es_urlname IS NOT NULL
AND es_urlname != ''
"""
cursor.execute(query, TARGET_SRCNAMES)
records = cursor.fetchall()
total = len(records)
print(f"共加载 {total} 条来自 {TARGET_SRCNAMES} 的记录用于匹配...")
updated_count = 0
skipped_short = 0
for idx, (record_id, es_srcname, es_urlname) in enumerate(records, 1):
suffix = get_suffix_32(es_urlname)
if suffix is None:
skipped_short += 1
continue
foreign_data = find_foreign_by_suffix(cursor, suffix, record_id)
if foreign_data:
title, content = foreign_data
update_chinese_record(cursor, record_id, title, content)
updated_count += 1
print(f"[{idx}/{total}] ✅ 已更新 ID={record_id} | src={es_srcname}")
conn.commit()
print("\n" + "=" * 50)
print(f"✅ 匹配完成!")
print(f" - 成功更新: {updated_count}")
print(f" - 因 URL 长度 <32 跳过: {skipped_short}")
print(f" - 总处理: {total}")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误,已回滚: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,158 @@
import pymysql
import jieba
from collections import Counter
from typing import List, Tuple, Set
# ================== 配置区 ==================
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
# 指定需要处理的中文站点es_srcname
TARGET_SRCNAMES: List[str] = [
"http://www.rodong.rep.kp/cn/index.php?MUBAMUAxQA==",
# 添加你的站点
]
FOREIGN_SRCNAME = 'http://www.rodong.rep.kp/ko/index.php?MUBAMUAxQA=='
# 相似度阈值(关键词重合率),建议 0.3 ~ 0.6
SIMILARITY_THRESHOLD = 0.3
# ================== 文本相似度函数 ==================
def extract_keywords(text: str) -> Set[str]:
"""提取中文关键词:分词 + 过滤单字、数字、标点"""
if not text:
return set()
words = jieba.lcut(text)
return {w for w in words if len(w) >= 2 and w.isalpha()}
def keyword_overlap_similarity(title1: str, title2: str) -> float:
"""计算两个中文标题的关键词重合率"""
kw1 = extract_keywords(title1)
kw2 = extract_keywords(title2)
if not kw1 and not kw2:
return 1.0 if title1 == title2 else 0.0
if not kw1 or not kw2:
return 0.0
overlap = kw1 & kw2
return len(overlap) / max(len(kw1), len(kw2))
# ================== 数据库操作 ==================
def get_chinese_records(cursor) -> List[Tuple]:
"""获取待处理的中文记录"""
if not TARGET_SRCNAMES:
return []
placeholders = ','.join(['%s'] * len(TARGET_SRCNAMES))
query = f"""
SELECT es_sid, es_srcname, es_urlname, es_urltitle, es_urltime
FROM indeximos
WHERE es_srcname IN ({placeholders})
AND es_urltitle IS NOT NULL AND TRIM(es_urltitle) != ''
AND es_urltime IS NOT NULL
"""
cursor.execute(query, TARGET_SRCNAMES)
return cursor.fetchall()
def get_foreign_candidates_by_time(cursor, pub_time) -> List[Tuple]:
"""
获取同一发布时间的所有外文候选记录要求 es_abstract 不为空
"""
query = """
SELECT es_sid, es_abstract, es_urltitle, es_urlcontent
FROM indeximos
WHERE es_urltime = %s
AND es_abstract IS NOT NULL AND TRIM(es_abstract) != ''
AND es_urlcontent IS NOT NULL
"""
cursor.execute(query, (pub_time,))
return cursor.fetchall()
def update_chinese_record(cursor, record_id: int, new_title: str, content: str):
"""更新中文记录的标题和内容"""
update_query = """
UPDATE indeximos
SET es_title = %s, es_content = %s
WHERE es_sid = %s
"""
cursor.execute(update_query, (new_title, content, record_id))
# ================== 主逻辑 ==================
def main():
if not TARGET_SRCNAMES:
print("⚠️ 未指定目标站点,退出。")
return
conn = pymysql.connect(**DB_CONFIG)
cursor = conn.cursor()
try:
chinese_records = get_chinese_records(cursor)
total = len(chinese_records)
print(f"共加载 {total} 条中文记录用于匹配...")
matched_count = 0
for idx, (cid, srcname, urlname, zh_title, pub_time) in enumerate(chinese_records, 1):
print(f"\n[{idx}/{total}] ID={cid}, 时间={pub_time}, 标题='{zh_title[:30]}...'")
candidates = get_foreign_candidates_by_time(cursor, pub_time)
if not candidates:
print(" → 无同时间且有翻译标题的外文记录")
continue
best_score = 0.0
best_candidate = None
for fid, trans_title, ori_title, content in candidates:
# 跳过自己(理论上不会发生,但安全起见)
if fid == cid:
continue
score = keyword_overlap_similarity(zh_title, trans_title)
print(f" 候选ID={fid} | 翻译标题='{trans_title[:30]}...' | 重合度={score:.3f}")
if score > best_score:
best_score = score
best_candidate = (ori_title, content)
if best_candidate and best_score >= SIMILARITY_THRESHOLD:
final_title, final_content = best_candidate
update_chinese_record(cursor, cid, final_title, final_content)
matched_count += 1
print(f" ✅ 匹配成功! 重合度={best_score:.3f}")
else:
print(f" ❌ 未达阈值(最高相似度={best_score:.3f}")
conn.commit()
print("\n" + "=" * 50)
print(f"✅ 匹配完成!成功关联 {matched_count} / {total} 条记录。")
except Exception as e:
conn.rollback()
print(f"❌ 发生错误,已回滚: {e}")
raise
finally:
cursor.close()
conn.close()
if __name__ == "__main__":
main()