osc/dsp/data_packager.py
yuxin-pc 89b951beb4 Create data_packager.py
数据自动打包
2025-06-09 14:53:53 +08:00

144 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import os
import tarfile
import datetime
import time
import requests
import json
import logging
from logging.handlers import RotatingFileHandler
# 配置常量
RECEIVED_DATA_DIR = "/app/data/received_data"
DATA_PACKAGE_DIR = "/app/data/data_package"
LOG_DIR = "/app/data/logs"
LOG_FILE = os.path.join(LOG_DIR, "data_packager.log")
API_URL = "http://38.54.94.107:28081/api/data-package/add/package"
DOWNLOAD_URL_PREFIX = "http://38.54.125.182:28086/data_package/"
# 确保目录存在
os.makedirs(DATA_PACKAGE_DIR, exist_ok=True)
os.makedirs(LOG_DIR, exist_ok=True)
# 配置日志
logger = logging.getLogger("DataPackager")
logger.setLevel(logging.INFO)
handler = RotatingFileHandler(
LOG_FILE, maxBytes=10*1024*1024, backupCount=5
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
def get_yesterday_folder():
"""获取前一天的目录路径"""
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
year = yesterday.strftime("%Y")
month = yesterday.strftime("%Y-%m")
day = yesterday.strftime("%Y-%m-%d")
folder_path = os.path.join(RECEIVED_DATA_DIR, year, month, day)
return folder_path, day
def create_tar_gz(source_dir, output_filename):
"""创建tar.gz压缩包"""
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname=os.path.basename(source_dir))
return os.path.getsize(output_filename)
def call_api(package_name, package_size):
"""调用API接口"""
download_url = f"{DOWNLOAD_URL_PREFIX}{package_name}"
data = {
"packageName": package_name,
"packageSize": package_size,
"downloadUrl": download_url
}
try:
response = requests.post(
API_URL,
json=data,
headers={"Content-Type": "application/json"},
timeout=10
)
response.raise_for_status()
logger.info(f"API调用成功: {response.status_code} - {response.text}")
return True
except requests.exceptions.RequestException as e:
logger.error(f"API调用失败: {str(e)}")
return False
def main():
"""主处理逻辑"""
try:
# 获取前一天的目录和日期
source_dir, day = get_yesterday_folder()
package_name = f"{day}.tar.gz"
package_path = os.path.join(DATA_PACKAGE_DIR, package_name)
# 检查源目录是否存在
if not os.path.exists(source_dir):
logger.warning(f"源目录不存在,跳过处理: {source_dir}")
return
# 检查目标文件是否已存在
if os.path.exists(package_path):
logger.warning(f"压缩包已存在,跳过处理: {package_path}")
return
logger.info(f"开始处理目录: {source_dir}")
# 创建压缩包
logger.info(f"创建压缩包: {package_path}")
package_size = create_tar_gz(source_dir, package_path)
logger.info(f"压缩包创建完成,大小: {package_size} 字节")
# 调用API
logger.info(f"调用API: {API_URL}")
if call_api(package_name, package_size):
logger.info("数据处理流程完成")
else:
logger.error("数据处理流程完成但API调用失败")
except Exception as e:
logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True)
def run_scheduler():
"""运行定时任务"""
# 立即执行一次
logger.info("脚本启动,立即执行一次")
main()
# 设置每天凌晨1点执行
while True:
now = datetime.datetime.now()
# 计算到下一个凌晨1点的时间
next_run = now.replace(hour=1, minute=0, second=0, microsecond=0)
if now >= next_run:
next_run += datetime.timedelta(days=1)
sleep_seconds = (next_run - now).total_seconds()
logger.info(f"下一次执行时间: {next_run}, 等待 {sleep_seconds:.0f}")
time.sleep(sleep_seconds)
logger.info("定时执行开始")
main()
logger.info("定时执行完成")
if __name__ == "__main__":
# 添加控制台日志输出
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
logger.info("数据打包服务启动")
try:
run_scheduler()
except KeyboardInterrupt:
logger.info("数据打包服务停止")
except Exception as e:
logger.error(f"服务异常终止: {str(e)}", exc_info=True)
raise