blob: 23ff49bc3825fb3bd246c4b9531c32f59258e479 [file] [log] [blame]
from flask import Blueprint, request, jsonify
from .functions.FAuth import FAuth
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from config import Config
from functools import wraps
from datetime import datetime
main = Blueprint('main', __name__)
def token_required(f):
"""装饰器:需要令牌验证"""
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'success': False, 'message': '缺少访问令牌'}), 401
session = None
try:
# 移除Bearer前缀
if token.startswith('Bearer '):
token = token[7:]
engine = create_engine(Config.SQLURL)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
f_auth = FAuth(session)
user = f_auth.get_user_by_token(token)
if not user:
return jsonify({'success': False, 'message': '无效的访问令牌'}), 401
# 将用户信息传递给路由函数
return f(user, *args, **kwargs)
except Exception as e:
if session:
session.rollback()
return jsonify({'success': False, 'message': '令牌验证失败'}), 401
finally:
if session:
session.close()
return decorated
@main.route('/login', methods=['POST'])
def login():
"""用户登录接口"""
session = None
try:
data = request.get_json()
# 验证必填字段
if not data or not data.get('email') or not data.get('password'):
return jsonify({
'success': False,
'message': '用户名和密码不能为空'
}), 400
email = data['email']
password = data['password']
# 创建数据库连接
engine = create_engine(Config.SQLURL)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 执行登录
f_auth = FAuth(session)
result = f_auth.login(email, password)
if result['success']:
session.commit()
return jsonify(result), 200
else:
return jsonify(result), 401
except Exception as e:
if session:
session.rollback()
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
finally:
if session:
session.close()
@main.route('/register', methods=['POST'])
def register():
"""用户注册接口"""
engine = create_engine(Config.SQLURL)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
try:
data = request.get_json()
# 验证必填字段
if not data or not data.get('username') or not data.get('email') or not data.get('password') or not data.get('verification_code'):
return jsonify({
'success': False,
'message': '用户名、邮箱和密码不能为空'
}), 400
username = data['username']
email = data['email']
password = data['password']
verification_code = data['verification_code']
# 简单的邮箱格式验证
if '@' not in email or '.' not in email:
return jsonify({
'success': False,
'message': '邮箱格式不正确'
}), 400
# 密码长度验证
if len(password) < 6:
return jsonify({
'success': False,
'message': '密码长度不能少于6位'
}), 400
# 执行注册
f_auth = FAuth(session)
result = f_auth.register(username, email, password, verification_code)
if result['success']:
session.commit()
return jsonify(result), 201
else:
return jsonify(result), 400
except Exception as e:
session.rollback()
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
finally:
session.close()
@main.route('/profile', methods=['GET'])
@token_required
def get_profile(current_user):
"""获取用户信息接口(需要登录)"""
try:
return jsonify({
'success': True,
'user': current_user.to_dict()
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': '获取用户信息失败'
}), 500
@main.route('/logout', methods=['POST'])
@token_required
def logout(current_user):
"""用户登出接口(需要登录)"""
try:
# 这里可以将令牌加入黑名单(如果需要的话)
return jsonify({
'success': True,
'message': '登出成功'
}), 200
except Exception as e:
return jsonify({
'success': False,
'message': '登出失败'
}), 500
@main.route('/send-verification-code', methods=['POST'])
def send_verification_code():
"""发送邮箱验证码接口"""
engine = create_engine(Config.SQLURL)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
try:
data = request.get_json()
# 验证必填字段
if not data or not data.get('email'):
return jsonify({
'success': False,
'message': '邮箱地址不能为空'
}), 400
email = data['email']
verification_type = data.get('type', 'register') # 默认为注册验证码
# 简单的邮箱格式验证
if '@' not in email or '.' not in email:
return jsonify({
'success': False,
'message': '邮箱格式不正确'
}), 400
# 发送验证码
f_auth = FAuth(session)
result = f_auth.send_verification_email(email, verification_type)
if result['success']:
session.commit()
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
session.rollback()
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
finally:
session.close()
@main.route('/reset-password', methods=['POST'])
def reset_password():
"""重置密码接口"""
engine = create_engine(Config.SQLURL)
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
try:
data = request.get_json()
# 验证必填字段
if not data or not data.get('email') or not data.get('new_password') or not data.get('verification_code'):
return jsonify({
'success': False,
'message': '邮箱地址、新密码和验证码不能为空'
}), 400
email = data['email']
new_password = data['new_password']
verification_code = data['verification_code']
# 简单的邮箱格式验证
if '@' not in email or '.' not in email:
return jsonify({
'success': False,
'message': '邮箱格式不正确'
}), 400
# 密码长度验证
if len(new_password) < 6:
return jsonify({
'success': False,
'message': '密码长度不能少于6位'
}), 400
# 重置密码
f_auth = FAuth(session)
result = f_auth.reset_password_with_verification(email, new_password, verification_code)
if result['success']:
session.commit()
return jsonify(result), 200
else:
return jsonify(result), 400
except Exception as e:
session.rollback()
return jsonify({
'success': False,
'message': '服务器内部错误'
}), 500
finally:
session.close()
@main.route('/test-jwt', methods=['POST'])
@token_required
def test_jwt(current_user):
"""测试JWT令牌接口(需要登录)"""
try:
# 获取当前请求的token(从装饰器已验证的Authorization header)
auth_header = request.headers.get('Authorization')
current_token = auth_header[7:] if auth_header and auth_header.startswith('Bearer ') else None
print(f"当前用户: {current_user.username}")
print(f"当前用户ID: {current_user.id}")
print(current_user.role)
print(f"Token验证成功: {current_token[:20]}..." if current_token else "No token")
# 可选:检查请求体中是否有额外的token需要验证
data = request.get_json() or {}
additional_token = data.get('token')
response_data = {
'success': True,
'message': 'JWT令牌验证成功',
'user': current_user.to_dict(),
'token_info': {
'header_token_verified': True,
'token_preview': current_token[:20] + "..." if current_token else None
}
}
# 如果请求体中有额外的token,也验证一下
if additional_token:
try:
additional_result = FAuth.verify_token(additional_token)
response_data['additional_token_verification'] = additional_result
print(f"额外token验证结果: {additional_result}")
except Exception as e:
response_data['additional_token_verification'] = {
'success': False,
'message': f'额外token验证失败: {str(e)}'
}
return jsonify(response_data), 200
except Exception as e:
print(f"test_jwt 错误: {str(e)}")
return jsonify({
'success': False,
'message': f'JWT令牌验证失败: {str(e)}'
}), 500