osc/research/pdf_downloader/save-page-with-selenium.py

323 lines
11 KiB
Python
Raw Normal View History

2025-12-26 08:54:58 +08:00
import logging
import os
import queue
import threading
import time
from datetime import datetime
import pymysql
from tqdm import tqdm
from save_page_as_pdf import PDFSaver
from save_remote_as_mhtml import RemoteMHTMLSaver
from save_page_as_mhtml import MHTMLSaver
import tldextract
# 配置日志
from save_remote_as_pdf import RemotePDFSaver
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('pdf_downloader.log')
]
)
logger = logging.getLogger(__name__)
# =============== MySQL 配置 ===============
MYSQL_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
'autocommit': False # 手动控制事务
}
# =========================================
# 配置参数
BATCH_SIZE = 500
MAX_WORKERS = 1
TIMEOUT = 10
PDF_OUTPUT_DIR = 'D:/data/output/pdf'
MIN_PDF_SIZE = 80 * 1024 # 80KB
MHTML_OUTPUT_DIR = 'D:/data/output/mhtml'
os.makedirs(PDF_OUTPUT_DIR, exist_ok=True)
running = True
running_interval_seconds = 15
remote_host_name = [
'epochtimes.com',
# 'secretchina.com'
]
class PDFDownloader:
def __init__(self):
self.db_lock = threading.Lock()
self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3)
self.processed_count = 0
self.success_count = 0
self.fail_count = 0
self.small_file_count = 0 # 新增:统计小文件数量
self.last_loadtime = self.get_last_loadtime()
self.total_rows = self.get_total_rows()
self.start_time = time.time()
self.skip_hosts = []
self.local_handler = None
self.remote_handler = None
# 替换 MYSQL_CONFIG 中的连接方式
def get_db_connection(self):
return pymysql.connect(
host=MYSQL_CONFIG['host'],
port=MYSQL_CONFIG['port'],
user=MYSQL_CONFIG['user'],
password=MYSQL_CONFIG['password'],
database=MYSQL_CONFIG['database'],
charset='utf8mb4',
autocommit=False
)
def get_total_rows(self):
"""获取总记录数"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT COUNT(*) FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) "
"AND es_loadtime > %s", self.last_loadtime
)
return cursor.fetchone()[0]
def get_last_loadtime(self):
"""获取上次导出数据的时间"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT config_value FROM config "
"WHERE config_name = 'last_loadtime' "
)
return cursor.fetchone()[0]
def use_remote_selenium(self, url):
for host in remote_host_name:
if host in url:
return True
return False
def format_pdf_filename(self, row):
"""格式化PDF文件名"""
es_urltitle = row[2] or 'untitled'
es_urltime = str(row[3]) or '19700101_000000'
es_sitename = row[4] or 'anonymous'
def clean_filename(text):
if not text:
return ''
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
text = text.replace(char, '_')
return text.strip()[:100]
try:
dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S')
es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S')
except:
es_urltime_fix = '19700101_000000'
filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.pdf"
return os.path.join(PDF_OUTPUT_DIR, filename)
def format_mhtml_filename(self, row):
"""格式化PDF文件名"""
es_urltitle = row[2] or 'untitled'
es_urltime = str(row[3]) or '19700101_000000'
es_sitename = row[4] or 'anonymous'
def clean_filename(text):
if not text:
return ''
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
for char in invalid_chars:
text = text.replace(char, '_')
return text.strip()[:100]
try:
dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S')
es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S')
except:
es_urltime_fix = '19700101_000000'
filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.mhtml"
return os.path.join(PDF_OUTPUT_DIR, filename)
def fetch_data_batch(self, offset):
"""分页获取数据"""
with self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos "
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) "
"AND es_loadtime > %s "
"ORDER BY es_urltime LIMIT %s OFFSET %s",
(self.last_loadtime, BATCH_SIZE, offset)
)
return cursor.fetchall()
def update_file_status(self, es_sid, status, retry=3):
"""更新数据库状态"""
for attempt in range(retry):
try:
with self.db_lock, self.get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE indeximos SET es_video = %s WHERE es_sid = %s",
(status, es_sid))
conn.commit()
return True
except Exception as e:
if attempt == retry - 1:
logger.error(f"更新数据库失败(es_sid={es_sid}): {e}")
return False
time.sleep(1)
def extract_main_domain(self, url):
extracted = tldextract.extract(url)
# 组合注册域名(主域名)
main_domain = f"{extracted.domain}.{extracted.suffix}"
return main_domain
def download_worker(self):
"""工作线程函数"""
while True:
try:
task = self.task_queue.get(timeout=1)
if task is None:
break
row = task
url = row[1]
if self.extract_main_domain(url) in self.skip_hosts:
self.small_file_count += 1
self.processed_count += 1
self.task_queue.task_done()
print(f"小文件规避暂时跳过URL{url}")
continue
output_file = self.format_pdf_filename(row) # 获取格式化后的文件名
try:
os.makedirs(os.path.dirname(output_file), exist_ok=True)
# 调用下载函数
if self.use_remote_selenium(url):
if self.remote_handler is None:
self.remote_handler = RemotePDFSaver()
success = self.remote_handler.save_as_pdf(
url=url,
output_path=output_file,
timeout=TIMEOUT
)
else:
if self.local_handler is None:
self.local_handler = PDFSaver()
success = self.local_handler.save_as_pdf(
url=url,
output_path=output_file,
timeout=TIMEOUT
)
# 验证下载结果
if success and os.path.exists(output_file):
file_size = os.path.getsize(output_file)
if file_size >= MIN_PDF_SIZE: # 文件大小合格
self.update_file_status(row[0], output_file)
self.success_count += 1
else: # 文件太小
self.update_file_status(row[0], '-2')
self.small_file_count += 1
logger.warning(f"文件过小({file_size}字节): {output_file}")
try:
os.remove(output_file)
self.skip_hosts.append(self.extract_main_domain(url))
except:
pass
else: # 下载失败
self.update_file_status(row[0], '0')
self.fail_count += 1
if os.path.exists(output_file):
try:
os.remove(output_file)
except:
pass
except Exception as e:
logger.error(f"下载出现异常(es_sid={row[0]}, url={url}): {str(e)}")
self.update_file_status(row[0], '-1')
self.fail_count += 1
self.processed_count += 1
self.task_queue.task_done()
except queue.Empty:
continue
def run(self):
"""启动下载任务"""
threads = []
# 创建工作线程
for _ in range(MAX_WORKERS):
t = threading.Thread(target=self.download_worker)
t.start()
threads.append(t)
# 使用进度条显示进度
with tqdm(total=self.total_rows, desc="处理进度", unit="") as pbar:
offset = 0
while True:
batch = self.fetch_data_batch(offset)
if not batch:
break
for row in batch:
self.task_queue.put(row)
pbar.update(len(batch))
pbar.set_postfix({
'成功': self.success_count,
'失败': self.fail_count,
'小文件': self.small_file_count,
'速度': f"{self.processed_count / (time.time() - self.start_time):.1f}条/秒"
})
offset += BATCH_SIZE
self.task_queue.join()
for _ in range(MAX_WORKERS):
self.task_queue.put(None)
for t in threads:
t.join()
total_time = time.time() - self.start_time
print(f"\n处理完成! 总计: {self.total_rows}")
print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}")
print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒")
if __name__ == "__main__":
while running:
print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}")
downloader = PDFDownloader()
downloader.run()
print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...")
time.sleep(running_interval_seconds)