TRM-coding | 3127efa | 2025-06-18 22:54:25 +0800 | [diff] [blame] | 1 | """ |
| 2 | 定时任务管理器模块 |
| 3 | 用于管理LightGCN图重建的定时任务 |
| 4 | """ |
| 5 | import logging |
| 6 | import os |
| 7 | from datetime import datetime |
| 8 | from apscheduler.schedulers.background import BackgroundScheduler |
| 9 | from apscheduler.triggers.interval import IntervalTrigger |
| 10 | from apscheduler.executors.pool import ThreadPoolExecutor |
| 11 | from .graph_build import build_user_post_graph |
| 12 | |
| 13 | # 配置日志 |
| 14 | logging.basicConfig( |
| 15 | level=logging.INFO, |
| 16 | format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| 17 | ) |
| 18 | logger = logging.getLogger(__name__) |
| 19 | |
| 20 | class SchedulerManager: |
| 21 | """定时任务管理器""" |
| 22 | |
| 23 | def __init__(self): |
| 24 | self.scheduler = None |
| 25 | self.is_running = False |
| 26 | self.last_rebuild_time = None |
| 27 | self.rebuild_count = 0 |
| 28 | self.error_count = 0 |
| 29 | self.last_error = None |
| 30 | |
| 31 | def init_scheduler(self, app): |
| 32 | """初始化调度器""" |
| 33 | if self.scheduler is None: |
| 34 | # 从配置中获取设置 |
| 35 | timezone = getattr(app.config, 'SCHEDULER_TIMEZONE', 'Asia/Shanghai') |
| 36 | max_threads = getattr(app.config, 'MAX_SCHEDULER_THREADS', 5) |
| 37 | |
| 38 | # 配置执行器 |
| 39 | executors = { |
| 40 | 'default': ThreadPoolExecutor(max_threads) |
| 41 | } |
| 42 | |
| 43 | # 创建调度器 |
| 44 | self.scheduler = BackgroundScheduler( |
| 45 | executors=executors, |
| 46 | timezone=timezone |
| 47 | ) |
| 48 | |
| 49 | app.scheduler_manager = self |
| 50 | logger.info(f"调度器初始化完成,时区: {timezone}, 最大线程数: {max_threads}") |
| 51 | |
| 52 | def rebuild_graph_job(self): |
| 53 | """重新构建LightGCN图的任务函数""" |
| 54 | try: |
| 55 | logger.info(f"开始第{self.rebuild_count + 1}次重新构建LightGCN图...") |
| 56 | start_time = datetime.now() |
| 57 | |
| 58 | # 执行图构建 |
| 59 | build_user_post_graph() |
| 60 | |
| 61 | end_time = datetime.now() |
| 62 | duration = (end_time - start_time).total_seconds() |
| 63 | |
| 64 | self.last_rebuild_time = end_time |
| 65 | self.rebuild_count += 1 |
| 66 | self.last_error = None # 清除上次错误 |
| 67 | |
| 68 | logger.info(f"LightGCN图重新构建完成,耗时: {duration:.2f}秒") |
| 69 | |
| 70 | except Exception as e: |
| 71 | self.error_count += 1 |
| 72 | self.last_error = str(e) |
| 73 | logger.error(f"LightGCN图重新构建失败: {str(e)}") |
| 74 | |
| 75 | def start_graph_rebuild_task(self, interval_minutes=1): |
| 76 | """启动图重建定时任务""" |
| 77 | if self.scheduler is None: |
| 78 | raise RuntimeError("调度器未初始化") |
| 79 | |
| 80 | job_id = 'rebuild_lightgcn_graph' |
| 81 | |
| 82 | # 如果任务已存在,先移除 |
| 83 | if self.scheduler.get_job(job_id): |
| 84 | self.scheduler.remove_job(job_id) |
| 85 | |
| 86 | # 添加新任务 |
| 87 | self.scheduler.add_job( |
| 88 | func=self.rebuild_graph_job, |
| 89 | trigger=IntervalTrigger(minutes=interval_minutes), |
| 90 | id=job_id, |
| 91 | name=f'重新构建LightGCN图(每{interval_minutes}分钟)', |
| 92 | replace_existing=True, |
| 93 | max_instances=1 # 防止重复执行 |
| 94 | ) |
| 95 | |
| 96 | if not self.scheduler.running: |
| 97 | self.scheduler.start() |
| 98 | |
| 99 | self.is_running = True |
| 100 | logger.info(f"图重建定时任务已启动,间隔: {interval_minutes}分钟") |
| 101 | |
| 102 | def stop_graph_rebuild_task(self): |
| 103 | """停止图重建定时任务""" |
| 104 | if self.scheduler and self.scheduler.running: |
| 105 | job_id = 'rebuild_lightgcn_graph' |
| 106 | if self.scheduler.get_job(job_id): |
| 107 | self.scheduler.remove_job(job_id) |
| 108 | |
| 109 | self.is_running = False |
| 110 | logger.info("图重建定时任务已停止") |
| 111 | |
| 112 | def update_task_interval(self, interval_minutes): |
| 113 | """更新任务间隔""" |
| 114 | if self.is_running: |
| 115 | self.stop_graph_rebuild_task() |
| 116 | self.start_graph_rebuild_task(interval_minutes) |
| 117 | else: |
| 118 | logger.info(f"任务间隔已更新为{interval_minutes}分钟,但任务当前未运行") |
| 119 | |
| 120 | def get_task_status(self): |
| 121 | """获取任务状态""" |
| 122 | job_id = 'rebuild_lightgcn_graph' |
| 123 | job = None |
| 124 | |
| 125 | if self.scheduler: |
| 126 | job = self.scheduler.get_job(job_id) |
| 127 | |
| 128 | return { |
| 129 | 'is_running': self.is_running, |
| 130 | 'scheduler_running': self.scheduler.running if self.scheduler else False, |
| 131 | 'job_exists': job is not None, |
| 132 | 'job_name': job.name if job else None, |
| 133 | 'next_run_time': job.next_run_time.isoformat() if job and job.next_run_time else None, |
| 134 | 'last_rebuild_time': self.last_rebuild_time.isoformat() if self.last_rebuild_time else None, |
| 135 | 'rebuild_count': self.rebuild_count, |
| 136 | 'error_count': self.error_count, |
| 137 | 'last_error': self.last_error, |
| 138 | 'success_rate': ( |
| 139 | ((self.rebuild_count - self.error_count) / self.rebuild_count * 100) |
| 140 | if self.rebuild_count > 0 else 0 |
| 141 | ) |
| 142 | } |
| 143 | |
| 144 | def shutdown(self): |
| 145 | """关闭调度器""" |
| 146 | if self.scheduler and self.scheduler.running: |
| 147 | self.scheduler.shutdown() |
| 148 | logger.info("调度器已关闭") |