96 lines
3.9 KiB
Python
96 lines
3.9 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
|
|||
|
|
# Define here the models for your scraped Extensions
|
|||
|
|
import json
|
|||
|
|
import time
|
|||
|
|
from datetime import timedelta
|
|||
|
|
|
|||
|
|
from kafka import KafkaProducer
|
|||
|
|
from scrapy import signals
|
|||
|
|
from scrapy.exceptions import NotConfigured
|
|||
|
|
from scrapy.utils.project import get_project_settings
|
|||
|
|
from twisted.internet import task
|
|||
|
|
|
|||
|
|
from MediaSpiders.utils.http_utils import http_put
|
|||
|
|
|
|||
|
|
project_settings = get_project_settings()
|
|||
|
|
|
|||
|
|
|
|||
|
|
class SetCrawlerStatusExtensions(object):
|
|||
|
|
def __init__(self, stats, interval, crawler):
|
|||
|
|
self.stats = stats
|
|||
|
|
self.crawler = crawler
|
|||
|
|
self.interval = interval
|
|||
|
|
self.multiplier = 60.0 / self.interval
|
|||
|
|
self.task = None
|
|||
|
|
self.last_period_items = 0
|
|||
|
|
self.no_items_seconds = 0
|
|||
|
|
self.producer = KafkaProducer(bootstrap_servers=project_settings['KAFKA_SERVER'])
|
|||
|
|
self.crawl_job_update_api = project_settings['CRAWL_JOB_UPDATE_API']
|
|||
|
|
self.job_id = ''
|
|||
|
|
self.oscm_log_map = {}
|
|||
|
|
|
|||
|
|
def parse_log(self):
|
|||
|
|
current_period_items = self.stats.get_value('item_scraped_count', 0)
|
|||
|
|
if current_period_items == self.last_period_items:
|
|||
|
|
self.no_items_seconds += self.interval
|
|||
|
|
self.last_period_items = current_period_items
|
|||
|
|
self.oscm_log_map = {
|
|||
|
|
"jobId": self.job_id,
|
|||
|
|
"itemsCount": current_period_items,
|
|||
|
|
"pagesCount": self.stats.get_value('response_received_count', 0),
|
|||
|
|
"filesCount": self.stats.get_value('file_count', 0),
|
|||
|
|
"downloadBytes": self.stats.get_value('downloader/response_bytes', 0),
|
|||
|
|
"updateTime": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
|||
|
|
# 用采集引擎的start_time覆盖数据库表中日志的开始时间,以保持time准确
|
|||
|
|
"startTime": (self.stats.get_value('start_time', 0) + timedelta(hours=8)).strftime("%Y-%m-%d %H:%M:%S")
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
@classmethod
|
|||
|
|
def from_crawler(cls, crawler):
|
|||
|
|
# 每10秒钟更新一次采集结果
|
|||
|
|
interval = 10.0
|
|||
|
|
if not interval:
|
|||
|
|
raise NotConfigured
|
|||
|
|
o = cls(crawler.stats, interval, crawler)
|
|||
|
|
crawler.signals.connect(o.spider_opened, signal=signals.spider_opened)
|
|||
|
|
crawler.signals.connect(o.spider_closed, signal=signals.spider_closed)
|
|||
|
|
return o
|
|||
|
|
|
|||
|
|
def spider_opened(self, spider):
|
|||
|
|
self.job_id = spider.job_id
|
|||
|
|
self.task = task.LoopingCall(self.update_status, spider)
|
|||
|
|
self.task.start(self.interval)
|
|||
|
|
|
|||
|
|
def update_status(self, spider):
|
|||
|
|
try:
|
|||
|
|
self.parse_log()
|
|||
|
|
self.oscm_log_map['status'] = '0'
|
|||
|
|
running_time = round(time.time() + time.timezone - self.stats.get_value('start_time', 0).timestamp())
|
|||
|
|
self.oscm_log_map['time'] = running_time
|
|||
|
|
spider.logger.info(self.oscm_log_map)
|
|||
|
|
http_put(self.crawl_job_update_api, json.dumps(self.oscm_log_map, ensure_ascii=False))
|
|||
|
|
except:
|
|||
|
|
spider.logger.warning('更新日志状态失败!')
|
|||
|
|
|
|||
|
|
def spider_closed(self, spider, reason):
|
|||
|
|
finish_reason = self.stats.get_value('finish_reason', 0)
|
|||
|
|
spider.logger.info('Finish season: %s' % finish_reason)
|
|||
|
|
if finish_reason in ['Waiting time exceeded', 'finished', 'shutdown']:
|
|||
|
|
crawl_result = '1'
|
|||
|
|
else:
|
|||
|
|
crawl_result = '0'
|
|||
|
|
try:
|
|||
|
|
self.parse_log()
|
|||
|
|
self.oscm_log_map['status'] = '2'
|
|||
|
|
self.oscm_log_map['result'] = crawl_result
|
|||
|
|
self.oscm_log_map['time'] = round(self.stats.get_value('elapsed_time_seconds', 0))
|
|||
|
|
self.oscm_log_map['finishTime'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|||
|
|
spider.logger.info(self.oscm_log_map)
|
|||
|
|
http_put(self.crawl_job_update_api, json.dumps(self.oscm_log_map, ensure_ascii=False))
|
|||
|
|
except:
|
|||
|
|
spider.logger.warning('更新日志状态失败!')
|
|||
|
|
|
|||
|
|
if self.task and self.task.running:
|
|||
|
|
self.task.stop()
|