2025-05-28 19:16:17 +08:00

284 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
import base64
import importlib
import json
import os
import tarfile
import time
import redis
from kafka import KafkaProducer
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
from AirplaneSpiders.proto.Plane_pb2 import FligthTrackpointInfoSets # 航班轨迹信息
from AirplaneSpiders.proto.route_pb2 import RouteInfoSets
class ProtobufSavePipeline(object):
def open_spider(self, spider):
self.retention_hours = spider.settings['RETENTION_HOURS']
module_path = spider.settings['PROTO_MODULE_PATH']
self.klass = spider.settings['PROTO_CLASS_NAME']
self.itemListName = spider.settings['PROTO_FIELD_NAME']
self.saveFileName = spider.settings['PROTO_SAVE_FILE_PATH'] + '/' + spider.settings['PROTO_SAVE_FILE_NAME']
self.batch_save_number = spider.settings['BATCH_SAVE_SIZE']
self.current_number = 0
self.module = importlib.import_module(module_path)
self.sets = getattr(self.module, self.klass)()
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.imgSaveList = []
self.attachSaveList = []
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
def close_spider(self, spider):
if len(getattr(self.sets, self.itemListName)) == 0:
return
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
# f.write(self.sets.SerializeToString())
if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[
'IMAGES_STORE'] != '':
# fromPath = os.path.join(spider.settings['IMAGES_STORE'],'full')
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
self.write_to_zip(self.imgSaveList, toPath)
self.imgSaveList = []
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
self.producer.flush()
self.producer.close()
# 删除最后修改时间大于保存时间的文件
self.del_file(spider.settings['PROTO_SAVE_FILE_PATH'])
self.del_file(spider.settings['FILES_STORE'])
self.del_file(spider.settings['IMAGES_STORE'])
self.del_file(spider.settings['FILE_TRANS_PATH'])
def process_item(self, item, spider):
protoData = getattr(self.sets, self.itemListName).add()
for field in item.fields:
if field in item and item[field] != '' and hasattr(protoData, field):
setattr(protoData, field, str(item[field]))
if field == spider.settings['IMAGES_RESULT_FIELD']:
for img in item[field]:
self.imgSaveList.append(os.path.join(spider.settings['IMAGES_STORE'], img['path']))
# 批次保存采集结果
if self.current_number >= self.batch_save_number:
# 压缩保存图片
if 'IMAGES_STORE' in spider.settings and spider.settings['IMAGES_STORE'] and spider.settings[
'IMAGES_STORE'] != '':
# fromPath = os.path.join(spider.settings['IMAGES_STORE'],'full')
toPath = os.path.join(spider.settings['FILE_TRANS_PATH'],
spider.settings['ZIP_FILE_NAME'] + str(int(time.time() * 1000)) + '.tar.gz')
self.write_to_zip(self.imgSaveList, toPath)
self.imgSaveList = []
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
try:
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
self.producer.flush()
except:
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
self.producer.flush()
self.current_number = 0
self.sets = getattr(self.module, self.klass)()
self.current_number += 1
return item
def write_to_zip(self, fileList, toPath):
if len(fileList) == 0:
return
with tarfile.open(toPath, 'w:gz') as zipFile:
for filePath in fileList:
try:
if not os.path.exists(filePath):
continue
arcname = "full" + filePath[filePath.rindex('/'):] # 文件相对路径
zipFile.add(filePath, arcname)
except:
pass
for filePath in fileList:
if not os.path.exists(filePath):
continue
os.remove(filePath)
# 发送压缩包至kafka
self.send_file_info_to_kafka(toPath)
def send_file_info_to_kafka(self, file_path):
last_index = file_path.rfind('/')
if last_index < file_path.rfind('\\'):
last_index = file_path.rfind('\\')
file_name = file_path[last_index + 1:]
HOST_IP = os.getenv('HOST_IP')
file_url = f'http://{HOST_IP}:28086/{file_name}'
file_info = {
'fileName': file_name,
'url': file_url
}
content = base64.b64encode(json.dumps(file_info).encode('utf-8'))
self.producer.send('stream-file-dl', content, partition=0)
self.producer.flush()
def del_file(self, path):
for root, dirs, files in os.walk(path, topdown=False):
for file in files:
filePath = os.path.join(root, file)
st_info = os.stat(filePath)
if time.time() - st_info.st_mtime > self.retention_hours * 60 * 60:
os.remove(filePath)
# 在爬取飞机实时位置信息时将其航班id进行存储给航班采集器使用
class FlightIdSavePipeline(object):
def open_spider(self, spider):
# 缓存采到的飞机注册号和flightid信息在爬虫关闭时统一写入redis
self.redisQueue = []
def close_spider(self, spider):
self.redisClient = redis.Redis(host=spider.settings['REDIS_HOST'], port=int(spider.settings['REDIS_PORT']),
password=spider.settings['REDIS_PWD'])
with self.redisClient.pipeline(transaction=False) as p:
for i in self.redisQueue:
for k in i.keys():
p.sadd(k, json.dumps(i[k]))
p.execute()
self.redisClient.close()
def process_item(self, item, spider):
if (spider.name == 'rb24_airplane_live'):
aRegs = {'areg': item['PlaneREG'], 'flightid': item['FlightID'][2:]} # 将飞机注册号机radarbox的flightid信息存进redis
data = {'AirplaneSpiders:aRegs': aRegs}
self.redisQueue.append(data)
return item
elif (spider.name == 'flightradar24_airplane_live'):
aRegs_2 = {'areg': item['PlaneREG'], 'flightid': item['FlightID'][2:]} # flightradar24
data = {'AirplaneSpiders:aRegs_2': aRegs_2}
self.redisQueue.append(data)
return item
return item
class RouteInfoSavePipeline(object):
def open_spider(self, spider):
self.sets = RouteInfoSets()
self.saveFileName = spider.settings['PROTO_SAVE_FILE_PATH'] + '/' + spider.settings['PROTO_SAVE_FILE_NAME']
self.batch_save_number = 500
self.current_number = 0
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
if not os.path.exists(spider.settings['PROTO_SAVE_FILE_PATH']):
os.makedirs(spider.settings['PROTO_SAVE_FILE_PATH'])
def close_spider(self, spider):
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
# f.write(self.sets.SerializeToString())
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
self.producer.close()
def process_item(self, item, spider):
routeinfo = self.sets.RouteInfo.add()
routeinfo.Name = item['Name']
routeinfo.StartBaseCode = item['StartBaseCode']
routeinfo.EndBaseCode = item['EndBaseCode']
routeinfo.Type = item['Type']
routeinfo.UpdateTime = item['UpdateTime']
routeinfo.LastTime = item['LastTime']
for nav_point_item in item['Navpoints']:
p = routeinfo.Navpoints.add()
p.NavpointCode = nav_point_item['NavpointCode']
p.Name = nav_point_item['Name']
p.Longitude = nav_point_item['Longitude']
p.Latitude = nav_point_item['Latitude']
p.Angle = nav_point_item['Angle']
p.Seq = nav_point_item['Seq']
p.UpdateTime = nav_point_item['UpdateTime']
p.LastTime = nav_point_item['LastTime']
if self.current_number > self.batch_save_number:
# 将数据发送至消息中心
jsonData = {
'content': self.sets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
try:
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
self.producer.flush()
except:
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
self.producer.flush()
self.sets = RouteInfoSets()
self.current_number = 0
self.current_number += 1
return item
class HistoryTrackSavePipeline(object):
def open_spider(self, spider):
self.saveFileName = spider.settings['PROTO_SAVE_FILE_PATH'] + '/' + spider.settings['PROTO_SAVE_FILE_NAME']
self.redisClient = redis.Redis(host=spider.settings['REDIS_HOST'], port=int(spider.settings['REDIS_PORT']),
password=spider.settings['REDIS_PWD'])
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
def close_spider(self, spider):
flightIds = self.redisClient.smembers('planeHistoryFids')
while len(flightIds) > 0:
trackSets = FligthTrackpointInfoSets()
for fid in flightIds:
if self.redisClient.llen(fid) > 0:
item = json.loads(self.redisClient.lpop(fid))
trackInfo = trackSets.FligthTrackpointInfo.add()
for key in item.keys():
if hasattr(trackInfo, key):
setattr(trackInfo, key, str(item[key]))
else:
self.redisClient.srem('planeHistoryFids', fid)
# with open(self.saveFileName +str(int(time.time() * 1000))+'.todist','wb+') as f:
# f.write(trackSets.SerializeToString())
# 将数据发送至消息中心
jsonData = {
'content': trackSets.SerializeToString().decode('ISO-8859-1'),
'protoName': spider.settings['PROTO_SAVE_FILE_NAME'],
'processQueue': spider.settings['KAFKA_PROCESS_QUEUE']
}
content = base64.b64encode(json.dumps(jsonData).encode('utf-8'))
try:
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
except:
self.producer = KafkaProducer(bootstrap_servers=spider.settings['KAFKA_SERVER'])
self.producer.send(spider.settings['KAFKA_TOPIC'], content, partition=0)
flightIds = self.redisClient.smembers('planeHistoryFids')
self.redisClient.close()
def process_item(self, item, spider):
fid = str(item['FlightID'])
self.redisClient.sadd('planeHistoryFids', fid)
self.redisClient.lpush(fid, item.jsonEncoder())
return item