#!/usr/bin/env python3 import os import tarfile import datetime import time import requests import json import logging from logging.handlers import RotatingFileHandler # 配置常量 RECEIVED_DATA_DIR = "/app/data/received_data" DATA_PACKAGE_DIR = "/app/data/data_package" LOG_DIR = "/app/data/logs" LOG_FILE = os.path.join(LOG_DIR, "data_packager.log") API_URL = "http://38.54.94.107:28081/api/data-package/add/package" DOWNLOAD_URL_PREFIX = "http://38.54.125.182:28086/data_package/" # 确保目录存在 os.makedirs(DATA_PACKAGE_DIR, exist_ok=True) os.makedirs(LOG_DIR, exist_ok=True) # 配置日志 logger = logging.getLogger("DataPackager") logger.setLevel(logging.INFO) handler = RotatingFileHandler( LOG_FILE, maxBytes=10*1024*1024, backupCount=5 ) formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) handler.setFormatter(formatter) logger.addHandler(handler) def get_yesterday_folder(): """获取前一天的目录路径""" yesterday = datetime.datetime.now() - datetime.timedelta(days=1) year = yesterday.strftime("%Y") month = yesterday.strftime("%Y-%m") day = yesterday.strftime("%Y-%m-%d") folder_path = os.path.join(RECEIVED_DATA_DIR, year, month, day) return folder_path, day def create_tar_gz(source_dir, output_filename): """创建tar.gz压缩包""" with tarfile.open(output_filename, "w:gz") as tar: tar.add(source_dir, arcname=os.path.basename(source_dir)) return os.path.getsize(output_filename) def call_api(package_name, package_size): """调用API接口""" download_url = f"{DOWNLOAD_URL_PREFIX}{package_name}" data = { "packageName": package_name, "packageSize": package_size, "downloadUrl": download_url } try: response = requests.post( API_URL, json=data, headers={"Content-Type": "application/json"}, timeout=10 ) response.raise_for_status() logger.info(f"API调用成功: {response.status_code} - {response.text}") return True except requests.exceptions.RequestException as e: logger.error(f"API调用失败: {str(e)}") return False def main(): """主处理逻辑""" try: # 获取前一天的目录和日期 source_dir, day = get_yesterday_folder() package_name = f"{day}.tar.gz" package_path = os.path.join(DATA_PACKAGE_DIR, package_name) # 检查源目录是否存在 if not os.path.exists(source_dir): logger.warning(f"源目录不存在,跳过处理: {source_dir}") return # 检查目标文件是否已存在 if os.path.exists(package_path): logger.warning(f"压缩包已存在,跳过处理: {package_path}") return logger.info(f"开始处理目录: {source_dir}") # 创建压缩包 logger.info(f"创建压缩包: {package_path}") package_size = create_tar_gz(source_dir, package_path) logger.info(f"压缩包创建完成,大小: {package_size} 字节") # 调用API logger.info(f"调用API: {API_URL}") if call_api(package_name, package_size): logger.info("数据处理流程完成") else: logger.error("数据处理流程完成,但API调用失败") except Exception as e: logger.error(f"处理过程中发生错误: {str(e)}", exc_info=True) def run_scheduler(): """运行定时任务""" # 立即执行一次 logger.info("脚本启动,立即执行一次") main() # 设置每天凌晨1点执行 while True: now = datetime.datetime.now() # 计算到下一个凌晨1点的时间 next_run = now.replace(hour=1, minute=0, second=0, microsecond=0) if now >= next_run: next_run += datetime.timedelta(days=1) sleep_seconds = (next_run - now).total_seconds() logger.info(f"下一次执行时间: {next_run}, 等待 {sleep_seconds:.0f} 秒") time.sleep(sleep_seconds) logger.info("定时执行开始") main() logger.info("定时执行完成") if __name__ == "__main__": # 添加控制台日志输出 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.info("数据打包服务启动") try: run_scheduler() except KeyboardInterrupt: logger.info("数据打包服务停止") except Exception as e: logger.error(f"服务异常终止: {str(e)}", exc_info=True) raise