323 lines
11 KiB
Python
323 lines
11 KiB
Python
import logging
|
||
import os
|
||
import queue
|
||
import threading
|
||
import time
|
||
from datetime import datetime
|
||
|
||
import pymysql
|
||
from tqdm import tqdm
|
||
|
||
from save_page_as_pdf import PDFSaver
|
||
from save_remote_as_mhtml import RemoteMHTMLSaver
|
||
from save_page_as_mhtml import MHTMLSaver
|
||
import tldextract
|
||
|
||
# 配置日志
|
||
from save_remote_as_pdf import RemotePDFSaver
|
||
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.StreamHandler(),
|
||
logging.FileHandler('pdf_downloader.log')
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# =============== MySQL 配置 ===============
|
||
MYSQL_CONFIG = {
|
||
'host': '47.113.231.200',
|
||
'port': 28089,
|
||
'user': 'root',
|
||
'password': 'passok123A',
|
||
'database': 'dsp',
|
||
'charset': 'utf8mb4',
|
||
'autocommit': False # 手动控制事务
|
||
}
|
||
# =========================================
|
||
|
||
# 配置参数
|
||
BATCH_SIZE = 500
|
||
MAX_WORKERS = 1
|
||
TIMEOUT = 10
|
||
PDF_OUTPUT_DIR = 'D:/data/output/pdf'
|
||
MIN_PDF_SIZE = 80 * 1024 # 80KB
|
||
|
||
MHTML_OUTPUT_DIR = 'D:/data/output/mhtml'
|
||
os.makedirs(PDF_OUTPUT_DIR, exist_ok=True)
|
||
|
||
running = True
|
||
running_interval_seconds = 15
|
||
|
||
remote_host_name = [
|
||
'epochtimes.com',
|
||
# 'secretchina.com'
|
||
]
|
||
|
||
|
||
class PDFDownloader:
|
||
def __init__(self):
|
||
self.db_lock = threading.Lock()
|
||
self.task_queue = queue.Queue(maxsize=MAX_WORKERS * 3)
|
||
self.processed_count = 0
|
||
self.success_count = 0
|
||
self.fail_count = 0
|
||
self.small_file_count = 0 # 新增:统计小文件数量
|
||
self.last_loadtime = self.get_last_loadtime()
|
||
self.total_rows = self.get_total_rows()
|
||
self.start_time = time.time()
|
||
self.skip_hosts = []
|
||
self.local_handler = None
|
||
self.remote_handler = None
|
||
|
||
# 替换 MYSQL_CONFIG 中的连接方式
|
||
def get_db_connection(self):
|
||
return pymysql.connect(
|
||
host=MYSQL_CONFIG['host'],
|
||
port=MYSQL_CONFIG['port'],
|
||
user=MYSQL_CONFIG['user'],
|
||
password=MYSQL_CONFIG['password'],
|
||
database=MYSQL_CONFIG['database'],
|
||
charset='utf8mb4',
|
||
autocommit=False
|
||
)
|
||
|
||
def get_total_rows(self):
|
||
"""获取总记录数"""
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"SELECT COUNT(*) FROM indeximos "
|
||
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) "
|
||
"AND es_loadtime > %s", self.last_loadtime
|
||
)
|
||
return cursor.fetchone()[0]
|
||
|
||
def get_last_loadtime(self):
|
||
"""获取上次导出数据的时间"""
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"SELECT config_value FROM config "
|
||
"WHERE config_name = 'last_loadtime' "
|
||
)
|
||
return cursor.fetchone()[0]
|
||
|
||
def use_remote_selenium(self, url):
|
||
for host in remote_host_name:
|
||
if host in url:
|
||
return True
|
||
return False
|
||
|
||
def format_pdf_filename(self, row):
|
||
"""格式化PDF文件名"""
|
||
es_urltitle = row[2] or 'untitled'
|
||
es_urltime = str(row[3]) or '19700101_000000'
|
||
es_sitename = row[4] or 'anonymous'
|
||
|
||
def clean_filename(text):
|
||
if not text:
|
||
return ''
|
||
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
|
||
for char in invalid_chars:
|
||
text = text.replace(char, '_')
|
||
return text.strip()[:100]
|
||
|
||
try:
|
||
dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S')
|
||
es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S')
|
||
except:
|
||
es_urltime_fix = '19700101_000000'
|
||
|
||
filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.pdf"
|
||
return os.path.join(PDF_OUTPUT_DIR, filename)
|
||
|
||
def format_mhtml_filename(self, row):
|
||
"""格式化PDF文件名"""
|
||
es_urltitle = row[2] or 'untitled'
|
||
es_urltime = str(row[3]) or '19700101_000000'
|
||
es_sitename = row[4] or 'anonymous'
|
||
|
||
def clean_filename(text):
|
||
if not text:
|
||
return ''
|
||
invalid_chars = ['<', '>', ':', '"', '/', '\\', '|', '?', '*']
|
||
for char in invalid_chars:
|
||
text = text.replace(char, '_')
|
||
return text.strip()[:100]
|
||
|
||
try:
|
||
dt = datetime.strptime(es_urltime, '%Y-%m-%d %H:%M:%S')
|
||
es_urltime_fix = dt.strftime('%Y%m%d_%H%M%S')
|
||
except:
|
||
es_urltime_fix = '19700101_000000'
|
||
|
||
filename = f"{clean_filename(es_urltitle)}_{es_urltime_fix}_{es_sitename}.mhtml"
|
||
return os.path.join(PDF_OUTPUT_DIR, filename)
|
||
|
||
def fetch_data_batch(self, offset):
|
||
"""分页获取数据"""
|
||
with self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"SELECT es_sid, es_urlname, es_urltitle, es_urltime, es_sitename, es_authors FROM indeximos "
|
||
"WHERE (es_video IS NULL OR es_video IN ('-2', '-1')) "
|
||
"AND es_loadtime > %s "
|
||
"ORDER BY es_urltime LIMIT %s OFFSET %s",
|
||
(self.last_loadtime, BATCH_SIZE, offset)
|
||
)
|
||
return cursor.fetchall()
|
||
|
||
def update_file_status(self, es_sid, status, retry=3):
|
||
"""更新数据库状态"""
|
||
for attempt in range(retry):
|
||
try:
|
||
with self.db_lock, self.get_db_connection() as conn:
|
||
cursor = conn.cursor()
|
||
cursor.execute(
|
||
"UPDATE indeximos SET es_video = %s WHERE es_sid = %s",
|
||
(status, es_sid))
|
||
conn.commit()
|
||
return True
|
||
except Exception as e:
|
||
if attempt == retry - 1:
|
||
logger.error(f"更新数据库失败(es_sid={es_sid}): {e}")
|
||
return False
|
||
time.sleep(1)
|
||
|
||
def extract_main_domain(self, url):
|
||
extracted = tldextract.extract(url)
|
||
# 组合注册域名(主域名)
|
||
main_domain = f"{extracted.domain}.{extracted.suffix}"
|
||
return main_domain
|
||
|
||
def download_worker(self):
|
||
"""工作线程函数"""
|
||
while True:
|
||
try:
|
||
task = self.task_queue.get(timeout=1)
|
||
if task is None:
|
||
break
|
||
|
||
row = task
|
||
url = row[1]
|
||
if self.extract_main_domain(url) in self.skip_hosts:
|
||
self.small_file_count += 1
|
||
self.processed_count += 1
|
||
self.task_queue.task_done()
|
||
print(f"小文件规避,暂时跳过URL:{url}")
|
||
continue
|
||
output_file = self.format_pdf_filename(row) # 获取格式化后的文件名
|
||
|
||
try:
|
||
os.makedirs(os.path.dirname(output_file), exist_ok=True)
|
||
|
||
# 调用下载函数
|
||
if self.use_remote_selenium(url):
|
||
if self.remote_handler is None:
|
||
self.remote_handler = RemotePDFSaver()
|
||
success = self.remote_handler.save_as_pdf(
|
||
url=url,
|
||
output_path=output_file,
|
||
timeout=TIMEOUT
|
||
)
|
||
else:
|
||
if self.local_handler is None:
|
||
self.local_handler = PDFSaver()
|
||
success = self.local_handler.save_as_pdf(
|
||
url=url,
|
||
output_path=output_file,
|
||
timeout=TIMEOUT
|
||
)
|
||
|
||
# 验证下载结果
|
||
if success and os.path.exists(output_file):
|
||
file_size = os.path.getsize(output_file)
|
||
|
||
if file_size >= MIN_PDF_SIZE: # 文件大小合格
|
||
self.update_file_status(row[0], output_file)
|
||
self.success_count += 1
|
||
else: # 文件太小
|
||
self.update_file_status(row[0], '-2')
|
||
self.small_file_count += 1
|
||
logger.warning(f"文件过小({file_size}字节): {output_file}")
|
||
try:
|
||
os.remove(output_file)
|
||
self.skip_hosts.append(self.extract_main_domain(url))
|
||
except:
|
||
pass
|
||
else: # 下载失败
|
||
self.update_file_status(row[0], '0')
|
||
self.fail_count += 1
|
||
if os.path.exists(output_file):
|
||
try:
|
||
os.remove(output_file)
|
||
except:
|
||
pass
|
||
|
||
except Exception as e:
|
||
logger.error(f"下载出现异常(es_sid={row[0]}, url={url}): {str(e)}")
|
||
self.update_file_status(row[0], '-1')
|
||
self.fail_count += 1
|
||
|
||
self.processed_count += 1
|
||
self.task_queue.task_done()
|
||
|
||
except queue.Empty:
|
||
continue
|
||
|
||
def run(self):
|
||
"""启动下载任务"""
|
||
threads = []
|
||
|
||
# 创建工作线程
|
||
for _ in range(MAX_WORKERS):
|
||
t = threading.Thread(target=self.download_worker)
|
||
t.start()
|
||
threads.append(t)
|
||
|
||
# 使用进度条显示进度
|
||
with tqdm(total=self.total_rows, desc="处理进度", unit="条") as pbar:
|
||
offset = 0
|
||
while True:
|
||
batch = self.fetch_data_batch(offset)
|
||
if not batch:
|
||
break
|
||
|
||
for row in batch:
|
||
self.task_queue.put(row)
|
||
|
||
pbar.update(len(batch))
|
||
pbar.set_postfix({
|
||
'成功': self.success_count,
|
||
'失败': self.fail_count,
|
||
'小文件': self.small_file_count,
|
||
'速度': f"{self.processed_count / (time.time() - self.start_time):.1f}条/秒"
|
||
})
|
||
|
||
offset += BATCH_SIZE
|
||
|
||
self.task_queue.join()
|
||
|
||
for _ in range(MAX_WORKERS):
|
||
self.task_queue.put(None)
|
||
|
||
for t in threads:
|
||
t.join()
|
||
|
||
total_time = time.time() - self.start_time
|
||
print(f"\n处理完成! 总计: {self.total_rows}条")
|
||
print(f"成功: {self.success_count}条, 失败: {self.fail_count}条, 小文件: {self.small_file_count}条")
|
||
print(f"总耗时: {total_time:.2f}秒, 平均速度: {self.total_rows / total_time:.2f}条/秒")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
while running:
|
||
print(f"开始处理,总记录数: {PDFDownloader().get_total_rows()}")
|
||
downloader = PDFDownloader()
|
||
downloader.run()
|
||
print(f"运行完成,暂停{running_interval_seconds}秒后开始下一次运行...")
|
||
time.sleep(running_interval_seconds)
|