diff --git a/spiders/MediaSpiders/MediaSpiders/spiders/FacebookUserSpider.py b/spiders/MediaSpiders/MediaSpiders/spiders/FacebookUserSpider.py index 9a297b2..339dead 100644 --- a/spiders/MediaSpiders/MediaSpiders/spiders/FacebookUserSpider.py +++ b/spiders/MediaSpiders/MediaSpiders/spiders/FacebookUserSpider.py @@ -12,6 +12,7 @@ from selenium.webdriver.common.action_chains import ActionChains from MediaSpiders.items import MediaspidersItem from MediaSpiders.utils.http_utils import http_post +from MediaSpiders.utils.login_utils import login from MediaSpiders.utils.string_utils import get_str_md5 from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp @@ -64,23 +65,32 @@ class FacebookSpider(scrapy.Spider): def parse(self, response): logger.info("login facebook") - driver = response.request.meta['driver'] - driver.maximize_window() - driver.get('https://m.facebook.com/') - time.sleep(3) + # driver = response.request.meta['driver'] + # driver.maximize_window() + # driver.get('https://m.facebook.com/') + # time.sleep(3) + # user_list = [] + # for u in login_users: + # user_list.append(json.loads(u.decode())) + # login_user = random.choice(user_list) + + # driver.find_element_by_xpath( + # '//input[@name="email"]').send_keys(login_user['uid']) + # driver.find_element_by_xpath( + # '//input[@name="pass"]').send_keys(login_user['pwd']) + # driver.find_element_by_xpath('//button[@name="login"]').click() + # time.sleep(10) + # logger.info("login as %s" % login_user['uid']) + # 获取采集登录账号并登录 login_users = self.redis_client.smembers('MediaSpiders:Facebook_login_accounts') - user_list = [] - for u in login_users: - user_list.append(json.loads(u.decode())) - login_user = random.choice(user_list) - driver.find_element_by_xpath( - '//input[@name="email"]').send_keys(login_user['uid']) - driver.find_element_by_xpath( - '//input[@name="pass"]').send_keys(login_user['pwd']) - driver.find_element_by_xpath('//button[@name="login"]').click() - time.sleep(10) - logger.info("login as %s" % login_user['uid']) + driver = login().login_with_selenium( + 'https://m.facebook.com/', + self.name, + login_users=login_users, + response=response + ) + # 获取待采集目标账号,并逐个请求 account_query_api = self.settings['SOCIAL_USER_QUERY_ALL_API'] account_query_api = account_query_api.format(sortBy="id", shuffleResult="true") diff --git a/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py b/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py index 2997c4b..fa210d4 100644 --- a/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py +++ b/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserSpider.py @@ -15,6 +15,7 @@ from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.common.by import By from MediaSpiders.items import MediaspidersItem from MediaSpiders.utils.http_utils import http_post +from MediaSpiders.utils.login_utils import login from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp from selenium.webdriver.common.action_chains import ActionChains @@ -76,104 +77,102 @@ class TwitterSpider(scrapy.Spider): self.redis_client = redis.Redis(host=self.settings['REDIS_HOST'], port=self.settings['REDIS_PORT'], password=self.settings['REDIS_PWD']) self.simhash_filter_key = self.settings['TWITTER_SIMHASH_FILTER_KEY'] - logger.info("login twitter") cookie_string = None + # 获取采集登录账号并登录 + login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts') # 尝试自动化登录网页获取 cookies,若失败则从redis中 使用已有cookies try: - driver = response.request.meta['driver'] - # 隐藏指纹 - driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { - 'source': ''' - delete navigator.__proto__.webdriver; - Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); - ''' - }) - driver.maximize_window() - # 1. 打开第一个标签页 - driver.get('https://x.com/i/flow/login') - wait = WebDriverWait(driver, 15) - # 2. 通过 JS 打开第二个标签页(新 Tab) - time.sleep(5) - driver.execute_script("window.open('');") - driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');") - - # 3. 获取所有标签页句柄 - handles = driver.window_handles # [handle1, handle2] - - # 4. 切换到第二个标签页(可选) - driver.switch_to.window(handles[1]) - # 获取采集登录账号并登录 - login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts') - user_list = [] - for u in login_users: - user_list.append(json.loads(u.decode())) - - login_user = random.choice(user_list) - logger.info(f"login as user {login_user['uid']}") - # time.sleep(random.uniform(1.5, 3.0)) - # driver.find_element_by_xpath("//input").send_keys(login_user['uid']) - # 等待并定位用户名输入框 - username_input = wait.until( - EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]')) + driver = login().login_with_selenium( + 'https://x.com/i/flow/login', + self.name, + login_users=login_users, + response=response ) + # driver = response.request.meta['driver'] + # driver.maximize_window() + # # 1. 打开第一个标签页 + # driver.get('https://x.com/i/flow/login') + # user_list = [] + # for u in login_users: + # user_list.append(json.loads(u.decode())) + # + # login_user = random.choice(user_list) - # 模拟真人逐字输入(带随机延迟) - username = login_user['uid'] - for char in username: - username_input.send_keys(char) - time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms - - time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿 - - # 尝试点击 "Next" 按钮(主逻辑:带文本的按钮) - try: - next_button = wait.until( - EC.element_to_be_clickable( - (By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]") - ) - ) - body = driver.find_element(By.TAG_NAME, "body") - ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform() - time.sleep(0.5) - # 模拟鼠标移动到按钮并点击 - actions = ActionChains(driver) - actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform() - - except Exception as e: - logger.info("主 Next 按钮未找到,尝试备用定位方式") - try: - # 备用:通过 role 定位第二个 button - next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]") - actions = ActionChains(driver) - actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform() - except Exception as e2: - logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}") - raise - time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 - try: - logger.info("输入手机号验证...") - driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641") - # driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click() - driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click() - time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 - except Exception: - logger.info("无需输入手机号验证") - driver.find_element_by_xpath("//input[@name='password']").send_keys(login_user['pwd']) - driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click() - time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 - try: - driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click() - time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载 - except: - time.sleep(5) + # wait = WebDriverWait(driver, 15) + # # 2. 通过 JS 打开第二个标签页(新 Tab) + # time.sleep(5) + # driver.execute_script("window.open('');") + # driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');") + # + # # 3. 获取所有标签页句柄 + # handles = driver.window_handles # [handle1, handle2] + # + # # 4. 切换到第二个标签页(可选) + # driver.switch_to.window(handles[1]) + # + # logger.info(f"login as user {login_user['uid']}") + # # time.sleep(random.uniform(1.5, 3.0)) + # # driver.find_element_by_xpath("//input").send_keys(login_user['uid']) + # # 等待并定位用户名输入框 + # username_input = wait.until( + # EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]')) + # ) + # + # # 模拟真人逐字输入(带随机延迟) + # username = login_user['uid'] + # for char in username: + # username_input.send_keys(char) + # time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms + # + # time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿 + # + # # 尝试点击 "Next" 按钮(主逻辑:带文本的按钮) + # try: + # next_button = wait.until( + # EC.element_to_be_clickable( + # (By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]") + # ) + # ) + # body = driver.find_element(By.TAG_NAME, "body") + # ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform() + # time.sleep(0.5) + # # 模拟鼠标移动到按钮并点击 + # actions = ActionChains(driver) + # actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform() + # + # except Exception as e: + # logger.info("主 Next 按钮未找到,尝试备用定位方式") + # try: + # # 备用:通过 role 定位第二个 button + # next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]") + # actions = ActionChains(driver) + # actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform() + # except Exception as e2: + # logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}") + # raise + # time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 + # try: + # logger.info("输入手机号验证...") + # driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641") + # # driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click() + # driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click() + # time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 + # except Exception: + # logger.info("无需输入手机号验证") + # driver.find_element_by_xpath("//input[@name='password']").send_keys(login_user['pwd']) + # driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click() + # time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 + # try: + # driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click() + # time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载 + # except: + # time.sleep(5) cookies = driver.get_cookies() - # cookies = json.loads(response.text)['cookies'] # 取cookie中的ct0为x-csrf-token,取gt为x-guest-token self.cookie_dict = {} for cookie in cookies: self.cookie_dict[cookie['name']] = cookie['value'] - except Exception as e: logger.info("自动化获取cookies失败") cookie_string = self.redis_client.get("MediaSpiders:Twitter_Cookies").decode() diff --git a/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py b/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py new file mode 100644 index 0000000..4ba6e80 --- /dev/null +++ b/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py @@ -0,0 +1,170 @@ +import random +import json +import time +import logging as logger + +from selenium.webdriver import ActionChains +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.common.by import By +from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \ + WECHAT_USER_TYPE +from MediaSpiders.utils.http_utils import http_post, UA + +class login: + def __init__(self): + self.name = None + self.url = None + + + + def login_with_selenium(self, login_url, site_name, login_users=None, response=None, drivers=None): + """ + 使用 Selenium 自动登录指定站点(从 Redis 账号池中随机选一个账号) + + :param driver: Selenium WebDriver 实例 + :param login_url: 登录页面 URL + :param site_name: 站点名称(如 'Facebook') + :param login_users: Redis 客户端获取的账号密码 + """ + self.name = site_name + self.url = login_url + logger.info(f"Starting login to {site_name}...") + if response is not None: + driver = response.request.meta['driver'] + elif drivers is not None: + driver = drivers + + if login_users is not None: + # 解析 redis 中 账号密码 + user_list = [json.loads(u.decode()) for u in login_users] + self.login_user = random.choice(user_list) + + if self.name == 'FacebookUserSpider': + self.facebook_login(driver) + elif self.name == 'TwitterUserSpider': + self.twitter_login(driver) + elif self.name == 'wechat_links_fetcher': + self.wechat_links_login(driver) + + + time.sleep(10) # 等待登录完成(可优化为显式等待) + + return driver + + """ + FaceBook 登录 获取cookie + """ + def facebook_login(self, driver): + # 打开登录页 + driver.maximize_window() # 注意:原代码中有空格! + time.sleep(3) + driver.get(self.url) + + driver.find_element_by_xpath( + '//input[@name="email"]').send_keys(self.login_user['uid']) + driver.find_element_by_xpath( + '//input[@name="pass"]').send_keys(self.login_user['pwd']) + driver.find_element_by_xpath('//button[@name="login"]').click() + time.sleep(10) + logger.info(f"Logged in to {self.name} as {self.login_user['uid']}") + + """ + Twitter 登录 获取cookie + """ + def twitter_login(self, driver): + # 打开登录页 + driver.maximize_window() # 注意:原代码中有空格! + time.sleep(3) + driver.get(self.url) + + # 隐藏指纹 + driver.execute_cdp_cmd('Page.addScriptToEvaluateOnNewDocument', { + 'source': ''' + delete navigator.__proto__.webdriver; + Object.defineProperty(navigator, 'webdriver', { get: () => undefined }); + ''' + }) + wait = WebDriverWait(driver, 15) + # 2. 通过 JS 打开第二个标签页(新 Tab) + time.sleep(5) + driver.execute_script("window.open('');") + driver.execute_script("window.open('https://x.com/i/flow/login', '_blank');") + + # 3. 获取所有标签页句柄 + handles = driver.window_handles # [handle1, handle2] + + # 4. 切换到第二个标签页(可选) + driver.switch_to.window(handles[1]) + + logger.info(f"login as user {self.login_user['uid']}") + # time.sleep(random.uniform(1.5, 3.0)) + # driver.find_element_by_xpath("//input").send_keys(login_user['uid']) + # 等待并定位用户名输入框 + username_input = wait.until( + EC.element_to_be_clickable((By.CSS_SELECTOR, 'input[autocomplete="username"]')) + ) + + # 模拟真人逐字输入(带随机延迟) + username = self.login_user['uid'] + for char in username: + username_input.send_keys(char) + time.sleep(random.uniform(0.05, 0.2)) # 每个字符间隔 50~200ms + + time.sleep(random.uniform(0.3, 0.8)) # 输入完后稍作停顿 + + # 尝试点击 "Next" 按钮(主逻辑:带文本的按钮) + try: + next_button = wait.until( + EC.element_to_be_clickable( + (By.XPATH, "//button[.//span[contains(text(), 'Next') or contains(text(), '下一步')]]") + ) + ) + body = driver.find_element(By.TAG_NAME, "body") + ActionChains(driver).move_to_element_with_offset(body, 100, 100).perform() + time.sleep(0.5) + # 模拟鼠标移动到按钮并点击 + actions = ActionChains(driver) + actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform() + + except Exception as e: + logger.info("主 Next 按钮未找到,尝试备用定位方式") + try: + # 备用:通过 role 定位第二个 button + next_button = driver.find_element(By.XPATH, "//button[@role='button'][2]") + actions = ActionChains(driver) + actions.move_to_element(next_button).pause(random.uniform(0.2, 0.6)).click().perform() + except Exception as e2: + logger.error(f"两种方式均无法点击 Next 按钮: {e}, {e2}") + raise + time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 + try: + logger.info("输入手机号验证...") + driver.find_element_by_xpath("//input[@name='text']").send_keys("+8619962025641") + # driver.find_element_by_xpath("//button[@data-testid='ocfEnterTextNextButton']").click() + driver.find_element_by_xpath(driver.find_element_by_xpath("//button[.//span[text()='下一步']]")).click() + time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 + except Exception: + logger.info("无需输入手机号验证") + driver.find_element_by_xpath("//input[@name='password']").send_keys(self.login_user['pwd']) + driver.find_element_by_xpath("//button[@data-testid='LoginForm_Login_Button']").click() + time.sleep(random.uniform(1.5, 5.0)) # 等待页面加载 + try: + driver.find_element_by_xpath("//button[@data-testid='confirmationSheetConfirm']").click() + time.sleep(random.uniform(1.5, 10.0)) # 等待页面加载 + except: + time.sleep(5) + logger.info(f"Logged in to {self.name} as {self.login_user['uid']}") + + def wechat_links_login(self, driver): + driver.maximize_window() + driver.get(self.url) + print("等待打开登录后的页面...") + while True: + delay = random.randint(5, 11) + time.sleep(delay) + if 'token=' in driver.current_url: + print("登录成功!") + logger.info(f"Logged in to {self.name}") + break + diff --git a/spiders/MediaSpiders/MediaSpiders/utils/wechat_links_fetcher.py b/spiders/MediaSpiders/MediaSpiders/utils/wechat_links_fetcher.py index 2061f6b..0b99719 100644 --- a/spiders/MediaSpiders/MediaSpiders/utils/wechat_links_fetcher.py +++ b/spiders/MediaSpiders/MediaSpiders/utils/wechat_links_fetcher.py @@ -5,16 +5,24 @@ from math import ceil import redis import requests -from msedge.selenium_tools import Edge -from msedge.selenium_tools import EdgeOptions +from selenium import webdriver +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service + from MediaSpiders.settings import REDIS_HOST, REDIS_PORT, REDIS_PWD, SOCIAL_USER_QUERY_ALL_API, SOCIAL_USER_UPDATE_API, \ WECHAT_USER_TYPE from MediaSpiders.utils.http_utils import http_post, UA +from MediaSpiders.utils.login_utils import login -edge_options = EdgeOptions() -edge_options.use_chromium = True -driver = Edge(executable_path='msedgedriver.exe', options=edge_options) +chrome_options = Options() +# 指定 chrome.exe 的完整路径 +chrome_options.binary_location = r"C:\Users\DELL\Downloads\chrome-win64\chrome.exe" +# chrome_options.use_chromium = True +driver = webdriver.Chrome( + executable_path=r"C:\Users\DELL\Downloads\chromedriver-win64\chromedriver.exe", + options=chrome_options +) driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', { @@ -27,25 +35,11 @@ redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PWD) if __name__ == "__main__": count_per_account = 200 total_count = 0 - driver.maximize_window() - query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false") - post_body = { - 'userType': WECHAT_USER_TYPE, - 'userFlag': 0 - } - account_rsp = json.loads( - http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text) - official_accounts = [] - if account_rsp['code'] == 200: - official_accounts = account_rsp['content'] - driver.get('https://mp.weixin.qq.com/') - print("等待打开登录后的页面...") - while True: - delay = random.randint(5, 11) - time.sleep(delay) - if 'token=' in driver.current_url: - print("登录成功!") - break + driver = login().login_with_selenium( + 'https://mp.weixin.qq.com/', + 'wechat_links_fetcher', + drivers=driver + ) break_flag = False token_index = driver.current_url.rfind('token=') token = driver.current_url[token_index + 6:] @@ -61,6 +55,16 @@ if __name__ == "__main__": 'Referer': f'https://mp.weixin.qq.com/cgi-bin/appmsg?t=media/' f'appmsg_edit_v2&action=edit&isNew=1&type=77&createType=0&token={token}&lang=zh_CN' } + query_api = SOCIAL_USER_QUERY_ALL_API.format(sortBy="updateTime", shuffleResult="false") + post_body = { + 'userType': WECHAT_USER_TYPE, + 'userFlag': 0 + } + account_rsp = json.loads( + http_post(query_api, json.dumps(post_body), headers={"Content-Type": "application/json"}).text) + official_accounts = [] + if account_rsp['code'] == 200: + official_accounts = account_rsp['content'] for account_line in official_accounts: try: if break_flag: diff --git a/spiders/MediaSpiders/run.py b/spiders/MediaSpiders/run.py index d6da25d..8f74cad 100644 --- a/spiders/MediaSpiders/run.py +++ b/spiders/MediaSpiders/run.py @@ -4,7 +4,19 @@ import sys from scrapy.cmdline import execute +""" + 命令行启动: + 1、 Win CMD + D:\dev\code\PythonCode\osc\spiders\MediaSpiders\.venv\Scripts\activate.bat + scrapy crawl FacebookUserSpider -a params="{}" + + 2、Windows PowerShell + D:\dev\code\PythonCode\osc\spiders\MediaSpiders\.venv\Scripts\Activate.ps1 + scrapy crawl FacebookUserSpider -a params="{}" +""" + dirpath = os.path.dirname(os.path.abspath(__file__)) sys.path.append(dirpath) -execute(['scrapy', 'crawl', 'TaobaoUserSpider', '-a', 'params={}']) +# 等效于:scrapy crawl FacebookUserSpider -a params="{}" +execute(['scrapy', 'crawl', 'TwitterUserSpider', '-a', 'params={}'])