# -*- coding: utf-8 -*- # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting # See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html from ShipSpiders.proto.Ship_pb2 import ShipInfoSets from ShipSpiders.proto.Ship_pb2 import VoyageTrackpointInfoSets import scrapy from scrapy.pipelines.images import ImagesPipeline import logging as logger import os import tarfile import zipfile import time import importlib import redis import json from kafka import KafkaProducer import base64 from scrapy.utils.project import get_project_settings settings = get_project_settings() class ProtobufSavePipeline(object): batch_save_number = 1000 current_number = 0 image_batch = 80 image_number = 0 def open_spider(self, spider): self.retention_hours = spider.settings['RETENTION_HOURS'] self.module_path = spider.settings['PROTO_MODULE_PATH'] self.klass = spider.settings['PROTO_CLASS_NAME'] self.itemListName = spider.settings['PROTO_FIELD_NAME'] self.saveFileName = '%s%s' % (spider.settings['PROTO_SAVE_FILE_PATH'], spider.settings['PROTO_SAVE_FILE_NAME']) if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']): os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH']) self.module = importlib.import_module(self.module_path) self.sets = getattr(self.module, self.klass)() self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER']) def close_spider(self, spider): if len(getattr(self.sets, self.itemListName)) == 0: self.producer.close() return if settings['SAVE_TO_LOCAL']: with open(self.saveFileName + str(int(time.time() * 1000)) + '.todist', 'wb+') as f: f.write(self.sets.SerializeToString()) else: if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[ 'IMAGES_STORE'] != '': # 图片压缩转移到传输服务监听目录 fromPath = os.path.join(spider.settings['IMAGES_STORE'], 'full') toPath = os.path.join(spider.settings['FILE_TRANS_PATH'], spider.settings['IMG_ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.zip') self.write2zip(fromPath, toPath) jsonData = { 'content': self.sets.SerializeToString().decode('ISO-8859-1'), 'protoName': spider.settings['PROTO_SAVE_FILE_NAME'], 'processQueue': spider.settings['KAFKA_PROCESS_QUEUE'] } content = base64.b64encode(json.dumps(jsonData).encode('utf-8')) self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0) self.producer.close() # 删除最后修改时间大于保存时间的文件 self.del_file(spider.settings['PROTO_SAVE_FILE_PATH']) self.del_file(spider.settings['FILES_STORE']) self.del_file(spider.settings['IMAGES_STORE']) self.del_file(spider.settings['FILE_TRANS_PATH']) def process_item(self, item, spider): protoData = getattr(self.sets, self.itemListName).add() for field in item.fields: if field in item and item[field] != '' and hasattr(protoData, field): setattr(protoData, field, str(item[field])) # 批次保存采集结果# if self.image_number >= self.image_batch: if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[ 'IMAGES_STORE'] != '': fromPath = os.path.join(spider.settings['IMAGES_STORE'], 'full') toPath = os.path.join(spider.settings['FILE_TRANS_PATH'], spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz') self.write2zip(fromPath, toPath) self.image_number = 0 if self.current_number >= self.batch_save_number: # 将数据发送至消息中心 jsonData = { 'content': self.sets.SerializeToString().decode('ISO-8859-1'), 'protoName': spider.settings['PROTO_SAVE_FILE_NAME'], 'processQueue': spider.settings['KAFKA_PROCESS_QUEUE'] } content = base64.b64encode(json.dumps(jsonData).encode('utf-8')) try: self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0) except: self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER']) self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0) self.current_number = 0 self.sets = getattr(self.module, self.klass)() self.image_number += 1 self.current_number += 1 return item def write2zip(self, fromPath, toPath): haveFile = False for root, dirs, files in os.walk(fromPath, topdown=True): if len(files) > 0: haveFile = True if not haveFile: return self.mZip(fromPath, toPath, True) self.send_file_info_to_kafka(toPath) def send_file_info_to_kafka(self, file_path): last_index = file_path.rfind('/') if last_index < file_path.rfind('\\'): last_index = file_path.rfind('\\') file_name = file_path[last_index + 1:] HOST_IP = os.getenv('HOST_IP') file_url = f'http://{HOST_IP}:28086/{file_name}' file_info = { 'fileName': file_name, 'url': file_url } content = base64.b64encode(json.dumps(file_info).encode('utf-8')) self.producer.send('stream-file-dl', content, partition=0) self.producer.flush() def del_file(self, path): for root, dirs, files in os.walk(path, topdown=False): for file in files: filePath = os.path.join(root, file) st_info = os.stat(filePath) if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60: os.remove(filePath) def mZip(self, fromPath, toPath, delFile): parentDir = os.path.abspath(os.path.dirname(fromPath) + os.path.sep + ".") with tarfile.open(toPath, 'w:gz') as zipFile: for root, dirs, files in os.walk(fromPath, topdown=True): for name in files: try: absPath = os.path.join(root, name) # 文件绝对路径 arcname = os.path.relpath(absPath, parentDir) # 文件相对路径 if not os.path.exists(absPath): continue zipFile.add(absPath, arcname) if delFile: os.remove(absPath) except: pass