2026-01-28 11:00:03 +08:00

177 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""This module contains the ``SeleniumMiddleware`` scrapy middleware"""
from importlib import import_module
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.http import HtmlResponse
from selenium.webdriver.support.ui import WebDriverWait
import selenium
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import time
from scrapy_selenium.middlewares import SeleniumRequest
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.remote.webdriver import WebDriver
from msedge.selenium_tools import EdgeOptions
from msedge.selenium_tools import Edge
class SeleniumMiddleware:
"""Scrapy middleware handling the requests using selenium"""
def __init__(self, driver_name, driver_executable_path, driver_arguments,
browser_executable_path):
self.driver_executable_path = driver_executable_path
self.driver_name = driver_name
self.proxy_count = 0
self.driver = None
webdriver_base_path = f'selenium.webdriver.{driver_name}'
driver_options_module = import_module(f'{webdriver_base_path}.options')
driver_options_klass = getattr(driver_options_module, 'Options')
self.driver_options = driver_options_klass()
if driver_name.lower() == 'edge':
edge_options = EdgeOptions()
for argument in driver_arguments:
edge_options.add_argument(argument)
self.driver_options = edge_options
else:
for argument in driver_arguments:
self.driver_options.add_argument(argument)
def start_session(self):
if self.driver_executable_path.find('http') >= 0:
# 远程浏览器
driver_type = None
if self.driver_name.lower() == 'firefox':
driver_type = DesiredCapabilities.FIREFOX
elif self.driver_name.lower() == 'chrome':
driver_type = DesiredCapabilities.CHROME
elif self.driver_name.lower() == 'edge':
driver_type = DesiredCapabilities.EDGE
if driver_type is None:
raise NotConfigured(
'SELENIUM_DRIVER_NAME IS NOT RIGHT'
)
self.driver = WebDriver(command_executor=self.driver_executable_path, desired_capabilities=driver_type,
options=self.driver_options)
else:
# 本地浏览器
# Edge in headless mode
edge_options = EdgeOptions()
edge_options.use_chromium = True
# edge_options.add_argument("--headless")
# 隐藏“受自动化软件控制”提示栏
edge_options.add_argument('--disable-blink-features=AutomationControlled')
edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])
# 禁用自动化扩展
edge_options.add_experimental_option('useAutomationExtension', False)
edge_options.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0")
edge_options.add_argument("--window-size=1920,1080")
# 设置浏览器的 高级偏好设置
prefs = {
# "profile.managed_default_content_settings.images": 2, # 禁用图片加载:2 表示“禁止”1 表示“允许”
"credentials_enable_service": False, # 禁用保存密码提示
"profile.password_manager_enabled": False # 禁用密码管理器
}
edge_options.add_experimental_option("prefs", prefs)
self.driver = Edge(executable_path="C:/Users/DELL/Downloads/edgedriver_win64/msedgedriver.exe", options=edge_options)
@classmethod
def from_crawler(cls, crawler):
driver_name = crawler.settings.get('SELENIUM_DRIVER_NAME')
driver_executable_path = crawler.settings.get('SELENIUM_DRIVER_EXECUTABLE_PATH')
browser_executable_path = crawler.settings.get('SELENIUM_BROWSER_EXECUTABLE_PATH')
driver_arguments = crawler.settings.get('SELENIUM_DRIVER_ARGUMENTS')
if not driver_name or not driver_executable_path:
raise NotConfigured(
'SELENIUM_DRIVER_NAME and SELENIUM_DRIVER_EXECUTABLE_PATH must be set'
)
middleware = cls(
driver_name=driver_name,
driver_executable_path=driver_executable_path,
driver_arguments=driver_arguments,
browser_executable_path=browser_executable_path
)
crawler.signals.connect(middleware.spider_closed, signals.spider_closed)
return middleware
def process_request(self, request, spider):
if not isinstance(request, SeleniumRequest):
return None
if self.driver is None:
self.start_session()
# 添加代理ip
if self.driver.name == 'firefox' and 'proxy' in request.meta and request.meta['proxy'] != '' and (
self.proxy_count == 0 or self.proxy_count > 20):
self.proxy_count = 0
ip = request.meta['proxy'].split(':')[1][2:]
port = int(request.meta['proxy'].split(':')[2])
user_agent = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0'
self.driver.get("about:config")
script = '''
var prefs = Components.classes["@mozilla.org/preferences-service;1"].getService(Components.interfaces.nsIPrefBranch);
prefs.setIntPref("network.proxy.type", 1);
prefs.setCharPref("network.proxy.http", "{ip}");
prefs.setIntPref("network.proxy.http_port", "{port}");
prefs.setCharPref("network.proxy.ssl", "{ip}");
prefs.setIntPref("network.proxy.ssl_port", "{port}");
prefs.setCharPref("network.proxy.ftp", "{ip}");
prefs.setIntPref("network.proxy.ftp_port", "{port}");
prefs.setBoolPref("general.useragent.site_specific_overrides",true);
prefs.setBoolPref("general.useragent.updates.enabled",true);
prefs.setCharPref("general.useragent.override","{user_agent}");
'''.format(ip=ip, port=port, user_agent=user_agent)
self.driver.execute_script(script)
time.sleep(1)
self.proxy_count += 1
try:
self.driver.get(request.url)
except selenium.common.exceptions.TimeoutException:
self.driver.execute_script('window.stop()')
for cookie_name, cookie_value in request.cookies.items():
self.driver.add_cookie(
{
'name': cookie_name,
'value': cookie_value
}
)
if request.wait_until:
WebDriverWait(self.driver, request.wait_time).until(
request.wait_until
)
if request.screenshot:
request.meta['screenshot'] = self.driver.get_screenshot_as_png()
if request.script:
self.driver.execute_script(request.script)
body = str.encode(self.driver.page_source)
# Expose the driver via the "meta" attribute
request.meta.update({'driver': self.driver})
return HtmlResponse(
self.driver.current_url,
body=body,
encoding='utf-8',
request=request
)
def process_exception(self, request, exception, spider):
if isinstance(exception, WebDriverException) and exception.args[0].find('No active session with ID') >= 0:
self.start_session()
return request
return None
def spider_closed(self):
"""Shutdown the driver when spider is closed"""
if self.driver:
self.driver.quit()