| """ |
| 定时任务管理器模块 |
| 用于管理LightGCN图重建的定时任务 |
| """ |
| import logging |
| import os |
| from datetime import datetime |
| from apscheduler.schedulers.background import BackgroundScheduler |
| from apscheduler.triggers.interval import IntervalTrigger |
| from apscheduler.executors.pool import ThreadPoolExecutor |
| from .graph_build import build_user_post_graph |
| |
| # 配置日志 |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
| |
| class SchedulerManager: |
| """定时任务管理器""" |
| |
| def __init__(self): |
| self.scheduler = None |
| self.is_running = False |
| self.last_rebuild_time = None |
| self.rebuild_count = 0 |
| self.error_count = 0 |
| self.last_error = None |
| |
| def init_scheduler(self, app): |
| """初始化调度器""" |
| if self.scheduler is None: |
| # 从配置中获取设置 |
| timezone = getattr(app.config, 'SCHEDULER_TIMEZONE', 'Asia/Shanghai') |
| max_threads = getattr(app.config, 'MAX_SCHEDULER_THREADS', 5) |
| |
| # 配置执行器 |
| executors = { |
| 'default': ThreadPoolExecutor(max_threads) |
| } |
| |
| # 创建调度器 |
| self.scheduler = BackgroundScheduler( |
| executors=executors, |
| timezone=timezone |
| ) |
| |
| app.scheduler_manager = self |
| logger.info(f"调度器初始化完成,时区: {timezone}, 最大线程数: {max_threads}") |
| |
| def rebuild_graph_job(self): |
| """重新构建LightGCN图的任务函数""" |
| try: |
| logger.info(f"开始第{self.rebuild_count + 1}次重新构建LightGCN图...") |
| start_time = datetime.now() |
| |
| # 执行图构建 |
| build_user_post_graph() |
| |
| end_time = datetime.now() |
| duration = (end_time - start_time).total_seconds() |
| |
| self.last_rebuild_time = end_time |
| self.rebuild_count += 1 |
| self.last_error = None # 清除上次错误 |
| |
| logger.info(f"LightGCN图重新构建完成,耗时: {duration:.2f}秒") |
| |
| except Exception as e: |
| self.error_count += 1 |
| self.last_error = str(e) |
| logger.error(f"LightGCN图重新构建失败: {str(e)}") |
| |
| def start_graph_rebuild_task(self, interval_minutes=1): |
| """启动图重建定时任务""" |
| if self.scheduler is None: |
| raise RuntimeError("调度器未初始化") |
| |
| job_id = 'rebuild_lightgcn_graph' |
| |
| # 如果任务已存在,先移除 |
| if self.scheduler.get_job(job_id): |
| self.scheduler.remove_job(job_id) |
| |
| # 添加新任务 |
| self.scheduler.add_job( |
| func=self.rebuild_graph_job, |
| trigger=IntervalTrigger(minutes=interval_minutes), |
| id=job_id, |
| name=f'重新构建LightGCN图(每{interval_minutes}分钟)', |
| replace_existing=True, |
| max_instances=1 # 防止重复执行 |
| ) |
| |
| if not self.scheduler.running: |
| self.scheduler.start() |
| |
| self.is_running = True |
| logger.info(f"图重建定时任务已启动,间隔: {interval_minutes}分钟") |
| |
| def stop_graph_rebuild_task(self): |
| """停止图重建定时任务""" |
| if self.scheduler and self.scheduler.running: |
| job_id = 'rebuild_lightgcn_graph' |
| if self.scheduler.get_job(job_id): |
| self.scheduler.remove_job(job_id) |
| |
| self.is_running = False |
| logger.info("图重建定时任务已停止") |
| |
| def update_task_interval(self, interval_minutes): |
| """更新任务间隔""" |
| if self.is_running: |
| self.stop_graph_rebuild_task() |
| self.start_graph_rebuild_task(interval_minutes) |
| else: |
| logger.info(f"任务间隔已更新为{interval_minutes}分钟,但任务当前未运行") |
| |
| def get_task_status(self): |
| """获取任务状态""" |
| job_id = 'rebuild_lightgcn_graph' |
| job = None |
| |
| if self.scheduler: |
| job = self.scheduler.get_job(job_id) |
| |
| return { |
| 'is_running': self.is_running, |
| 'scheduler_running': self.scheduler.running if self.scheduler else False, |
| 'job_exists': job is not None, |
| 'job_name': job.name if job else None, |
| 'next_run_time': job.next_run_time.isoformat() if job and job.next_run_time else None, |
| 'last_rebuild_time': self.last_rebuild_time.isoformat() if self.last_rebuild_time else None, |
| 'rebuild_count': self.rebuild_count, |
| 'error_count': self.error_count, |
| 'last_error': self.last_error, |
| 'success_rate': ( |
| ((self.rebuild_count - self.error_count) / self.rebuild_count * 100) |
| if self.rebuild_count > 0 else 0 |
| ) |
| } |
| |
| def shutdown(self): |
| """关闭调度器""" |
| if self.scheduler and self.scheduler.running: |
| self.scheduler.shutdown() |
| logger.info("调度器已关闭") |