Create data_packager.py
数据自动打包
This commit is contained in:
parent
9910c1479a
commit
89b951beb4
144
dsp/data_packager.py
Normal file
144
dsp/data_packager.py
Normal file
@ -0,0 +1,144 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
import os
|
||||||
|
import tarfile
|
||||||
|
import datetime
|
||||||
|
import time
|
||||||
|
import requests
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
from logging.handlers import RotatingFileHandler
|
||||||
|
|
||||||
|
# 配置常量
|
||||||
|
RECEIVED_DATA_DIR = "/app/data/received_data"
|
||||||
|
DATA_PACKAGE_DIR = "/app/data/data_package"
|
||||||
|
LOG_DIR = "/app/data/logs"
|
||||||
|
LOG_FILE = os.path.join(LOG_DIR, "data_packager.log")
|
||||||
|
API_URL = "http://38.54.94.107:28081/api/data-package/add/package"
|
||||||
|
DOWNLOAD_URL_PREFIX = "http://38.54.125.182:28086/data_package/"
|
||||||
|
|
||||||
|
# 确保目录存在
|
||||||
|
os.makedirs(DATA_PACKAGE_DIR, exist_ok=True)
|
||||||
|
os.makedirs(LOG_DIR, exist_ok=True)
|
||||||
|
|
||||||
|
# 配置日志
|
||||||
|
logger = logging.getLogger("DataPackager")
|
||||||
|
logger.setLevel(logging.INFO)
|
||||||
|
handler = RotatingFileHandler(
|
||||||
|
LOG_FILE, maxBytes=10*1024*1024, backupCount=5
|
||||||
|
)
|
||||||
|
formatter = logging.Formatter(
|
||||||
|
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||||
|
)
|
||||||
|
handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(handler)
|
||||||
|
|
||||||
|
def get_yesterday_folder():
|
||||||
|
"""获取前一天的目录路径"""
|
||||||
|
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
||||||
|
year = yesterday.strftime("%Y")
|
||||||
|
month = yesterday.strftime("%Y-%m")
|
||||||
|
day = yesterday.strftime("%Y-%m-%d")
|
||||||
|
|
||||||
|
folder_path = os.path.join(RECEIVED_DATA_DIR, year, month, day)
|
||||||
|
return folder_path, day
|
||||||
|
|
||||||
|
def create_tar_gz(source_dir, output_filename):
|
||||||
|
"""创建tar.gz压缩包"""
|
||||||
|
with tarfile.open(output_filename, "w:gz") as tar:
|
||||||
|
tar.add(source_dir, arcname=os.path.basename(source_dir))
|
||||||
|
return os.path.getsize(output_filename)
|
||||||
|
|
||||||
|
def call_api(package_name, package_size):
|
||||||
|
"""调用API接口"""
|
||||||
|
download_url = f"{DOWNLOAD_URL_PREFIX}{package_name}"
|
||||||
|
data = {
|
||||||
|
"packageName": package_name,
|
||||||
|
"packageSize": package_size,
|
||||||
|
"downloadUrl": download_url
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(
|
||||||
|
API_URL,
|
||||||
|
json=data,
|
||||||
|
headers={"Content-Type": "application/json"},
|
||||||
|
timeout=10
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
logger.info(f"API调用成功: {response.status_code} - {response.text}")
|
||||||
|
return True
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
logger.error(f"API调用失败: {str(e)}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""主处理逻辑"""
|
||||||
|
try:
|
||||||
|
# 获取前一天的目录和日期
|
||||||
|
source_dir, day = get_yesterday_folder()
|
||||||
|
package_name = f"{day}.tar.gz"
|
||||||
|
package_path = os.path.join(DATA_PACKAGE_DIR, package_name)
|
||||||
|
|
||||||
|
# 检查源目录是否存在
|
||||||
|
if not os.path.exists(source_dir):
|
||||||
|
logger.warning(f"源目录不存在,跳过处理: {source_dir}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# 检查目标文件是否已存在
|
||||||
|
if os.path.exists(package_path):
|
||||||
|
logger.warning(f"压缩包已存在,跳过处理: {package_path}")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(f"开始处理目录: {source_dir}")
|
||||||
|
|
||||||
|
# 创建压缩包
|
||||||
|
logger.info(f"创建压缩包: {package_path}")
|
||||||
|
package_size = create_tar_gz(source_dir, package_path)
|
||||||
|
logger.info(f"压缩包创建完成,大小: {package_size} 字节")
|
||||||
|
|
||||||
|
# 调用API
|
||||||
|
logger.info(f"调用API: {API_URL}")
|
||||||
|
if call_api(package_name, package_size):
|
||||||
|
logger.info("数据处理流程完成")
|
||||||
|
else:
|
||||||
|
logger.error("数据处理流程完成,但API调用失败")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
|
||||||
|
|
||||||
|
def run_scheduler():
|
||||||
|
"""运行定时任务"""
|
||||||
|
# 立即执行一次
|
||||||
|
logger.info("脚本启动,立即执行一次")
|
||||||
|
main()
|
||||||
|
|
||||||
|
# 设置每天凌晨1点执行
|
||||||
|
while True:
|
||||||
|
now = datetime.datetime.now()
|
||||||
|
# 计算到下一个凌晨1点的时间
|
||||||
|
next_run = now.replace(hour=1, minute=0, second=0, microsecond=0)
|
||||||
|
if now >= next_run:
|
||||||
|
next_run += datetime.timedelta(days=1)
|
||||||
|
|
||||||
|
sleep_seconds = (next_run - now).total_seconds()
|
||||||
|
logger.info(f"下一次执行时间: {next_run}, 等待 {sleep_seconds:.0f} 秒")
|
||||||
|
time.sleep(sleep_seconds)
|
||||||
|
|
||||||
|
logger.info("定时执行开始")
|
||||||
|
main()
|
||||||
|
logger.info("定时执行完成")
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# 添加控制台日志输出
|
||||||
|
console_handler = logging.StreamHandler()
|
||||||
|
console_handler.setFormatter(formatter)
|
||||||
|
logger.addHandler(console_handler)
|
||||||
|
|
||||||
|
logger.info("数据打包服务启动")
|
||||||
|
try:
|
||||||
|
run_scheduler()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("数据打包服务停止")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"服务异常终止: {str(e)}", exc_info=True)
|
||||||
|
raise
|
||||||
Loading…
x
Reference in New Issue
Block a user