| # filepath: /home/ronghanji/api/API-TRM/rhj/backend/app/models/email_verification.py |
| from . import Base |
| from sqlalchemy import ( |
| Column, Integer, String, Boolean, TIMESTAMP, text, ForeignKey |
| ) |
| from sqlalchemy.orm import relationship |
| from datetime import datetime, timedelta |
| import secrets |
| import string |
| import hashlib |
| import pytz |
| |
| |
| class EmailVerification(Base): |
| __tablename__ = 'email_verifications' |
| |
| id = Column(Integer, primary_key=True, autoincrement=True, comment='验证记录ID') |
| email = Column(String(100), nullable=False, comment='邮箱地址') |
| code = Column(String(255), nullable=False, comment='验证码(加密存储)') |
| type = Column(String(50), nullable=False, comment='验证类型:register, reset_password, email_change') |
| user_id = Column(Integer, ForeignKey('users.id'), nullable=True, comment='关联用户ID') |
| is_verified = Column(Boolean, nullable=False, default=False, comment='是否已验证') |
| expires_at = Column(TIMESTAMP, nullable=False, comment='过期时间') |
| created_at = Column( |
| TIMESTAMP, |
| nullable=False, |
| server_default=text('CURRENT_TIMESTAMP'), |
| comment='创建时间' |
| ) |
| verified_at = Column(TIMESTAMP, nullable=True, comment='验证时间') |
| |
| # 关联用户表 |
| user = relationship("User", back_populates="email_verifications") |
| |
| def __init__(self, email, verification_type, user_id=None, expires_minutes=15): |
| """初始化邮箱验证记录 |
| |
| Args: |
| email: 邮箱地址 |
| verification_type: 验证类型 |
| user_id: 用户ID(可选) |
| expires_minutes: 过期时间(分钟) |
| """ |
| self.email = email |
| self.type = verification_type |
| self.user_id = user_id |
| self.is_verified = False |
| |
| # 使用中国时区时间,确保与数据库时间一致 |
| china_tz = pytz.timezone('Asia/Shanghai') |
| current_time = datetime.now(china_tz).replace(tzinfo=None) |
| self.expires_at = current_time + timedelta(minutes=expires_minutes) |
| |
| # 生成并加密验证码 |
| raw_code = self._generate_code() |
| self.code = self._hash_code(raw_code) |
| self._raw_code = raw_code # 临时存储原始验证码用于发送邮件 |
| |
| @classmethod |
| def create_verification(cls, email, verification_type, user_id=None, expires_minutes=15): |
| """创建验证记录 |
| |
| Args: |
| email: 邮箱地址 |
| verification_type: 验证类型 |
| user_id: 用户ID(可选) |
| expires_minutes: 过期时间(分钟) |
| |
| Returns: |
| EmailVerification: 验证记录实例 |
| """ |
| return cls( |
| email=email, |
| verification_type=verification_type, |
| user_id=user_id, |
| expires_minutes=expires_minutes |
| ) |
| |
| def _generate_code(self, length=6): |
| """生成随机验证码 |
| |
| Args: |
| length: 验证码长度 |
| |
| Returns: |
| str: 验证码 |
| """ |
| characters = string.digits |
| return ''.join(secrets.choice(characters) for _ in range(length)) |
| |
| def _hash_code(self, code): |
| """对验证码进行哈希加密 |
| |
| Args: |
| code: 原始验证码 |
| |
| Returns: |
| str: 加密后的验证码 |
| """ |
| return hashlib.sha256(code.encode()).hexdigest() |
| |
| def verify(self, input_code): |
| """验证验证码 |
| |
| Args: |
| input_code: 用户输入的验证码 |
| |
| Returns: |
| bool: 验证是否成功 |
| """ |
| if self.is_verified: |
| return False |
| |
| if self.is_expired(): |
| return False |
| |
| hashed_input = self._hash_code(input_code) |
| if hashed_input == self.code: |
| # 使用中国时区时间设置验证时间 |
| china_tz = pytz.timezone('Asia/Shanghai') |
| self.verified_at = datetime.now(china_tz).replace(tzinfo=None) |
| self.is_verified = True |
| return True |
| |
| return False |
| |
| def verify_hashed(self, hashed_code): |
| """验证已经加密的验证码 |
| |
| Args: |
| hashed_code: 已经加密的验证码 |
| |
| Returns: |
| bool: 验证是否成功 |
| """ |
| if self.is_verified: |
| return False |
| |
| if self.is_expired(): |
| return False |
| |
| # 直接比较加密后的验证码 |
| if hashed_code == self.code: |
| # 使用中国时区时间设置验证时间 |
| china_tz = pytz.timezone('Asia/Shanghai') |
| self.verified_at = datetime.now(china_tz).replace(tzinfo=None) |
| self.is_verified = True |
| return True |
| |
| return False |
| |
| def is_expired(self): |
| """检查是否已过期 |
| |
| Returns: |
| bool: 是否已过期 |
| """ |
| # 使用中国时区时间进行比较 |
| china_tz = pytz.timezone('Asia/Shanghai') |
| current_time = datetime.now(china_tz).replace(tzinfo=None) |
| return current_time > self.expires_at |
| |
| def get_raw_code(self): |
| """获取原始验证码(仅在创建时可用) |
| |
| Returns: |
| str: 原始验证码 |
| """ |
| return getattr(self, '_raw_code', None) |
| |
| def to_dict(self): |
| """转换为字典格式 |
| |
| Returns: |
| dict: 对象字典表示 |
| """ |
| return { |
| 'id': self.id, |
| 'email': self.email, |
| 'type': self.type, |
| 'user_id': self.user_id, |
| 'is_verified': self.is_verified, |
| 'expires_at': self.expires_at.isoformat() if self.expires_at else None, |
| 'created_at': self.created_at.isoformat() if self.created_at else None, |
| 'verified_at': self.verified_at.isoformat() if self.verified_at else None |
| } |
| |
| def __repr__(self): |
| return f"<EmailVerification(id={self.id}, email='{self.email}', type='{self.type}', verified={self.is_verified})>" |