blob: d063e84412757fb2b04259f32186cc46ec1fcaba [file] [log] [blame]
Raverb48556a2025-06-18 22:55:03 +08001"""
2定时任务管理器模块
3用于管理LightGCN图重建的定时任务
4"""
5import logging
6import os
7from datetime import datetime
8from apscheduler.schedulers.background import BackgroundScheduler
9from apscheduler.triggers.interval import IntervalTrigger
10from apscheduler.executors.pool import ThreadPoolExecutor
11from .graph_build import build_user_post_graph
12
13# 配置日志
14logging.basicConfig(
15 level=logging.INFO,
16 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
17)
18logger = logging.getLogger(__name__)
19
20class SchedulerManager:
21 """定时任务管理器"""
22
23 def __init__(self):
24 self.scheduler = None
25 self.is_running = False
26 self.last_rebuild_time = None
27 self.rebuild_count = 0
28 self.error_count = 0
29 self.last_error = None
30
31 def init_scheduler(self, app):
32 """初始化调度器"""
33 if self.scheduler is None:
34 # 从配置中获取设置
35 timezone = getattr(app.config, 'SCHEDULER_TIMEZONE', 'Asia/Shanghai')
36 max_threads = getattr(app.config, 'MAX_SCHEDULER_THREADS', 5)
37
38 # 配置执行器
39 executors = {
40 'default': ThreadPoolExecutor(max_threads)
41 }
42
43 # 创建调度器
44 self.scheduler = BackgroundScheduler(
45 executors=executors,
46 timezone=timezone
47 )
48
49 app.scheduler_manager = self
50 logger.info(f"调度器初始化完成,时区: {timezone}, 最大线程数: {max_threads}")
51
52 def rebuild_graph_job(self):
53 """重新构建LightGCN图的任务函数"""
54 try:
55 logger.info(f"开始第{self.rebuild_count + 1}次重新构建LightGCN图...")
56 start_time = datetime.now()
57
58 # 执行图构建
59 build_user_post_graph()
60
61 end_time = datetime.now()
62 duration = (end_time - start_time).total_seconds()
63
64 self.last_rebuild_time = end_time
65 self.rebuild_count += 1
66 self.last_error = None # 清除上次错误
67
68 logger.info(f"LightGCN图重新构建完成,耗时: {duration:.2f}秒")
69
70 except Exception as e:
71 self.error_count += 1
72 self.last_error = str(e)
73 logger.error(f"LightGCN图重新构建失败: {str(e)}")
74
75 def start_graph_rebuild_task(self, interval_minutes=1):
76 """启动图重建定时任务"""
77 if self.scheduler is None:
78 raise RuntimeError("调度器未初始化")
79
80 job_id = 'rebuild_lightgcn_graph'
81
82 # 如果任务已存在,先移除
83 if self.scheduler.get_job(job_id):
84 self.scheduler.remove_job(job_id)
85
86 # 添加新任务
87 self.scheduler.add_job(
88 func=self.rebuild_graph_job,
89 trigger=IntervalTrigger(minutes=interval_minutes),
90 id=job_id,
91 name=f'重新构建LightGCN图(每{interval_minutes}分钟)',
92 replace_existing=True,
93 max_instances=1 # 防止重复执行
94 )
95
96 if not self.scheduler.running:
97 self.scheduler.start()
98
99 self.is_running = True
100 logger.info(f"图重建定时任务已启动,间隔: {interval_minutes}分钟")
101
102 def stop_graph_rebuild_task(self):
103 """停止图重建定时任务"""
104 if self.scheduler and self.scheduler.running:
105 job_id = 'rebuild_lightgcn_graph'
106 if self.scheduler.get_job(job_id):
107 self.scheduler.remove_job(job_id)
108
109 self.is_running = False
110 logger.info("图重建定时任务已停止")
111
112 def update_task_interval(self, interval_minutes):
113 """更新任务间隔"""
114 if self.is_running:
115 self.stop_graph_rebuild_task()
116 self.start_graph_rebuild_task(interval_minutes)
117 else:
118 logger.info(f"任务间隔已更新为{interval_minutes}分钟,但任务当前未运行")
119
120 def get_task_status(self):
121 """获取任务状态"""
122 job_id = 'rebuild_lightgcn_graph'
123 job = None
124
125 if self.scheduler:
126 job = self.scheduler.get_job(job_id)
127
128 return {
129 'is_running': self.is_running,
130 'scheduler_running': self.scheduler.running if self.scheduler else False,
131 'job_exists': job is not None,
132 'job_name': job.name if job else None,
133 'next_run_time': job.next_run_time.isoformat() if job and job.next_run_time else None,
134 'last_rebuild_time': self.last_rebuild_time.isoformat() if self.last_rebuild_time else None,
135 'rebuild_count': self.rebuild_count,
136 'error_count': self.error_count,
137 'last_error': self.last_error,
138 'success_rate': (
139 ((self.rebuild_count - self.error_count) / self.rebuild_count * 100)
140 if self.rebuild_count > 0 else 0
141 )
142 }
143
144 def shutdown(self):
145 """关闭调度器"""
146 if self.scheduler and self.scheduler.running:
147 self.scheduler.shutdown()
148 logger.info("调度器已关闭")