[微信公众号] 采集优化

This commit is contained in:
DELL 2026-03-02 13:50:42 +08:00
parent 85158d00db
commit 8c3f2ffad0
4 changed files with 287 additions and 15 deletions

View File

@ -0,0 +1,282 @@
# -*- coding: utf-8 -*-
import json
import logging as logger
import random
import time
from math import ceil
import redis
import requests
import scrapy
from scrapy_selenium import SeleniumRequest
from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \
WECHAT_USER_TYPE
from MediaSpiders.utils.http_utils import http_post, UA
from MediaSpiders.utils.time_utils import get_current_timestamp
class WechatLinksFetcherSpider(scrapy.Spider):
name = 'WechatLinksFetcherSpider'
custom_settings = {
'PROTO_MODULE_PATH': 'MediaSpiders.proto.Es_pb2',
'PROTO_CLASS_NAME': 'EsSets',
'PROTO_FIELD_NAME': 'Es',
'PROTO_SAVE_FILE_NAME': 'public_info_data_',
'IMAGES_STORE': r'/usr/local/temp_image/twitter',
'IMAGES_RESULT_FIELD': 'es_urlimage',
'FILES_STORE': r'/usr/local/videos',
'FILES_RESULT_FIELD': 'es_video',
'ZIP_FILE_NAME': 'image_data_ship_',
'FILE_ZIP_FILE_NAME': 'image_data_plane_',
'ITEM_PIPELINES': {
'scrapy.pipelines.images.ImagesPipeline': 2,
'scrapy.pipelines.files.FilesPipeline': 1,
'MediaSpiders.pipelines.ProtobufSavePipeline': 300,
},
'SPIDER_MIDDLEWARES': {
'MediaSpiders.middlewares.DumpFilterSpiderMiddleware': 543,
'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': None
}
}
# 常量定义
PAGE_LOAD_TIMEOUT = 10
ELEMENT_WAIT_TIMEOUT = 5
MAX_NEWS_PER_HOT = 6
MAX_HOT_ITEMS = 10
# 需要过滤的文本模式
SKIP_PATTERNS = ['版权', '声明', '邮箱', '记者', '编辑', '来源', '投稿', '责任编辑']
def __init__(self, params=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.url_time = get_current_timestamp()
self.total_num = 0
self.authorization = None
self.job_id = None
self.redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD)
if params:
try:
json_params = json.loads(params)
self.total_num = int(json_params.get('totalNum', 0))
self.authorization = json_params.get('authorization')
self.job_id = json_params.get('job_id')
except (json.JSONDecodeError, ValueError) as e:
self.logger.error(f"解析参数失败: {e}")
def start_requests(self):
"""开始请求"""
yield SeleniumRequest(
url='https://mp.weixin.qq.com/',
callback=self.parse,
)
def parse(self, response):
driver = response.request.meta['driver']
cookie_list = self.redis_client.lrange("MediaSpiders:WeChatLinksFetcher_Cookies", 0, -1)
cookie_parts = [
item.decode('utf-8') if isinstance(item, bytes) else str(item)
for item in cookie_list
]
# 尝试使用 Redis 中的 cookie 登录
for item in cookie_parts:
try:
driver.delete_all_cookies()
driver.get('https://mp.weixin.qq.com/')
time.sleep(2)
cookie_string = item
cookie_dict = parse_cookie_string(cookie_string)
success_count = 0
for name, value in cookie_dict.items():
if add_cookie_smart(driver, name, value):
success_count += 1
else:
logger.warning(f"跳过 cookie: {name}")
logger.info(f"成功添加 {success_count}/{len(cookie_dict)} 个 cookie")
# 验证 cookie 是否有效
driver.refresh()
time.sleep(5)
except Exception as e:
logger.error(f"使用 cookie 登录时出错: {str(e)}")
continue
count_per_account = 200
total_count = 0
break_flag = False
token_index = driver.current_url.rfind('token=')
token = driver.current_url[token_index + 6:]
logger.info(f'获取 token 成功!当前 token 为 {token}')
raw_cookies = driver.get_cookies()
cookies = {}
for c in raw_cookies:
cookies[c['name']] = c['value']
logger.info(f'获取 cookie 成功!')
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'Referer': f'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/'
f'appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={token}&lang=zh_CN'
}
query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false")
post_body = {
'userType': WECHAT_USER_TYPE,
'userFlag': 0
}
account_rsp = json.loads(
http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text)
official_accounts = []
if account_rsp['code'] == 200:
official_accounts = account_rsp['content'][:10]
for account_line in official_accounts:
try:
if break_flag:
break
start_timestamp = int((time.time() - 500 * 24 * 3600) * 1000)
if 'updateTime' in account_line:
start_timestamp = account_line['updateTime']
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_timestamp / 1000))
account = account_line['userName']
search_account_api = f'https://mp.weixin.qq.com/cgi-bin/searchbiz?action=search_biz&begin=0&count=5&' \
f'query={account}&token={token}&lang=zh_CN&f=json&ajax=1'
logger.info(f"开始搜索公众号“{account}”...")
time.sleep(3 + random.random())
response = requests.get(search_account_api, cookies=cookies, headers=headers)
rsp_body = json.loads(response.text)
index_end = ceil(count_per_account / 5)
if 'list' in rsp_body:
matched_account = {}
matched_account_flag = False
for item in rsp_body['list']:
if item['nickname'] == account:
matched_account_flag = True
matched_account = item
break
if not matched_account_flag:
logger.info(f"未找到公众号“{account}")
continue
fake_id = matched_account['fakeid']
update_time_flag = True # 用于记录获取到的历史列表是否已经超出最早的时间限制
next_start_timestamp = int(time.time() * 1000)
for index in range(index_end):
if update_time_flag:
if next_start_timestamp - start_timestamp < 12 * 3600 * 1000:
logger.info(f"公众号“{account}”以及后续账号在12小时内已经扫码获取过文章链接本次获取结束")
break_flag = True
else:
fetch_article_api = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=' \
f'{index * 5}&count=5&fakeid={fake_id}&type=9&query=&token={token}' \
f'&lang=zh_CN&f=json&ajax=1'
logger.info(f"开始获取公众号“{account}”在 {start_time} 后发表的的文章列表...")
time.sleep(3 + random.random())
article_response = requests.get(fetch_article_api, cookies=cookies, headers=headers)
article_rsp_body = json.loads(article_response.text)
if 'app_msg_list' in article_rsp_body:
for article in article_rsp_body['app_msg_list']:
title = article['title']
link = article['link']
update_time = article['update_time'] * 1000
if update_time > start_timestamp:
total_count += 1
time_str = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time / 1000))
logger.info(f"[No. {total_count}] 获取到公众号“{account}”在 {time_str} "
f"发表的文章《{title}》,链接地址:{link}")
self.redis_client.sadd(
f"MediaSpiders:Wechat_links:{account_line['id']}",
link)
else:
update_time_flag = False
break
else:
logger.info(json.dumps(article_rsp_body, ensure_ascii=False))
if 'base_resp' in article_rsp_body:
err_msg = article_rsp_body['base_resp']['err_msg']
if err_msg == "freq control" or err_msg == "invalid session":
logger.info("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
if not break_flag:
# 本循环内只有12小时内扫过码以及接口频率限制退出会导致 break_flag 为 True这两种情况都不需要更新扫码状态
next_start_time = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(next_start_timestamp / 1000))
account_line['updateTime'] = next_start_timestamp
http_post(SOCIAL_USER_UPDATE_API,
data=json.dumps(account_line, ensure_ascii=False).encode('utf-8'),
headers={'User-Agent': UA, "Content-Type": "application/json"}
)
logger.info(f"公众号“{account}”文章获取结束,该账号下一次获取起始时间为 {next_start_time}")
else:
logger.info(json.dumps(rsp_body, ensure_ascii=False))
if 'base_resp' in rsp_body:
if rsp_body['base_resp']['err_msg'] == "freq control":
logger.info("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
except Exception as e:
logger.info(repr(e))
self.redis_client.close()
driver.quit()
def parse_cookie_string(cookie_str):
"""解析 cookie 字符串为 dict"""
cookie_dict = {}
for item in cookie_str.split(';'):
if '=' in item:
name, value = item.split('=', 1)
cookie_dict[name.strip()] = value.strip()
return cookie_dict
def add_cookie_smart(driver, name, value, target_domain='mp.weixin.qq.com'):
"""
智能添加 cookie先试目标域名失败则试父域再失败则跳过
"""
# 微信核心 cookie 必须用 mp.weixin.qq.com
wechat_critical = ['wxuin', 'slave_sid', 'slave_user', 'bizuin', 'data_ticket', 'token']
# 腾讯通用 cookie 可尝试父域
tencent_common = ['ptui_loginuin', 'RK', 'ptcz', 'ua_id']
# 策略 1: 核心 cookie → 精确域名
if name in wechat_critical:
domains_to_try = [target_domain]
# 策略 2: 腾讯通用 cookie → 先试目标域,再试父域
elif name in tencent_common:
domains_to_try = [target_domain, '.weixin.qq.com', '.qq.com']
# 策略 3: 其他 cookie → 默认 host-only不传 domain
else:
domains_to_try = [None, target_domain]
for domain in domains_to_try:
cookie = {
'name': name,
'value': value,
'path': '/',
'secure': True
}
if domain:
cookie['domain'] = domain
try:
driver.add_cookie(cookie)
# logger.debug(f"✓ {name} added with domain={domain or 'host-only'}")
return True
except Exception as e:
if 'invalid cookie domain' in str(e):
continue # 尝试下一个 domain
else:
# logger.warning(f"✗ {name} failed: {e}")
return False
return False # 所有 domain 都失败

View File

@ -43,11 +43,6 @@ class HotSearchSpider(scrapy.Spider):
super(HotSearchSpider, self).__init__(*args, **kwargs)
self.job_id = None
self.collected_items = []
self.redis_client = redis.Redis(
host=self.settings['REDIS_HOST'],
port=self.settings['REDIS_PORT'],
password=self.settings['REDIS_PWD']
)
if params:
try:
json_params = json.loads(params)
@ -96,6 +91,3 @@ class HotSearchSpider(scrapy.Spider):
except Exception as e:
self.logger.exception(f"解析异常: {str(e)}")

View File

@ -9,14 +9,10 @@ import redis
import requests
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \
WECHAT_USER_TYPE
from MediaSpiders.spiders.TwitterUserSpider import form_cookie_dict
from MediaSpiders.utils.http_utils import http_post, UA
from MediaSpiders.utils.login_utils import login
chrome_options = Options()
# 指定 chrome.exe 的完整路径
@ -290,7 +286,8 @@ if __name__ == "__main__":
time.localtime(update_time / 1000))
print(f"[No. {total_count}] 获取到公众号“{account}”在 {time_str} "
f"发表的文章《{title}》,链接地址:{link}")
redis_client.sadd(f"MediaSpiders:Wechat_links:{account_line['id']}", link)
redis_client.sadd(f"MediaSpiders:Wechat_links:{account_line['id']}",
link)
else:
update_time_flag = False
break
@ -305,7 +302,8 @@ if __name__ == "__main__":
if not break_flag:
# 本循环内只有12小时内扫过码以及接口频率限制退出会导致 break_flag 为 True这两种情况都不需要更新扫码状态
next_start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_start_timestamp / 1000))
next_start_time = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(next_start_timestamp / 1000))
account_line['updateTime'] = next_start_timestamp
http_post(SOCIAL_USER_UPDATE_API,
data=json.dumps(account_line, ensure_ascii=False).encode('utf-8'),

View File

@ -20,4 +20,4 @@ dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath)
# 等效于scrapy crawl FacebookUserSpider -a params="{}"
# execute(['scrapy', 'crawl', 'hot_search_spider', '-a', 'params={}'])
execute(['scrapy', 'crawl', 'HotSearchRedisSpider', '-a', 'params={}'])
execute(['scrapy', 'crawl', 'WechatLinksFetcherSpider', '-a', 'params={}'])