[20260119]1、微信公众号扫码的脚本,改成调用Selenium Chrome,2、将TW、FB、微信公众号扫描调用Selenium的部分,抽象成一个方法;3、scrapy 框架 命令行启动注释

This commit is contained in:
DELL 2026-01-19 17:18:53 +08:00
parent 488bc2fdca
commit 9a36e9c5b5
5 changed files with 322 additions and 127 deletions

View File

@ -12,6 +12,7 @@ from selenium.webdriver.common.action_chains import ActionChains
from MediaSpiders.items import MediaspidersItem
from MediaSpiders.utils.http_utils import http_post
from MediaSpiders.utils.login_utils import login
from MediaSpiders.utils.string_utils import get_str_md5
from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp
@ -64,23 +65,32 @@ class FacebookSpider(scrapy.Spider):
def parse(self, response):
logger.info("login facebook")
driver = response.request.meta['driver']
driver.maximize_window()
driver.get('https://m.facebook.com/')
time.sleep(3)
# driver = response.request.meta['driver']
# driver.maximize_window()
# driver.get('https://m.facebook.com/')
# time.sleep(3)
# user_list = []
# for u in login_users:
# user_list.append(json.loads(u.decode()))
# login_user = random.choice(user_list)
# driver.find_element_by_xpath(
# '//input[@name="email"]').send_keys(login_user['uid'])
# driver.find_element_by_xpath(
# '//input[@name="pass"]').send_keys(login_user['pwd'])
# driver.find_element_by_xpath('//button[@name="login"]').click()
# time.sleep(10)
# logger.info("login as %s" % login_user['uid'])
# 获取采集登录账号并登录
login_users = self.redis_client.smembers('MediaSpiders:Facebook_login_accounts')
user_list = []
for u in login_users:
user_list.append(json.loads(u.decode()))
login_user = random.choice(user_list)
driver.find_element_by_xpath(
'//input[@name="email"]').send_keys(login_user['uid'])
driver.find_element_by_xpath(
'//input[@name="pass"]').send_keys(login_user['pwd'])
driver.find_element_by_xpath('//button[@name="login"]').click()
time.sleep(10)
logger.info("login as %s" % login_user['uid'])
driver = login().login_with_selenium(
'https://m.facebook.com/',
self.name,
login_users=login_users,
response=response
)
# 获取待采集目标账号,并逐个请求
account_query_api = self.settings['SOCIAL_USER_QUERY_ALL_API']
account_query_api = account_query_api.format(sortBy="id", shuffleResult="true")

View File

@ -15,6 +15,7 @@ from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from MediaSpiders.items import MediaspidersItem
from MediaSpiders.utils.http_utils import http_post
from MediaSpiders.utils.login_utils import login
from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp
from selenium.webdriver.common.action_chains import ActionChains
@ -76,104 +77,102 @@ class TwitterSpider(scrapy.Spider):
self.redis_client = redis.Redis(host=self.settings['REDIS_HOST'], port=self.settings['REDIS_PORT'],
password=self.settings['REDIS_PWD'])
self.simhash_filter_key = self.settings['TWITTER_SIMHASH_FILTER_KEY']
logger.info("login twitter")
cookie_string = None
# 尝试自动化登录网页获取 cookies若失败则从redis中 使用已有cookies
try:
driver = response.request.meta['driver']
# 隐藏指纹
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
delete navigator.__proto__.webdriver;
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
'''
})
driver.maximize_window()
# 1. 打开第一个标签页
driver.get('https://x.com/i/flow/login')
wait = WebDriverWait(driver, 15)
# 2. 通过 JS 打开第二个标签页(新 Tab
time.sleep(5)
driver.execute_script("window.open('');")
driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');")
# 3. 获取所有标签页句柄
handles = driver.window_handles # [handle1, handle2]
# 4. 切换到第二个标签页(可选)
driver.switch_to.window(handles[1])
# 获取采集登录账号并登录
login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts')
user_list = []
for u in login_users:
user_list.append(json.loads(u.decode()))
# 尝试自动化登录网页获取 cookies若失败则从redis中 使用已有cookies
try:
login_user = random.choice(user_list)
logger.info(f"login as user {login_user['uid']}")
# time.sleep(random.uniform(1.5, 3.0))
# driver.find_element_by_xpath("//input").send_keys(login_user['uid'])
# 等待并定位用户名输入框
username_input = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]'))
driver = login().login_with_selenium(
'https://x.com/i/flow/login',
self.name,
login_users=login_users,
response=response
)
# driver = response.request.meta['driver']
# driver.maximize_window()
# # 1. 打开第一个标签页
# driver.get('https://x.com/i/flow/login')
# user_list = []
# for u in login_users:
# user_list.append(json.loads(u.decode()))
#
# login_user = random.choice(user_list)
# 模拟真人逐字输入(带随机延迟)
username = login_user['uid']
for char in username:
username_input.send_keys(char)
time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms
time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿
# 尝试点击 "Next" 按钮(主逻辑:带文本的按钮)
try:
next_button = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]")
)
)
body = driver.find_element(By.TAG_NAME, "body")
ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform()
time.sleep(0.5)
# 模拟鼠标移动到按钮并点击
actions = ActionChains(driver)
actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
except Exception as e:
logger.info("主 Next 按钮未找到,尝试备用定位方式")
try:
# 备用:通过 role 定位第二个 button
next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]")
actions = ActionChains(driver)
actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
except Exception as e2:
logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}")
raise
time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
try:
logger.info("输入手机号验证...")
driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641")
# driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click()
driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click()
time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
except Exception:
logger.info("无需输入手机号验证")
driver.find_element_by_xpath("//input[@name='password']").send_keys(login_user['pwd'])
driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click()
time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
try:
driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click()
time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载
except:
time.sleep(5)
# wait = WebDriverWait(driver, 15)
# # 2. 通过 JS 打开第二个标签页(新 Tab
# time.sleep(5)
# driver.execute_script("window.open('');")
# driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');")
#
# # 3. 获取所有标签页句柄
# handles = driver.window_handles # [handle1, handle2]
#
# # 4. 切换到第二个标签页(可选)
# driver.switch_to.window(handles[1])
#
# logger.info(f"login as user {login_user['uid']}")
# # time.sleep(random.uniform(1.5, 3.0))
# # driver.find_element_by_xpath("//input").send_keys(login_user['uid'])
# # 等待并定位用户名输入框
# username_input = wait.until(
# EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]'))
# )
#
# # 模拟真人逐字输入(带随机延迟)
# username = login_user['uid']
# for char in username:
# username_input.send_keys(char)
# time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms
#
# time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿
#
# # 尝试点击 "Next" 按钮(主逻辑:带文本的按钮)
# try:
# next_button = wait.until(
# EC.element_to_be_clickable(
# (By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]")
# )
# )
# body = driver.find_element(By.TAG_NAME, "body")
# ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform()
# time.sleep(0.5)
# # 模拟鼠标移动到按钮并点击
# actions = ActionChains(driver)
# actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
#
# except Exception as e:
# logger.info("主 Next 按钮未找到,尝试备用定位方式")
# try:
# # 备用:通过 role 定位第二个 button
# next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]")
# actions = ActionChains(driver)
# actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
# except Exception as e2:
# logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}")
# raise
# time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
# try:
# logger.info("输入手机号验证...")
# driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641")
# # driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click()
# driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click()
# time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
# except Exception:
# logger.info("无需输入手机号验证")
# driver.find_element_by_xpath("//input[@name='password']").send_keys(login_user['pwd'])
# driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click()
# time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
# try:
# driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click()
# time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载
# except:
# time.sleep(5)
cookies = driver.get_cookies()
# cookies = json.loads(response.text)['cookies']
# 取cookie中的ct0为x-csrf-token取gt为x-guest-token
self.cookie_dict = {}
for cookie in cookies:
self.cookie_dict[cookie['name']] = cookie['value']
except Exception as e:
logger.info("自动化获取cookies失败")
cookie_string = self.redis_client.get("MediaSpiders:Twitter_Cookies").decode()

View File

@ -0,0 +1,170 @@
import random
import json
import time
import logging as logger
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \
WECHAT_USER_TYPE
from MediaSpiders.utils.http_utils import http_post, UA
class login:
def __init__(self):
self.name = None
self.url = None
def login_with_selenium(self, login_url, site_name, login_users=None, response=None, drivers=None):
"""
使用 Selenium 自动登录指定站点 Redis 账号池中随机选一个账号
:param driver: Selenium WebDriver 实例
:param login_url: 登录页面 URL
:param site_name: 站点名称 'Facebook'
:param login_users: Redis 客户端获取的账号密码
"""
self.name = site_name
self.url = login_url
logger.info(f"Starting login to {site_name}...")
if response is not None:
driver = response.request.meta['driver']
elif drivers is not None:
driver = drivers
if login_users is not None:
# 解析 redis 中 账号密码
user_list = [json.loads(u.decode()) for u in login_users]
self.login_user = random.choice(user_list)
if self.name == 'FacebookUserSpider':
self.facebook_login(driver)
elif self.name == 'TwitterUserSpider':
self.twitter_login(driver)
elif self.name == 'wechat_links_fetcher':
self.wechat_links_login(driver)
time.sleep(10) # 等待登录完成(可优化为显式等待)
return driver
"""
FaceBook 登录 获取cookie
"""
def facebook_login(self, driver):
# 打开登录页
driver.maximize_window() # 注意:原代码中有空格!
time.sleep(3)
driver.get(self.url)
driver.find_element_by_xpath(
'//input[@name="email"]').send_keys(self.login_user['uid'])
driver.find_element_by_xpath(
'//input[@name="pass"]').send_keys(self.login_user['pwd'])
driver.find_element_by_xpath('//button[@name="login"]').click()
time.sleep(10)
logger.info(f"Logged in to {self.name} as {self.login_user['uid']}")
"""
Twitter 登录 获取cookie
"""
def twitter_login(self, driver):
# 打开登录页
driver.maximize_window() # 注意:原代码中有空格!
time.sleep(3)
driver.get(self.url)
# 隐藏指纹
driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', {
'source': '''
delete navigator.__proto__.webdriver;
Object.defineProperty(navigator, 'webdriver', { get: () => undefined });
'''
})
wait = WebDriverWait(driver, 15)
# 2. 通过 JS 打开第二个标签页(新 Tab
time.sleep(5)
driver.execute_script("window.open('');")
driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');")
# 3. 获取所有标签页句柄
handles = driver.window_handles # [handle1, handle2]
# 4. 切换到第二个标签页(可选)
driver.switch_to.window(handles[1])
logger.info(f"login as user {self.login_user['uid']}")
# time.sleep(random.uniform(1.5, 3.0))
# driver.find_element_by_xpath("//input").send_keys(login_user['uid'])
# 等待并定位用户名输入框
username_input = wait.until(
EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]'))
)
# 模拟真人逐字输入(带随机延迟)
username = self.login_user['uid']
for char in username:
username_input.send_keys(char)
time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms
time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿
# 尝试点击 "Next" 按钮(主逻辑:带文本的按钮)
try:
next_button = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]")
)
)
body = driver.find_element(By.TAG_NAME, "body")
ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform()
time.sleep(0.5)
# 模拟鼠标移动到按钮并点击
actions = ActionChains(driver)
actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
except Exception as e:
logger.info("主 Next 按钮未找到,尝试备用定位方式")
try:
# 备用:通过 role 定位第二个 button
next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]")
actions = ActionChains(driver)
actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform()
except Exception as e2:
logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}")
raise
time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
try:
logger.info("输入手机号验证...")
driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641")
# driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click()
driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click()
time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
except Exception:
logger.info("无需输入手机号验证")
driver.find_element_by_xpath("//input[@name='password']").send_keys(self.login_user['pwd'])
driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click()
time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载
try:
driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click()
time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载
except:
time.sleep(5)
logger.info(f"Logged in to {self.name} as {self.login_user['uid']}")
def wechat_links_login(self, driver):
driver.maximize_window()
driver.get(self.url)
print("等待打开登录后的页面...")
while True:
delay = random.randint(5, 11)
time.sleep(delay)
if 'token=' in driver.current_url:
print("登录成功!")
logger.info(f"Logged in to {self.name}")
break

View File

@ -5,16 +5,24 @@ from math import ceil
import redis
import requests
from msedge.selenium_tools import Edge
from msedge.selenium_tools import EdgeOptions
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \
WECHAT_USER_TYPE
from MediaSpiders.utils.http_utils import http_post, UA
from MediaSpiders.utils.login_utils import login
edge_options = EdgeOptions()
edge_options.use_chromium = True
driver = Edge(executable_path='msedgedriver.exe', options=edge_options)
chrome_options = Options()
# 指定 chrome.exe 的完整路径
chrome_options.binary_location = r"C:\Users\DELL\Downloads\chrome-win64\chrome.exe"
# chrome_options.use_chromium = True
driver = webdriver.Chrome(
executable_path=r"C:\Users\DELL\Downloads\chromedriver-win64\chromedriver.exe",
options=chrome_options
)
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
@ -27,25 +35,11 @@ redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD)
if __name__ == "__main__":
count_per_account = 200
total_count = 0
driver.maximize_window()
query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false")
post_body = {
'userType': WECHAT_USER_TYPE,
'userFlag': 0
}
account_rsp = json.loads(
http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text)
official_accounts = []
if account_rsp['code'] == 200:
official_accounts = account_rsp['content']
driver.get('https://mp.weixin.qq.com/')
print("等待打开登录后的页面...")
while True:
delay = random.randint(5, 11)
time.sleep(delay)
if 'token=' in driver.current_url:
print("登录成功!")
break
driver = login().login_with_selenium(
'https://mp.weixin.qq.com/',
'wechat_links_fetcher',
drivers=driver
)
break_flag = False
token_index = driver.current_url.rfind('token=')
token = driver.current_url[token_index + 6:]
@ -61,6 +55,16 @@ if __name__ == "__main__":
'Referer': f'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/'
f'appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={token}&lang=zh_CN'
}
query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false")
post_body = {
'userType': WECHAT_USER_TYPE,
'userFlag': 0
}
account_rsp = json.loads(
http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text)
official_accounts = []
if account_rsp['code'] == 200:
official_accounts = account_rsp['content']
for account_line in official_accounts:
try:
if break_flag:

View File

@ -4,7 +4,19 @@ import sys
from scrapy.cmdline import execute
"""
命令行启动
1 Win CMD
D:\dev\code\PythonCode\osc\spiders\MediaSpiders\.venv\Scripts\activate.bat
scrapy crawl FacebookUserSpider -a params="{}"
2Windows PowerShell
D:\dev\code\PythonCode\osc\spiders\MediaSpiders\.venv\Scripts\Activate.ps1
scrapy crawl FacebookUserSpider -a params="{}"
"""
dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath)
execute(['scrapy', 'crawl', 'TaobaoUserSpider', '-a', 'params={}'])
# 等效于scrapy crawl FacebookUserSpider -a params="{}"
execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}'])