| from ..models.users import User as users |
| from ..models.post import Post as post |
| import secrets |
| import hashlib |
| from datetime import datetime, timedelta |
| from sqlalchemy.orm import Session |
| from ..models.logs import Log |
| from ..models.syscost import PerformanceData |
| # from ..models.token import Token |
| from config import Config |
| import requests |
| |
| class Fpost: |
| def __init__(self,session:Session): |
| self.session=session |
| return |
| |
| |
| def getlist(self): |
| results = self.session.query(post.id, post.title,post.status) |
| return results |
| |
| def getuserlist(self): |
| results= self.session.query(users.id, users.username, users.role) |
| return results |
| |
| def giveadmin(self,userid): |
| res=self.session.query(users).filter(users.id==userid).first() |
| if not res: |
| return False |
| res.role='admin' |
| self.session.commit() |
| return True |
| |
| def giveuser(self,userid): |
| res=self.session.query(users).filter(users.id==userid).first() |
| if not res: |
| return False |
| res.role='user' |
| self.session.commit() |
| return True |
| |
| def givesuperadmin(self,userid): |
| res=self.session.query(users).filter(users.id==userid).first() |
| if not res: |
| return False |
| res.role='superadmin' |
| self.session.commit() |
| return True |
| |
| |
| def getpost(self,postid): |
| res=self.session.query(post).filter(post.id==postid).first() |
| return res |
| def checkid(self, token, status=''): |
| """ |
| 使用前端传来的 token 调用 /verify_user 接口校验, |
| 如果接口返回 success=False 或者角色不匹配则返回 False,否则 True。 |
| """ |
| print("---------------------------------------------------------") |
| try: |
| url = f"{Config.API_BASE_URL}/verify_user" |
| headers = { |
| 'Authorization': f'Bearer {token}', |
| 'Content-Type': 'application/json' |
| } |
| payload = {'token': token} |
| resp = requests.post(url, headers=headers, json=payload) |
| |
| print(resp.json()) |
| if resp.status_code != 200: |
| return False |
| data = resp.json() |
| if not data.get('success'): |
| return False |
| return data.get('role') == status,data.get('user_id') |
| except Exception: |
| return False |
| |
| def review(self,postid,status): |
| print(status) |
| res=self.session.query(post).filter(post.id==postid).first() |
| if not res: |
| return False |
| res.status=status |
| self.session.commit() |
| return True |
| |
| # def createtoken(self, userid): |
| # """ |
| # 根据userid创建token并插入到数据库 |
| # :param userid: 用户ID |
| # :return: 生成的token字符串 |
| # """ |
| # # 生成随机盐值 |
| # salt = secrets.token_hex(16) |
| |
| # # 创建哈希值:userid + 当前时间戳 + 随机盐值 |
| # current_time = str(datetime.now().timestamp()) |
| # hash_input = f"{userid}_{current_time}_{salt}" |
| |
| # # 生成SHA256哈希值作为token |
| # token = hashlib.sha256(hash_input.encode()).hexdigest() |
| |
| # # 设置时间 |
| # created_time = datetime.now() |
| # expires_time = created_time + timedelta(days=1) # 一天后过期 |
| |
| # try: |
| # # 创建新的token记录 |
| # new_token = Token( |
| # token=token, |
| # expires_at=expires_time, |
| # created_at=created_time |
| # ) |
| |
| # # 假设self.session是数据库会话对象 |
| # self.session.add(new_token) |
| # self.session.commit() |
| |
| # return token |
| |
| # except Exception as e: |
| # self.session.rollback() |
| # raise Exception(f"创建token失败: {str(e)}") |
| |
| def recordlog(self,user_id,log_type,content,ip): |
| """ |
| 记录日志 |
| :param user_id: 用户ID |
| :param log_type: 日志类型,'access','error','behavior','system' |
| :param content: 日志内容 |
| :param ip: IP地址 |
| """ |
| try: |
| new_log = Log( |
| user_id=user_id, |
| type=log_type, |
| content=content, |
| ip=ip |
| ) |
| self.session.add(new_log) |
| self.session.commit() |
| except Exception as e: |
| self.session.rollback() |
| raise Exception(f"记录日志失败: {str(e)}") |
| |
| def getrecordlog(self): |
| res= self.session.query(Log).all() |
| return res |
| |
| def recordsyscost(self, endpoint: str, elapsed_time: float, cpu_user: float, cpu_system: float, memory_rss: int): |
| """ |
| 记录系统性能消耗到 performance_data 表 |
| :param endpoint: 请求接口路径 |
| :param elapsed_time: 总耗时(秒) |
| :param cpu_user: 用户态 CPU 时间差(秒) |
| :param cpu_system: 系统态 CPU 时间差(秒) |
| :param memory_rss: RSS 内存增量(字节) |
| """ |
| try: |
| new_record = PerformanceData( |
| endpoint=endpoint, |
| elapsed_time=elapsed_time, |
| cpu_user=cpu_user, |
| cpu_system=cpu_system, |
| memory_rss=memory_rss |
| ) |
| self.session.add(new_record) |
| self.session.commit() |
| except Exception as e: |
| self.session.rollback() |
| raise Exception(f"记录系统性能消耗失败: {e}") |
| |
| def getsyscost(self): |
| res = self.session.query(PerformanceData).order_by(PerformanceData.record_time.desc()).limit(200).all() |
| return res |