blob: 2d3f7da1091eeaa89d5011bcd30c3791ea18cb67 [file] [log] [blame]
# filepath: /home/ronghanji/api/API-TRM/rhj/backend/app/models/email_verification.py
from . import Base
from sqlalchemy import (
Column, Integer, String, Boolean, TIMESTAMP, text, ForeignKey
)
from sqlalchemy.orm import relationship
from datetime import datetime, timedelta
import secrets
import string
import hashlib
import pytz
class EmailVerification(Base):
__tablename__ = 'email_verifications'
id = Column(Integer, primary_key=True, autoincrement=True, comment='验证记录ID')
email = Column(String(100), nullable=False, comment='邮箱地址')
code = Column(String(255), nullable=False, comment='验证码(加密存储)')
type = Column(String(50), nullable=False, comment='验证类型:register, reset_password, email_change')
user_id = Column(Integer, ForeignKey('users.id'), nullable=True, comment='关联用户ID')
is_verified = Column(Boolean, nullable=False, default=False, comment='是否已验证')
expires_at = Column(TIMESTAMP, nullable=False, comment='过期时间')
created_at = Column(
TIMESTAMP,
nullable=False,
server_default=text('CURRENT_TIMESTAMP'),
comment='创建时间'
)
verified_at = Column(TIMESTAMP, nullable=True, comment='验证时间')
# 关联用户表
user = relationship("User", back_populates="email_verifications")
def __init__(self, email, verification_type, user_id=None, expires_minutes=15):
"""初始化邮箱验证记录
Args:
email: 邮箱地址
verification_type: 验证类型
user_id: 用户ID(可选)
expires_minutes: 过期时间(分钟)
"""
self.email = email
self.type = verification_type
self.user_id = user_id
self.is_verified = False
# 使用中国时区时间,确保与数据库时间一致
china_tz = pytz.timezone('Asia/Shanghai')
current_time = datetime.now(china_tz).replace(tzinfo=None)
self.expires_at = current_time + timedelta(minutes=expires_minutes)
# 生成并加密验证码
raw_code = self._generate_code()
self.code = self._hash_code(raw_code)
self._raw_code = raw_code # 临时存储原始验证码用于发送邮件
@classmethod
def create_verification(cls, email, verification_type, user_id=None, expires_minutes=15):
"""创建验证记录
Args:
email: 邮箱地址
verification_type: 验证类型
user_id: 用户ID(可选)
expires_minutes: 过期时间(分钟)
Returns:
EmailVerification: 验证记录实例
"""
return cls(
email=email,
verification_type=verification_type,
user_id=user_id,
expires_minutes=expires_minutes
)
def _generate_code(self, length=6):
"""生成随机验证码
Args:
length: 验证码长度
Returns:
str: 验证码
"""
characters = string.digits
return ''.join(secrets.choice(characters) for _ in range(length))
def _hash_code(self, code):
"""对验证码进行哈希加密
Args:
code: 原始验证码
Returns:
str: 加密后的验证码
"""
return hashlib.sha256(code.encode()).hexdigest()
def verify(self, input_code):
"""验证验证码
Args:
input_code: 用户输入的验证码
Returns:
bool: 验证是否成功
"""
if self.is_verified:
return False
if self.is_expired():
return False
hashed_input = self._hash_code(input_code)
if hashed_input == self.code:
# 使用中国时区时间设置验证时间
china_tz = pytz.timezone('Asia/Shanghai')
self.verified_at = datetime.now(china_tz).replace(tzinfo=None)
self.is_verified = True
return True
return False
def verify_hashed(self, hashed_code):
"""验证已经加密的验证码
Args:
hashed_code: 已经加密的验证码
Returns:
bool: 验证是否成功
"""
if self.is_verified:
return False
if self.is_expired():
return False
# 直接比较加密后的验证码
if hashed_code == self.code:
# 使用中国时区时间设置验证时间
china_tz = pytz.timezone('Asia/Shanghai')
self.verified_at = datetime.now(china_tz).replace(tzinfo=None)
self.is_verified = True
return True
return False
def is_expired(self):
"""检查是否已过期
Returns:
bool: 是否已过期
"""
# 使用中国时区时间进行比较
china_tz = pytz.timezone('Asia/Shanghai')
current_time = datetime.now(china_tz).replace(tzinfo=None)
return current_time > self.expires_at
def get_raw_code(self):
"""获取原始验证码(仅在创建时可用)
Returns:
str: 原始验证码
"""
return getattr(self, '_raw_code', None)
def to_dict(self):
"""转换为字典格式
Returns:
dict: 对象字典表示
"""
return {
'id': self.id,
'email': self.email,
'type': self.type,
'user_id': self.user_id,
'is_verified': self.is_verified,
'expires_at': self.expires_at.isoformat() if self.expires_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'verified_at': self.verified_at.isoformat() if self.verified_at else None
}
def __repr__(self):
return f"<EmailVerification(id={self.id}, email='{self.email}', type='{self.type}', verified={self.is_verified})>"