| from flask import Blueprint, render_template |
| from .functions.Fpost import Fpost; |
| from sqlalchemy import create_engine |
| from sqlalchemy.orm import sessionmaker |
| from config import Config |
| from flask import jsonify,request |
| |
| |
| main = Blueprint('main', __name__) |
| |
| |
| @main.route('/sgiveadmin',methods=['POST','GET']) |
| def giveadmin(): |
| data=request.get_json() |
| print(data) |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'superadmin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要超级管理员才能执行修改用户角色的操作,但是当前用户不是超级管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.giveadmin(data['targetid']) |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| f"尝试修改用户{data['targetid']}角色为admin失败,用户不存在", |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'User not found'}) |
| f.recordlog(data['userid'], |
| 'behavior', |
| f'用户角色为admin修改成功,用户ID: {data["targetid"]} 被修改为管理员', |
| request.remote_addr) |
| return jsonify({'status': 'success', 'message': 'User role updated to admin'}) |
| |
| @main.route('/sgiveuser',methods=['POST','GET']) |
| def giveuser(): |
| data=request.get_json() |
| print(data) |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'superadmin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要超级管理员才能执行修改用户角色的操作,但是当前用户不是超级管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.giveuser(data['targetid']) |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| f"尝试修改用户{data['targetid']}为user失败,用户不存在", |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'User not found'}) |
| f.recordlog(data['userid'], |
| 'behavior', |
| f'用户角色修改成功,用户ID: {data["targetid"]} 被修改为普通用户', |
| request.remote_addr) |
| return jsonify({'status': 'success', 'message': 'User role updated to user'}) |
| |
| |
| @main.route('/sgivesuperadmin',methods=['POST','GET']) |
| def givesuperadmin(): |
| data=request.get_json() |
| print(data) |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'superadmin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要超级管理员才能执行修改用户角色的操作,但是当前用户不是超级管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.givesuperadmin(data['targetid']) |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| f'尝试修改用户{data["targetid"]}角色为superadmin失败,用户不存在', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'User not found'}) |
| f.recordlog(data['userid'], |
| 'behavior', |
| f'用户角色修改成功,用户ID: {data["targetid"]} 被修改为超级管理员', |
| request.remote_addr) |
| return jsonify({'status': 'success', 'message': 'User role updated to superadmin'}) |
| |
| @main.route('/sgetuserlist',methods=['POST','GET']) |
| def userlist(): |
| data=request.get_json() |
| print(data) |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'superadmin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要超级管理员才能执行获取用户列表的操作,但是当前用户不是超级管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| res=f.getuserlist() |
| respons=[] |
| for datai in res: |
| respons.append({ |
| 'id': datai[0], |
| 'username': datai[1], |
| 'role': datai[2] |
| }) |
| |
| f.recordlog(data['userid'], |
| 'access', |
| '获取用户列表成功', |
| request.remote_addr) |
| return jsonify(respons) |
| |
| @main.route('/apostlist',methods=['POST','GET']) |
| def postlist(): |
| data=request.get_json() |
| print(data) |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'admin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要管理员才能执行获取帖子列表的操作,但是当前用户不是管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| res=f.getlist() |
| respons=[] |
| for datai in res: |
| respons.append({ |
| 'id': datai[0], |
| 'title': datai[1], |
| 'status': datai[2] |
| }) |
| f.recordlog(data['userid'], |
| 'access', |
| '获取帖子列表成功', |
| request.remote_addr) |
| return jsonify(respons) |
| |
| @main.route('/agetpost',methods=['POST','GET']) |
| def post(): |
| data=request.get_json() |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'admin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要管理员才能执行获取帖子详情的操作,但是当前用户不是管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| res=f.getpost(data['postid']) |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| f'尝试获取帖子{data["postid"]}失败,帖子不存在', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Post not found'}) |
| f.recordlog(data['userid'], |
| 'access', |
| f'获取帖子详情成功,帖子ID: {data["postid"]}', |
| request.remote_addr) |
| return jsonify(res.to_dict() if res else {}) |
| |
| @main.route('/areview',methods=['POST','GET']) |
| def review(): |
| data=request.get_json() |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'admin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要管理员才能执行帖子审核的操作,但是当前用户不是管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.review(data['postid'],data['status']) |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| f'尝试审核帖子{data["postid"]}失败,帖子不存在', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Post not found'}) |
| f.recordlog(data['userid'], |
| 'behavior', |
| f'帖子审核成功,帖子ID: {data["postid"]} 状态更新为 {data["status"]}', |
| request.remote_addr) |
| return jsonify({'status': 'success', 'message': 'Post reviewed successfully'}) |
| |
| |
| |
| @main.route('/nginxauth',methods=['POST','GET']) |
| def nginxauth(): |
| data=request.get_json() |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'admin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要管理员才能执行Nginx认证的操作,但是当前用户不是管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.nginxauth(data['postid'],data['status']) |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| f'尝试更新Nginx认证状态失败,帖子{data["postid"]}不存在', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Post not found'}) |
| f.recordlog(data['userid'], |
| 'behavior', |
| f'Nginx认证状态更新成功,帖子ID: {data["postid"]} 状态更新为 {data["status"]}', |
| request.remote_addr) |
| return jsonify({'status': 'success', 'message': 'Nginx auth updated successfully'}) |
| |
| @main.route('/getsyscost',methods=['POST','GET']) |
| def getsyscost(): |
| data=request.get_json() |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'superadmin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要管理员才能执行获取系统性能消耗的操作,但是当前用户不是管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.getsyscost() |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| '尝试获取系统性能消耗数据失败,数据不存在', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'No performance data found'}) |
| |
| f.recordlog(data['userid'], |
| 'access', |
| '获取系统性能消耗数据成功', |
| request.remote_addr) |
| resdata = [] |
| for datai in res: |
| resdata.append({ |
| 'id': datai.id, |
| 'record_time': datai.record_time.isoformat(), |
| 'endpoint': datai.endpoint, |
| 'elapsed_time': datai.elapsed_time, |
| 'cpu_user': datai.cpu_user, |
| 'cpu_system': datai.cpu_system, |
| 'memory_rss': datai.memory_rss |
| }) |
| return jsonify(resdata) |
| @main.route('/getrecordlog',methods=['POST','GET']) |
| def getrecordlog(): |
| data=request.get_json() |
| engine=create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f=Fpost(session) |
| checres=f.checkid(data['userid'],'superadmin') |
| if(not checres): |
| f.recordlog(data['userid'], |
| 'error', |
| '系统需要管理员才能执行获取日志的操作,但是当前用户不是管理员', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'Unauthorized'}) |
| |
| res=f.getrecordlog() |
| if not res: |
| f.recordlog(data['userid'], |
| 'error', |
| '尝试获取日志失败,日志不存在', |
| request.remote_addr) |
| return jsonify({'status': 'error', 'message': 'No logs found'}) |
| |
| f.recordlog(data['userid'], |
| 'access', |
| '获取日志成功', |
| request.remote_addr) |
| |
| resdata = [] |
| for datai in res: |
| resdata.append({ |
| 'id': datai.id, |
| 'user_id': datai.user_id, |
| 'type': datai.type, |
| 'content': datai.content, |
| 'ip': datai.ip, |
| 'created_at': datai.created_at.isoformat() |
| }) |
| |
| return jsonify(resdata) |
| |