blob: 248ed13975b0b67c10bd0d1a8feebe9272ddb7f3 [file] [log] [blame]
from ..models.users import User as users
from ..models.post import Post as post
import secrets
import hashlib
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from ..models.logs import Log
from ..models.syscost import PerformanceData
# from ..models.token import Token
from config import Config
import requests
class Fpost:
def __init__(self,session:Session):
self.session=session
return
def getlist(self):
results = self.session.query(post.id, post.title,post.status)
return results
def getuserlist(self):
results= self.session.query(users.id, users.username, users.role)
return results
def giveadmin(self,userid):
res=self.session.query(users).filter(users.id==userid).first()
if not res:
return False
res.role='admin'
self.session.commit()
return True
def giveuser(self,userid):
res=self.session.query(users).filter(users.id==userid).first()
if not res:
return False
res.role='user'
self.session.commit()
return True
def givesuperadmin(self,userid):
res=self.session.query(users).filter(users.id==userid).first()
if not res:
return False
res.role='superadmin'
self.session.commit()
return True
def getpost(self,postid):
res=self.session.query(post).filter(post.id==postid).first()
return res
def checkid(self, token, status=''):
"""
使用前端传来的 token 调用 /verify_user 接口校验,
如果接口返回 success=False 或者角色不匹配则返回 False,否则 True。
"""
print("---------------------------------------------------------")
try:
url = f"{Config.API_BASE_URL}/verify_user"
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
payload = {'token': token}
resp = requests.post(url, headers=headers, json=payload)
print(resp.json())
if resp.status_code != 200:
return False
data = resp.json()
if not data.get('success'):
return False
return data.get('role') == status,data.get('user_id')
except Exception:
return False
def review(self,postid,status):
print(status)
res=self.session.query(post).filter(post.id==postid).first()
if not res:
return False
res.status=status
self.session.commit()
return True
# def createtoken(self, userid):
# """
# 根据userid创建token并插入到数据库
# :param userid: 用户ID
# :return: 生成的token字符串
# """
# # 生成随机盐值
# salt = secrets.token_hex(16)
# # 创建哈希值:userid + 当前时间戳 + 随机盐值
# current_time = str(datetime.now().timestamp())
# hash_input = f"{userid}_{current_time}_{salt}"
# # 生成SHA256哈希值作为token
# token = hashlib.sha256(hash_input.encode()).hexdigest()
# # 设置时间
# created_time = datetime.now()
# expires_time = created_time + timedelta(days=1) # 一天后过期
# try:
# # 创建新的token记录
# new_token = Token(
# token=token,
# expires_at=expires_time,
# created_at=created_time
# )
# # 假设self.session是数据库会话对象
# self.session.add(new_token)
# self.session.commit()
# return token
# except Exception as e:
# self.session.rollback()
# raise Exception(f"创建token失败: {str(e)}")
def recordlog(self,user_id,log_type,content,ip):
"""
记录日志
:param user_id: 用户ID
:param log_type: 日志类型,'access','error','behavior','system'
:param content: 日志内容
:param ip: IP地址
"""
try:
new_log = Log(
user_id=user_id,
type=log_type,
content=content,
ip=ip
)
self.session.add(new_log)
self.session.commit()
except Exception as e:
self.session.rollback()
raise Exception(f"记录日志失败: {str(e)}")
def getrecordlog(self):
res= self.session.query(Log).all()
return res
def recordsyscost(self, endpoint: str, elapsed_time: float, cpu_user: float, cpu_system: float, memory_rss: int):
"""
记录系统性能消耗到 performance_data 表
:param endpoint: 请求接口路径
:param elapsed_time: 总耗时(秒)
:param cpu_user: 用户态 CPU 时间差(秒)
:param cpu_system: 系统态 CPU 时间差(秒)
:param memory_rss: RSS 内存增量(字节)
"""
try:
new_record = PerformanceData(
endpoint=endpoint,
elapsed_time=elapsed_time,
cpu_user=cpu_user,
cpu_system=cpu_system,
memory_rss=memory_rss
)
self.session.add(new_record)
self.session.commit()
except Exception as e:
self.session.rollback()
raise Exception(f"记录系统性能消耗失败: {e}")
def getsyscost(self):
res = self.session.query(PerformanceData).order_by(PerformanceData.record_time.desc()).limit(200).all()
return res