blob: d063e84412757fb2b04259f32186cc46ec1fcaba [file] [log] [blame]
"""
定时任务管理器模块
用于管理LightGCN图重建的定时任务
"""
import logging
import os
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.executors.pool import ThreadPoolExecutor
from .graph_build import build_user_post_graph
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SchedulerManager:
"""定时任务管理器"""
def __init__(self):
self.scheduler = None
self.is_running = False
self.last_rebuild_time = None
self.rebuild_count = 0
self.error_count = 0
self.last_error = None
def init_scheduler(self, app):
"""初始化调度器"""
if self.scheduler is None:
# 从配置中获取设置
timezone = getattr(app.config, 'SCHEDULER_TIMEZONE', 'Asia/Shanghai')
max_threads = getattr(app.config, 'MAX_SCHEDULER_THREADS', 5)
# 配置执行器
executors = {
'default': ThreadPoolExecutor(max_threads)
}
# 创建调度器
self.scheduler = BackgroundScheduler(
executors=executors,
timezone=timezone
)
app.scheduler_manager = self
logger.info(f"调度器初始化完成,时区: {timezone}, 最大线程数: {max_threads}")
def rebuild_graph_job(self):
"""重新构建LightGCN图的任务函数"""
try:
logger.info(f"开始第{self.rebuild_count + 1}次重新构建LightGCN图...")
start_time = datetime.now()
# 执行图构建
build_user_post_graph()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
self.last_rebuild_time = end_time
self.rebuild_count += 1
self.last_error = None # 清除上次错误
logger.info(f"LightGCN图重新构建完成,耗时: {duration:.2f}秒")
except Exception as e:
self.error_count += 1
self.last_error = str(e)
logger.error(f"LightGCN图重新构建失败: {str(e)}")
def start_graph_rebuild_task(self, interval_minutes=1):
"""启动图重建定时任务"""
if self.scheduler is None:
raise RuntimeError("调度器未初始化")
job_id = 'rebuild_lightgcn_graph'
# 如果任务已存在,先移除
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
# 添加新任务
self.scheduler.add_job(
func=self.rebuild_graph_job,
trigger=IntervalTrigger(minutes=interval_minutes),
id=job_id,
name=f'重新构建LightGCN图(每{interval_minutes}分钟)',
replace_existing=True,
max_instances=1 # 防止重复执行
)
if not self.scheduler.running:
self.scheduler.start()
self.is_running = True
logger.info(f"图重建定时任务已启动,间隔: {interval_minutes}分钟")
def stop_graph_rebuild_task(self):
"""停止图重建定时任务"""
if self.scheduler and self.scheduler.running:
job_id = 'rebuild_lightgcn_graph'
if self.scheduler.get_job(job_id):
self.scheduler.remove_job(job_id)
self.is_running = False
logger.info("图重建定时任务已停止")
def update_task_interval(self, interval_minutes):
"""更新任务间隔"""
if self.is_running:
self.stop_graph_rebuild_task()
self.start_graph_rebuild_task(interval_minutes)
else:
logger.info(f"任务间隔已更新为{interval_minutes}分钟,但任务当前未运行")
def get_task_status(self):
"""获取任务状态"""
job_id = 'rebuild_lightgcn_graph'
job = None
if self.scheduler:
job = self.scheduler.get_job(job_id)
return {
'is_running': self.is_running,
'scheduler_running': self.scheduler.running if self.scheduler else False,
'job_exists': job is not None,
'job_name': job.name if job else None,
'next_run_time': job.next_run_time.isoformat() if job and job.next_run_time else None,
'last_rebuild_time': self.last_rebuild_time.isoformat() if self.last_rebuild_time else None,
'rebuild_count': self.rebuild_count,
'error_count': self.error_count,
'last_error': self.last_error,
'success_rate': (
((self.rebuild_count - self.error_count) / self.rebuild_count * 100)
if self.rebuild_count > 0 else 0
)
}
def shutdown(self):
"""关闭调度器"""
if self.scheduler and self.scheduler.running:
self.scheduler.shutdown()
logger.info("调度器已关闭")