blob: d6310a59a8947e72d2b035b6dd3faa0d8dd6faa8 [file] [log] [blame]
from ..models.users import User as users
from ..models.email_verification import EmailVerification
from sqlalchemy.orm import Session
import hashlib
import jwt
import smtplib
import pytz
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime, timedelta
from config import Config
class FAuth:
def __init__(self, session: Session):
self.session = session
return
def hash_password(self, password):
"""密码加密"""
return hashlib.sha256(password.encode()).hexdigest()
def is_password_hashed(self, password):
"""检查密码是否已经被哈希加密
Args:
password: 密码字符串
Returns:
bool: 是否为已加密的密码(64位十六进制字符串)
"""
import re
if not password or not isinstance(password, str):
return False
# SHA256 加密后是64位十六进制字符串
return bool(re.match(r'^[a-f0-9]{64}$', password, re.IGNORECASE))
def safe_hash_password(self, password):
"""安全的密码加密函数,避免重复加密
Args:
password: 密码字符串
Returns:
str: 加密后的密码
"""
if not password:
raise ValueError('密码不能为空')
# 如果已经是加密的密码,直接返回
if self.is_password_hashed(password):
return password
# 否则进行加密
return self.hash_password(password)
def verify_password(self, password, hashed_password):
"""验证密码"""
return self.hash_password(password) == hashed_password
def generate_token(self, user_id, role='user'):
"""生成JWT令牌"""
payload = {
'user_id': user_id,
'role': role,
'exp': datetime.utcnow() + timedelta(hours=24) # 24小时过期
}
return jwt.encode(payload, Config.JWT_SECRET_KEY, algorithm='HS256')
def verify_token(self, token):
"""验证JWT令牌"""
try:
payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=['HS256'])
return {
'user_id': payload['user_id'],
'role': payload.get('role', 'user') # 默认角色为user,兼容旧令牌
}
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def login(self, username_or_email, password):
"""用户登录"""
try:
# 查找用户
user = self.session.query(users).filter(
(users.email == username_or_email)
).first()
if not user:
return {'success': False, 'message': '用户不存在'}
# 检查账号状态
if user.status != 'active':
return {'success': False, 'message': '账号已被禁用'}
# 验证密码(前端已加密,后端使用安全比较)
if not self.safe_hash_password(password) == user.password:
return {'success': False, 'message': '密码错误'}
# 生成令牌
token = self.generate_token(user.id, user.role)
return {
'success': True,
'message': '登录成功',
'token': token,
'user': user.to_dict()
}
except Exception as e:
print(f"Login error: {str(e)}")
return {'success': False, 'message': f'登录失败: {str(e)}'}
def register(self, username, email, password, verification_code):
"""用户注册"""
# 检查用户名是否存在
existing_user = self.session.query(users).filter(
(users.username == username) | (users.email == email)
).first()
if existing_user:
if existing_user.username == username:
return {'success': False, 'message': '用户名已存在'}
else:
return {'success': False, 'message': '邮箱已被注册'}
verification = self.session.query(EmailVerification).filter(
EmailVerification.email == email,
EmailVerification.type == 'register',
EmailVerification.is_verified == False
).order_by(EmailVerification.created_at.desc()).first()
if not verification:
return {
'success': False,
'message': '验证码不存在或已过期'
}
# 验证验证码(检查是否为已加密的验证码)
verification_success = False
if self.is_password_hashed(verification_code):
# 如果是已加密的验证码,直接比较
verification_success = verification.verify_hashed(verification_code)
else:
# 如果是明文验证码,先加密再比较
verification_success = verification.verify(verification_code)
if not verification_success:
return {
'success': False,
'message': '验证码错误或已过期'
}
# 如果验证码验证成功,标记为已验证
verification.is_verified = True
verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
# 创建新用户(使用安全加密函数避免重复加密)
hashed_password = self.safe_hash_password(password)
new_user = users(
username=username,
email=email,
password=hashed_password,
role='user',
status='active'
)
try:
self.session.add(new_user)
self.session.commit()
# 生成令牌
token = self.generate_token(new_user.id, new_user.role)
return {
'success': True,
'message': '注册成功',
'token': token,
'user': new_user.to_dict()
}
except Exception as e:
self.session.rollback()
return {'success': False, 'message': '注册失败,请稍后重试'}
def get_user_by_token(self, token):
"""通过令牌获取用户信息"""
token_data = self.verify_token(token)
if not token_data:
return None
user_id = token_data['user_id'] if isinstance(token_data, dict) else token_data
user = self.session.query(users).filter(users.id == user_id).first()
return user
def send_verification_email(self, email, verification_type='register', user_id=None):
"""发送邮箱验证码
Args:
email: 目标邮箱地址
verification_type: 验证类型 ('register', 'reset_password', 'email_change')
user_id: 用户ID(可选)
Returns:
dict: 发送结果
"""
try:
# 检查邮件配置
if not all([Config.MAIL_USERNAME, Config.MAIL_PASSWORD, Config.MAIL_DEFAULT_SENDER]):
return {
'success': False,
'message': '邮件服务配置不完整,请联系管理员'
}
if verification_type not in ['register', 'reset_password', 'email_change']:
return {
'success': False,
'message': '无效的验证类型'
}
if verification_type == 'reset_password' or verification_type == 'email_change':
# 检查用户是否存在
user = self.session.query(users).filter(users.email == email).first()
if not user:
return {
'success': False,
'message': '用户不存在或邮箱不匹配'
}
elif verification_type == 'register':
# 检查邮箱是否已注册
existing_user = self.session.query(users).filter(users.email == email).first()
if existing_user:
return {
'success': False,
'message': '邮箱已被注册'
}
# 创建验证记录
verification = EmailVerification.create_verification(
email=email,
verification_type=verification_type,
user_id=user_id,
expires_minutes=15 # 15分钟过期
)
# 保存到数据库
self.session.add(verification)
# 获取验证码
verification_code = verification.get_raw_code()
if not verification_code:
return {
'success': False,
'message': '验证码生成失败'
}
# 发送邮件
result = self._send_email(email, verification_code, verification_type)
if result['success']:
return {
'success': True,
'message': '验证码已发送到您的邮箱',
'verification_id': verification.id
}
else:
# 如果邮件发送失败,删除验证记录
self.session.delete(verification)
self.session.commit()
return result
except Exception as e:
self.session.rollback()
print(f"Send verification email error: {str(e)}")
return {
'success': False,
'message': f'发送验证码失败: {str(e)}'
}
def _send_email(self, to_email, verification_code, verification_type):
"""发送邮件的具体实现
Args:
to_email: 收件人邮箱
verification_code: 验证码
verification_type: 验证类型
Returns:
dict: 发送结果
"""
try:
# 根据验证类型设置邮件内容
subject_map = {
'register': '注册验证码',
'reset_password': '密码重置验证码',
'email_change': '邮箱变更验证码'
}
message_map = {
'register': '欢迎注册我们的平台!',
'reset_password': '您正在重置密码',
'email_change': '您正在变更邮箱地址'
}
subject = subject_map.get(verification_type, '验证码')
message_intro = message_map.get(verification_type, '验证码')
# 创建邮件内容
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = Config.MAIL_DEFAULT_SENDER
msg['To'] = to_email
# HTML邮件内容
html_body = f"""
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>{subject}</title>
</head>
<body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
<div style="max-width: 600px; margin: 0 auto; padding: 20px;">
<div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin-bottom: 20px;">
<h2 style="color: #007bff; margin-top: 0;">{message_intro}</h2>
<p>您的验证码是:</p>
<div style="background-color: #007bff; color: white; padding: 15px; border-radius: 5px; text-align: center; font-size: 24px; font-weight: bold; letter-spacing: 3px; margin: 20px 0;">
{verification_code}
</div>
<p style="color: #666; font-size: 14px;">
• 验证码有效期为15分钟<br>
• 请勿将验证码透露给他人<br>
• 如果这不是您的操作,请忽略此邮件
</p>
</div>
<div style="text-align: center; color: #999; font-size: 12px; border-top: 1px solid #eee; padding-top: 20px;">
<p>此邮件由系统自动发送,请勿回复</p>
</div>
</div>
</body>
</html>
"""
# 纯文本内容(备用)
text_body = f"""
{message_intro}
您的验证码是:{verification_code}
验证码有效期为15分钟
请勿将验证码透露给他人
如果这不是您的操作,请忽略此邮件
此邮件由系统自动发送,请勿回复
"""
# 添加邮件内容
text_part = MIMEText(text_body, 'plain', 'utf-8')
html_part = MIMEText(html_body, 'html', 'utf-8')
msg.attach(text_part)
msg.attach(html_part)
# 连接SMTP服务器并发送邮件
server = None
try:
server = smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT)
if Config.MAIL_USE_TLS:
server.starttls()
server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
result = server.send_message(msg)
# 检查发送结果
if result:
# 如果有失败的收件人,记录日志
print(f"邮件发送部分失败: {result}")
return {
'success': True,
'message': '邮件发送成功'
}
finally:
# 确保连接被正确关闭
if server:
try:
server.quit()
except Exception:
# 如果quit()失败,强制关闭连接
try:
server.close()
except Exception:
pass
except smtplib.SMTPAuthenticationError:
return {
'success': False,
'message': '邮件服务认证失败,请检查邮箱配置'
}
except smtplib.SMTPException as e:
return {
'success': False,
'message': f'邮件发送失败: {str(e)}'
}
except Exception as e:
return {
'success': False,
'message': f'发送邮件时发生错误: {str(e)}'
}
def verify_email_code(self, email, code, verification_type='register'):
"""验证邮箱验证码
Args:
email: 邮箱地址
code: 验证码
verification_type: 验证类型
Returns:
dict: 验证结果
"""
try:
# 查找最新的未验证的验证记录
verification = self.session.query(EmailVerification).filter(
EmailVerification.email == email,
EmailVerification.type == verification_type,
EmailVerification.is_verified == False
).order_by(EmailVerification.created_at.desc()).first()
if not verification:
return {
'success': False,
'message': '验证码不存在或已过期'
}
# 验证验证码(检查是否为已加密的验证码)
verification_success = False
if self.is_password_hashed(code):
# 如果是已加密的验证码,直接比较
verification_success = verification.verify_hashed(code)
else:
# 如果是明文验证码,先加密再比较
verification_success = verification.verify(code)
if verification_success:
# 不在这里提交事务,留给调用者决定何时提交
# self.session.commit() # 注释掉立即提交
return {
'success': True,
'message': '验证成功',
'verification_id': verification.id
}
else:
return {
'success': False,
'message': '验证码错误或已过期'
}
except Exception as e:
print(f"Verify email code error: {str(e)}")
return {
'success': False,
'message': f'验证失败: {str(e)}'
}
def reset_password(self, email, new_password, verification_code):
"""重置用户密码
Args:
email: 用户邮箱
new_password: 新密码
verification_code: 验证码
Returns:
dict: 重置结果
"""
try:
# 检查是否有最近已验证的重置密码验证记录(5分钟内)
china_tz = pytz.timezone('Asia/Shanghai')
current_time = datetime.now(china_tz).replace(tzinfo=None)
five_minutes_ago = current_time - timedelta(minutes=5)
# 查找最近5分钟内已验证的重置密码验证记录
recent_verification = self.session.query(EmailVerification).filter(
EmailVerification.email == email,
EmailVerification.type == 'reset_password',
EmailVerification.is_verified == True,
EmailVerification.verified_at >= five_minutes_ago
).order_by(EmailVerification.verified_at.desc()).first()
if not recent_verification:
return {
'success': False,
'message': '验证码未验证或已过期,请重新验证'
}
# 查找用户
user = self.session.query(users).filter(users.email == email).first()
if not user:
return {
'success': False,
'message': '用户不存在'
}
# 检查账号状态
if user.status != 'active':
return {
'success': False,
'message': '账号已被禁用,无法重置密码'
}
# 更新密码(使用安全加密函数避免重复加密)
user.password = self.safe_hash_password(new_password)
# 使用中国时区时间
china_tz = pytz.timezone('Asia/Shanghai')
user.updated_at = datetime.now(china_tz).replace(tzinfo=None)
# 提交更改
self.session.commit()
return {
'success': True,
'message': '密码重置成功'
}
except Exception as e:
self.session.rollback()
print(f"Reset password error: {str(e)}")
return {
'success': False,
'message': f'密码重置失败: {str(e)}'
}
def reset_password_with_verification(self, email, new_password, verification_code):
"""重置用户密码(一步完成验证码验证和密码重置)
Args:
email: 用户邮箱
new_password: 新密码
verification_code: 验证码
Returns:
dict: 重置结果
"""
try:
# 查找用户
user = self.session.query(users).filter(users.email == email).first()
if not user:
return {
'success': False,
'message': '用户不存在'
}
# 检查账号状态
if user.status != 'active':
return {
'success': False,
'message': '账号已被禁用,无法重置密码'
}
# 验证验证码
verification = self.session.query(EmailVerification).filter(
EmailVerification.email == email,
EmailVerification.type == 'reset_password',
EmailVerification.is_verified == False
).order_by(EmailVerification.created_at.desc()).first()
if not verification:
return {
'success': False,
'message': '验证码不存在或已过期'
}
# 验证验证码(检查是否为已加密的验证码)
verification_success = False
if self.is_password_hashed(verification_code):
# 如果是已加密的验证码,直接比较
verification_success = verification.verify_hashed(verification_code)
else:
# 如果是明文验证码,先加密再比较
verification_success = verification.verify(verification_code)
if not verification_success:
return {
'success': False,
'message': '验证码错误或已过期'
}
# 如果验证码验证成功,标记为已验证
verification.is_verified = True
verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
# 更新密码(使用安全加密函数避免重复加密)
user.password = self.safe_hash_password(new_password)
# 使用中国时区时间更新时间戳
china_tz = pytz.timezone('Asia/Shanghai')
user.updated_at = datetime.now(china_tz).replace(tzinfo=None)
# 提交更改
self.session.commit()
return {
'success': True,
'message': '密码重置成功'
}
except Exception as e:
self.session.rollback()
print(f"Reset password with verification error: {str(e)}")
return {
'success': False,
'message': f'密码重置失败: {str(e)}'
}