230 lines
10 KiB
Python
230 lines
10 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import base64
|
||
import importlib
|
||
import json
|
||
import logging
|
||
import os
|
||
import tarfile
|
||
import time
|
||
|
||
from kafka import KafkaProducer
|
||
from scrapy.pipelines.images import ImagesPipeline
|
||
from scrapy.utils.misc import md5sum
|
||
|
||
from CustomWebsite.utils.file_utils import set_file_attribs
|
||
from CustomWebsite.utils.http_utils import filter_unsupported_file_name_char, custom_download, parse_file_name_from_url
|
||
from CustomWebsite.utils.string_utils import get_str_md5
|
||
|
||
|
||
class CustomAttachmentDownload(object):
|
||
|
||
def open_spider(self, spider):
|
||
logging.info("Use CustomAttachmentDownload pipeline")
|
||
|
||
def close_spider(self, spider):
|
||
logging.info("Close CustomAttachmentDownload pipeline")
|
||
|
||
def process_item(self, item, spider):
|
||
if 'es_attachment' in item and item['es_attachment'] and len(item['es_attachment']) > 0:
|
||
attachment_list = item['es_attachment']
|
||
attachment_objects = []
|
||
for attachment_url in attachment_list:
|
||
dl_file_name = parse_file_name_from_url(attachment_url)
|
||
if "es_attention" in item and item['es_attention'] and len(item['es_attention']) > 0:
|
||
dl_file_name = ''.join(item['es_attention']).strip()
|
||
file_fp = get_str_md5(attachment_url)[:8].upper()
|
||
original_file_name = filter_unsupported_file_name_char(
|
||
f'{dl_file_name[:100]} - {file_fp}JLIT.pdf')
|
||
dl_file = custom_download(attachment_url, original_file_name)
|
||
if dl_file:
|
||
logging.info("Attachment file url {} downloaded".format(attachment_url))
|
||
attachment_objects.append({
|
||
"path": "full/" + dl_file,
|
||
"url": attachment_url
|
||
})
|
||
item['es_attachment'] = attachment_objects
|
||
# 对于载体类型是报告的item,如果他的附件没有报告,则将载体类型改成新闻,从而避免在报告模块展示
|
||
if len(item['es_attachment']) == 0 and item['es_carriertype'] == 'article':
|
||
item['es_carriertype'] = 'news'
|
||
else:
|
||
if item['es_carriertype'] == 'article':
|
||
item['es_carriertype'] = 'news'
|
||
return item
|
||
|
||
|
||
class ProtobufSavePipeline(object):
|
||
|
||
def __init__(self):
|
||
self.retention_hours = None
|
||
self.module = None
|
||
self.proto_class_name = None
|
||
self.proto_field_name = None
|
||
self.proto_save_file_path_name = None
|
||
self.zip_file_path = None
|
||
self.sets = None
|
||
self.batch_save_number = None
|
||
self.current_number = 0
|
||
self.producer = None
|
||
self.img_save_list = []
|
||
self.attach_save_list = []
|
||
|
||
def open_spider(self, spider):
|
||
self.retention_hours = spider.settings['RETENTION_HOURS']
|
||
module_path = spider.settings['PROTO_MODULE_PATH']
|
||
self.module = importlib.import_module(module_path)
|
||
self.proto_class_name = spider.settings['PROTO_CLASS_NAME']
|
||
self.proto_field_name = spider.settings['PROTO_FIELD_NAME']
|
||
self.proto_save_file_path_name = os.path.join(spider.settings['PROTO_SAVE_FILE_PATH'],
|
||
spider.settings['PROTO_SAVE_FILE_NAME'])
|
||
self.zip_file_path = spider.settings['ZIP_FILE_PATH']
|
||
self.sets = getattr(self.module, self.proto_class_name)()
|
||
|
||
self.batch_save_number = spider.settings['BATCH_SAVE_SIZE']
|
||
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
|
||
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
|
||
def close_spider(self, spider):
|
||
if len(getattr(self.sets, self.proto_field_name)) == 0:
|
||
self.producer.close()
|
||
return
|
||
self.move_to_watched_folder(spider.settings['FILE_TRANS_PATH'],
|
||
spider.settings['IMG_ZIP_FILE_NAME'],
|
||
spider.settings['FILE_ZIP_FILE_NAME'])
|
||
# 将数据发送至消息中心
|
||
json_data = {
|
||
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(json_data).encode('utf-8'))
|
||
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
|
||
self.producer.flush()
|
||
self.producer.close()
|
||
|
||
# 删除最后修改时间大于保存时间的文件
|
||
self.del_file(spider.settings['PROTO_SAVE_FILE_PATH'])
|
||
self.del_file(spider.settings['FILES_STORE'])
|
||
self.del_file(spider.settings['IMAGES_STORE'])
|
||
self.del_file(spider.settings['FILE_TRANS_PATH'])
|
||
|
||
def process_item(self, item, spider):
|
||
if item:
|
||
proto_data = getattr(self.sets, self.proto_field_name).add()
|
||
if item['es_carriertype'] == 'article':
|
||
item['es_catalog2'] = 'pdf'
|
||
for topic_word in ['Israel', 'Palestine', 'Hamas', '以色列', '巴勒斯坦', '巴以', '哈马斯']:
|
||
if topic_word in item['es_urlcontent']:
|
||
item['es_tags'] = 'PALISRWAR'
|
||
break
|
||
for field in item.fields:
|
||
if field in item and item[field] != '' and hasattr(proto_data, field):
|
||
setattr(proto_data, field, str(item[field]))
|
||
if field == spider.settings['IMAGES_RESULT_FIELD']:
|
||
img_list = item[field]
|
||
for img in img_list:
|
||
img_abs_path = os.path.join(spider.settings['IMAGES_STORE'], img['path'])
|
||
self.img_save_list.append(img_abs_path)
|
||
item['es_urlcontent'] = item['es_urlcontent'].replace(img['url'], '/' + img['path'])
|
||
setattr(proto_data, 'es_urlcontent', str(item['es_urlcontent']))
|
||
if field == spider.settings['FILES_RESULT_FIELD']:
|
||
for attach in item[field]:
|
||
self.attach_save_list.append(spider.settings['FILES_STORE'] + '/' + attach['path'])
|
||
self.current_number += 1
|
||
# 批次保存采集结果
|
||
if self.current_number >= self.batch_save_number:
|
||
# 将数据发送至消息中心
|
||
json_data = {
|
||
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(json_data).encode('utf-8'))
|
||
try:
|
||
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
|
||
except Exception as e:
|
||
spider.logging.debug(e)
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
self.producer.send(spider.settings['KAFKA_PROCESS_QUEUE'][0], content, partition=0)
|
||
|
||
self.move_to_watched_folder(spider.settings['FILE_TRANS_PATH'],
|
||
spider.settings['IMG_ZIP_FILE_NAME'],
|
||
spider.settings['FILE_ZIP_FILE_NAME'])
|
||
self.current_number = 0
|
||
self.sets = getattr(self.module, self.proto_class_name)()
|
||
return item
|
||
|
||
def move_to_watched_folder(self, file_trans_path, img_zip_name, file_zip_name):
|
||
# 图片压缩转移到传输服务监听目录
|
||
to_path = os.path.join(file_trans_path, img_zip_name + str(int(time.time() * 1000)) + '.tar.gz')
|
||
self.write_to_zip(self.img_save_list, to_path)
|
||
self.img_save_list = []
|
||
# 附件压缩转移到传输服务监听目录
|
||
to_path = os.path.join(file_trans_path, file_zip_name + str(int(time.time() * 1000)) + '.tar.gz')
|
||
self.write_to_zip(self.attach_save_list, to_path)
|
||
self.attach_save_list = []
|
||
|
||
def write_to_zip(self, file_list, to_path):
|
||
if len(file_list) == 0:
|
||
return
|
||
with tarfile.open(to_path, 'w:gz') as zipFile:
|
||
for filePath in file_list:
|
||
try:
|
||
if not os.path.exists(filePath):
|
||
continue
|
||
arc_name = "full" + filePath[filePath.rindex('/'):] # 文件相对路径
|
||
zipFile.add(filePath, arc_name)
|
||
except Exception as e:
|
||
print(e)
|
||
# 发送压缩包至kafka
|
||
self.send_file_info_to_kafka(to_path)
|
||
|
||
def send_file_info_to_kafka(self, file_path):
|
||
last_index = file_path.rfind('/')
|
||
if last_index < file_path.rfind('\\'):
|
||
last_index = file_path.rfind('\\')
|
||
file_name = file_path[last_index + 1:]
|
||
host_ip = os.getenv('HOST_IP')
|
||
file_url = f'http://{host_ip}:28086/{file_name}'
|
||
file_info = {
|
||
'fileName': file_name,
|
||
'url': file_url
|
||
}
|
||
content = base64.b64encode(json.dumps(file_info).encode('utf-8'))
|
||
self.producer.send('stream-file-dl', content, partition=0)
|
||
self.producer.flush()
|
||
|
||
def del_file(self, path):
|
||
for root, dirs, files in os.walk(path, topdown=False):
|
||
for file in files:
|
||
file_path = os.path.join(root, file)
|
||
st_info = os.stat(file_path)
|
||
if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60:
|
||
os.remove(file_path)
|
||
|
||
|
||
class ImageDownloadAndTagPipeline(ImagesPipeline):
|
||
def __init__(self, store_uri, download_func=None, settings=None):
|
||
super().__init__(store_uri, download_func, settings)
|
||
self.image_root_path = store_uri
|
||
|
||
def image_downloaded(self, response, request, info, *, item=None):
|
||
checksum = None
|
||
for path, image, buf in self.get_images(response, request, info, item=item):
|
||
if checksum is None:
|
||
buf.seek(0)
|
||
checksum = md5sum(buf)
|
||
width, height = image.size
|
||
self.store.persist_file(
|
||
path,
|
||
buf,
|
||
info,
|
||
meta={"width": width, "height": height},
|
||
headers={"Content-Type": "image/jpeg"},
|
||
)
|
||
tags = item['es_tags']
|
||
full_path = f'{self.image_root_path}/{path}'
|
||
set_file_attribs(full_path, {'image_description': tags})
|
||
return checksum
|