登陆注册与忘记密码前后端与jwt配置
Change-Id: Ide4ca3ea34609fdb33ea027e28169852fa41784a
diff --git a/rhj/backend/app/__init__.py b/rhj/backend/app/__init__.py
new file mode 100644
index 0000000..41611ae
--- /dev/null
+++ b/rhj/backend/app/__init__.py
@@ -0,0 +1,19 @@
+from flask import Flask
+from flask_cors import CORS
+
+def create_app():
+ app = Flask(__name__)
+
+ # 启用CORS支持跨域请求
+ CORS(app)
+
+ # Load configuration
+ app.config.from_object('config.Config')
+
+ # Register blueprints or routes
+ from .routes import main as main_blueprint
+ app.register_blueprint(main_blueprint)
+
+ return app
+
+app = create_app()
diff --git a/rhj/backend/app/__pycache__/__init__.cpython-312.pyc b/rhj/backend/app/__pycache__/__init__.cpython-312.pyc
new file mode 100644
index 0000000..5c357bc
--- /dev/null
+++ b/rhj/backend/app/__pycache__/__init__.cpython-312.pyc
Binary files differ
diff --git a/rhj/backend/app/__pycache__/routes.cpython-312.pyc b/rhj/backend/app/__pycache__/routes.cpython-312.pyc
new file mode 100644
index 0000000..0ec74bd
--- /dev/null
+++ b/rhj/backend/app/__pycache__/routes.cpython-312.pyc
Binary files differ
diff --git a/rhj/backend/app/functions/FAuth.py b/rhj/backend/app/functions/FAuth.py
new file mode 100644
index 0000000..d6310a5
--- /dev/null
+++ b/rhj/backend/app/functions/FAuth.py
@@ -0,0 +1,611 @@
+from ..models.users import User as users
+from ..models.email_verification import EmailVerification
+from sqlalchemy.orm import Session
+import hashlib
+import jwt
+import smtplib
+import pytz
+from email.mime.text import MIMEText
+from email.mime.multipart import MIMEMultipart
+from datetime import datetime, timedelta
+from config import Config
+
+class FAuth:
+ def __init__(self, session: Session):
+ self.session = session
+ return
+
+ def hash_password(self, password):
+ """密码加密"""
+ return hashlib.sha256(password.encode()).hexdigest()
+
+ def is_password_hashed(self, password):
+ """检查密码是否已经被哈希加密
+
+ Args:
+ password: 密码字符串
+
+ Returns:
+ bool: 是否为已加密的密码(64位十六进制字符串)
+ """
+ import re
+ if not password or not isinstance(password, str):
+ return False
+ # SHA256 加密后是64位十六进制字符串
+ return bool(re.match(r'^[a-f0-9]{64}$', password, re.IGNORECASE))
+
+ def safe_hash_password(self, password):
+ """安全的密码加密函数,避免重复加密
+
+ Args:
+ password: 密码字符串
+
+ Returns:
+ str: 加密后的密码
+ """
+ if not password:
+ raise ValueError('密码不能为空')
+
+ # 如果已经是加密的密码,直接返回
+ if self.is_password_hashed(password):
+ return password
+
+ # 否则进行加密
+ return self.hash_password(password)
+
+ def verify_password(self, password, hashed_password):
+ """验证密码"""
+ return self.hash_password(password) == hashed_password
+
+ def generate_token(self, user_id, role='user'):
+ """生成JWT令牌"""
+ payload = {
+ 'user_id': user_id,
+ 'role': role,
+ 'exp': datetime.utcnow() + timedelta(hours=24) # 24小时过期
+ }
+ return jwt.encode(payload, Config.JWT_SECRET_KEY, algorithm='HS256')
+
+ def verify_token(self, token):
+ """验证JWT令牌"""
+ try:
+ payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=['HS256'])
+ return {
+ 'user_id': payload['user_id'],
+ 'role': payload.get('role', 'user') # 默认角色为user,兼容旧令牌
+ }
+ except jwt.ExpiredSignatureError:
+ return None
+ except jwt.InvalidTokenError:
+ return None
+
+ def login(self, username_or_email, password):
+ """用户登录"""
+ try:
+ # 查找用户
+ user = self.session.query(users).filter(
+ (users.email == username_or_email)
+ ).first()
+
+ if not user:
+ return {'success': False, 'message': '用户不存在'}
+
+ # 检查账号状态
+ if user.status != 'active':
+ return {'success': False, 'message': '账号已被禁用'}
+
+ # 验证密码(前端已加密,后端使用安全比较)
+ if not self.safe_hash_password(password) == user.password:
+ return {'success': False, 'message': '密码错误'}
+
+ # 生成令牌
+ token = self.generate_token(user.id, user.role)
+
+ return {
+ 'success': True,
+ 'message': '登录成功',
+ 'token': token,
+ 'user': user.to_dict()
+ }
+ except Exception as e:
+ print(f"Login error: {str(e)}")
+ return {'success': False, 'message': f'登录失败: {str(e)}'}
+
+ def register(self, username, email, password, verification_code):
+ """用户注册"""
+ # 检查用户名是否存在
+ existing_user = self.session.query(users).filter(
+ (users.username == username) | (users.email == email)
+ ).first()
+
+ if existing_user:
+ if existing_user.username == username:
+ return {'success': False, 'message': '用户名已存在'}
+ else:
+ return {'success': False, 'message': '邮箱已被注册'}
+
+ verification = self.session.query(EmailVerification).filter(
+ EmailVerification.email == email,
+ EmailVerification.type == 'register',
+ EmailVerification.is_verified == False
+ ).order_by(EmailVerification.created_at.desc()).first()
+
+ if not verification:
+ return {
+ 'success': False,
+ 'message': '验证码不存在或已过期'
+ }
+
+ # 验证验证码(检查是否为已加密的验证码)
+ verification_success = False
+ if self.is_password_hashed(verification_code):
+ # 如果是已加密的验证码,直接比较
+ verification_success = verification.verify_hashed(verification_code)
+ else:
+ # 如果是明文验证码,先加密再比较
+ verification_success = verification.verify(verification_code)
+ if not verification_success:
+ return {
+ 'success': False,
+ 'message': '验证码错误或已过期'
+ }
+ # 如果验证码验证成功,标记为已验证
+ verification.is_verified = True
+ verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
+
+
+ # 创建新用户(使用安全加密函数避免重复加密)
+ hashed_password = self.safe_hash_password(password)
+ new_user = users(
+ username=username,
+ email=email,
+ password=hashed_password,
+ role='user',
+ status='active'
+ )
+
+ try:
+ self.session.add(new_user)
+ self.session.commit()
+
+ # 生成令牌
+ token = self.generate_token(new_user.id, new_user.role)
+
+ return {
+ 'success': True,
+ 'message': '注册成功',
+ 'token': token,
+ 'user': new_user.to_dict()
+ }
+ except Exception as e:
+ self.session.rollback()
+ return {'success': False, 'message': '注册失败,请稍后重试'}
+
+ def get_user_by_token(self, token):
+ """通过令牌获取用户信息"""
+ token_data = self.verify_token(token)
+ if not token_data:
+ return None
+
+ user_id = token_data['user_id'] if isinstance(token_data, dict) else token_data
+ user = self.session.query(users).filter(users.id == user_id).first()
+ return user
+
+ def send_verification_email(self, email, verification_type='register', user_id=None):
+ """发送邮箱验证码
+
+ Args:
+ email: 目标邮箱地址
+ verification_type: 验证类型 ('register', 'reset_password', 'email_change')
+ user_id: 用户ID(可选)
+
+ Returns:
+ dict: 发送结果
+ """
+ try:
+ # 检查邮件配置
+ if not all([Config.MAIL_USERNAME, Config.MAIL_PASSWORD, Config.MAIL_DEFAULT_SENDER]):
+ return {
+ 'success': False,
+ 'message': '邮件服务配置不完整,请联系管理员'
+ }
+
+ if verification_type not in ['register', 'reset_password', 'email_change']:
+ return {
+ 'success': False,
+ 'message': '无效的验证类型'
+ }
+
+ if verification_type == 'reset_password' or verification_type == 'email_change':
+ # 检查用户是否存在
+ user = self.session.query(users).filter(users.email == email).first()
+ if not user:
+ return {
+ 'success': False,
+ 'message': '用户不存在或邮箱不匹配'
+ }
+ elif verification_type == 'register':
+ # 检查邮箱是否已注册
+ existing_user = self.session.query(users).filter(users.email == email).first()
+ if existing_user:
+ return {
+ 'success': False,
+ 'message': '邮箱已被注册'
+ }
+
+ # 创建验证记录
+ verification = EmailVerification.create_verification(
+ email=email,
+ verification_type=verification_type,
+ user_id=user_id,
+ expires_minutes=15 # 15分钟过期
+ )
+
+ # 保存到数据库
+ self.session.add(verification)
+
+ # 获取验证码
+ verification_code = verification.get_raw_code()
+ if not verification_code:
+ return {
+ 'success': False,
+ 'message': '验证码生成失败'
+ }
+
+ # 发送邮件
+ result = self._send_email(email, verification_code, verification_type)
+
+ if result['success']:
+ return {
+ 'success': True,
+ 'message': '验证码已发送到您的邮箱',
+ 'verification_id': verification.id
+ }
+ else:
+ # 如果邮件发送失败,删除验证记录
+ self.session.delete(verification)
+ self.session.commit()
+ return result
+
+ except Exception as e:
+ self.session.rollback()
+ print(f"Send verification email error: {str(e)}")
+ return {
+ 'success': False,
+ 'message': f'发送验证码失败: {str(e)}'
+ }
+
+ def _send_email(self, to_email, verification_code, verification_type):
+ """发送邮件的具体实现
+
+ Args:
+ to_email: 收件人邮箱
+ verification_code: 验证码
+ verification_type: 验证类型
+
+ Returns:
+ dict: 发送结果
+ """
+ try:
+ # 根据验证类型设置邮件内容
+ subject_map = {
+ 'register': '注册验证码',
+ 'reset_password': '密码重置验证码',
+ 'email_change': '邮箱变更验证码'
+ }
+
+ message_map = {
+ 'register': '欢迎注册我们的平台!',
+ 'reset_password': '您正在重置密码',
+ 'email_change': '您正在变更邮箱地址'
+ }
+
+ subject = subject_map.get(verification_type, '验证码')
+ message_intro = message_map.get(verification_type, '验证码')
+
+ # 创建邮件内容
+ msg = MIMEMultipart('alternative')
+ msg['Subject'] = subject
+ msg['From'] = Config.MAIL_DEFAULT_SENDER
+ msg['To'] = to_email
+
+ # HTML邮件内容
+ html_body = f"""
+ <!DOCTYPE html>
+ <html>
+ <head>
+ <meta charset="utf-8">
+ <title>{subject}</title>
+ </head>
+ <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
+ <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
+ <div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin-bottom: 20px;">
+ <h2 style="color: #007bff; margin-top: 0;">{message_intro}</h2>
+ <p>您的验证码是:</p>
+ <div style="background-color: #007bff; color: white; padding: 15px; border-radius: 5px; text-align: center; font-size: 24px; font-weight: bold; letter-spacing: 3px; margin: 20px 0;">
+ {verification_code}
+ </div>
+ <p style="color: #666; font-size: 14px;">
+ • 验证码有效期为15分钟<br>
+ • 请勿将验证码透露给他人<br>
+ • 如果这不是您的操作,请忽略此邮件
+ </p>
+ </div>
+ <div style="text-align: center; color: #999; font-size: 12px; border-top: 1px solid #eee; padding-top: 20px;">
+ <p>此邮件由系统自动发送,请勿回复</p>
+ </div>
+ </div>
+ </body>
+ </html>
+ """
+
+ # 纯文本内容(备用)
+ text_body = f"""
+ {message_intro}
+
+ 您的验证码是:{verification_code}
+
+ 验证码有效期为15分钟
+ 请勿将验证码透露给他人
+ 如果这不是您的操作,请忽略此邮件
+
+ 此邮件由系统自动发送,请勿回复
+ """
+
+ # 添加邮件内容
+ text_part = MIMEText(text_body, 'plain', 'utf-8')
+ html_part = MIMEText(html_body, 'html', 'utf-8')
+
+ msg.attach(text_part)
+ msg.attach(html_part)
+
+ # 连接SMTP服务器并发送邮件
+ server = None
+ try:
+ server = smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT)
+ if Config.MAIL_USE_TLS:
+ server.starttls()
+
+ server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
+ result = server.send_message(msg)
+
+ # 检查发送结果
+ if result:
+ # 如果有失败的收件人,记录日志
+ print(f"邮件发送部分失败: {result}")
+
+ return {
+ 'success': True,
+ 'message': '邮件发送成功'
+ }
+ finally:
+ # 确保连接被正确关闭
+ if server:
+ try:
+ server.quit()
+ except Exception:
+ # 如果quit()失败,强制关闭连接
+ try:
+ server.close()
+ except Exception:
+ pass
+
+ except smtplib.SMTPAuthenticationError:
+ return {
+ 'success': False,
+ 'message': '邮件服务认证失败,请检查邮箱配置'
+ }
+ except smtplib.SMTPException as e:
+ return {
+ 'success': False,
+ 'message': f'邮件发送失败: {str(e)}'
+ }
+ except Exception as e:
+ return {
+ 'success': False,
+ 'message': f'发送邮件时发生错误: {str(e)}'
+ }
+
+ def verify_email_code(self, email, code, verification_type='register'):
+ """验证邮箱验证码
+
+ Args:
+ email: 邮箱地址
+ code: 验证码
+ verification_type: 验证类型
+
+ Returns:
+ dict: 验证结果
+ """
+ try:
+ # 查找最新的未验证的验证记录
+ verification = self.session.query(EmailVerification).filter(
+ EmailVerification.email == email,
+ EmailVerification.type == verification_type,
+ EmailVerification.is_verified == False
+ ).order_by(EmailVerification.created_at.desc()).first()
+
+ if not verification:
+ return {
+ 'success': False,
+ 'message': '验证码不存在或已过期'
+ }
+
+ # 验证验证码(检查是否为已加密的验证码)
+ verification_success = False
+ if self.is_password_hashed(code):
+ # 如果是已加密的验证码,直接比较
+ verification_success = verification.verify_hashed(code)
+ else:
+ # 如果是明文验证码,先加密再比较
+ verification_success = verification.verify(code)
+
+ if verification_success:
+ # 不在这里提交事务,留给调用者决定何时提交
+ # self.session.commit() # 注释掉立即提交
+ return {
+ 'success': True,
+ 'message': '验证成功',
+ 'verification_id': verification.id
+ }
+ else:
+ return {
+ 'success': False,
+ 'message': '验证码错误或已过期'
+ }
+
+ except Exception as e:
+ print(f"Verify email code error: {str(e)}")
+ return {
+ 'success': False,
+ 'message': f'验证失败: {str(e)}'
+ }
+
+ def reset_password(self, email, new_password, verification_code):
+ """重置用户密码
+
+ Args:
+ email: 用户邮箱
+ new_password: 新密码
+ verification_code: 验证码
+
+ Returns:
+ dict: 重置结果
+ """
+ try:
+ # 检查是否有最近已验证的重置密码验证记录(5分钟内)
+
+ china_tz = pytz.timezone('Asia/Shanghai')
+ current_time = datetime.now(china_tz).replace(tzinfo=None)
+ five_minutes_ago = current_time - timedelta(minutes=5)
+
+ # 查找最近5分钟内已验证的重置密码验证记录
+ recent_verification = self.session.query(EmailVerification).filter(
+ EmailVerification.email == email,
+ EmailVerification.type == 'reset_password',
+ EmailVerification.is_verified == True,
+ EmailVerification.verified_at >= five_minutes_ago
+ ).order_by(EmailVerification.verified_at.desc()).first()
+
+ if not recent_verification:
+ return {
+ 'success': False,
+ 'message': '验证码未验证或已过期,请重新验证'
+ }
+
+ # 查找用户
+ user = self.session.query(users).filter(users.email == email).first()
+ if not user:
+ return {
+ 'success': False,
+ 'message': '用户不存在'
+ }
+
+ # 检查账号状态
+ if user.status != 'active':
+ return {
+ 'success': False,
+ 'message': '账号已被禁用,无法重置密码'
+ }
+
+ # 更新密码(使用安全加密函数避免重复加密)
+ user.password = self.safe_hash_password(new_password)
+ # 使用中国时区时间
+ china_tz = pytz.timezone('Asia/Shanghai')
+ user.updated_at = datetime.now(china_tz).replace(tzinfo=None)
+
+ # 提交更改
+ self.session.commit()
+
+ return {
+ 'success': True,
+ 'message': '密码重置成功'
+ }
+
+ except Exception as e:
+ self.session.rollback()
+ print(f"Reset password error: {str(e)}")
+ return {
+ 'success': False,
+ 'message': f'密码重置失败: {str(e)}'
+ }
+
+ def reset_password_with_verification(self, email, new_password, verification_code):
+ """重置用户密码(一步完成验证码验证和密码重置)
+
+ Args:
+ email: 用户邮箱
+ new_password: 新密码
+ verification_code: 验证码
+
+ Returns:
+ dict: 重置结果
+ """
+ try:
+ # 查找用户
+ user = self.session.query(users).filter(users.email == email).first()
+ if not user:
+ return {
+ 'success': False,
+ 'message': '用户不存在'
+ }
+
+ # 检查账号状态
+ if user.status != 'active':
+ return {
+ 'success': False,
+ 'message': '账号已被禁用,无法重置密码'
+ }
+
+ # 验证验证码
+ verification = self.session.query(EmailVerification).filter(
+ EmailVerification.email == email,
+ EmailVerification.type == 'reset_password',
+ EmailVerification.is_verified == False
+ ).order_by(EmailVerification.created_at.desc()).first()
+
+ if not verification:
+ return {
+ 'success': False,
+ 'message': '验证码不存在或已过期'
+ }
+
+ # 验证验证码(检查是否为已加密的验证码)
+ verification_success = False
+ if self.is_password_hashed(verification_code):
+ # 如果是已加密的验证码,直接比较
+ verification_success = verification.verify_hashed(verification_code)
+ else:
+ # 如果是明文验证码,先加密再比较
+ verification_success = verification.verify(verification_code)
+ if not verification_success:
+ return {
+ 'success': False,
+ 'message': '验证码错误或已过期'
+ }
+ # 如果验证码验证成功,标记为已验证
+ verification.is_verified = True
+ verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
+
+ # 更新密码(使用安全加密函数避免重复加密)
+ user.password = self.safe_hash_password(new_password)
+
+ # 使用中国时区时间更新时间戳
+ china_tz = pytz.timezone('Asia/Shanghai')
+ user.updated_at = datetime.now(china_tz).replace(tzinfo=None)
+
+ # 提交更改
+ self.session.commit()
+
+ return {
+ 'success': True,
+ 'message': '密码重置成功'
+ }
+
+ except Exception as e:
+ self.session.rollback()
+ print(f"Reset password with verification error: {str(e)}")
+ return {
+ 'success': False,
+ 'message': f'密码重置失败: {str(e)}'
+ }
diff --git a/rhj/backend/app/functions/__pycache__/FAuth.cpython-312.pyc b/rhj/backend/app/functions/__pycache__/FAuth.cpython-312.pyc
new file mode 100644
index 0000000..49086a6
--- /dev/null
+++ b/rhj/backend/app/functions/__pycache__/FAuth.cpython-312.pyc
Binary files differ
diff --git a/rhj/backend/app/models/__init__.py b/rhj/backend/app/models/__init__.py
new file mode 100644
index 0000000..179ba58
--- /dev/null
+++ b/rhj/backend/app/models/__init__.py
@@ -0,0 +1,7 @@
+from sqlalchemy.ext.declarative import declarative_base
+
+Base = declarative_base()
+
+# 先定义好 Base,再把所有 model import 进来,让 SQLAlchemy 一次性注册它们
+from .users import User
+from .email_verification import EmailVerification
diff --git a/rhj/backend/app/models/__pycache__/__init__.cpython-312.pyc b/rhj/backend/app/models/__pycache__/__init__.cpython-312.pyc
new file mode 100644
index 0000000..a0814d8
--- /dev/null
+++ b/rhj/backend/app/models/__pycache__/__init__.cpython-312.pyc
Binary files differ
diff --git a/rhj/backend/app/models/__pycache__/email_verification.cpython-312.pyc b/rhj/backend/app/models/__pycache__/email_verification.cpython-312.pyc
new file mode 100644
index 0000000..c1d6dfa
--- /dev/null
+++ b/rhj/backend/app/models/__pycache__/email_verification.cpython-312.pyc
Binary files differ
diff --git a/rhj/backend/app/models/__pycache__/users.cpython-312.pyc b/rhj/backend/app/models/__pycache__/users.cpython-312.pyc
new file mode 100644
index 0000000..58af35e
--- /dev/null
+++ b/rhj/backend/app/models/__pycache__/users.cpython-312.pyc
Binary files differ
diff --git a/rhj/backend/app/models/email_verification.py b/rhj/backend/app/models/email_verification.py
new file mode 100644
index 0000000..2d3f7da
--- /dev/null
+++ b/rhj/backend/app/models/email_verification.py
@@ -0,0 +1,189 @@
+# filepath: /home/ronghanji/api/API-TRM/rhj/backend/app/models/email_verification.py
+from . import Base
+from sqlalchemy import (
+ Column, Integer, String, Boolean, TIMESTAMP, text, ForeignKey
+)
+from sqlalchemy.orm import relationship
+from datetime import datetime, timedelta
+import secrets
+import string
+import hashlib
+import pytz
+
+
+class EmailVerification(Base):
+ __tablename__ = 'email_verifications'
+
+ id = Column(Integer, primary_key=True, autoincrement=True, comment='验证记录ID')
+ email = Column(String(100), nullable=False, comment='邮箱地址')
+ code = Column(String(255), nullable=False, comment='验证码(加密存储)')
+ type = Column(String(50), nullable=False, comment='验证类型:register, reset_password, email_change')
+ user_id = Column(Integer, ForeignKey('users.id'), nullable=True, comment='关联用户ID')
+ is_verified = Column(Boolean, nullable=False, default=False, comment='是否已验证')
+ expires_at = Column(TIMESTAMP, nullable=False, comment='过期时间')
+ created_at = Column(
+ TIMESTAMP,
+ nullable=False,
+ server_default=text('CURRENT_TIMESTAMP'),
+ comment='创建时间'
+ )
+ verified_at = Column(TIMESTAMP, nullable=True, comment='验证时间')
+
+ # 关联用户表
+ user = relationship("User", back_populates="email_verifications")
+
+ def __init__(self, email, verification_type, user_id=None, expires_minutes=15):
+ """初始化邮箱验证记录
+
+ Args:
+ email: 邮箱地址
+ verification_type: 验证类型
+ user_id: 用户ID(可选)
+ expires_minutes: 过期时间(分钟)
+ """
+ self.email = email
+ self.type = verification_type
+ self.user_id = user_id
+ self.is_verified = False
+
+ # 使用中国时区时间,确保与数据库时间一致
+ china_tz = pytz.timezone('Asia/Shanghai')
+ current_time = datetime.now(china_tz).replace(tzinfo=None)
+ self.expires_at = current_time + timedelta(minutes=expires_minutes)
+
+ # 生成并加密验证码
+ raw_code = self._generate_code()
+ self.code = self._hash_code(raw_code)
+ self._raw_code = raw_code # 临时存储原始验证码用于发送邮件
+
+ @classmethod
+ def create_verification(cls, email, verification_type, user_id=None, expires_minutes=15):
+ """创建验证记录
+
+ Args:
+ email: 邮箱地址
+ verification_type: 验证类型
+ user_id: 用户ID(可选)
+ expires_minutes: 过期时间(分钟)
+
+ Returns:
+ EmailVerification: 验证记录实例
+ """
+ return cls(
+ email=email,
+ verification_type=verification_type,
+ user_id=user_id,
+ expires_minutes=expires_minutes
+ )
+
+ def _generate_code(self, length=6):
+ """生成随机验证码
+
+ Args:
+ length: 验证码长度
+
+ Returns:
+ str: 验证码
+ """
+ characters = string.digits
+ return ''.join(secrets.choice(characters) for _ in range(length))
+
+ def _hash_code(self, code):
+ """对验证码进行哈希加密
+
+ Args:
+ code: 原始验证码
+
+ Returns:
+ str: 加密后的验证码
+ """
+ return hashlib.sha256(code.encode()).hexdigest()
+
+ def verify(self, input_code):
+ """验证验证码
+
+ Args:
+ input_code: 用户输入的验证码
+
+ Returns:
+ bool: 验证是否成功
+ """
+ if self.is_verified:
+ return False
+
+ if self.is_expired():
+ return False
+
+ hashed_input = self._hash_code(input_code)
+ if hashed_input == self.code:
+ # 使用中国时区时间设置验证时间
+ china_tz = pytz.timezone('Asia/Shanghai')
+ self.verified_at = datetime.now(china_tz).replace(tzinfo=None)
+ self.is_verified = True
+ return True
+
+ return False
+
+ def verify_hashed(self, hashed_code):
+ """验证已经加密的验证码
+
+ Args:
+ hashed_code: 已经加密的验证码
+
+ Returns:
+ bool: 验证是否成功
+ """
+ if self.is_verified:
+ return False
+
+ if self.is_expired():
+ return False
+
+ # 直接比较加密后的验证码
+ if hashed_code == self.code:
+ # 使用中国时区时间设置验证时间
+ china_tz = pytz.timezone('Asia/Shanghai')
+ self.verified_at = datetime.now(china_tz).replace(tzinfo=None)
+ self.is_verified = True
+ return True
+
+ return False
+
+ def is_expired(self):
+ """检查是否已过期
+
+ Returns:
+ bool: 是否已过期
+ """
+ # 使用中国时区时间进行比较
+ china_tz = pytz.timezone('Asia/Shanghai')
+ current_time = datetime.now(china_tz).replace(tzinfo=None)
+ return current_time > self.expires_at
+
+ def get_raw_code(self):
+ """获取原始验证码(仅在创建时可用)
+
+ Returns:
+ str: 原始验证码
+ """
+ return getattr(self, '_raw_code', None)
+
+ def to_dict(self):
+ """转换为字典格式
+
+ Returns:
+ dict: 对象字典表示
+ """
+ return {
+ 'id': self.id,
+ 'email': self.email,
+ 'type': self.type,
+ 'user_id': self.user_id,
+ 'is_verified': self.is_verified,
+ 'expires_at': self.expires_at.isoformat() if self.expires_at else None,
+ 'created_at': self.created_at.isoformat() if self.created_at else None,
+ 'verified_at': self.verified_at.isoformat() if self.verified_at else None
+ }
+
+ def __repr__(self):
+ return f"<EmailVerification(id={self.id}, email='{self.email}', type='{self.type}', verified={self.is_verified})>"
\ No newline at end of file
diff --git a/rhj/backend/app/models/users.py b/rhj/backend/app/models/users.py
new file mode 100644
index 0000000..8edc8be
--- /dev/null
+++ b/rhj/backend/app/models/users.py
@@ -0,0 +1,53 @@
+from . import Base
+from sqlalchemy import (
+ Column, Integer, String, Enum, TIMESTAMP, text
+)
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import relationship
+
+
+class User(Base):
+ __tablename__ = 'users'
+
+ def to_dict(self):
+ return {
+ 'id': self.id,
+ 'username': self.username if self.username else None,
+ 'email': self.email if self.email else None,
+ 'avatar': self.avatar if self.avatar else None,
+ 'role': self.role if self.role else None,
+ 'bio': self.bio if self.bio else None,
+ 'status': self.status if self.status else None,
+ 'created_at': self.created_at.isoformat() if self.created_at else None,
+ 'updated_at': self.updated_at.isoformat() if self.updated_at else None
+ }
+
+ id = Column(Integer, primary_key=True, autoincrement=True, comment='用户ID')
+ username = Column(String(50), nullable=False, unique=True, comment='用户名')
+ password = Column(String(255), nullable=False, comment='加密密码')
+ email = Column(String(100), nullable=False, unique=True, comment='邮箱')
+ avatar = Column(String(255), comment='头像URL')
+ role = Column(Enum('user', 'admin', 'superadmin', name='user_role'), comment='角色')
+ bio = Column(String(255), comment='个人简介')
+ status = Column(
+ Enum('active','banned','muted', name='user_status'),
+ nullable=False,
+ server_default=text("'active'"),
+ comment='账号状态'
+ )
+ created_at = Column(
+ TIMESTAMP,
+ nullable=True,
+ server_default=text('CURRENT_TIMESTAMP'),
+ comment='创建时间'
+ )
+ updated_at = Column(
+ TIMESTAMP,
+ nullable=True,
+ server_default=text('CURRENT_TIMESTAMP'),
+ onupdate=text('CURRENT_TIMESTAMP'),
+ comment='更新时间'
+ )
+
+ # 关联关系
+ email_verifications = relationship("EmailVerification", back_populates="user")
diff --git a/rhj/backend/app/routes.py b/rhj/backend/app/routes.py
new file mode 100644
index 0000000..23ff49b
--- /dev/null
+++ b/rhj/backend/app/routes.py
@@ -0,0 +1,325 @@
+from flask import Blueprint, request, jsonify
+from .functions.FAuth import FAuth
+from sqlalchemy import create_engine
+from sqlalchemy.orm import sessionmaker
+from config import Config
+from functools import wraps
+from datetime import datetime
+
+main = Blueprint('main', __name__)
+
+def token_required(f):
+ """装饰器:需要令牌验证"""
+ @wraps(f)
+ def decorated(*args, **kwargs):
+ token = request.headers.get('Authorization')
+ if not token:
+ return jsonify({'success': False, 'message': '缺少访问令牌'}), 401
+
+ session = None
+ try:
+ # 移除Bearer前缀
+ if token.startswith('Bearer '):
+ token = token[7:]
+
+ engine = create_engine(Config.SQLURL)
+ SessionLocal = sessionmaker(bind=engine)
+ session = SessionLocal()
+ f_auth = FAuth(session)
+
+ user = f_auth.get_user_by_token(token)
+ if not user:
+ return jsonify({'success': False, 'message': '无效的访问令牌'}), 401
+
+ # 将用户信息传递给路由函数
+ return f(user, *args, **kwargs)
+ except Exception as e:
+ if session:
+ session.rollback()
+ return jsonify({'success': False, 'message': '令牌验证失败'}), 401
+ finally:
+ if session:
+ session.close()
+
+ return decorated
+
+@main.route('/login', methods=['POST'])
+def login():
+ """用户登录接口"""
+ session = None
+ try:
+ data = request.get_json()
+
+ # 验证必填字段
+ if not data or not data.get('email') or not data.get('password'):
+ return jsonify({
+ 'success': False,
+ 'message': '用户名和密码不能为空'
+ }), 400
+
+ email = data['email']
+ password = data['password']
+
+ # 创建数据库连接
+ engine = create_engine(Config.SQLURL)
+ SessionLocal = sessionmaker(bind=engine)
+ session = SessionLocal()
+
+ # 执行登录
+ f_auth = FAuth(session)
+ result = f_auth.login(email, password)
+
+ if result['success']:
+ session.commit()
+ return jsonify(result), 200
+ else:
+ return jsonify(result), 401
+
+ except Exception as e:
+ if session:
+ session.rollback()
+ return jsonify({
+ 'success': False,
+ 'message': '服务器内部错误'
+ }), 500
+ finally:
+ if session:
+ session.close()
+
+@main.route('/register', methods=['POST'])
+def register():
+ """用户注册接口"""
+
+ engine = create_engine(Config.SQLURL)
+ SessionLocal = sessionmaker(bind=engine)
+ session = SessionLocal()
+
+ try:
+ data = request.get_json()
+
+ # 验证必填字段
+ if not data or not data.get('username') or not data.get('email') or not data.get('password') or not data.get('verification_code'):
+ return jsonify({
+ 'success': False,
+ 'message': '用户名、邮箱和密码不能为空'
+ }), 400
+
+ username = data['username']
+ email = data['email']
+ password = data['password']
+ verification_code = data['verification_code']
+
+ # 简单的邮箱格式验证
+ if '@' not in email or '.' not in email:
+ return jsonify({
+ 'success': False,
+ 'message': '邮箱格式不正确'
+ }), 400
+
+ # 密码长度验证
+ if len(password) < 6:
+ return jsonify({
+ 'success': False,
+ 'message': '密码长度不能少于6位'
+ }), 400
+
+ # 执行注册
+ f_auth = FAuth(session)
+ result = f_auth.register(username, email, password, verification_code)
+
+ if result['success']:
+ session.commit()
+ return jsonify(result), 201
+ else:
+ return jsonify(result), 400
+
+ except Exception as e:
+ session.rollback()
+ return jsonify({
+ 'success': False,
+ 'message': '服务器内部错误'
+ }), 500
+ finally:
+ session.close()
+
+@main.route('/profile', methods=['GET'])
+@token_required
+def get_profile(current_user):
+ """获取用户信息接口(需要登录)"""
+ try:
+ return jsonify({
+ 'success': True,
+ 'user': current_user.to_dict()
+ }), 200
+ except Exception as e:
+ return jsonify({
+ 'success': False,
+ 'message': '获取用户信息失败'
+ }), 500
+
+@main.route('/logout', methods=['POST'])
+@token_required
+def logout(current_user):
+ """用户登出接口(需要登录)"""
+ try:
+ # 这里可以将令牌加入黑名单(如果需要的话)
+ return jsonify({
+ 'success': True,
+ 'message': '登出成功'
+ }), 200
+ except Exception as e:
+ return jsonify({
+ 'success': False,
+ 'message': '登出失败'
+ }), 500
+
+@main.route('/send-verification-code', methods=['POST'])
+def send_verification_code():
+ """发送邮箱验证码接口"""
+
+ engine = create_engine(Config.SQLURL)
+ SessionLocal = sessionmaker(bind=engine)
+ session = SessionLocal()
+
+ try:
+ data = request.get_json()
+
+ # 验证必填字段
+ if not data or not data.get('email'):
+ return jsonify({
+ 'success': False,
+ 'message': '邮箱地址不能为空'
+ }), 400
+
+ email = data['email']
+ verification_type = data.get('type', 'register') # 默认为注册验证码
+
+ # 简单的邮箱格式验证
+ if '@' not in email or '.' not in email:
+ return jsonify({
+ 'success': False,
+ 'message': '邮箱格式不正确'
+ }), 400
+
+ # 发送验证码
+ f_auth = FAuth(session)
+ result = f_auth.send_verification_email(email, verification_type)
+
+ if result['success']:
+ session.commit()
+ return jsonify(result), 200
+ else:
+ return jsonify(result), 400
+
+ except Exception as e:
+ session.rollback()
+ return jsonify({
+ 'success': False,
+ 'message': '服务器内部错误'
+ }), 500
+ finally:
+ session.close()
+
+@main.route('/reset-password', methods=['POST'])
+def reset_password():
+ """重置密码接口"""
+
+ engine = create_engine(Config.SQLURL)
+ SessionLocal = sessionmaker(bind=engine)
+ session = SessionLocal()
+
+ try:
+ data = request.get_json()
+
+ # 验证必填字段
+ if not data or not data.get('email') or not data.get('new_password') or not data.get('verification_code'):
+ return jsonify({
+ 'success': False,
+ 'message': '邮箱地址、新密码和验证码不能为空'
+ }), 400
+
+ email = data['email']
+ new_password = data['new_password']
+ verification_code = data['verification_code']
+
+ # 简单的邮箱格式验证
+ if '@' not in email or '.' not in email:
+ return jsonify({
+ 'success': False,
+ 'message': '邮箱格式不正确'
+ }), 400
+
+ # 密码长度验证
+ if len(new_password) < 6:
+ return jsonify({
+ 'success': False,
+ 'message': '密码长度不能少于6位'
+ }), 400
+
+ # 重置密码
+ f_auth = FAuth(session)
+ result = f_auth.reset_password_with_verification(email, new_password, verification_code)
+
+ if result['success']:
+ session.commit()
+ return jsonify(result), 200
+ else:
+ return jsonify(result), 400
+
+ except Exception as e:
+ session.rollback()
+ return jsonify({
+ 'success': False,
+ 'message': '服务器内部错误'
+ }), 500
+ finally:
+ session.close()
+
+@main.route('/test-jwt', methods=['POST'])
+@token_required
+def test_jwt(current_user):
+ """测试JWT令牌接口(需要登录)"""
+ try:
+ # 获取当前请求的token(从装饰器已验证的Authorization header)
+ auth_header = request.headers.get('Authorization')
+ current_token = auth_header[7:] if auth_header and auth_header.startswith('Bearer ') else None
+
+ print(f"当前用户: {current_user.username}")
+ print(f"当前用户ID: {current_user.id}")
+ print(current_user.role)
+ print(f"Token验证成功: {current_token[:20]}..." if current_token else "No token")
+
+ # 可选:检查请求体中是否有额外的token需要验证
+ data = request.get_json() or {}
+ additional_token = data.get('token')
+
+ response_data = {
+ 'success': True,
+ 'message': 'JWT令牌验证成功',
+ 'user': current_user.to_dict(),
+ 'token_info': {
+ 'header_token_verified': True,
+ 'token_preview': current_token[:20] + "..." if current_token else None
+ }
+ }
+
+ # 如果请求体中有额外的token,也验证一下
+ if additional_token:
+ try:
+ additional_result = FAuth.verify_token(additional_token)
+ response_data['additional_token_verification'] = additional_result
+ print(f"额外token验证结果: {additional_result}")
+ except Exception as e:
+ response_data['additional_token_verification'] = {
+ 'success': False,
+ 'message': f'额外token验证失败: {str(e)}'
+ }
+
+ return jsonify(response_data), 200
+
+ except Exception as e:
+ print(f"test_jwt 错误: {str(e)}")
+ return jsonify({
+ 'success': False,
+ 'message': f'JWT令牌验证失败: {str(e)}'
+ }), 500
\ No newline at end of file