2026-01-19 09:17:26 +08:00

167 lines
6.9 KiB
Python

"""This module contains the ``SeleniumMiddleware`` scrapy middleware"""
from importlib import import_module
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.http import HtmlResponse
from selenium.webdriver.support.ui import WebDriverWait
import selenium
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
import time
from scrapy_selenium.middlewares import SeleniumRequest
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.remote.webdriver import WebDriver
from msedge.selenium_tools import EdgeOptions
from msedge.selenium_tools import Edge
class SeleniumMiddleware:
"""Scrapy middleware handling the requests using selenium"""
def __init__(self, driver_name, driver_executable_path, driver_arguments,
browser_executable_path):
self.driver_executable_path = driver_executable_path
self.driver_name = driver_name
self.proxy_count = 0
self.driver = None
webdriver_base_path = f'selenium.webdriver.{driver_name}'
driver_options_module = import_module(f'{webdriver_base_path}.options')
driver_options_klass = getattr(driver_options_module, 'Options')
self.driver_options = driver_options_klass()
if driver_name.lower() == 'edge':
edge_options = EdgeOptions()
for argument in driver_arguments:
edge_options.add_argument(argument)
self.driver_options = edge_options
else:
for argument in driver_arguments:
self.driver_options.add_argument(argument)
def start_session(self):
if self.driver_executable_path.find('http') >= 0:
# 远程浏览器
driver_type = None
if self.driver_name.lower() == 'firefox':
driver_type = DesiredCapabilities.FIREFOX
elif self.driver_name.lower() == 'chrome':
driver_type = DesiredCapabilities.CHROME
elif self.driver_name.lower() == 'edge':
driver_type = DesiredCapabilities.EDGE
if driver_type is None:
raise NotConfigured(
'SELENIUM_DRIVER_NAME IS NOT RIGHT'
)
self.driver = WebDriver(command_executor=self.driver_executable_path, desired_capabilities=driver_type,
options=self.driver_options)
else:
# 本地浏览器
# Edge in headless mode
edge_options = EdgeOptions()
edge_options.use_chromium = True
self.driver = Edge(executable_path='msedgedriver.exe', options=edge_options)
self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined
})
"""
})
@classmethod
def from_crawler(cls, crawler):
driver_name = crawler.settings.get('SELENIUM_DRIVER_NAME')
driver_executable_path = crawler.settings.get('SELENIUM_DRIVER_EXECUTABLE_PATH')
browser_executable_path = crawler.settings.get('SELENIUM_BROWSER_EXECUTABLE_PATH')
driver_arguments = crawler.settings.get('SELENIUM_DRIVER_ARGUMENTS')
if not driver_name or not driver_executable_path:
raise NotConfigured(
'SELENIUM_DRIVER_NAME and SELENIUM_DRIVER_EXECUTABLE_PATH must be set'
)
middleware = cls(
driver_name=driver_name,
driver_executable_path=driver_executable_path,
driver_arguments=driver_arguments,
browser_executable_path=browser_executable_path
)
crawler.signals.connect(middleware.spider_closed, signals.spider_closed)
return middleware
def process_request(self, request, spider):
if not isinstance(request, SeleniumRequest):
return None
if self.driver is None:
self.start_session()
# 添加代理ip
if self.driver.name == 'firefox' and 'proxy' in request.meta and request.meta['proxy'] != '' and (
self.proxy_count == 0 or self.proxy_count > 20):
self.proxy_count = 0
ip = request.meta['proxy'].split(':')[1][2:]
port = int(request.meta['proxy'].split(':')[2])
user_agent = 'Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.1; WOW64; Trident/6.0)'
self.driver.get("about:config")
script = '''
var prefs = Components.classes["@mozilla.org/preferences-service;1"].getService(Components.interfaces.nsIPrefBranch);
prefs.setIntPref("network.proxy.type", 1);
prefs.setCharPref("network.proxy.http", "{ip}");
prefs.setIntPref("network.proxy.http_port", "{port}");
prefs.setCharPref("network.proxy.ssl", "{ip}");
prefs.setIntPref("network.proxy.ssl_port", "{port}");
prefs.setCharPref("network.proxy.ftp", "{ip}");
prefs.setIntPref("network.proxy.ftp_port", "{port}");
prefs.setBoolPref("general.useragent.site_specific_overrides",true);
prefs.setBoolPref("general.useragent.updates.enabled",true);
prefs.setCharPref("general.useragent.override","{user_agent}");
'''.format(ip=ip, port=port, user_agent=user_agent)
self.driver.execute_script(script)
time.sleep(1)
self.proxy_count += 1
try:
self.driver.get(request.url)
except selenium.common.exceptions.TimeoutException:
self.driver.execute_script('window.stop()')
for cookie_name, cookie_value in request.cookies.items():
self.driver.add_cookie(
{
'name': cookie_name,
'value': cookie_value
}
)
if request.wait_until:
WebDriverWait(self.driver, request.wait_time).until(
request.wait_until
)
if request.screenshot:
request.meta['screenshot'] = self.driver.get_screenshot_as_png()
if request.script:
self.driver.execute_script(request.script)
body = str.encode(self.driver.page_source)
# Expose the driver via the "meta" attribute
request.meta.update({'driver': self.driver})
return HtmlResponse(
self.driver.current_url,
body=body,
encoding='utf-8',
request=request
)
def process_exception(self, request, exception, spider):
if isinstance(exception, WebDriverException) and exception.args[0].find('No active session with ID') >= 0:
self.start_session()
return request
return None
def spider_closed(self):
"""Shutdown the driver when spider is closed"""
if self.driver:
self.driver.quit()