| from ..models.users import User as users |
| from ..models.email_verification import EmailVerification |
| from sqlalchemy.orm import Session |
| import hashlib |
| import jwt |
| import smtplib |
| import pytz |
| from email.mime.text import MIMEText |
| from email.mime.multipart import MIMEMultipart |
| from datetime import datetime, timedelta |
| from config import Config |
| |
| class FAuth: |
| def __init__(self, session: Session): |
| self.session = session |
| return |
| |
| def hash_password(self, password): |
| """密码加密""" |
| return hashlib.sha256(password.encode()).hexdigest() |
| |
| def is_password_hashed(self, password): |
| """检查密码是否已经被哈希加密 |
| |
| Args: |
| password: 密码字符串 |
| |
| Returns: |
| bool: 是否为已加密的密码(64位十六进制字符串) |
| """ |
| import re |
| if not password or not isinstance(password, str): |
| return False |
| # SHA256 加密后是64位十六进制字符串 |
| return bool(re.match(r'^[a-f0-9]{64}$', password, re.IGNORECASE)) |
| |
| def safe_hash_password(self, password): |
| """安全的密码加密函数,避免重复加密 |
| |
| Args: |
| password: 密码字符串 |
| |
| Returns: |
| str: 加密后的密码 |
| """ |
| if not password: |
| raise ValueError('密码不能为空') |
| |
| # 如果已经是加密的密码,直接返回 |
| if self.is_password_hashed(password): |
| return password |
| |
| # 否则进行加密 |
| return self.hash_password(password) |
| |
| def verify_password(self, password, hashed_password): |
| """验证密码""" |
| return self.hash_password(password) == hashed_password |
| |
| def generate_token(self, user_id, role='user'): |
| """生成JWT令牌""" |
| payload = { |
| 'user_id': user_id, |
| 'role': role, |
| 'exp': datetime.utcnow() + timedelta(hours=24) # 24小时过期 |
| } |
| return jwt.encode(payload, Config.JWT_SECRET_KEY, algorithm='HS256') |
| |
| def verify_token(self, token): |
| """验证JWT令牌""" |
| try: |
| payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=['HS256']) |
| return { |
| 'user_id': payload['user_id'], |
| 'role': payload.get('role', 'user') # 默认角色为user,兼容旧令牌 |
| } |
| except jwt.ExpiredSignatureError: |
| return None |
| except jwt.InvalidTokenError: |
| return None |
| |
| def login(self, username_or_email, password): |
| """用户登录""" |
| try: |
| # 查找用户 |
| user = self.session.query(users).filter( |
| (users.email == username_or_email) |
| ).first() |
| |
| if not user: |
| return {'success': False, 'message': '用户不存在'} |
| |
| # 检查账号状态 |
| if user.status != 'active': |
| return {'success': False, 'message': '账号已被禁用'} |
| |
| # 验证密码(前端已加密,后端使用安全比较) |
| if not self.safe_hash_password(password) == user.password: |
| return {'success': False, 'message': '密码错误'} |
| |
| # 生成令牌 |
| token = self.generate_token(user.id, user.role) |
| |
| return { |
| 'success': True, |
| 'message': '登录成功', |
| 'token': token, |
| 'user': user.to_dict() |
| } |
| except Exception as e: |
| print(f"Login error: {str(e)}") |
| return {'success': False, 'message': f'登录失败: {str(e)}'} |
| |
| def register(self, username, email, password, verification_code): |
| """用户注册""" |
| # 检查用户名是否存在 |
| existing_user = self.session.query(users).filter( |
| (users.username == username) | (users.email == email) |
| ).first() |
| |
| if existing_user: |
| if existing_user.username == username: |
| return {'success': False, 'message': '用户名已存在'} |
| else: |
| return {'success': False, 'message': '邮箱已被注册'} |
| |
| verification = self.session.query(EmailVerification).filter( |
| EmailVerification.email == email, |
| EmailVerification.type == 'register', |
| EmailVerification.is_verified == False |
| ).order_by(EmailVerification.created_at.desc()).first() |
| |
| if not verification: |
| return { |
| 'success': False, |
| 'message': '验证码不存在或已过期' |
| } |
| |
| # 验证验证码(检查是否为已加密的验证码) |
| verification_success = False |
| if self.is_password_hashed(verification_code): |
| # 如果是已加密的验证码,直接比较 |
| verification_success = verification.verify_hashed(verification_code) |
| else: |
| # 如果是明文验证码,先加密再比较 |
| verification_success = verification.verify(verification_code) |
| if not verification_success: |
| return { |
| 'success': False, |
| 'message': '验证码错误或已过期' |
| } |
| # 如果验证码验证成功,标记为已验证 |
| verification.is_verified = True |
| verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None) |
| |
| |
| # 创建新用户(使用安全加密函数避免重复加密) |
| hashed_password = self.safe_hash_password(password) |
| new_user = users( |
| username=username, |
| email=email, |
| password=hashed_password, |
| role='user', |
| status='active' |
| ) |
| |
| try: |
| self.session.add(new_user) |
| self.session.commit() |
| |
| # 生成令牌 |
| token = self.generate_token(new_user.id, new_user.role) |
| |
| return { |
| 'success': True, |
| 'message': '注册成功', |
| 'token': token, |
| 'user': new_user.to_dict() |
| } |
| except Exception as e: |
| self.session.rollback() |
| return {'success': False, 'message': '注册失败,请稍后重试'} |
| |
| def get_user_by_token(self, token): |
| """通过令牌获取用户信息""" |
| token_data = self.verify_token(token) |
| if not token_data: |
| return None |
| |
| user_id = token_data['user_id'] if isinstance(token_data, dict) else token_data |
| user = self.session.query(users).filter(users.id == user_id).first() |
| return user |
| |
| def send_verification_email(self, email, verification_type='register', user_id=None): |
| """发送邮箱验证码 |
| |
| Args: |
| email: 目标邮箱地址 |
| verification_type: 验证类型 ('register', 'reset_password', 'email_change') |
| user_id: 用户ID(可选) |
| |
| Returns: |
| dict: 发送结果 |
| """ |
| try: |
| # 检查邮件配置 |
| if not all([Config.MAIL_USERNAME, Config.MAIL_PASSWORD, Config.MAIL_DEFAULT_SENDER]): |
| return { |
| 'success': False, |
| 'message': '邮件服务配置不完整,请联系管理员' |
| } |
| |
| if verification_type not in ['register', 'reset_password', 'email_change']: |
| return { |
| 'success': False, |
| 'message': '无效的验证类型' |
| } |
| |
| if verification_type == 'reset_password' or verification_type == 'email_change': |
| # 检查用户是否存在 |
| user = self.session.query(users).filter(users.email == email).first() |
| if not user: |
| return { |
| 'success': False, |
| 'message': '用户不存在或邮箱不匹配' |
| } |
| elif verification_type == 'register': |
| # 检查邮箱是否已注册 |
| existing_user = self.session.query(users).filter(users.email == email).first() |
| if existing_user: |
| return { |
| 'success': False, |
| 'message': '邮箱已被注册' |
| } |
| |
| # 创建验证记录 |
| verification = EmailVerification.create_verification( |
| email=email, |
| verification_type=verification_type, |
| user_id=user_id, |
| expires_minutes=15 # 15分钟过期 |
| ) |
| |
| # 保存到数据库 |
| self.session.add(verification) |
| |
| # 获取验证码 |
| verification_code = verification.get_raw_code() |
| if not verification_code: |
| return { |
| 'success': False, |
| 'message': '验证码生成失败' |
| } |
| |
| # 发送邮件 |
| result = self._send_email(email, verification_code, verification_type) |
| |
| if result['success']: |
| return { |
| 'success': True, |
| 'message': '验证码已发送到您的邮箱', |
| 'verification_id': verification.id |
| } |
| else: |
| # 如果邮件发送失败,删除验证记录 |
| self.session.delete(verification) |
| self.session.commit() |
| return result |
| |
| except Exception as e: |
| self.session.rollback() |
| print(f"Send verification email error: {str(e)}") |
| return { |
| 'success': False, |
| 'message': f'发送验证码失败: {str(e)}' |
| } |
| |
| def _send_email(self, to_email, verification_code, verification_type): |
| """发送邮件的具体实现 |
| |
| Args: |
| to_email: 收件人邮箱 |
| verification_code: 验证码 |
| verification_type: 验证类型 |
| |
| Returns: |
| dict: 发送结果 |
| """ |
| try: |
| # 根据验证类型设置邮件内容 |
| subject_map = { |
| 'register': '注册验证码', |
| 'reset_password': '密码重置验证码', |
| 'email_change': '邮箱变更验证码' |
| } |
| |
| message_map = { |
| 'register': '欢迎注册我们的平台!', |
| 'reset_password': '您正在重置密码', |
| 'email_change': '您正在变更邮箱地址' |
| } |
| |
| subject = subject_map.get(verification_type, '验证码') |
| message_intro = message_map.get(verification_type, '验证码') |
| |
| # 创建邮件内容 |
| msg = MIMEMultipart('alternative') |
| msg['Subject'] = subject |
| msg['From'] = Config.MAIL_DEFAULT_SENDER |
| msg['To'] = to_email |
| |
| # HTML邮件内容 |
| html_body = f""" |
| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <title>{subject}</title> |
| </head> |
| <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;"> |
| <div style="max-width: 600px; margin: 0 auto; padding: 20px;"> |
| <div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin-bottom: 20px;"> |
| <h2 style="color: #007bff; margin-top: 0;">{message_intro}</h2> |
| <p>您的验证码是:</p> |
| <div style="background-color: #007bff; color: white; padding: 15px; border-radius: 5px; text-align: center; font-size: 24px; font-weight: bold; letter-spacing: 3px; margin: 20px 0;"> |
| {verification_code} |
| </div> |
| <p style="color: #666; font-size: 14px;"> |
| • 验证码有效期为15分钟<br> |
| • 请勿将验证码透露给他人<br> |
| • 如果这不是您的操作,请忽略此邮件 |
| </p> |
| </div> |
| <div style="text-align: center; color: #999; font-size: 12px; border-top: 1px solid #eee; padding-top: 20px;"> |
| <p>此邮件由系统自动发送,请勿回复</p> |
| </div> |
| </div> |
| </body> |
| </html> |
| """ |
| |
| # 纯文本内容(备用) |
| text_body = f""" |
| {message_intro} |
| |
| 您的验证码是:{verification_code} |
| |
| 验证码有效期为15分钟 |
| 请勿将验证码透露给他人 |
| 如果这不是您的操作,请忽略此邮件 |
| |
| 此邮件由系统自动发送,请勿回复 |
| """ |
| |
| # 添加邮件内容 |
| text_part = MIMEText(text_body, 'plain', 'utf-8') |
| html_part = MIMEText(html_body, 'html', 'utf-8') |
| |
| msg.attach(text_part) |
| msg.attach(html_part) |
| |
| # 连接SMTP服务器并发送邮件 |
| server = None |
| try: |
| server = smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT) |
| if Config.MAIL_USE_TLS: |
| server.starttls() |
| |
| server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD) |
| result = server.send_message(msg) |
| |
| # 检查发送结果 |
| if result: |
| # 如果有失败的收件人,记录日志 |
| print(f"邮件发送部分失败: {result}") |
| |
| return { |
| 'success': True, |
| 'message': '邮件发送成功' |
| } |
| finally: |
| # 确保连接被正确关闭 |
| if server: |
| try: |
| server.quit() |
| except Exception: |
| # 如果quit()失败,强制关闭连接 |
| try: |
| server.close() |
| except Exception: |
| pass |
| |
| except smtplib.SMTPAuthenticationError: |
| return { |
| 'success': False, |
| 'message': '邮件服务认证失败,请检查邮箱配置' |
| } |
| except smtplib.SMTPException as e: |
| return { |
| 'success': False, |
| 'message': f'邮件发送失败: {str(e)}' |
| } |
| except Exception as e: |
| return { |
| 'success': False, |
| 'message': f'发送邮件时发生错误: {str(e)}' |
| } |
| |
| def verify_email_code(self, email, code, verification_type='register'): |
| """验证邮箱验证码 |
| |
| Args: |
| email: 邮箱地址 |
| code: 验证码 |
| verification_type: 验证类型 |
| |
| Returns: |
| dict: 验证结果 |
| """ |
| try: |
| # 查找最新的未验证的验证记录 |
| verification = self.session.query(EmailVerification).filter( |
| EmailVerification.email == email, |
| EmailVerification.type == verification_type, |
| EmailVerification.is_verified == False |
| ).order_by(EmailVerification.created_at.desc()).first() |
| |
| if not verification: |
| return { |
| 'success': False, |
| 'message': '验证码不存在或已过期' |
| } |
| |
| # 验证验证码(检查是否为已加密的验证码) |
| verification_success = False |
| if self.is_password_hashed(code): |
| # 如果是已加密的验证码,直接比较 |
| verification_success = verification.verify_hashed(code) |
| else: |
| # 如果是明文验证码,先加密再比较 |
| verification_success = verification.verify(code) |
| |
| if verification_success: |
| # 不在这里提交事务,留给调用者决定何时提交 |
| # self.session.commit() # 注释掉立即提交 |
| return { |
| 'success': True, |
| 'message': '验证成功', |
| 'verification_id': verification.id |
| } |
| else: |
| return { |
| 'success': False, |
| 'message': '验证码错误或已过期' |
| } |
| |
| except Exception as e: |
| print(f"Verify email code error: {str(e)}") |
| return { |
| 'success': False, |
| 'message': f'验证失败: {str(e)}' |
| } |
| |
| def reset_password(self, email, new_password, verification_code): |
| """重置用户密码 |
| |
| Args: |
| email: 用户邮箱 |
| new_password: 新密码 |
| verification_code: 验证码 |
| |
| Returns: |
| dict: 重置结果 |
| """ |
| try: |
| # 检查是否有最近已验证的重置密码验证记录(5分钟内) |
| |
| china_tz = pytz.timezone('Asia/Shanghai') |
| current_time = datetime.now(china_tz).replace(tzinfo=None) |
| five_minutes_ago = current_time - timedelta(minutes=5) |
| |
| # 查找最近5分钟内已验证的重置密码验证记录 |
| recent_verification = self.session.query(EmailVerification).filter( |
| EmailVerification.email == email, |
| EmailVerification.type == 'reset_password', |
| EmailVerification.is_verified == True, |
| EmailVerification.verified_at >= five_minutes_ago |
| ).order_by(EmailVerification.verified_at.desc()).first() |
| |
| if not recent_verification: |
| return { |
| 'success': False, |
| 'message': '验证码未验证或已过期,请重新验证' |
| } |
| |
| # 查找用户 |
| user = self.session.query(users).filter(users.email == email).first() |
| if not user: |
| return { |
| 'success': False, |
| 'message': '用户不存在' |
| } |
| |
| # 检查账号状态 |
| if user.status != 'active': |
| return { |
| 'success': False, |
| 'message': '账号已被禁用,无法重置密码' |
| } |
| |
| # 更新密码(使用安全加密函数避免重复加密) |
| user.password = self.safe_hash_password(new_password) |
| # 使用中国时区时间 |
| china_tz = pytz.timezone('Asia/Shanghai') |
| user.updated_at = datetime.now(china_tz).replace(tzinfo=None) |
| |
| # 提交更改 |
| self.session.commit() |
| |
| return { |
| 'success': True, |
| 'message': '密码重置成功' |
| } |
| |
| except Exception as e: |
| self.session.rollback() |
| print(f"Reset password error: {str(e)}") |
| return { |
| 'success': False, |
| 'message': f'密码重置失败: {str(e)}' |
| } |
| |
| def reset_password_with_verification(self, email, new_password, verification_code): |
| """重置用户密码(一步完成验证码验证和密码重置) |
| |
| Args: |
| email: 用户邮箱 |
| new_password: 新密码 |
| verification_code: 验证码 |
| |
| Returns: |
| dict: 重置结果 |
| """ |
| try: |
| # 查找用户 |
| user = self.session.query(users).filter(users.email == email).first() |
| if not user: |
| return { |
| 'success': False, |
| 'message': '用户不存在' |
| } |
| |
| # 检查账号状态 |
| if user.status != 'active': |
| return { |
| 'success': False, |
| 'message': '账号已被禁用,无法重置密码' |
| } |
| |
| # 验证验证码 |
| verification = self.session.query(EmailVerification).filter( |
| EmailVerification.email == email, |
| EmailVerification.type == 'reset_password', |
| EmailVerification.is_verified == False |
| ).order_by(EmailVerification.created_at.desc()).first() |
| |
| if not verification: |
| return { |
| 'success': False, |
| 'message': '验证码不存在或已过期' |
| } |
| |
| # 验证验证码(检查是否为已加密的验证码) |
| verification_success = False |
| if self.is_password_hashed(verification_code): |
| # 如果是已加密的验证码,直接比较 |
| verification_success = verification.verify_hashed(verification_code) |
| else: |
| # 如果是明文验证码,先加密再比较 |
| verification_success = verification.verify(verification_code) |
| if not verification_success: |
| return { |
| 'success': False, |
| 'message': '验证码错误或已过期' |
| } |
| # 如果验证码验证成功,标记为已验证 |
| verification.is_verified = True |
| verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None) |
| |
| # 更新密码(使用安全加密函数避免重复加密) |
| user.password = self.safe_hash_password(new_password) |
| |
| # 使用中国时区时间更新时间戳 |
| china_tz = pytz.timezone('Asia/Shanghai') |
| user.updated_at = datetime.now(china_tz).replace(tzinfo=None) |
| |
| # 提交更改 |
| self.session.commit() |
| |
| return { |
| 'success': True, |
| 'message': '密码重置成功' |
| } |
| |
| except Exception as e: |
| self.session.rollback() |
| print(f"Reset password with verification error: {str(e)}") |
| return { |
| 'success': False, |
| 'message': f'密码重置失败: {str(e)}' |
| } |