From 89b951beb49825d173ae21a0b2b068339a9b1896 Mon Sep 17 00:00:00 2001 From: yuxin-pc Date: Mon, 9 Jun 2025 14:53:53 +0800 Subject: [PATCH] Create data_packager.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 数据自动打包 --- dsp/data_packager.py | 144 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 144 insertions(+) create mode 100644 dsp/data_packager.py diff --git a/dsp/data_packager.py b/dsp/data_packager.py new file mode 100644 index 0000000..9170678 --- /dev/null +++ b/dsp/data_packager.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python3 +import os +import tarfile +import datetime +import time +import requests +import json +import logging +from logging.handlers import RotatingFileHandler + +# 配置常量 +RECEIVED_DATA_DIR = "/app/data/received_data" +DATA_PACKAGE_DIR = "/app/data/data_package" +LOG_DIR = "/app/data/logs" +LOG_FILE = os.path.join(LOG_DIR, "data_packager.log") +API_URL = "http://38.54.94.107:28081/api/data-package/add/package" +DOWNLOAD_URL_PREFIX = "http://38.54.125.182:28086/data_package/" + +# 确保目录存在 +os.makedirs(DATA_PACKAGE_DIR, exist_ok=True) +os.makedirs(LOG_DIR, exist_ok=True) + +# 配置日志 +logger = logging.getLogger("DataPackager") +logger.setLevel(logging.INFO) +handler = RotatingFileHandler( + LOG_FILE, maxBytes=10*1024*1024, backupCount=5 +) +formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +handler.setFormatter(formatter) +logger.addHandler(handler) + +def get_yesterday_folder(): + """获取前一天的目录路径""" + yesterday = datetime.datetime.now() - datetime.timedelta(days=1) + year = yesterday.strftime("%Y") + month = yesterday.strftime("%Y-%m") + day = yesterday.strftime("%Y-%m-%d") + + folder_path = os.path.join(RECEIVED_DATA_DIR, year, month, day) + return folder_path, day + +def create_tar_gz(source_dir, output_filename): + """创建tar.gz压缩包""" + with tarfile.open(output_filename, "w:gz") as tar: + tar.add(source_dir, arcname=os.path.basename(source_dir)) + return os.path.getsize(output_filename) + +def call_api(package_name, package_size): + """调用API接口""" + download_url = f"{DOWNLOAD_URL_PREFIX}{package_name}" + data = { + "packageName": package_name, + "packageSize": package_size, + "downloadUrl": download_url + } + + try: + response = requests.post( + API_URL, + json=data, + headers={"Content-Type": "application/json"}, + timeout=10 + ) + response.raise_for_status() + logger.info(f"API调用成功: {response.status_code} - {response.text}") + return True + except requests.exceptions.RequestException as e: + logger.error(f"API调用失败: {str(e)}") + return False + +def main(): + """主处理逻辑""" + try: + # 获取前一天的目录和日期 + source_dir, day = get_yesterday_folder() + package_name = f"{day}.tar.gz" + package_path = os.path.join(DATA_PACKAGE_DIR, package_name) + + # 检查源目录是否存在 + if not os.path.exists(source_dir): + logger.warning(f"源目录不存在,跳过处理: {source_dir}") + return + + # 检查目标文件是否已存在 + if os.path.exists(package_path): + logger.warning(f"压缩包已存在,跳过处理: {package_path}") + return + + logger.info(f"开始处理目录: {source_dir}") + + # 创建压缩包 + logger.info(f"创建压缩包: {package_path}") + package_size = create_tar_gz(source_dir, package_path) + logger.info(f"压缩包创建完成,大小: {package_size} 字节") + + # 调用API + logger.info(f"调用API: {API_URL}") + if call_api(package_name, package_size): + logger.info("数据处理流程完成") + else: + logger.error("数据处理流程完成,但API调用失败") + + except Exception as e: + logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True) + +def run_scheduler(): + """运行定时任务""" + # 立即执行一次 + logger.info("脚本启动,立即执行一次") + main() + + # 设置每天凌晨1点执行 + while True: + now = datetime.datetime.now() + # 计算到下一个凌晨1点的时间 + next_run = now.replace(hour=1, minute=0, second=0, microsecond=0) + if now >= next_run: + next_run += datetime.timedelta(days=1) + + sleep_seconds = (next_run - now).total_seconds() + logger.info(f"下一次执行时间: {next_run}, 等待 {sleep_seconds:.0f} 秒") + time.sleep(sleep_seconds) + + logger.info("定时执行开始") + main() + logger.info("定时执行完成") + +if __name__ == "__main__": + # 添加控制台日志输出 + console_handler = logging.StreamHandler() + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + + logger.info("数据打包服务启动") + try: + run_scheduler() + except KeyboardInterrupt: + logger.info("数据打包服务停止") + except Exception as e: + logger.error(f"服务异常终止: {str(e)}", exc_info=True) + raise \ No newline at end of file