| from flask import Blueprint, request, jsonify |
| from .functions.FAuth import FAuth |
| from .services.recommendation_service import RecommendationService |
| from sqlalchemy import create_engine |
| from sqlalchemy.orm import sessionmaker |
| from config import Config |
| from functools import wraps |
| from datetime import datetime |
| |
| main = Blueprint('main', __name__) |
| |
| # 初始化推荐服务 |
| recommendation_service = RecommendationService() |
| |
| def token_required(f): |
| """装饰器:需要令牌验证""" |
| @wraps(f) |
| def decorated(*args, **kwargs): |
| token = request.headers.get('Authorization') |
| if not token: |
| return jsonify({'success': False, 'message': '缺少访问令牌'}), 401 |
| |
| session = None |
| try: |
| # 移除Bearer前缀 |
| if token.startswith('Bearer '): |
| token = token[7:] |
| |
| engine = create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| f_auth = FAuth(session) |
| |
| user = f_auth.get_user_by_token(token) |
| if not user: |
| return jsonify({'success': False, 'message': '无效的访问令牌'}), 401 |
| |
| # 将用户信息传递给路由函数 |
| return f(user, *args, **kwargs) |
| except Exception as e: |
| if session: |
| session.rollback() |
| return jsonify({'success': False, 'message': '令牌验证失败'}), 401 |
| finally: |
| if session: |
| session.close() |
| |
| return decorated |
| |
| @main.route('/login', methods=['POST']) |
| def login(): |
| """用户登录接口""" |
| session = None |
| try: |
| data = request.get_json() |
| |
| # 验证必填字段 |
| if not data or not data.get('email') or not data.get('password'): |
| return jsonify({ |
| 'success': False, |
| 'message': '用户名和密码不能为空' |
| }), 400 |
| |
| email = data['email'] |
| password = data['password'] |
| |
| # 创建数据库连接 |
| engine = create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| |
| # 执行登录 |
| f_auth = FAuth(session) |
| result = f_auth.login(email, password) |
| |
| if result['success']: |
| session.commit() |
| return jsonify(result), 200 |
| else: |
| return jsonify(result), 401 |
| |
| except Exception as e: |
| if session: |
| session.rollback() |
| return jsonify({ |
| 'success': False, |
| 'message': '服务器内部错误' |
| }), 500 |
| finally: |
| if session: |
| session.close() |
| |
| @main.route('/register', methods=['POST']) |
| def register(): |
| """用户注册接口""" |
| |
| engine = create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| |
| try: |
| data = request.get_json() |
| |
| # 验证必填字段 |
| if not data or not data.get('username') or not data.get('email') or not data.get('password') or not data.get('verification_code'): |
| return jsonify({ |
| 'success': False, |
| 'message': '用户名、邮箱和密码不能为空' |
| }), 400 |
| |
| username = data['username'] |
| email = data['email'] |
| password = data['password'] |
| verification_code = data['verification_code'] |
| |
| # 简单的邮箱格式验证 |
| if '@' not in email or '.' not in email: |
| return jsonify({ |
| 'success': False, |
| 'message': '邮箱格式不正确' |
| }), 400 |
| |
| # 密码长度验证 |
| if len(password) < 6: |
| return jsonify({ |
| 'success': False, |
| 'message': '密码长度不能少于6位' |
| }), 400 |
| |
| # 执行注册 |
| f_auth = FAuth(session) |
| result = f_auth.register(username, email, password, verification_code) |
| |
| if result['success']: |
| session.commit() |
| return jsonify(result), 201 |
| else: |
| return jsonify(result), 400 |
| |
| except Exception as e: |
| session.rollback() |
| return jsonify({ |
| 'success': False, |
| 'message': '服务器内部错误' |
| }), 500 |
| finally: |
| session.close() |
| |
| @main.route('/profile', methods=['GET']) |
| @token_required |
| def get_profile(current_user): |
| """获取用户信息接口(需要登录)""" |
| try: |
| return jsonify({ |
| 'success': True, |
| 'user': current_user.to_dict() |
| }), 200 |
| except Exception as e: |
| return jsonify({ |
| 'success': False, |
| 'message': '获取用户信息失败' |
| }), 500 |
| |
| @main.route('/logout', methods=['POST']) |
| @token_required |
| def logout(current_user): |
| """用户登出接口(需要登录)""" |
| try: |
| # 这里可以将令牌加入黑名单(如果需要的话) |
| return jsonify({ |
| 'success': True, |
| 'message': '登出成功' |
| }), 200 |
| except Exception as e: |
| return jsonify({ |
| 'success': False, |
| 'message': '登出失败' |
| }), 500 |
| |
| @main.route('/send-verification-code', methods=['POST']) |
| def send_verification_code(): |
| """发送邮箱验证码接口""" |
| |
| engine = create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| |
| try: |
| data = request.get_json() |
| |
| # 验证必填字段 |
| if not data or not data.get('email'): |
| return jsonify({ |
| 'success': False, |
| 'message': '邮箱地址不能为空' |
| }), 400 |
| |
| email = data['email'] |
| verification_type = data.get('type', 'register') # 默认为注册验证码 |
| |
| # 简单的邮箱格式验证 |
| if '@' not in email or '.' not in email: |
| return jsonify({ |
| 'success': False, |
| 'message': '邮箱格式不正确' |
| }), 400 |
| |
| # 发送验证码 |
| f_auth = FAuth(session) |
| result = f_auth.send_verification_email(email, verification_type) |
| |
| if result['success']: |
| session.commit() |
| return jsonify(result), 200 |
| else: |
| return jsonify(result), 400 |
| |
| except Exception as e: |
| session.rollback() |
| return jsonify({ |
| 'success': False, |
| 'message': '服务器内部错误' |
| }), 500 |
| finally: |
| session.close() |
| |
| @main.route('/reset-password', methods=['POST']) |
| def reset_password(): |
| """重置密码接口""" |
| |
| engine = create_engine(Config.SQLURL) |
| SessionLocal = sessionmaker(bind=engine) |
| session = SessionLocal() |
| |
| try: |
| data = request.get_json() |
| |
| # 验证必填字段 |
| if not data or not data.get('email') or not data.get('new_password') or not data.get('verification_code'): |
| return jsonify({ |
| 'success': False, |
| 'message': '邮箱地址、新密码和验证码不能为空' |
| }), 400 |
| |
| email = data['email'] |
| new_password = data['new_password'] |
| verification_code = data['verification_code'] |
| |
| # 简单的邮箱格式验证 |
| if '@' not in email or '.' not in email: |
| return jsonify({ |
| 'success': False, |
| 'message': '邮箱格式不正确' |
| }), 400 |
| |
| # 密码长度验证 |
| if len(new_password) < 6: |
| return jsonify({ |
| 'success': False, |
| 'message': '密码长度不能少于6位' |
| }), 400 |
| |
| # 重置密码 |
| f_auth = FAuth(session) |
| result = f_auth.reset_password_with_verification(email, new_password, verification_code) |
| |
| if result['success']: |
| session.commit() |
| return jsonify(result), 200 |
| else: |
| return jsonify(result), 400 |
| |
| except Exception as e: |
| session.rollback() |
| return jsonify({ |
| 'success': False, |
| 'message': '服务器内部错误' |
| }), 500 |
| finally: |
| session.close() |
| |
| @main.route('/test-jwt', methods=['POST']) |
| @token_required |
| def test_jwt(current_user): |
| """测试JWT令牌接口(需要登录)""" |
| try: |
| # 获取当前请求的token(从装饰器已验证的Authorization header) |
| auth_header = request.headers.get('Authorization') |
| current_token = auth_header[7:] if auth_header and auth_header.startswith('Bearer ') else None |
| |
| print(f"当前用户: {current_user.username}") |
| print(f"当前用户ID: {current_user.id}") |
| print(current_user.role) |
| print(f"Token验证成功: {current_token[:20]}..." if current_token else "No token") |
| |
| # 可选:检查请求体中是否有额外的token需要验证 |
| data = request.get_json() or {} |
| additional_token = data.get('token') |
| |
| response_data = { |
| 'success': True, |
| 'message': 'JWT令牌验证成功', |
| 'user': current_user.to_dict(), |
| 'token_info': { |
| 'header_token_verified': True, |
| 'token_preview': current_token[:20] + "..." if current_token else None |
| } |
| } |
| |
| # 如果请求体中有额外的token,也验证一下 |
| if additional_token: |
| try: |
| additional_result = FAuth.verify_token(additional_token) |
| response_data['additional_token_verification'] = additional_result |
| print(f"额外token验证结果: {additional_result}") |
| except Exception as e: |
| response_data['additional_token_verification'] = { |
| 'success': False, |
| 'message': f'额外token验证失败: {str(e)}' |
| } |
| |
| return jsonify(response_data), 200 |
| |
| except Exception as e: |
| print(f"test_jwt 错误: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': f'JWT令牌验证失败: {str(e)}' |
| }), 500 |
| |
| @main.route('/verify_user', methods=['POST']) |
| @token_required |
| def verify_user(current_user): |
| """测试JWT令牌接口(需要登录)""" |
| try: |
| # 获取当前请求的token(从装饰器已验证的Authorization header) |
| auth_header = request.headers.get('Authorization') |
| current_token = auth_header[7:] if auth_header and auth_header.startswith('Bearer ') else None |
| |
| print(f"当前用户: {current_user.username}") |
| print(f"当前用户ID: {current_user.id}") |
| print(current_user.role) |
| print(f"Token验证成功: {current_token[:20]}..." if current_token else "No token") |
| |
| # 可选:检查请求体中是否有额外的token需要验证 |
| data = request.get_json() or {} |
| additional_token = data.get('token') |
| |
| response_data = { |
| 'success': True, |
| 'userid': current_user.id, |
| 'role': current_user.role, |
| } |
| |
| return jsonify(response_data), 200 |
| |
| except Exception as e: |
| print(f"用户验证错误: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': f'JWT令牌验证失败: {str(e)}' |
| }), 500 |
| |
| @main.route('/recommend', methods=['POST']) |
| @token_required |
| def get_recommendations(current_user): |
| """获取个性化推荐接口""" |
| try: |
| data = request.get_json() or {} |
| user_id = data.get('user_id') or current_user.id |
| topk = data.get('topk', 10) # 默认推荐10个 |
| |
| print(f"为用户 {user_id} 获取推荐,数量: {topk}") |
| |
| # 调用推荐系统 |
| recommendations = recommendation_service.get_recommendations(user_id, topk) |
| |
| return jsonify({ |
| 'success': True, |
| 'data': { |
| 'user_id': user_id, |
| 'recommendations': recommendations, |
| 'count': len(recommendations), |
| 'type': 'personalized' |
| }, |
| 'message': '个性化推荐获取成功' |
| }), 200 |
| |
| except Exception as e: |
| print(f"推荐系统错误: {str(e)}") |
| return jsonify({ |
| 'success': False, |
| 'message': f'推荐获取失败: {str(e)}' |
| }), 500 |