2025-05-28 19:16:17 +08:00

222 lines
10 KiB
Python

# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
import base64
import importlib
import json
import logging
import os
import tarfile
import time
from PIL import Image
from kafka import KafkaProducer
from scrapy.pipelines.files import FilesPipeline
from scrapy.pipelines.images import ImagesPipeline
from scrapy.utils.misc import md5sum
from utils.file_utils import set_file_attribs
from utils.http_utils import parse_file_name_from_url
class CustomFilesPipeline(FilesPipeline):
def file_path(self, request, response=None, info=None, *, item=None):
original_file_name = parse_file_name_from_url(request.url)
return 'full/' + original_file_name
class ProtobufSavePipeline(object):
def __init__(self):
self.retention_hours = None
self.module = None
self.proto_class_name = None
self.proto_field_name = None
self.proto_save_file_path_name = None
self.zip_file_path = None
self.sets = None
self.batch_save_number = None
self.current_number = 0
self.producer = None
self.img_save_list = []
self.attach_save_list = []
self.snapshot_save_list = []
def open_spider(self, spider):
self.retention_hours = spider.settings['RETENTION_HOURS']
module_path = spider.settings['PROTO_MODULE_PATH']
self.module = importlib.import_module(module_path)
self.proto_class_name = spider.settings['PROTO_CLASS_NAME']
self.proto_field_name = spider.settings['PROTO_FIELD_NAME']
self.proto_save_file_path_name = os.path.join(spider.settings['PROTO_SAVE_FILE_PATH'],
spider.settings['PROTO_SAVE_FILE_NAME'])
self.zip_file_path = spider.settings['ZIP_FILE_PATH']
self.sets = getattr(self.module, self.proto_class_name)()
self.batch_save_number = spider.settings['BATCH_SAVE_SIZE']
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
def close_spider(self, spider):
if len(getattr(self.sets, self.proto_field_name)) == 0:
self.producer.close()
return
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
# f.write(self.sets.SerializeToString())
self.move_to_watched_folder(spider.settings['FILE_TRANS_PATH'],
spider.settings['IMG_ZIP_FILE_NAME'],
spider.settings['FILE_ZIP_FILE_NAME'])
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE'],
'spider': 'ApiSpiders.website_info_common'
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
self.producer.flush()
self.producer.close()
# 删除最后修改时间大于保存时间的文件
self.del_file(spider.settings['PROTO_SAVE_FILE_PATH'])
self.del_file(spider.settings['FILES_STORE'])
self.del_file(spider.settings['IMAGES_STORE'])
self.del_file(spider.settings['FILE_TRANS_PATH'])
def process_item(self, item, spider):
protoData = getattr(self.sets, self.proto_field_name).add()
if item['es_catalog2'] and len(item['es_catalog2']) > 0:
item['es_attachment'] = item['es_catalog2']
for field in item.fields:
if field in item and item[field] != '' and hasattr(protoData, field):
setattr(protoData, field, str(item[field]))
if field == spider.settings['IMAGES_RESULT_FIELD']:
imgList = item[field]
for img in imgList:
imgAbsPath = os.path.join(spider.settings['IMAGES_STORE'], img['path'])
try:
imgFile = Image.open(imgAbsPath)
except Exception as e:
logging.error(f"图片 {imgAbsPath} 打开错误:{repr(e)}")
continue
height = imgFile.height
width = imgFile.width
size = os.path.getsize(imgAbsPath)
# 判断宽高及文件大小是否符合保存要求
try:
if height > spider.imageHeight and width >= spider.imageWidth \
and size >= spider.imageSize and width / height >= spider.imageLwRate:
self.img_save_list.append(imgAbsPath)
# 将原文中图片原始链接替换为我方图片存储服务其上的相对路径
item['es_urlcontent'] = item['es_urlcontent'].replace(img['url'], '/' + img['path'])
except AttributeError:
self.img_save_list.append(imgAbsPath)
item['es_urlcontent'] = item['es_urlcontent'].replace(img['url'], '/' + img['path'])
setattr(protoData, 'es_urlcontent', str(item['es_urlcontent']))
if field == spider.settings['FILES_RESULT_FIELD']:
for attach in item[field]:
self.attach_save_list.append(spider.settings['FILES_STORE'] + '/' + attach['path'])
self.current_number += 1
# 批次保存采集结果
if self.current_number >= self.batch_save_number:
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE'],
'spider': 'ApiSpiders.website_info_common'
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
try:
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
except Exception as e:
spider.logging.debug(e)
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
self.move_to_watched_folder(spider.settings['FILE_TRANS_PATH'],
spider.settings['IMG_ZIP_FILE_NAME'],
spider.settings['FILE_ZIP_FILE_NAME'])
self.current_number = 0
self.sets = getattr(self.module, self.proto_class_name)()
return item
def move_to_watched_folder(self, file_trans_path, img_zip_name, file_zip_name):
# 图片压缩转移到传输服务监听目录
to_path = os.path.join(file_trans_path, img_zip_name + str(int(time.time() * 1000)) + '.tar.gz')
self.write_to_zip(self.img_save_list, to_path)
self.img_save_list = []
# 附件压缩转移到传输服务监听目录
to_path = os.path.join(file_trans_path, file_zip_name + str(int(time.time() * 1000)) + '.tar.gz')
self.write_to_zip(self.attach_save_list, to_path)
self.attach_save_list = []
def write_to_zip(self, file_list, to_path):
if len(file_list) == 0:
return
with tarfile.open(to_path, 'w:gz') as zipFile:
for filePath in file_list:
try:
if not os.path.exists(filePath):
continue
arc_name = "full" + filePath[filePath.rindex('/'):] # 文件相对路径
zipFile.add(filePath, arc_name)
except Exception as e:
print(e)
# 发送压缩包至kafka
self.send_file_info_to_kafka(to_path)
def send_file_info_to_kafka(self, file_path):
last_index = file_path.rfind('/')
if last_index < file_path.rfind('\\'):
last_index = file_path.rfind('\\')
file_name = file_path[last_index + 1:]
HOST_IP = os.getenv('HOST_IP')
file_url = f'http://{HOST_IP}:28086/{file_name}'
file_info = {
'fileName': file_name,
'url': file_url
}
content = base64.b64encode(json.dumps(file_info).encode('utf-8'))
self.producer.send('stream-file-dl', content, partition=0)
self.producer.flush()
def del_file(self, path):
for root, dirs, files in os.walk(path, topdown=False):
for file in files:
filePath = os.path.join(root, file)
st_info = os.stat(filePath)
if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60:
os.remove(filePath)
class ImageDownloadAndTagPipeline(ImagesPipeline):
def __init__(self, store_uri, download_func=None, settings=None):
super().__init__(store_uri, download_func, settings)
self.image_root_path = store_uri
def image_downloaded(self, response, request, info, *, item=None):
checksum = None
for path, image, buf in self.get_images(response, request, info, item=item):
if checksum is None:
buf.seek(0)
checksum = md5sum(buf)
width, height = image.size
self.store.persist_file(
path,
buf,
info,
meta={"width": width, "height": height},
headers={"Content-Type": "image/jpeg"},
)
tags = item['es_tags']
full_path = f'{self.image_root_path}/{path}'
set_file_attribs(full_path, {'image_description': tags})
return checksum