2025-05-28 19:16:17 +08:00

186 lines
8.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import importlib
import json
import logging as logger
import os
import tarfile
import time
from typing import List
from kafka import KafkaProducer
class ProtobufSavePipeline(object):
def open_spider(self, spider):
self.retention_hours = spider.settings['RETENTION_HOURS']
module_path = spider.settings['PROTO_MODULE_PATH']
self.klass = spider.settings['PROTO_CLASS_NAME']
self.itemListName = spider.settings['PROTO_FIELD_NAME']
self.saveFileName = os.path.join(spider.settings['PROTO_SAVE_FILE_PATH'],
spider.settings['PROTO_SAVE_FILE_NAME'])
self.module = importlib.import_module(module_path)
self.sets = getattr(self.module, self.klass)()
self.batch_save_number = 5
self.current_number = 0
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.imgSaveList = []
self.attachSaveList = []
def close_spider(self, spider):
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
if len(getattr(self.sets, self.itemListName)) == 0:
return
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
self.write2zip(self.imgSaveList, toPath)
self.imgSaveList = []
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
self.producer.flush()
self.producer.close()
# 删除最后修改时间大于保存时间的文件
self.del_file(spider.settings['PROTO_SAVE_FILE_PATH'])
self.del_file(spider.settings['FILES_STORE'])
self.del_file(spider.settings['IMAGES_STORE'])
self.del_file(spider.settings['FILE_TRANS_PATH'])
def del_file(self, path):
for root, dirs, files in os.walk(path, topdown=False):
for file in files:
filePath = os.path.join(root, file)
st_info = os.stat(filePath)
if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60:
os.remove(filePath)
def process_item(self, item, spider):
protoData = getattr(self.sets, self.itemListName).add() # 根据约定的proto文件格式初始化带有需要特定字段的数据对象protoData与sets进行绑定
for field in item.fields:
if field in item and item[field] != '' and hasattr(protoData, field): # 只有item的字段与protoData字段对应且值不为空的可以赋值
if isinstance(item[field], List): # protoData各个字段的值都应是字符串。对于字段的值是一个序列的进行json.dumps序列化操作使之转化为字符串
setattr(protoData, field, json.dumps(item[field]))
# for t in item[field]:
# getattr(protoData, field).append(t)
else:
setattr(protoData, field, str(item[field]))
if field == spider.settings[
'IMAGES_RESULT_FIELD']: # 对图片url字段进行处理由于image_pipeline优先级比本pipeline高图片实际上已经下载完成。文件同理
for img in item[field]:
self.imgSaveList.append(os.path.join(spider.settings['IMAGES_STORE'], img['path']))
if field == spider.settings['FILES_RESULT_FIELD']:
for attach in item[field]:
self.attachSaveList.append(os.path.join(spider.settings['FILES_STORE'], attach['path']))
# self.attachSaveList.append(spider.settings['FILES_STORE'] + '/' + attach['path'])
# 批次保存采集结果
if self.current_number >= self.batch_save_number:
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
self.write2zip(self.imgSaveList, toPath) # 调用write2zip将图片/文件打包成压缩包
self.imgSaveList = []
# 附件压缩转移到传输服务监听目录
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
self.write2zip(self.attachSaveList, toPath)
self.attachSaveList = []
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'), # 将sets内容序列化并使用加密编码生成jsonData
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8')) # 将jsonData对象序列化并加密编码生成content
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content,
partition=0) # 将content发送至KAFKA_PROCESS_QUEUE定义的第一个处理单元中
# 发送完毕,重新初始化
self.producer.flush()
self.current_number = 0
self.sets = getattr(self.module, self.klass)()
self.current_number += 1
return item
def write2zip(self, fileList, toPath):
if len(fileList) == 0:
return
with tarfile.open(toPath, 'w:gz') as zipFile:
for filePath in fileList:
try:
if not os.path.exists(filePath):
continue
arcname = "full" + filePath[filePath.rindex('/'):] # 文件相对路径
zipFile.add(filePath, arcname)
except:
pass
for filePath in fileList:
if not os.path.exists(filePath):
continue
os.remove(filePath)
# 发送压缩包至kafka
self.send_file_info_to_kafka(toPath)
def send_file_info_to_kafka(self, file_path):
last_index = file_path.rfind('/')
if last_index < file_path.rfind('\\'):
last_index = file_path.rfind('\\')
file_name = file_path[last_index + 1:]
HOST_IP = os.getenv('HOST_IP')
file_url = f'http://{HOST_IP}:28086/{file_name}'
file_info = {
'fileName': file_name,
'url': file_url
}
content = base64.b64encode(json.dumps(file_info).encode('utf-8'))
self.producer.send('stream-file-dl', content, partition=0)
self.producer.flush()
class StoreToJsonFilePipeline:
def __init__(self):
self.current_number = 0
self.batch_save_number = 5
self.entity_array = []
self.json_save_path = r'E:/yuxin/forum-data/json'
def open_spider(self, spider):
logger.info("Store to JSON File")
try:
self.batch_save_number = spider.settings["ITEM_BATCH_SAVE_NUMBER"]
except:
logger.error("未定义 ITEM_BATCH_SAVE_NUMBER使用默认值 5")
def close_spider(self, spider):
self.save_to_json_file()
def process_item(self, item, spider):
item_dict = {}
for field in item.fields:
if field in item and item[field] != '':
item_dict[field] = item[field]
self.entity_array.append(item_dict)
self.current_number += 1
if self.current_number >= self.batch_save_number:
self.save_to_json_file()
return item
def save_to_json_file(self):
if len(self.entity_array) > 0:
result_file = open(
"{}/forum_{}_({}).json".format(self.json_save_path, str(int(time.time())), len(self.entity_array)),
'w', encoding='utf-8')
result_file.write(json.dumps(self.entity_array, ensure_ascii=False))
result_file.close()
self.current_number = 0
self.entity_array = []