import logging import os import queue import threading import time from datetime import datetime import random import pymysql from tqdm import tqdm from save_page_as_pdf import PDFSaver from save_remote_as_mhtml import RemoteMHTMLSaver from save_page_as_mhtml import MHTMLSaver import tldextract # 配置日志 from save_remote_as_pdf import RemotePDFSaver logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('pdf_downloader.log') ] ) logger = logging.getLogger(__name__) # =============== MySQL 配置 =============== MYSQL_CONFIG = { 'host': '47.113.231.200', 'port': 28089, 'user': 'root', 'password': 'passok123A', 'database': 'dsp', 'charset': 'utf8mb4', 'autocommit': False # 手动控制事务 } # ========================================= # 配置参数 BATCH_SIZE = 500 MAX_WORKERS = 1 TIMEOUT = 10 PDF_OUTPUT_DIR = 'D:/data/output/pdf' MIN_PDF_SIZE = 5 * 1024 # 80KB MHTML_OUTPUT_DIR = 'D:/data/output/mhtml' os.makedirs(PDF_OUTPUT_DIR, exist_ok=True) running = True running_interval_seconds = 10 skip_host_name = [ 'epochtimes.com', 'secretchina.com', # 'rodong.rep.kp', # 'kcna.kp' ] class PDFDownloader: def __init__(self): self.db_lock = threading.Lock() self.db_connection = None self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3) self.processed_count = 0 self.success_count = 0 self.fail_count = 0 self.small_file_count = 0 # 新增:统计小文件数量 self.last_loadtime = self.get_last_loadtime() self.total_rows = self.get_total_rows() self.start_time = time.time() self.skip_hosts = [] self.local_handler = None self.remote_handler = None # 替换 MYSQL_CONFIG 中的连接方式 def get_db_connection(self): self.db_connection = pymysql.connect( host=MYSQL_CONFIG['host'], port=MYSQL_CONFIG['port'], user=MYSQL_CONFIG['user'], password=MYSQL_CONFIG['password'], database=MYSQL_CONFIG['database'], charset='utf8mb4', autocommit=False ) def get_total_rows(self): """获取总记录数""" if self.db_connection is None: self.get_db_connection() cursor = self.db_connection.cursor() cursor.execute( "SELECT COUNT(*) FROM indeximos " "WHERE (es_video IS NULL OR es_video IN ('-1')) " "AND es_loadtime > %s", self.last_loadtime ) return cursor.fetchone()[0] def get_last_loadtime(self): """获取上次导出数据的时间""" if self.db_connection is None: self.get_db_connection() cursor = self.db_connection.cursor() cursor.execute( "SELECT config_value FROM config " "WHERE config_name = 'last_loadtime' " ) return cursor.fetchone()[0] def use_remote_selenium(self, url): for host in skip_host_name: if host in url: return True return False def format_pdf_filename(self, row): """格式化PDF文件名""" es_urltitle = row[2] or 'untitled' es_urltime = str(row[3]) or '19700101_000000' es_sitename = row[4] or 'anonymous' def clean_filename(text): if not text: return '' invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*'] for char in invalid_chars: text = text.replace(char, '_') return text.strip()[:100] try: dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S') es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S') except: es_urltime_fix = '19700101_000000' filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.pdf" return os.path.join(PDF_OUTPUT_DIR, filename) def format_mhtml_filename(self, row): """格式化PDF文件名""" es_urltitle = row[2] or 'untitled' es_urltime = str(row[3]) or '19700101_000000' es_sitename = row[4] or 'anonymous' def clean_filename(text): if not text: return '' invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*'] for char in invalid_chars: text = text.replace(char, '_') return text.strip()[:100] try: dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S') es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S') except: es_urltime_fix = '19700101_000000' filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.mhtml" return os.path.join(PDF_OUTPUT_DIR, filename) def fetch_data_batch(self, offset): """分页获取数据""" if self.db_connection is None: self.get_db_connection() cursor = self.db_connection.cursor() cursor.execute( "SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos " "WHERE (es_video IS NULL OR es_video IN ('-1')) " "AND es_loadtime > %s " "ORDER BY es_urltime LIMIT %s OFFSET %s", (self.last_loadtime, BATCH_SIZE, offset) ) return cursor.fetchall() def update_file_status(self, es_sid, status, retry=3): """更新数据库状态""" for attempt in range(retry): try: with self.db_lock: if self.db_connection is None: self.get_db_connection() cursor = self.db_connection.cursor() cursor.execute( "UPDATE indeximos SET es_video = %s WHERE es_sid = %s", (status, es_sid)) self.db_connection.commit() return True except Exception as e: if attempt == retry - 1: logger.error(f"更新数据库失败(es_sid={es_sid}): {e}") return False time.sleep(1) def extract_main_domain(self, url): extracted = tldextract.extract(url) # 组合注册域名(主域名) main_domain = f"{extracted.domain}.{extracted.suffix}" return main_domain def download_worker(self): """工作线程函数""" while True: try: task = self.task_queue.get(timeout=1) if task is None: break row = task url = row[1] if self.extract_main_domain(url) in self.skip_hosts: self.small_file_count += 1 self.processed_count += 1 self.task_queue.task_done() print(f"小文件规避,暂时跳过URL:{url}") continue output_file = self.format_pdf_filename(row) # 获取格式化后的文件名 try: os.makedirs(os.path.dirname(output_file), exist_ok=True) # 调用下载函数 if self.use_remote_selenium(url): self.processed_count += 1 self.task_queue.task_done() continue # if self.remote_handler is None: # self.remote_handler = RemotePDFSaver() # success = self.remote_handler.save_as_pdf( # url=url, # output_path=output_file, # timeout=TIMEOUT # ) else: if self.local_handler is None: self.local_handler = PDFSaver(headless=False) success = self.local_handler.save_as_pdf( url=url, output_path=output_file, timeout=TIMEOUT, wait_time=5 ) # 验证下载结果 if success and os.path.exists(output_file): file_size = os.path.getsize(output_file) if file_size >= MIN_PDF_SIZE: # 文件大小合格 self.update_file_status(row[0], output_file) self.success_count += 1 else: # 文件太小 self.update_file_status(row[0], '-2') self.small_file_count += 1 logger.warning(f"文件过小({file_size}字节): {output_file}") try: os.remove(output_file) self.skip_hosts.append(self.extract_main_domain(url)) except: pass else: # 下载失败 self.update_file_status(row[0], '0') self.fail_count += 1 if os.path.exists(output_file): try: os.remove(output_file) except: pass except Exception as e: logger.error(f"下载出现异常(es_sid={row[0]}, url={url}): {str(e)}") self.update_file_status(row[0], '-1') self.fail_count += 1 self.processed_count += 1 self.task_queue.task_done() except queue.Empty: continue def run(self): """启动下载任务""" threads = [] # 创建工作线程 for _ in range(MAX_WORKERS): t = threading.Thread(target=self.download_worker) t.start() threads.append(t) # 使用进度条显示进度 with tqdm(total=self.total_rows, desc="处理进度", unit="条") as pbar: offset = 0 while True: batch = self.fetch_data_batch(offset) if not batch: break batch_list = list(batch) random.shuffle(batch_list) batch = tuple(batch_list) for row in batch: self.task_queue.put(row) pbar.update(len(batch)) pbar.set_postfix({ '成功': self.success_count, '失败': self.fail_count, '小文件': self.small_file_count, '速度': f"{self.processed_count / (time.time() - self.start_time):.1f}条/秒" }) offset += BATCH_SIZE self.task_queue.join() for _ in range(MAX_WORKERS): self.task_queue.put(None) for t in threads: t.join() total_time = time.time() - self.start_time print(f"\n处理完成! 总计: {self.total_rows}条") print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}条") print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒") def terminate(self): if self.local_handler: self.local_handler.quit() if self.remote_handler: self.remote_handler.quit() self.db_connection.close() if __name__ == "__main__": while running: try: print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}") downloader = PDFDownloader() downloader.run() print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...") downloader.terminate() time.sleep(running_interval_seconds) except Exception as e: print(repr(e))