2025-05-28 19:16:17 +08:00

107 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# Define here the models for your scraped Extensions
import json
import time
from datetime import timedelta
from kafka import KafkaProducer
from scrapy import signals
from scrapy.exceptions import NotConfigured
from scrapy.utils.project import get_project_settings
from twisted.internet import task
from WikipediaSpiders.utils.http_utils import http_put
project_settings = get_project_settings()
class SetCrawlerStatusExtensions(object):
def __init__(self, stats, interval, crawler):
self.stats = stats
self.crawler = crawler
self.interval = interval
self.multiplier = 60.0 / self.interval
self.task = None
self.last_period_items = 0
self.no_items_seconds = 0
self.producer = KafkaProducer(bootstrap_servers=project_settings['KAFKA_SERVER'])
self.crawl_job_update_api = project_settings['CRAWL_JOB_UPDATE_API']
self.job_id = ''
self.oscm_log_map = {}
def parse_log(self):
current_period_items = self.stats.get_value('item_scraped_count', 0)
if current_period_items == self.last_period_items:
self.no_items_seconds += self.interval
self.last_period_items = current_period_items
self.oscm_log_map = {
"jobId": self.job_id,
"itemsCount": current_period_items,
"pagesCount": self.stats.get_value('response_received_count', 0),
"filesCount": self.stats.get_value('file_count', 0),
"downloadBytes": self.stats.get_value('downloader/response_bytes', 0),
"updateTime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
# 用采集引擎的start_time覆盖数据库表中日志的开始时间以保持time准确
"startTime": (self.stats.get_value('start_time', 0) + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
}
@classmethod
def from_crawler(cls, crawler):
# 每15秒钟更新一次采集结果
interval = 10.0
if not interval:
raise NotConfigured
o = cls(crawler.stats, interval, crawler)
crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
return o
def spider_opened(self, spider):
self.job_id = spider.job_id
self.task = task.LoopingCall(self.update_status, spider)
self.task.start(self.interval)
def update_status(self, spider):
try:
self.parse_log()
if self.no_items_seconds > 60 * 60:
self.crawler.engine.close_spider(spider, 'No items pause')
self.oscm_log_map['status'] = '0'
running_time = round(time.time() + time.timezone - self.stats.get_value('start_time', 0).timestamp())
if running_time > 60 * 60:
self.crawler.engine.close_spider(spider, 'Running time exceeded')
self.oscm_log_map['time'] = running_time
spider.logger.info(self.oscm_log_map)
# json_data = {
# 'content': json.dumps(self.osdp_log_map)
# }
# content = base64.b64encode(json.dumps(json_data).encode('utf-8'))
# self.producer.send('stream-log', content, partition=0)
# self.producer.flush()
http_put(self.crawl_job_update_api, json.dumps(self.oscm_log_map, ensure_ascii=False))
except:
spider.logger.warning('更新OSCM日志状态失败')
def spider_closed(self, spider, reason):
finish_reason = self.stats.get_value('finish_reason', 0)
spider.logger.info('Finish season: %s' % finish_reason)
if finish_reason in ['Waiting time exceeded', 'finished', 'No items pause', 'Running time exceeded',
'shutdown']:
crawl_result = '1'
else:
crawl_result = '0'
try:
self.parse_log()
self.oscm_log_map['status'] = '2'
self.oscm_log_map['result'] = crawl_result
self.oscm_log_map['time'] = round(self.stats.get_value('elapsed_time_seconds', 0))
self.oscm_log_map['finishTime'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
spider.logger.info(self.oscm_log_map)
http_put(self.crawl_job_update_api, json.dumps(self.oscm_log_map, ensure_ascii=False))
except:
spider.logger.warning('更新OSDP日志状态失败')
if self.task and self.task.running:
self.task.stop()