osc/research/pdf_downloader/save-page-with-selenium.py
2026-01-19 09:17:10 +08:00

349 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import os
import queue
import threading
import time
from datetime import datetime
import random
import pymysql
from tqdm import tqdm
from save_page_as_pdf import PDFSaver
from save_remote_as_mhtml import RemoteMHTMLSaver
from save_page_as_mhtml import MHTMLSaver
import tldextract
# 配置日志
from save_remote_as_pdf import RemotePDFSaver
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('pdf_downloader.log')
]
)
logger = logging.getLogger(__name__)
# =============== MySQL 配置 ===============
MYSQL_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
'autocommit': False # 手动控制事务
}
# =========================================
# 配置参数
BATCH_SIZE = 500
MAX_WORKERS = 1
TIMEOUT = 10
PDF_OUTPUT_DIR = 'D:/data/output/pdf'
MIN_PDF_SIZE = 5 * 1024 # 80KB
MHTML_OUTPUT_DIR = 'D:/data/output/mhtml'
os.makedirs(PDF_OUTPUT_DIR, exist_ok=True)
running = True
running_interval_seconds = 10
skip_host_name = [
'epochtimes.com',
'secretchina.com',
# 'rodong.rep.kp',
# 'kcna.kp'
]
class PDFDownloader:
def __init__(self):
self.db_lock = threading.Lock()
self.db_connection = None
self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3)
self.processed_count = 0
self.success_count = 0
self.fail_count = 0
self.small_file_count = 0 # 新增:统计小文件数量
self.last_loadtime = self.get_last_loadtime()
self.total_rows = self.get_total_rows()
self.start_time = time.time()
self.skip_hosts = []
self.local_handler = None
self.remote_handler = None
# 替换 MYSQL_CONFIG 中的连接方式
def get_db_connection(self):
self.db_connection = pymysql.connect(
host=MYSQL_CONFIG['host'],
port=MYSQL_CONFIG['port'],
user=MYSQL_CONFIG['user'],
password=MYSQL_CONFIG['password'],
database=MYSQL_CONFIG['database'],
charset='utf8mb4',
autocommit=False
)
def get_total_rows(self):
"""获取总记录数"""
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT COUNT(*) FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-1')) "
"AND es_loadtime > %s", self.last_loadtime
)
return cursor.fetchone()[0]
def get_last_loadtime(self):
"""获取上次导出数据的时间"""
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT config_value FROM config "
"WHERE config_name = 'last_loadtime' "
)
return cursor.fetchone()[0]
def use_remote_selenium(self, url):
for host in skip_host_name:
if host in url:
return True
return False
def format_pdf_filename(self, row):
"""格式化PDF文件名"""
es_urltitle = row[2] or 'untitled'
es_urltime = str(row[3]) or '19700101_000000'
es_sitename = row[4] or 'anonymous'
def clean_filename(text):
if not text:
return ''
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
text = text.replace(char, '_')
return text.strip()[:100]
try:
dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S')
es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S')
except:
es_urltime_fix = '19700101_000000'
filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.pdf"
return os.path.join(PDF_OUTPUT_DIR, filename)
def format_mhtml_filename(self, row):
"""格式化PDF文件名"""
es_urltitle = row[2] or 'untitled'
es_urltime = str(row[3]) or '19700101_000000'
es_sitename = row[4] or 'anonymous'
def clean_filename(text):
if not text:
return ''
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
text = text.replace(char, '_')
return text.strip()[:100]
try:
dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S')
es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S')
except:
es_urltime_fix = '19700101_000000'
filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.mhtml"
return os.path.join(PDF_OUTPUT_DIR, filename)
def fetch_data_batch(self, offset):
"""分页获取数据"""
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-1')) "
"AND es_loadtime > %s "
"ORDER BY es_urltime LIMIT %s OFFSET %s",
(self.last_loadtime, BATCH_SIZE, offset)
)
return cursor.fetchall()
def update_file_status(self, es_sid, status, retry=3):
"""更新数据库状态"""
for attempt in range(retry):
try:
with self.db_lock:
if self.db_connection is None:
self.get_db_connection()
cursor = self.db_connection.cursor()
cursor.execute(
"UPDATE indeximos SET es_video = %s WHERE es_sid = %s",
(status, es_sid))
self.db_connection.commit()
return True
except Exception as e:
if attempt == retry - 1:
logger.error(f"更新数据库失败(es_sid={es_sid}): {e}")
return False
time.sleep(1)
def extract_main_domain(self, url):
extracted = tldextract.extract(url)
# 组合注册域名(主域名)
main_domain = f"{extracted.domain}.{extracted.suffix}"
return main_domain
def download_worker(self):
"""工作线程函数"""
while True:
try:
task = self.task_queue.get(timeout=1)
if task is None:
break
row = task
url = row[1]
if self.extract_main_domain(url) in self.skip_hosts:
self.small_file_count += 1
self.processed_count += 1
self.task_queue.task_done()
print(f"小文件规避暂时跳过URL{url}")
continue
output_file = self.format_pdf_filename(row) # 获取格式化后的文件名
try:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 调用下载函数
if self.use_remote_selenium(url):
self.processed_count += 1
self.task_queue.task_done()
continue
# if self.remote_handler is None:
# self.remote_handler = RemotePDFSaver()
# success = self.remote_handler.save_as_pdf(
# url=url,
# output_path=output_file,
# timeout=TIMEOUT
# )
else:
if self.local_handler is None:
self.local_handler = PDFSaver(headless=False)
success = self.local_handler.save_as_pdf(
url=url,
output_path=output_file,
timeout=TIMEOUT,
wait_time=5
)
# 验证下载结果
if success and os.path.exists(output_file):
file_size = os.path.getsize(output_file)
if file_size >= MIN_PDF_SIZE: # 文件大小合格
self.update_file_status(row[0], output_file)
self.success_count += 1
else: # 文件太小
self.update_file_status(row[0], '-2')
self.small_file_count += 1
logger.warning(f"文件过小({file_size}字节): {output_file}")
try:
os.remove(output_file)
self.skip_hosts.append(self.extract_main_domain(url))
except:
pass
else: # 下载失败
self.update_file_status(row[0], '0')
self.fail_count += 1
if os.path.exists(output_file):
try:
os.remove(output_file)
except:
pass
except Exception as e:
logger.error(f"下载出现异常(es_sid={row[0]}, url={url}): {str(e)}")
self.update_file_status(row[0], '-1')
self.fail_count += 1
self.processed_count += 1
self.task_queue.task_done()
except queue.Empty:
continue
def run(self):
"""启动下载任务"""
threads = []
# 创建工作线程
for _ in range(MAX_WORKERS):
t = threading.Thread(target=self.download_worker)
t.start()
threads.append(t)
# 使用进度条显示进度
with tqdm(total=self.total_rows, desc="处理进度", unit="") as pbar:
offset = 0
while True:
batch = self.fetch_data_batch(offset)
if not batch:
break
batch_list = list(batch)
random.shuffle(batch_list)
batch = tuple(batch_list)
for row in batch:
self.task_queue.put(row)
pbar.update(len(batch))
pbar.set_postfix({
'成功': self.success_count,
'失败': self.fail_count,
'小文件': self.small_file_count,
'速度': f"{self.processed_count / (time.time() - self.start_time):.1f}条/秒"
})
offset += BATCH_SIZE
self.task_queue.join()
for _ in range(MAX_WORKERS):
self.task_queue.put(None)
for t in threads:
t.join()
total_time = time.time() - self.start_time
print(f"\n处理完成! 总计: {self.total_rows}")
print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}")
print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒")
def terminate(self):
if self.local_handler:
self.local_handler.quit()
if self.remote_handler:
self.remote_handler.quit()
self.db_connection.close()
if __name__ == "__main__":
while running:
try:
print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}")
downloader = PDFDownloader()
downloader.run()
print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...")
downloader.terminate()
time.sleep(running_interval_seconds)
except Exception as e:
print(repr(e))