284 lines
13 KiB
Python
284 lines
13 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import base64
|
||
import importlib
|
||
import json
|
||
import os
|
||
import tarfile
|
||
import time
|
||
|
||
import redis
|
||
from kafka import KafkaProducer
|
||
|
||
# Define your item pipelines here
|
||
#
|
||
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
|
||
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
|
||
from AirplaneSpiders.proto.Plane_pb2 import FligthTrackpointInfoSets # 航班轨迹信息
|
||
from AirplaneSpiders.proto.route_pb2 import RouteInfoSets
|
||
|
||
|
||
class ProtobufSavePipeline(object):
|
||
def open_spider(self, spider):
|
||
self.retention_hours = spider.settings['RETENTION_HOURS']
|
||
module_path = spider.settings['PROTO_MODULE_PATH']
|
||
self.klass = spider.settings['PROTO_CLASS_NAME']
|
||
self.itemListName = spider.settings['PROTO_FIELD_NAME']
|
||
self.saveFileName = spider.settings['PROTO_SAVE_FILE_PATH'] + '/' + spider.settings['PROTO_SAVE_FILE_NAME']
|
||
self.batch_save_number = spider.settings['BATCH_SAVE_SIZE']
|
||
self.current_number = 0
|
||
self.module = importlib.import_module(module_path)
|
||
self.sets = getattr(self.module, self.klass)()
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
self.imgSaveList = []
|
||
self.attachSaveList = []
|
||
|
||
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
|
||
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
|
||
|
||
def close_spider(self, spider):
|
||
if len(getattr(self.sets, self.itemListName)) == 0:
|
||
return
|
||
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
|
||
# f.write(self.sets.SerializeToString())
|
||
if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[
|
||
'IMAGES_STORE'] != '':
|
||
# fromPath = os.path.join(spider.settings['IMAGES_STORE'],'full')
|
||
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
|
||
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
|
||
self.write_to_zip(self.imgSaveList, toPath)
|
||
self.imgSaveList = []
|
||
|
||
# 将数据发送至消息中心
|
||
jsonData = {
|
||
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
self.producer.flush()
|
||
self.producer.close()
|
||
# 删除最后修改时间大于保存时间的文件
|
||
self.del_file(spider.settings['PROTO_SAVE_FILE_PATH'])
|
||
self.del_file(spider.settings['FILES_STORE'])
|
||
self.del_file(spider.settings['IMAGES_STORE'])
|
||
self.del_file(spider.settings['FILE_TRANS_PATH'])
|
||
|
||
def process_item(self, item, spider):
|
||
protoData = getattr(self.sets, self.itemListName).add()
|
||
for field in item.fields:
|
||
if field in item and item[field] != '' and hasattr(protoData, field):
|
||
setattr(protoData, field, str(item[field]))
|
||
if field == spider.settings['IMAGES_RESULT_FIELD']:
|
||
for img in item[field]:
|
||
self.imgSaveList.append(os.path.join(spider.settings['IMAGES_STORE'], img['path']))
|
||
|
||
# 批次保存采集结果
|
||
if self.current_number >= self.batch_save_number:
|
||
# 压缩保存图片
|
||
if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[
|
||
'IMAGES_STORE'] != '':
|
||
# fromPath = os.path.join(spider.settings['IMAGES_STORE'],'full')
|
||
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
|
||
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
|
||
self.write_to_zip(self.imgSaveList, toPath)
|
||
self.imgSaveList = []
|
||
# 将数据发送至消息中心
|
||
jsonData = {
|
||
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
|
||
try:
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
self.producer.flush()
|
||
except:
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
self.producer.flush()
|
||
self.current_number = 0
|
||
self.sets = getattr(self.module, self.klass)()
|
||
self.current_number += 1
|
||
return item
|
||
|
||
def write_to_zip(self, fileList, toPath):
|
||
if len(fileList) == 0:
|
||
return
|
||
with tarfile.open(toPath, 'w:gz') as zipFile:
|
||
for filePath in fileList:
|
||
try:
|
||
if not os.path.exists(filePath):
|
||
continue
|
||
arcname = "full" + filePath[filePath.rindex('/'):] # 文件相对路径
|
||
zipFile.add(filePath, arcname)
|
||
except:
|
||
pass
|
||
for filePath in fileList:
|
||
if not os.path.exists(filePath):
|
||
continue
|
||
os.remove(filePath)
|
||
# 发送压缩包至kafka
|
||
self.send_file_info_to_kafka(toPath)
|
||
|
||
def send_file_info_to_kafka(self, file_path):
|
||
last_index = file_path.rfind('/')
|
||
if last_index < file_path.rfind('\\'):
|
||
last_index = file_path.rfind('\\')
|
||
file_name = file_path[last_index + 1:]
|
||
HOST_IP = os.getenv('HOST_IP')
|
||
file_url = f'http://{HOST_IP}:28086/{file_name}'
|
||
file_info = {
|
||
'fileName': file_name,
|
||
'url': file_url
|
||
}
|
||
content = base64.b64encode(json.dumps(file_info).encode('utf-8'))
|
||
self.producer.send('stream-file-dl', content, partition=0)
|
||
self.producer.flush()
|
||
|
||
def del_file(self, path):
|
||
for root, dirs, files in os.walk(path, topdown=False):
|
||
for file in files:
|
||
filePath = os.path.join(root, file)
|
||
st_info = os.stat(filePath)
|
||
if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60:
|
||
os.remove(filePath)
|
||
|
||
|
||
# 在爬取飞机实时位置信息时将其航班id进行存储,给航班采集器使用
|
||
class FlightIdSavePipeline(object):
|
||
def open_spider(self, spider):
|
||
# 缓存采到的飞机注册号和flightid信息,在爬虫关闭时统一写入redis
|
||
self.redisQueue = []
|
||
|
||
def close_spider(self, spider):
|
||
self.redisClient = redis.Redis(host=spider.settings['REDIS_HOST'], port=int(spider.settings['REDIS_PORT']),
|
||
password=spider.settings['REDIS_PWD'])
|
||
with self.redisClient.pipeline(transaction=False) as p:
|
||
for i in self.redisQueue:
|
||
for k in i.keys():
|
||
p.sadd(k, json.dumps(i[k]))
|
||
p.execute()
|
||
self.redisClient.close()
|
||
|
||
def process_item(self, item, spider):
|
||
if (spider.name == 'rb24_airplane_live'):
|
||
aRegs = {'areg': item['PlaneREG'], 'flightid': item['FlightID'][2:]} # 将飞机注册号机radarbox的flightid信息存进redis
|
||
data = {'AirplaneSpiders:aRegs': aRegs}
|
||
self.redisQueue.append(data)
|
||
return item
|
||
elif (spider.name == 'flightradar24_airplane_live'):
|
||
aRegs_2 = {'areg': item['PlaneREG'], 'flightid': item['FlightID'][2:]} # flightradar24
|
||
data = {'AirplaneSpiders:aRegs_2': aRegs_2}
|
||
self.redisQueue.append(data)
|
||
return item
|
||
return item
|
||
|
||
|
||
class RouteInfoSavePipeline(object):
|
||
def open_spider(self, spider):
|
||
self.sets = RouteInfoSets()
|
||
self.saveFileName = spider.settings['PROTO_SAVE_FILE_PATH'] + '/' + spider.settings['PROTO_SAVE_FILE_NAME']
|
||
self.batch_save_number = 500
|
||
self.current_number = 0
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
|
||
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
|
||
|
||
def close_spider(self, spider):
|
||
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
|
||
# f.write(self.sets.SerializeToString())
|
||
# 将数据发送至消息中心
|
||
jsonData = {
|
||
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
self.producer.close()
|
||
|
||
def process_item(self, item, spider):
|
||
routeinfo = self.sets.RouteInfo.add()
|
||
routeinfo.Name = item['Name']
|
||
routeinfo.StartBaseCode = item['StartBaseCode']
|
||
routeinfo.EndBaseCode = item['EndBaseCode']
|
||
routeinfo.Type = item['Type']
|
||
routeinfo.UpdateTime = item['UpdateTime']
|
||
routeinfo.LastTime = item['LastTime']
|
||
for nav_point_item in item['Navpoints']:
|
||
p = routeinfo.Navpoints.add()
|
||
p.NavpointCode = nav_point_item['NavpointCode']
|
||
p.Name = nav_point_item['Name']
|
||
p.Longitude = nav_point_item['Longitude']
|
||
p.Latitude = nav_point_item['Latitude']
|
||
p.Angle = nav_point_item['Angle']
|
||
p.Seq = nav_point_item['Seq']
|
||
p.UpdateTime = nav_point_item['UpdateTime']
|
||
p.LastTime = nav_point_item['LastTime']
|
||
if self.current_number > self.batch_save_number:
|
||
# 将数据发送至消息中心
|
||
jsonData = {
|
||
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
|
||
try:
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
self.producer.flush()
|
||
except:
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
self.producer.flush()
|
||
self.sets = RouteInfoSets()
|
||
self.current_number = 0
|
||
|
||
self.current_number += 1
|
||
return item
|
||
|
||
|
||
class HistoryTrackSavePipeline(object):
|
||
def open_spider(self, spider):
|
||
self.saveFileName = spider.settings['PROTO_SAVE_FILE_PATH'] + '/' + spider.settings['PROTO_SAVE_FILE_NAME']
|
||
self.redisClient = redis.Redis(host=spider.settings['REDIS_HOST'], port=int(spider.settings['REDIS_PORT']),
|
||
password=spider.settings['REDIS_PWD'])
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
|
||
def close_spider(self, spider):
|
||
flightIds = self.redisClient.smembers('planeHistoryFids')
|
||
while len(flightIds) > 0:
|
||
trackSets = FligthTrackpointInfoSets()
|
||
for fid in flightIds:
|
||
if self.redisClient.llen(fid) > 0:
|
||
item = json.loads(self.redisClient.lpop(fid))
|
||
trackInfo = trackSets.FligthTrackpointInfo.add()
|
||
for key in item.keys():
|
||
if hasattr(trackInfo, key):
|
||
setattr(trackInfo, key, str(item[key]))
|
||
else:
|
||
self.redisClient.srem('planeHistoryFids', fid)
|
||
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
|
||
# f.write(trackSets.SerializeToString())
|
||
# 将数据发送至消息中心
|
||
jsonData = {
|
||
'content': trackSets.SerializeToString().decode('ISO-8859-1'),
|
||
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
|
||
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
|
||
}
|
||
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
|
||
try:
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
except:
|
||
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
|
||
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
|
||
flightIds = self.redisClient.smembers('planeHistoryFids')
|
||
self.redisClient.close()
|
||
|
||
def process_item(self, item, spider):
|
||
fid = str(item['FlightID'])
|
||
self.redisClient.sadd('planeHistoryFids', fid)
|
||
self.redisClient.lpush(fid, item.jsonEncoder())
|
||
return item
|