Compare commits

...

2 Commits

Author SHA1 Message Date
DELL
b5384f8bcd [baidu] 百度热搜信息采集优化 2026-02-26 16:17:48 +08:00
DELL
2e9654966b [baidu] 新增百度热搜信息采集 2026-02-26 14:48:48 +08:00
4 changed files with 671 additions and 126 deletions

View File

@ -0,0 +1,373 @@
# -*- coding: utf-8 -*-
import json
import logging
import hashlib
import datetime
import re
import time
from urllib.parse import urlparse
import random
from selenium.webdriver.common.by import By
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
StaleElementReferenceException,
WebDriverException
)
import scrapy
from scrapy_selenium import SeleniumRequest
from MediaSpiders.items import MediaspidersItem
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from scrapy.exceptions import CloseSpider
from MediaSpiders.utils.string_utils import get_str_md5
from MediaSpiders.utils.time_utils import get_current_timestamp
class BaiduHotSearchSprder(scrapy.Spider):
name = 'BaiduHotSearchSprder'
comment_urls = []
custom_settings = {
'PROTO_MODULE_PATH': 'MediaSpiders.proto.Es_pb2',
'PROTO_CLASS_NAME': 'EsSets',
'PROTO_FIELD_NAME': 'Es',
'PROTO_SAVE_FILE_NAME': 'public_info_data_',
'IMAGES_STORE': r'/usr/local/temp_image/twitter',
'IMAGES_RESULT_FIELD': 'es_urlimage',
'FILES_STORE': r'/usr/local/videos',
'FILES_RESULT_FIELD': 'es_video',
'ZIP_FILE_NAME': 'image_data_publicinfo_',
'FILE_ZIP_FILE_NAME': 'image_data_plane_',
'ITEM_PIPELINES': {
'scrapy.pipelines.images.ImagesPipeline': 2,
'MediaSpiders.pipelines.ProtobufSavePipeline': 300,
# 'MediaSpiders.pipelines.TwitterUserDataSaveToMySQL': 300,
},
'SPIDER_MIDDLEWARES': {
'MediaSpiders.middlewares.DumpFilterSpiderMiddleware': 543,
'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': None
},
'BATCH_SAVE_SIZE': 50
}
start_urls = 'https://top.baidu.com/board?tab=realtime'
def __init__(self, params=None, *args, **kwargs):
super(BaiduHotSearchSprder, self).__init__(*args, **kwargs)
self.job_id = None
self.collected_items = 0
self.max_items = 50 # 设定最大爬取数量,防止无限循环
self.retry_count = 0
self.max_retries = 3
if params:
try:
json_params = json.loads(params)
if 'job_id' in json_params:
self.job_id = json_params['job_id']
if 'max_items' in json_params:
self.max_items = int(json_params['max_items'])
except Exception as e:
self.logger.error(f"解析参数失败: {str(e)}")
def start_requests(self):
"""发起初始请求"""
self.logger.info(f"开始爬取百度热搜任务ID: {self.job_id if self.job_id else 'N/A'}")
yield SeleniumRequest(
url=self.start_urls,
callback=self.parse,
meta={'retry_count': 0},
wait_time=5
)
def parse(self, response):
"""解析热搜榜单数据"""
self.logger.info("开始解析百度热搜数据...")
driver = response.request.meta['driver']
try:
# 设置页面大小避免元素不可见
driver.set_window_size(1400, 1000)
# 访问主域确保Cookie正确设置
driver.get(self.start_urls)
# 等待主要内容加载
try:
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".category-wrap_iQLoo, .board-item"))
)
self.logger.info("页面主要内容加载完成")
except TimeoutException:
self.logger.warning("等待主要内容超时,尝试直接处理可用元素")
# 滚动页面以确保所有元素加载
self._scroll_page(driver)
# 尝试多种选择器策略,提高兼容性
hot_search_items = self._get_hot_search_items(driver)
if not hot_search_items:
self.logger.error("未找到任何热搜项,检查页面结构是否发生变化")
self.retry_count += 1
if self.retry_count <= self.max_retries:
self.logger.info(f"重试第 {self.retry_count}/{self.max_retries}")
driver.refresh()
return SeleniumRequest(
url=self.start_urls,
callback=self.parse,
meta={'retry_count': self.retry_count},
dont_filter=True,
wait_time=5
)
else:
self.logger.error("达到最大重试次数,终止爬虫")
raise CloseSpider("页面结构可能已更改,无法提取数据")
self.logger.info(f"找到 {len(hot_search_items)} 个热搜项")
hot_search_items_list = []
# 处理每个热搜项
for index, item in enumerate(hot_search_items):
try:
hot_search_item = self._extract_hot_search_data(item, driver)
if hot_search_item:
self.collected_items += 1
# hot_search_item['es_simrank'] = self.collected_items
self.logger.info(f"成功提取第 {self.collected_items} 条数据: {hot_search_item['es_urltitle']}")
# hot_search_items_list.append(hot_search_items)
yield hot_search_item
except StaleElementReferenceException:
self.logger.warning(f"{index + 1} 项元素已失效,跳过处理")
continue
except Exception as e:
self.logger.error(f"处理第 {index + 1} 项时出错: {str(e)}", exc_info=True)
continue
self.logger.info(f"本次爬取共收集 {self.collected_items} 条有效数据")
except WebDriverException as e:
self.logger.error(f"WebDriver异常: {str(e)}", exc_info=True)
if 'retry_count' not in response.meta or response.meta['retry_count'] < self.max_retries:
retry_count = response.meta.get('retry_count', 0) + 1
self.logger.info(f"尝试重新请求,重试次数: {retry_count}")
yield SeleniumRequest(
url=self.start_urls,
callback=self.parse,
meta={'retry_count': retry_count},
dont_filter=True,
wait_time=5 + retry_count * 2 # 指数退避
)
except Exception as e:
self.logger.error(f"处理页面时发生未预期错误: {str(e)}", exc_info=True)
finally:
# 可以在此处添加清理代码
pass
def _scroll_page(self, driver):
"""滚动页面确保所有元素加载"""
try:
# 缓慢滚动到底部
scroll_pause_time = 1
screen_height = driver.execute_script("return window.screen.height;")
scrolls = 5
for i in range(scrolls):
driver.execute_script(f"window.scrollTo(0, {screen_height * i});")
time.sleep(scroll_pause_time * (1 + random.random()))
# 滚回到顶部
driver.execute_script("window.scrollTo(0, 0);")
time.sleep(scroll_pause_time)
self.logger.info("页面滚动完成,确保元素加载")
except Exception as e:
self.logger.warning(f"页面滚动时发生异常: {str(e)}")
def _get_hot_search_items(self, driver):
"""尝试多种策略获取热搜项"""
selectors = [
'.category-wrap_iQLoo.horizontal_1eKyQ', # 主要选择器
'.board-item', # 备用选择器
'div[data-index]' # 基于属性的选择器
]
for selector in selectors:
try:
items = driver.find_elements(By.CSS_SELECTOR, selector)
if items and len(items) > 0:
self.logger.info(f"使用选择器 '{selector}' 成功找到 {len(items)} 个元素")
return items
except Exception as e:
self.logger.debug(f"选择器 '{selector}' 失败: {str(e)}")
# 如果CSS选择器都失败尝试XPath
try:
xpath_patterns = [
'//div[contains(@class, "category-wrap") and contains(@class, "horizontal")]',
'//div[contains(@class, "board-item")]',
'//div[@data-index]'
]
for xpath in xpath_patterns:
items = driver.find_elements(By.XPATH, xpath)
if items and len(items) > 0:
self.logger.info(f"使用XPath '{xpath}' 成功找到 {len(items)} 个元素")
return items
except Exception as e:
self.logger.debug(f"XPath策略失败: {str(e)}")
return []
def _extract_hot_search_data(self, item, driver):
"""提取单个热搜项数据(变量集中收集 + 末尾统一赋值)"""
# 常量定义
TITLE_SELECTORS = ['.c-single-text-ellipsis', '.title_dIF3B']
RANK_SELECTORS = ['.index_1Ew5p', '.hot-index_1Bl1a']
LINK_SELECTORS = [
'.look-more_3oNWC',
'a[href*="www.baidu.com/s?"]',
'.hot-desc_1m_jR a',
'.content_3Kk0y a'
]
DESC_SELECTORS = [
'.hot-desc_1m_jR.large_nSuFU',
'.hot-desc_1m_jR.small_Uvkd3',
'.desc_3CT34',
'.content_3Kk0y'
]
HOT_NUM_SELECTOR = '.hot-index_1Bl1a'
# 辅助函数
def find_visible_element(selectors, context=item):
for selector in selectors:
try:
elem = context.find_element(By.CSS_SELECTOR, selector)
if elem.is_displayed():
return elem
except Exception as e:
self.logger.debug(f"选择器 '{selector}' 未匹配: {str(e)}")
return None
def clean_text(text, remove_phrases=("查看更多>", "查看更多", "查看全文", "展开全文")):
"""深度清理文本:移除干扰短语 + 合并连续空格"""
if not text:
return ""
# 移除指定短语
for phrase in remove_phrases:
text = text.replace(phrase, "")
# 清理多余空白(包括\xa0等特殊空格
text = re.sub(r'\s+', ' ', text).strip()
return text.strip()
def normalize_url(url):
if not url or not isinstance(url, str):
return ""
url = url.strip()
if url.startswith('//'):
url = 'https:' + url
if url.startswith('data:') or not url.startswith(('http://', 'https://')):
return ""
return url
# 1. 提取标题
title_elem = find_visible_element(TITLE_SELECTORS)
if not title_elem:
self.logger.warning("标题元素未找到,跳过该项")
return None
title = clean_text(title_elem.text)
if not title:
self.logger.warning("标题内容为空,跳过该项")
return None
# 2. 初始化所有字段变量
now_ms = int(time.time() * 1000)
# 基础字段
site_name = '百度热搜'
carrier_type = 'hot_search'
hkey = get_str_md5(title)
# 排名(默认使用当前收集序号)
rank = str(self.collected_items)
rank_elem = find_visible_element(RANK_SELECTORS)
if rank_elem:
extracted_rank = clean_text(rank_elem.text)
if extracted_rank:
rank = extracted_rank
# 链接与SID
url_href = ""
link_elem = find_visible_element(LINK_SELECTORS)
if link_elem:
raw_href = link_elem.get_attribute('href')
url_href = normalize_url(raw_href) if raw_href else ""
if url_href:
sid = hashlib.md5(url_href.lower().encode('utf-8')).hexdigest()
else:
fallback_seed = f"no_link_{title}_{now_ms}"
sid = hashlib.md5(fallback_seed.encode('utf-8')).hexdigest()
# 热度
heat = 0
try:
hot_elem = item.find_element(By.CSS_SELECTOR, HOT_NUM_SELECTOR)
hot_val = clean_text(hot_elem.text).replace(',', '')
if hot_val.isdigit():
heat = int(hot_val)
except Exception as e:
self.logger.debug(f"热度提取失败: {str(e)}")
# 描述
desc = ""
desc_elem = find_visible_element(DESC_SELECTORS)
if desc_elem:
desc = clean_text(desc_elem.text)
# 内容
detail_url = ""
try:
# 1. 先定位到描述容器
desc_container = item.find_element(By.CSS_SELECTOR, '.hot-desc_1m_jR')
# 2. 在容器内精准定位"查看更多"链接
detail_link = desc_container.find_element(By.CSS_SELECTOR, 'a.look-more_3oNWC[href*="www.baidu.com/s?"]')
# 3. 获取并标准化URL
raw_href = detail_link.get_attribute('href') or ""
detail_url = normalize_url(raw_href) # 使用之前定义的URL标准化函数
self.logger.debug(f"成功提取详情页URL: {detail_url}")
except Exception as e:
self.logger.debug(f"提取详情页URL失败: {str(e)}")
# 失败时保持空字符串,后续会使用主链接作为备选
# 图片
# img_element = item.find_element(By.CSS_SELECTOR, 'img[src*="cdn.bcebos.com/hotboard_image"]')
# img_url = img_element.get_attribute('src') or ""
# ==================== 3. 统一创建并赋值Item唯一赋值点 ====================
hot_search_item = MediaspidersItem()
hot_search_item['es_sitename'] = site_name
hot_search_item['es_urltitle'] = title
hot_search_item['es_urlcontent'] = desc
hot_search_item['es_carriertype'] = carrier_type
hot_search_item['es_urltime'] = get_current_timestamp()
hot_search_item['es_lasttime'] = get_current_timestamp()
hot_search_item['es_loadtime'] = get_current_timestamp()
hot_search_item['es_hkey'] = hkey
hot_search_item['es_simrank'] = rank
hot_search_item['es_heat'] = heat
hot_search_item['es_sid'] = sid
hot_search_item['es_urlname'] = detail_url
# 条件字段:仅当存在有效图片时赋值
# if img_url:
# hot_search_item['image_urls'] = [img_url] # ImagesPipeline要求列表格式
return hot_search_item

View File

@ -9,12 +9,10 @@ import scrapy
from redisbloom.client import Client from redisbloom.client import Client
from scrapy_selenium import SeleniumRequest from scrapy_selenium import SeleniumRequest
from selenium.webdriver.common.action_chains import ActionChains from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.common.by import By
from MediaSpiders.items import MediaspidersItem from MediaSpiders.items import MediaspidersItem
from MediaSpiders.spiders.TwitterUserInfoSpider import form_cookie_dict from MediaSpiders.spiders.TwitterUserInfoSpider import form_cookie_dict
from MediaSpiders.utils.http_utils import http_post from MediaSpiders.utils.http_utils import http_post
from MediaSpiders.utils.login_utils import login
from MediaSpiders.utils.string_utils import get_str_md5 from MediaSpiders.utils.string_utils import get_str_md5
from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp
@ -275,6 +273,7 @@ class FacebookSpider(scrapy.Spider):
logger.info("用户 {} 的发文数量为 {}".format(uid, len(article_items))) logger.info("用户 {} 的发文数量为 {}".format(uid, len(article_items)))
return article_items return article_items
def comment_parse(self, response): def comment_parse(self, response):
browser = response.request.meta['driver'] browser = response.request.meta['driver']
article_id = response.request.meta['article_id'] article_id = response.request.meta['article_id']

View File

@ -2,7 +2,9 @@ import json
import random import random
import time import time
from math import ceil from math import ceil
import logging as logger
from selenium.webdriver.common.by import By
import redis import redis
import requests import requests
from selenium import webdriver from selenium import webdriver
@ -12,6 +14,7 @@ from selenium.webdriver.chrome.service import Service
from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \ from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \
WECHAT_USER_TYPE WECHAT_USER_TYPE
from MediaSpiders.spiders.TwitterUserSpider import form_cookie_dict
from MediaSpiders.utils.http_utils import http_post, UA from MediaSpiders.utils.http_utils import http_post, UA
from MediaSpiders.utils.login_utils import login from MediaSpiders.utils.login_utils import login
@ -20,133 +23,303 @@ chrome_options = Options()
chrome_options.binary_location = r"C:\Users\DELL\Downloads\chrome-win64\chrome.exe" chrome_options.binary_location = r"C:\Users\DELL\Downloads\chrome-win64\chrome.exe"
# chrome_options.use_chromium = True # chrome_options.use_chromium = True
driver = webdriver.Chrome( driver = webdriver.Chrome(
executable_path=r"C:\Users\DELL\Downloads\chromedriver-win64\chromedriver.exe", executable_path=r"D:\chromedriver.exe",
options=chrome_options options=chrome_options
) )
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """ "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
}) })
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD) redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD)
if __name__ == "__main__":
count_per_account = 200 def parse_cookie_string(cookie_str):
total_count = 0 """解析 cookie 字符串为 dict"""
driver = login().login_with_selenium( cookie_dict = {}
'https://mp.weixin.qq.com/', for item in cookie_str.split(';'):
'wechat_links_fetcher', if '=' in item:
drivers=driver name, value = item.split('=', 1)
) cookie_dict[name.strip()] = value.strip()
break_flag = False return cookie_dict
token_index = driver.current_url.rfind('token=')
token = driver.current_url[token_index + 6:]
print(f'获取 token 成功!当前 token 为 {token}') def add_cookie_smart(driver, name, value, target_domain='mp.weixin.qq.com'):
raw_cookies = driver.get_cookies() """
cookies = {} 智能添加 cookie先试目标域名失败则试父域再失败则跳过
for c in raw_cookies: """
cookies[c['name']] = c['value'] # 微信核心 cookie 必须用 mp.weixin.qq.com
print(f'获取 cookie 成功!') wechat_critical = ['wxuin', 'slave_sid', 'slave_user', 'bizuin', 'data_ticket', 'token']
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' # 腾讯通用 cookie 可尝试父域
'Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0', tencent_common = ['ptui_loginuin', 'RK', 'ptcz', 'ua_id']
'Referer': f'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/'
f'appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={token}&lang=zh_CN' # 策略 1: 核心 cookie → 精确域名
} if name in wechat_critical:
query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false") domains_to_try = [target_domain]
post_body = { # 策略 2: 腾讯通用 cookie → 先试目标域,再试父域
'userType': WECHAT_USER_TYPE, elif name in tencent_common:
'userFlag': 0 domains_to_try = [target_domain, '.weixin.qq.com', '.qq.com']
} # 策略 3: 其他 cookie → 默认 host-only不传 domain
account_rsp = json.loads( else:
http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text) domains_to_try = [None, target_domain]
official_accounts = []
if account_rsp['code'] == 200: for domain in domains_to_try:
official_accounts = account_rsp['content'] cookie = {
for account_line in official_accounts: 'name': name,
'value': value,
'path': '/',
'secure': True
}
if domain:
cookie['domain'] = domain
try: try:
if break_flag: driver.add_cookie(cookie)
break # logger.debug(f"✓ {name} added with domain={domain or 'host-only'}")
start_timestamp = int((time.time() - 500 * 24 * 3600) * 1000) return True
if 'updateTime' in account_line:
start_timestamp = account_line['updateTime']
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_timestamp / 1000))
account = account_line['userName']
search_account_api = f'https://mp.weixin.qq.com/cgi-bin/searchbiz?action=search_biz&begin=0&count=5&' \
f'query={account}&token={token}&lang=zh_CN&f=json&ajax=1'
print(f"开始搜索公众号“{account}”...")
time.sleep(3 + random.random())
response = requests.get(search_account_api, cookies=cookies, headers=headers)
rsp_body = json.loads(response.text)
index_end = ceil(count_per_account / 5)
if 'list' in rsp_body:
matched_account = {}
matched_account_flag = False
for item in rsp_body['list']:
if item['nickname'] == account:
matched_account_flag = True
matched_account = item
break
if not matched_account_flag:
print(f"未找到公众号“{account}")
continue
fake_id = matched_account['fakeid']
update_time_flag = True # 用于记录获取到的历史列表是否已经超出最早的时间限制
next_start_timestamp = int(time.time() * 1000)
for index in range(index_end):
if update_time_flag:
if next_start_timestamp - start_timestamp < 12 * 3600 * 1000:
print(f"公众号“{account}”以及后续账号在12小时内已经扫码获取过文章链接本次获取结束")
break_flag = True
else:
fetch_article_api = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=' \
f'{index * 5}&count=5&fakeid={fake_id}&type=9&query=&token={token}' \
f'&lang=zh_CN&f=json&ajax=1'
print(f"开始获取公众号“{account}”在 {start_time} 后发表的的文章列表...")
time.sleep(3 + random.random())
article_response = requests.get(fetch_article_api, cookies=cookies, headers=headers)
article_rsp_body = json.loads(article_response.text)
if 'app_msg_list' in article_rsp_body:
for article in article_rsp_body['app_msg_list']:
title = article['title']
link = article['link']
update_time = article['update_time'] * 1000
if update_time > start_timestamp:
total_count += 1
time_str = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time / 1000))
print(f"[No. {total_count}] 获取到公众号“{account}”在 {time_str} "
f"发表的文章《{title}》,链接地址:{link}")
redis_client.sadd(f"MediaSpiders:Wechat_links:{account_line['id']}", link)
else:
update_time_flag = False
break
else:
print(json.dumps(article_rsp_body, ensure_ascii=False))
if 'base_resp' in article_rsp_body:
if article_rsp_body['base_resp']['err_msg'] == "freq control":
print("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
if not break_flag:
# 本循环内只有12小时内扫过码以及接口频率限制退出会导致 break_flag 为 True这两种情况都不需要更新扫码状态
next_start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_start_timestamp / 1000))
account_line['updateTime'] = next_start_timestamp
http_post(SOCIAL_USER_UPDATE_API,
data=json.dumps(account_line, ensure_ascii=False).encode('utf-8'),
headers={'User-Agent': UA, "Content-Type": "application/json"}
)
print(f"公众号“{account}”文章获取结束,该账号下一次获取起始时间为 {next_start_time}")
else:
print(json.dumps(rsp_body, ensure_ascii=False))
if 'base_resp' in rsp_body:
if rsp_body['base_resp']['err_msg'] == "freq control":
print("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
except Exception as e: except Exception as e:
print(repr(e)) if 'invalid cookie domain' in str(e):
redis_client.close() continue # 尝试下一个 domain
driver.quit() else:
# logger.warning(f"✗ {name} failed: {e}")
return False
return False # 所有 domain 都失败
if __name__ == "__main__":
cookie_list = redis_client.lrange("MediaSpiders:WeChatLinksFetcher_Cookies", 0, -1)
cookie_parts = [
item.decode('utf-8') if isinstance(item, bytes) else str(item)
for item in cookie_list
]
# 标记是否需要手动登录
need_manual_login = True
current_cookie = None
if not cookie_parts:
logger.warning("Redis 中没有可用的 cookie需要手动登录")
need_manual_login = True
else:
# 尝试使用 Redis 中的 cookie 登录
for item in cookie_parts:
current_cookie = item
try:
driver.delete_all_cookies()
driver.get('https://mp.weixin.qq.com/')
time.sleep(2)
cookie_string = item
cookie_dict = parse_cookie_string(cookie_string)
success_count = 0
for name, value in cookie_dict.items():
if add_cookie_smart(driver, name, value):
success_count += 1
else:
logger.warning(f"跳过 cookie: {name}")
logger.info(f"成功添加 {success_count}/{len(cookie_dict)} 个 cookie")
# 验证 cookie 是否有效
driver.refresh()
time.sleep(5)
# 检查是否登录成功 - 通过检查 URL 中是否包含 token 或页面元素
current_url = driver.current_url
if 'token=' in current_url:
logger.info("使用 Redis 中的 cookie 登录成功")
need_manual_login = False
break
else:
# 二次验证:检查页面上是否有登录状态相关的元素
try:
# 检查是否有用户头像或用户名元素
driver.find_element(By.CSS_SELECTOR,
".weui-desktop-account__nickname, .userinfo_nickname, .account_nickname")
logger.info("通过页面元素验证,登录成功")
need_manual_login = False
break
except:
logger.warning("Cookie 登录失败,尝试下一个 cookie 或手动登录")
except Exception as e:
logger.error(f"使用 cookie 登录时出错: {str(e)}")
continue
# 如果自动登录失败,进行手动登录
if need_manual_login:
logger.info("所有 cookie 均无效,启动手动登录流程")
try:
driver.delete_all_cookies()
driver.get('https://mp.weixin.qq.com/')
time.sleep(2)
# 等待用户手动登录
logger.info("请在浏览器中手动完成登录(扫描二维码)")
logger.info("登录成功后,程序将自动继续执行")
# 设置最长等待时间(例如 120 秒)
max_wait_time = 120
start_time = time.time()
logged_in = False
while time.time() - start_time < max_wait_time:
current_url = driver.current_url
if 'token=' in current_url:
logged_in = True
logger.info("手动登录成功!")
break
# 检查页面元素
try:
driver.find_element(By.CSS_SELECTOR,
".weui-desktop-account__nickname, .userinfo_nickname, .account_nickname")
logged_in = True
logger.info("通过页面元素确认手动登录成功!")
break
except:
time.sleep(2)
if not logged_in:
logger.error(f"等待 {max_wait_time} 秒后仍未登录成功,程序终止")
raise Exception("手动登录超时")
# 获取新的 cookie
raw_cookies = driver.get_cookies()
new_cookie_dict = {}
for c in raw_cookies:
new_cookie_dict[c['name']] = c['value']
# 将字典转换为字符串格式
new_cookie_string = "; ".join([f"{k}={v}" for k, v in new_cookie_dict.items()])
# 更新 Redis 中的 cookie
logger.info("更新 Redis 中的 cookie")
# 删除旧的 cookie
redis_client.delete("MediaSpiders:WeChatLinksFetcher_Cookies")
# 添加新的 cookie
redis_client.lpush("MediaSpiders:WeChatLinksFetcher_Cookies", new_cookie_string)
current_cookie = new_cookie_string
logger.info("Redis cookie 更新成功")
except Exception as e:
logger.error(f"手动登录过程出错: {str(e)}")
raise
count_per_account = 200
total_count = 0
break_flag = False
token_index = driver.current_url.rfind('token=')
token = driver.current_url[token_index + 6:]
print(f'获取 token 成功!当前 token 为 {token}')
raw_cookies = driver.get_cookies()
cookies = {}
for c in raw_cookies:
cookies[c['name']] = c['value']
print(f'获取 cookie 成功!')
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/136.0.0.0 Safari/537.36 Edg/136.0.0.0',
'Referer': f'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/'
f'appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={token}&lang=zh_CN'
}
query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false")
post_body = {
'userType': WECHAT_USER_TYPE,
'userFlag': 0
}
account_rsp = json.loads(
http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text)
official_accounts = []
if account_rsp['code'] == 200:
official_accounts = account_rsp['content']
for account_line in official_accounts:
try:
if break_flag:
break
start_timestamp = int((time.time() - 500 * 24 * 3600) * 1000)
if 'updateTime' in account_line:
start_timestamp = account_line['updateTime']
start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_timestamp / 1000))
account = account_line['userName']
search_account_api = f'https://mp.weixin.qq.com/cgi-bin/searchbiz?action=search_biz&begin=0&count=5&' \
f'query={account}&token={token}&lang=zh_CN&f=json&ajax=1'
print(f"开始搜索公众号“{account}”...")
time.sleep(3 + random.random())
response = requests.get(search_account_api, cookies=cookies, headers=headers)
rsp_body = json.loads(response.text)
index_end = ceil(count_per_account / 5)
if 'list' in rsp_body:
matched_account = {}
matched_account_flag = False
for item in rsp_body['list']:
if item['nickname'] == account:
matched_account_flag = True
matched_account = item
break
if not matched_account_flag:
print(f"未找到公众号“{account}")
continue
fake_id = matched_account['fakeid']
update_time_flag = True # 用于记录获取到的历史列表是否已经超出最早的时间限制
next_start_timestamp = int(time.time() * 1000)
for index in range(index_end):
if update_time_flag:
if next_start_timestamp - start_timestamp < 12 * 3600 * 1000:
print(f"公众号“{account}”以及后续账号在12小时内已经扫码获取过文章链接本次获取结束")
break_flag = True
else:
fetch_article_api = f'https://mp.weixin.qq.com/cgi-bin/appmsg?action=list_ex&begin=' \
f'{index * 5}&count=5&fakeid={fake_id}&type=9&query=&token={token}' \
f'&lang=zh_CN&f=json&ajax=1'
print(f"开始获取公众号“{account}”在 {start_time} 后发表的的文章列表...")
time.sleep(3 + random.random())
article_response = requests.get(fetch_article_api, cookies=cookies, headers=headers)
article_rsp_body = json.loads(article_response.text)
if 'app_msg_list' in article_rsp_body:
for article in article_rsp_body['app_msg_list']:
title = article['title']
link = article['link']
update_time = article['update_time'] * 1000
if update_time > start_timestamp:
total_count += 1
time_str = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(update_time / 1000))
print(f"[No. {total_count}] 获取到公众号“{account}”在 {time_str} "
f"发表的文章《{title}》,链接地址:{link}")
redis_client.sadd(f"MediaSpiders:Wechat_links:{account_line['id']}", link)
else:
update_time_flag = False
break
else:
print(json.dumps(article_rsp_body, ensure_ascii=False))
if 'base_resp' in article_rsp_body:
err_msg = article_rsp_body['base_resp']['err_msg']
if err_msg == "freq control" or err_msg == "invalid session":
print("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
if not break_flag:
# 本循环内只有12小时内扫过码以及接口频率限制退出会导致 break_flag 为 True这两种情况都不需要更新扫码状态
next_start_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(next_start_timestamp / 1000))
account_line['updateTime'] = next_start_timestamp
http_post(SOCIAL_USER_UPDATE_API,
data=json.dumps(account_line, ensure_ascii=False).encode('utf-8'),
headers={'User-Agent': UA, "Content-Type": "application/json"}
)
print(f"公众号“{account}”文章获取结束,该账号下一次获取起始时间为 {next_start_time}")
else:
print(json.dumps(rsp_body, ensure_ascii=False))
if 'base_resp' in rsp_body:
if rsp_body['base_resp']['err_msg'] == "freq control":
print("接口频率限制,稍后再试,本次获取结束")
break_flag = True
break
except Exception as e:
print(repr(e))
redis_client.close()
driver.quit()

View File

@ -19,5 +19,5 @@ dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath) sys.path.append(dirpath)
# 等效于scrapy crawl FacebookUserSpider -a params="{}" # 等效于scrapy crawl FacebookUserSpider -a params="{}"
# execute(['scrapy', 'crawl', 'LinkedinUserSpider', '-a', 'params={}']) # execute(['scrapy', 'crawl', 'hot_search_spider', '-a', 'params={}'])
execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}']) execute(['scrapy', 'crawl', 'BaiduHotSearchSprder', '-a', 'params={}'])