osc/research/pdf_downloader/decode-url-for-rodong-news.py
2026-01-19 09:17:10 +08:00

172 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import mysql.connector
import base64
import urllib.parse
import re
# === 数据库配置 ===
DB_CONFIG = {
'host': '47.113.231.200',
'port': 28089,
'user': 'root',
'password': 'passok123A',
'database': 'dsp',
'charset': 'utf8mb4',
}
def decode_rodong_url(url):
"""
从朝鲜劳动新闻URL中提取并Base64解码参数部分
示例输入: http://www.rodong.rep.kp/cn/index.php?MTJAMjAyNi0wMS0wNS0wMDJAMUAxQEAwQDNA==
输出: '12@2026-01-05-002@1@1@@0@37@' 或 None若无法解析
"""
if not url or 'index.php?' not in url:
return None
try:
# 方法1使用 urllib.parse 解析
parsed = urllib.parse.urlparse(url)
query = parsed.query
# 如果 query 为空尝试用正则兜底应对非常规URL
if not query:
match = re.search(r'index\.php\?([A-Za-z0-9+/=]+)', url)
if match:
query = match.group(1)
else:
return None
# Base64 解码
decoded_bytes = base64.b64decode(query)
decoded_str = decoded_bytes.decode('utf-8')
return decoded_str
except Exception as e:
# 记录错误但不中断整体流程
print(f" 解码失败 (URL: {url[:60]}...): {e}")
return None
def main():
try:
# 连接数据库
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(buffered=True)
# 查询所有需要处理的记录(只处理包含 index.php? 的 URL
print("正在查询待处理的新闻记录...")
cursor.execute("""
SELECT es_sid, es_urlname
FROM indeximos
WHERE es_sitename = '劳动新闻'
AND (es_tags IS NULL OR es_tags = '')
""")
records = cursor.fetchall()
if not records:
print("没有找到需要处理的记录。")
return
print(f"共找到 {len(records)} 条待处理记录。")
updated_count = 0
for i, (es_sid, es_urlname) in enumerate(records, 1):
print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ")
decoded = decode_rodong_url(es_urlname)
if decoded is not None:
# 更新 es_tags 字段
update_query = "UPDATE indeximos SET es_tags = %s WHERE es_sid = %s"
cursor.execute(update_query, (decoded, es_sid))
conn.commit()
updated_count += 1
print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}")
else:
print("跳过(无法解码)")
print(f"\n✅ 完成!共更新 {updated_count} 条记录。")
except mysql.connector.Error as db_err:
print(f"❌ 数据库错误: {db_err}")
except Exception as e:
print(f"❌ 脚本执行出错: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭。")
if __name__ == "__main__":
# 动态替换 SQL 中的表名(注意:表名不能用参数化,需手动拼接,但确保安全)
# 为安全起见,可加校验
if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', 'indeximos'):
raise ValueError("表名包含非法字符!")
# 临时替换函数中的表名(更优雅的方式是传参,此处为简洁)
import sys
module = sys.modules[__name__]
# 修改 main 函数中的 SQL通过字符串替换
# 实际建议:将表名作为全局变量或参数传递
# 更简单做法:在 main() 上方定义 TABLE_NAME然后在 SQL 中直接引用
# 我们重写 main 函数内部逻辑以支持变量表名
# 重新定义带表名参数的主逻辑
def main_with_table(table_name):
try:
conn = mysql.connector.connect(**DB_CONFIG)
cursor = conn.cursor(buffered=True)
# 查询
query_sql = f"""
SELECT es_sid, es_urlname
FROM `{table_name}`
WHERE es_urlname LIKE '%index.php?%'
AND (es_tags IS NULL OR es_tags = '')
"""
cursor.execute(query_sql)
records = cursor.fetchall()
if not records:
print("没有找到需要处理的记录。")
return
print(f"共找到 {len(records)} 条待处理记录。")
updated_count = 0
for i, (es_sid, es_urlname) in enumerate(records, 1):
print(f"[{i}/{len(records)}] 处理 ID={es_sid} ...", end=" ")
decoded = decode_rodong_url(es_urlname)
if decoded is not None:
update_sql = f"UPDATE `{table_name}` SET es_tags = %s WHERE es_sid = %s"
cursor.execute(update_sql, (decoded, es_sid))
conn.commit()
updated_count += 1
print(f"成功 → {decoded[:50]}{'...' if len(decoded) > 50 else ''}")
else:
print("跳过(无法解码)")
print(f"\n✅ 完成!共更新 {updated_count} 条记录。")
except mysql.connector.Error as db_err:
print(f"❌ 数据库错误: {db_err}")
except Exception as e:
print(f"❌ 脚本执行出错: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals() and conn.is_connected():
conn.close()
print("数据库连接已关闭。")
# 执行
main_with_table('indeximos')