osc/spiders/MediaSpiders/MediaSpiders/utils/wechat_links_fetcher.py
2026-04-14 09:50:18 +08:00

324 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import random
import time
from math import ceil
import logging as logger
from selenium.webdriver.common.by import By
import redis
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \
WECHAT_USER_TYPE
from MediaSpiders.utils.http_utils import http_post, UA
chrome_options = Options()
# 指定 chrome.exe 的完整路径
chrome_options.binary_location = "D:/chrome-win64/chrome.exe"
# chrome_options.use_chromium = True
driver = webdriver.Chrome(
executable_path=r"D:\chromedriver.exe",
options=chrome_options
)
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
})
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD)
def parse_cookie_string(cookie_str):
"""解析 cookie 字符串为 dict"""
cookie_dict = {}
for item in cookie_str.split(';'):
if '=' in item:
name, value = item.split('=', 1)
cookie_dict[name.strip()] = value.strip()
return cookie_dict
def add_cookie_smart(driver, name, value, target_domain='mp.weixin.qq.com'):
"""
智能添加 cookie先试目标域名失败则试父域再失败则跳过
"""
# 微信核心 cookie 必须用 mp.weixin.qq.com
wechat_critical = ['wxuin', 'slave_sid', 'slave_user', 'bizuin', 'data_ticket', 'token']
# 腾讯通用 cookie 可尝试父域
tencent_common = ['ptui_loginuin', 'RK', 'ptcz', 'ua_id']
# 策略 1: 核心 cookie → 精确域名
if name in wechat_critical:
domains_to_try = [target_domain]
# 策略 2: 腾讯通用 cookie → 先试目标域,再试父域
elif name in tencent_common:
domains_to_try = [target_domain, '.weixin.qq.com', '.qq.com']
# 策略 3: 其他 cookie → 默认 host-only不传 domain
else:
domains_to_try = [None, target_domain]
for domain in domains_to_try:
cookie = {
'name': name,
'value': value,
'path': '/',
'secure': True
}
if domain:
cookie['domain'] = domain
try:
driver.add_cookie(cookie)
# logger.debug(f"✓ {name} added with domain={domain or 'host-only'}")
return True
except Exception as e:
if 'invalid cookie domain' in str(e):
continue # 尝试下一个 domain
else:
# logger.warning(f"✗ {name} failed: {e}")
return False
return False # 所有 domain 都失败
if __name__ == "__main__":
cookie_list = redis_client.lrange("MediaSpiders:WeChatLinksFetcher_Cookies", 0, -1)
cookie_parts = [
item.decode('utf-8') if isinstance(item, bytes) else str(item)
for item in cookie_list
]
# 标记是否需要手动登录
need_manual_login = True
current_cookie = None
if not cookie_parts:
logger.warning("Redis 中没有可用的 cookie需要手动登录")
need_manual_login = True
else:
# 尝试使用 Redis 中的 cookie 登录
for item in cookie_parts:
current_cookie = item
try:
driver.delete_all_cookies()
driver.get('https://mp.weixin.qq.com/')
time.sleep(2)
cookie_string = item
cookie_dict = parse_cookie_string(cookie_string)
success_count = 0
for name, value in cookie_dict.items():
if add_cookie_smart(driver, name, value):
success_count += 1
else:
logger.warning(f"跳过 cookie: {name}")
logger.info(f"成功添加 {success_count}/{len(cookie_dict)} 个 cookie")
# 验证 cookie 是否有效
driver.refresh()
time.sleep(5)
# 检查是否登录成功 - 通过检查 URL 中是否包含 token 或页面元素
current_url = driver.current_url
if 'token=' in current_url:
logger.info("使用 Redis 中的 cookie 登录成功")
need_manual_login = False
else:
# 二次验证:检查页面上是否有登录状态相关的元素
try:
# 检查是否有用户头像或用户名元素
driver.find_element(By.CSS_SELECTOR,
".weui-desktop-account__nickname, .userinfo_nickname, .account_nickname")
logger.info("通过页面元素验证,登录成功")
need_manual_login = False
except:
logger.warning("Cookie 登录失败,尝试下一个 cookie 或手动登录")
except Exception as e:
logger.error(f"使用 cookie 登录时出错: {str(e)}")
continue
# 如果自动登录失败,进行手动登录
if need_manual_login:
logger.info("所有 cookie 均无效,启动手动登录流程")
try:
driver.delete_all_cookies()
driver.get('https://mp.weixin.qq.com/')
time.sleep(2)
# 等待用户手动登录
logger.info("请在浏览器中手动完成登录(扫描二维码)")
logger.info("登录成功后,程序将自动继续执行")
# 设置最长等待时间(例如 120 秒)
max_wait_time = 120
start_time = time.time()
logged_in = False
while time.time() - start_time < max_wait_time:
current_url = driver.current_url
if 'token=' in current_url:
logged_in = True
logger.info("手动登录成功!")
break
# 检查页面元素
try:
driver.find_element(By.CSS_SELECTOR,
".weui-desktop-account__nickname, .userinfo_nickname, .account_nickname")
logged_in = True
logger.info("通过页面元素确认手动登录成功!")
break
except:
time.sleep(2)
if not logged_in:
logger.error(f"等待 {max_wait_time} 秒后仍未登录成功,程序终止")
raise Exception("手动登录超时")
# 获取新的 cookie
raw_cookies = driver.get_cookies()
new_cookie_dict = {}
for c in raw_cookies:
new_cookie_dict[c['name']] = c['value']
# 将字典转换为字符串格式
new_cookie_string = "; ".join([f"{k}={v}" for k, v in new_cookie_dict.items()])
# 更新 Redis 中的 cookie
logger.info("更新 Redis 中的 cookie")
# 删除旧的 cookie
redis_client.delete("MediaSpiders:WeChatLinksFetcher_Cookies")
# 添加新的 cookie
redis_client.lpush("MediaSpiders:WeChatLinksFetcher_Cookies", new_cookie_string)
current_cookie = new_cookie_string
logger.info("Redis cookie 更新成功")
except Exception as e:
logger.error(f"手动登录过程出错: {str(e)}")
raise
count_per_account = 200
total_count = 0
break_flag = False
token_index = driver.current_url.rfind('token=')
token = driver.current_url[token_index + 6:]
print(f'获取 token 成功!当前 token 为 {token}')
raw_cookies = driver.get_cookies()
cookies = {}
for c in raw_cookies:
cookies[c['name']] = c['value']
print(f'获取 cookie 成功!')
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'Referer': f'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/'
f'appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={token}&lang=zh_CN'
}
query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false")
post_body = {
'userType': WECHAT_USER_TYPE,
'userFlag': 0
}
account_rsp = json.loads(
http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text)
official_accounts = []
if account_rsp['code'] == 200:
official_accounts = account_rsp['content']
for account_line in official_accounts:
try:
if break_flag:
break
start_timestamp = int((time.time() - 500 * 24 * 3600) * 1000)
if 'updateTime' in account_line:
start_timestamp = account_line['updateTime']
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_timestamp / 1000))
account = account_line['userName']
search_account_api = f'https://mp.weixin.qq.com/cgi-bin/searchbiz?action=search_biz&begin=0&count=5&' \
f'query={account}&token={token}&lang=zh_CN&f=json&ajax=1'
print(f"开始搜索公众号“{account}”...")
time.sleep(3 + random.random())
response = requests.get(search_account_api, cookies=cookies, headers=headers)
rsp_body = json.loads(response.text)
index_end = ceil(count_per_account / 5)
if 'list' in rsp_body:
matched_account = {}
matched_account_flag = False
for item in rsp_body['list']:
if item['nickname'] == account:
matched_account_flag = True
matched_account = item
break
if not matched_account_flag:
print(f"未找到公众号“{account}")
continue
fake_id = matched_account['fakeid']
update_time_flag = True # 用于记录获取到的历史列表是否已经超出最早的时间限制
next_start_timestamp = int(time.time() * 1000)
for index in range(index_end):
if update_time_flag:
if next_start_timestamp - start_timestamp < 12 * 3600 * 1000:
print(f"公众号“{account}”以及后续账号在12小时内已经扫码获取过文章链接本次获取结束")
break_flag = True
else:
fetch_article_api = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=' \
f'{index * 5}&count=5&fakeid={fake_id}&type=9&query=&token={token}' \
f'&lang=zh_CN&f=json&ajax=1'
print(f"开始获取公众号“{account}”在 {start_time} 后发表的的文章列表...")
time.sleep(3 + random.random())
article_response = requests.get(fetch_article_api, cookies=cookies, headers=headers)
article_rsp_body = json.loads(article_response.text)
if 'app_msg_list' in article_rsp_body:
for article in article_rsp_body['app_msg_list']:
title = article['title']
link = article['link']
update_time = article['update_time'] * 1000
if update_time > start_timestamp:
total_count += 1
time_str = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time / 1000))
print(f"[No. {total_count}] 获取到公众号“{account}”在 {time_str} "
f"发表的文章《{title}》,链接地址:{link}")
redis_client.sadd(f"MediaSpiders:Wechat_links:{account_line['id']}",
link)
else:
update_time_flag = False
break
else:
print(json.dumps(article_rsp_body, ensure_ascii=False))
if 'base_resp' in article_rsp_body:
err_msg = article_rsp_body['base_resp']['err_msg']
if err_msg == "freq control" or err_msg == "invalid session":
print("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
if not break_flag:
# 本循环内只有12小时内扫过码以及接口频率限制退出会导致 break_flag 为 True这两种情况都不需要更新扫码状态
next_start_time = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(next_start_timestamp / 1000))
account_line['updateTime'] = next_start_timestamp
http_post(SOCIAL_USER_UPDATE_API,
data=json.dumps(account_line, ensure_ascii=False).encode('utf-8'),
headers={'User-Agent': UA, "Content-Type": "application/json"}
)
print(f"公众号“{account}”文章获取结束,该账号下一次获取起始时间为 {next_start_time}")
else:
print(json.dumps(rsp_body, ensure_ascii=False))
if 'base_resp' in rsp_body:
if rsp_body['base_resp']['err_msg'] == "freq control":
print("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
except Exception as e:
print(repr(e))
redis_client.close()
driver.quit()