2025-05-28 19:16:17 +08:00

157 lines
6.9 KiB
Python

# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
from ShipSpiders.proto.Ship_pb2 import ShipInfoSets
from ShipSpiders.proto.Ship_pb2 import VoyageTrackpointInfoSets
import scrapy
from scrapy.pipelines.images import ImagesPipeline
import logging as logger
import os
import tarfile
import zipfile
import time
import importlib
import redis
import json
from kafka import KafkaProducer
import base64
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
class ProtobufSavePipeline(object):
batch_save_number = 1000
current_number = 0
image_batch = 80
image_number = 0
def open_spider(self, spider):
self.retention_hours = spider.settings['RETENTION_HOURS']
self.module_path = spider.settings['PROTO_MODULE_PATH']
self.klass = spider.settings['PROTO_CLASS_NAME']
self.itemListName = spider.settings['PROTO_FIELD_NAME']
self.saveFileName = '%s%s' % (spider.settings['PROTO_SAVE_FILE_PATH'],
spider.settings['PROTO_SAVE_FILE_NAME'])
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
self.module = importlib.import_module(self.module_path)
self.sets = getattr(self.module, self.klass)()
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
def close_spider(self, spider):
if len(getattr(self.sets, self.itemListName)) == 0:
self.producer.close()
return
if settings['SAVE_TO_LOCAL']:
with open(self.saveFileName + str(int(time.time() * 1000)) + '.todist', 'wb+') as f:
f.write(self.sets.SerializeToString())
else:
if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[
'IMAGES_STORE'] != '':
# 图片压缩转移到传输服务监听目录
fromPath = os.path.join(spider.settings['IMAGES_STORE'], 'full')
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['IMG_ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.zip')
self.write2zip(fromPath, toPath)
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
self.producer.close()
# 删除最后修改时间大于保存时间的文件
self.del_file(spider.settings['PROTO_SAVE_FILE_PATH'])
self.del_file(spider.settings['FILES_STORE'])
self.del_file(spider.settings['IMAGES_STORE'])
self.del_file(spider.settings['FILE_TRANS_PATH'])
def process_item(self, item, spider):
protoData = getattr(self.sets, self.itemListName).add()
for field in item.fields:
if field in item and item[field] != '' and hasattr(protoData, field):
setattr(protoData, field, str(item[field]))
# 批次保存采集结果#
if self.image_number >= self.image_batch:
if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[
'IMAGES_STORE'] != '':
fromPath = os.path.join(spider.settings['IMAGES_STORE'], 'full')
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
self.write2zip(fromPath, toPath)
self.image_number = 0
if self.current_number >= self.batch_save_number:
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
try:
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
except:
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
self.current_number = 0
self.sets = getattr(self.module, self.klass)()
self.image_number += 1
self.current_number += 1
return item
def write2zip(self, fromPath, toPath):
haveFile = False
for root, dirs, files in os.walk(fromPath, topdown=True):
if len(files) > 0:
haveFile = True
if not haveFile:
return
self.mZip(fromPath, toPath, True)
self.send_file_info_to_kafka(toPath)
def send_file_info_to_kafka(self, file_path):
last_index = file_path.rfind('/')
if last_index < file_path.rfind('\\'):
last_index = file_path.rfind('\\')
file_name = file_path[last_index + 1:]
HOST_IP = os.getenv('HOST_IP')
file_url = f'http://{HOST_IP}:28086/{file_name}'
file_info = {
'fileName': file_name,
'url': file_url
}
content = base64.b64encode(json.dumps(file_info).encode('utf-8'))
self.producer.send('stream-file-dl', content, partition=0)
self.producer.flush()
def del_file(self, path):
for root, dirs, files in os.walk(path, topdown=False):
for file in files:
filePath = os.path.join(root, file)
st_info = os.stat(filePath)
if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60:
os.remove(filePath)
def mZip(self, fromPath, toPath, delFile):
parentDir = os.path.abspath(os.path.dirname(fromPath) + os.path.sep + ".")
with tarfile.open(toPath, 'w:gz') as zipFile:
for root, dirs, files in os.walk(fromPath, topdown=True):
for name in files:
try:
absPath = os.path.join(root, name) # 文件绝对路径
arcname = os.path.relpath(absPath, parentDir) # 文件相对路径
if not os.path.exists(absPath):
continue
zipFile.add(absPath, arcname)
if delFile:
os.remove(absPath)
except:
pass