blob: d6310a59a8947e72d2b035b6dd3faa0d8dd6faa8 [file] [log] [blame]
Raverafc93da2025-06-15 18:12:49 +08001from ..models.users import User as users
2from ..models.email_verification import EmailVerification
3from sqlalchemy.orm import Session
4import hashlib
5import jwt
6import smtplib
7import pytz
8from email.mime.text import MIMEText
9from email.mime.multipart import MIMEMultipart
10from datetime import datetime, timedelta
11from config import Config
12
13class FAuth:
14 def __init__(self, session: Session):
15 self.session = session
16 return
17
18 def hash_password(self, password):
19 """密码加密"""
20 return hashlib.sha256(password.encode()).hexdigest()
21
22 def is_password_hashed(self, password):
23 """检查密码是否已经被哈希加密
24
25 Args:
26 password: 密码字符串
27
28 Returns:
29 bool: 是否为已加密的密码(64位十六进制字符串)
30 """
31 import re
32 if not password or not isinstance(password, str):
33 return False
34 # SHA256 加密后是64位十六进制字符串
35 return bool(re.match(r'^[a-f0-9]{64}$', password, re.IGNORECASE))
36
37 def safe_hash_password(self, password):
38 """安全的密码加密函数,避免重复加密
39
40 Args:
41 password: 密码字符串
42
43 Returns:
44 str: 加密后的密码
45 """
46 if not password:
47 raise ValueError('密码不能为空')
48
49 # 如果已经是加密的密码,直接返回
50 if self.is_password_hashed(password):
51 return password
52
53 # 否则进行加密
54 return self.hash_password(password)
55
56 def verify_password(self, password, hashed_password):
57 """验证密码"""
58 return self.hash_password(password) == hashed_password
59
60 def generate_token(self, user_id, role='user'):
61 """生成JWT令牌"""
62 payload = {
63 'user_id': user_id,
64 'role': role,
65 'exp': datetime.utcnow() + timedelta(hours=24) # 24小时过期
66 }
67 return jwt.encode(payload, Config.JWT_SECRET_KEY, algorithm='HS256')
68
69 def verify_token(self, token):
70 """验证JWT令牌"""
71 try:
72 payload = jwt.decode(token, Config.JWT_SECRET_KEY, algorithms=['HS256'])
73 return {
74 'user_id': payload['user_id'],
75 'role': payload.get('role', 'user') # 默认角色为user,兼容旧令牌
76 }
77 except jwt.ExpiredSignatureError:
78 return None
79 except jwt.InvalidTokenError:
80 return None
81
82 def login(self, username_or_email, password):
83 """用户登录"""
84 try:
85 # 查找用户
86 user = self.session.query(users).filter(
87 (users.email == username_or_email)
88 ).first()
89
90 if not user:
91 return {'success': False, 'message': '用户不存在'}
92
93 # 检查账号状态
94 if user.status != 'active':
95 return {'success': False, 'message': '账号已被禁用'}
96
97 # 验证密码(前端已加密,后端使用安全比较)
98 if not self.safe_hash_password(password) == user.password:
99 return {'success': False, 'message': '密码错误'}
100
101 # 生成令牌
102 token = self.generate_token(user.id, user.role)
103
104 return {
105 'success': True,
106 'message': '登录成功',
107 'token': token,
108 'user': user.to_dict()
109 }
110 except Exception as e:
111 print(f"Login error: {str(e)}")
112 return {'success': False, 'message': f'登录失败: {str(e)}'}
113
114 def register(self, username, email, password, verification_code):
115 """用户注册"""
116 # 检查用户名是否存在
117 existing_user = self.session.query(users).filter(
118 (users.username == username) | (users.email == email)
119 ).first()
120
121 if existing_user:
122 if existing_user.username == username:
123 return {'success': False, 'message': '用户名已存在'}
124 else:
125 return {'success': False, 'message': '邮箱已被注册'}
126
127 verification = self.session.query(EmailVerification).filter(
128 EmailVerification.email == email,
129 EmailVerification.type == 'register',
130 EmailVerification.is_verified == False
131 ).order_by(EmailVerification.created_at.desc()).first()
132
133 if not verification:
134 return {
135 'success': False,
136 'message': '验证码不存在或已过期'
137 }
138
139 # 验证验证码(检查是否为已加密的验证码)
140 verification_success = False
141 if self.is_password_hashed(verification_code):
142 # 如果是已加密的验证码,直接比较
143 verification_success = verification.verify_hashed(verification_code)
144 else:
145 # 如果是明文验证码,先加密再比较
146 verification_success = verification.verify(verification_code)
147 if not verification_success:
148 return {
149 'success': False,
150 'message': '验证码错误或已过期'
151 }
152 # 如果验证码验证成功,标记为已验证
153 verification.is_verified = True
154 verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
155
156
157 # 创建新用户(使用安全加密函数避免重复加密)
158 hashed_password = self.safe_hash_password(password)
159 new_user = users(
160 username=username,
161 email=email,
162 password=hashed_password,
163 role='user',
164 status='active'
165 )
166
167 try:
168 self.session.add(new_user)
169 self.session.commit()
170
171 # 生成令牌
172 token = self.generate_token(new_user.id, new_user.role)
173
174 return {
175 'success': True,
176 'message': '注册成功',
177 'token': token,
178 'user': new_user.to_dict()
179 }
180 except Exception as e:
181 self.session.rollback()
182 return {'success': False, 'message': '注册失败,请稍后重试'}
183
184 def get_user_by_token(self, token):
185 """通过令牌获取用户信息"""
186 token_data = self.verify_token(token)
187 if not token_data:
188 return None
189
190 user_id = token_data['user_id'] if isinstance(token_data, dict) else token_data
191 user = self.session.query(users).filter(users.id == user_id).first()
192 return user
193
194 def send_verification_email(self, email, verification_type='register', user_id=None):
195 """发送邮箱验证码
196
197 Args:
198 email: 目标邮箱地址
199 verification_type: 验证类型 ('register', 'reset_password', 'email_change')
200 user_id: 用户ID(可选)
201
202 Returns:
203 dict: 发送结果
204 """
205 try:
206 # 检查邮件配置
207 if not all([Config.MAIL_USERNAME, Config.MAIL_PASSWORD, Config.MAIL_DEFAULT_SENDER]):
208 return {
209 'success': False,
210 'message': '邮件服务配置不完整,请联系管理员'
211 }
212
213 if verification_type not in ['register', 'reset_password', 'email_change']:
214 return {
215 'success': False,
216 'message': '无效的验证类型'
217 }
218
219 if verification_type == 'reset_password' or verification_type == 'email_change':
220 # 检查用户是否存在
221 user = self.session.query(users).filter(users.email == email).first()
222 if not user:
223 return {
224 'success': False,
225 'message': '用户不存在或邮箱不匹配'
226 }
227 elif verification_type == 'register':
228 # 检查邮箱是否已注册
229 existing_user = self.session.query(users).filter(users.email == email).first()
230 if existing_user:
231 return {
232 'success': False,
233 'message': '邮箱已被注册'
234 }
235
236 # 创建验证记录
237 verification = EmailVerification.create_verification(
238 email=email,
239 verification_type=verification_type,
240 user_id=user_id,
241 expires_minutes=15 # 15分钟过期
242 )
243
244 # 保存到数据库
245 self.session.add(verification)
246
247 # 获取验证码
248 verification_code = verification.get_raw_code()
249 if not verification_code:
250 return {
251 'success': False,
252 'message': '验证码生成失败'
253 }
254
255 # 发送邮件
256 result = self._send_email(email, verification_code, verification_type)
257
258 if result['success']:
259 return {
260 'success': True,
261 'message': '验证码已发送到您的邮箱',
262 'verification_id': verification.id
263 }
264 else:
265 # 如果邮件发送失败,删除验证记录
266 self.session.delete(verification)
267 self.session.commit()
268 return result
269
270 except Exception as e:
271 self.session.rollback()
272 print(f"Send verification email error: {str(e)}")
273 return {
274 'success': False,
275 'message': f'发送验证码失败: {str(e)}'
276 }
277
278 def _send_email(self, to_email, verification_code, verification_type):
279 """发送邮件的具体实现
280
281 Args:
282 to_email: 收件人邮箱
283 verification_code: 验证码
284 verification_type: 验证类型
285
286 Returns:
287 dict: 发送结果
288 """
289 try:
290 # 根据验证类型设置邮件内容
291 subject_map = {
292 'register': '注册验证码',
293 'reset_password': '密码重置验证码',
294 'email_change': '邮箱变更验证码'
295 }
296
297 message_map = {
298 'register': '欢迎注册我们的平台!',
299 'reset_password': '您正在重置密码',
300 'email_change': '您正在变更邮箱地址'
301 }
302
303 subject = subject_map.get(verification_type, '验证码')
304 message_intro = message_map.get(verification_type, '验证码')
305
306 # 创建邮件内容
307 msg = MIMEMultipart('alternative')
308 msg['Subject'] = subject
309 msg['From'] = Config.MAIL_DEFAULT_SENDER
310 msg['To'] = to_email
311
312 # HTML邮件内容
313 html_body = f"""
314 <!DOCTYPE html>
315 <html>
316 <head>
317 <meta charset="utf-8">
318 <title>{subject}</title>
319 </head>
320 <body style="font-family: Arial, sans-serif; line-height: 1.6; color: #333;">
321 <div style="max-width: 600px; margin: 0 auto; padding: 20px;">
322 <div style="background-color: #f8f9fa; padding: 20px; border-radius: 8px; margin-bottom: 20px;">
323 <h2 style="color: #007bff; margin-top: 0;">{message_intro}</h2>
324 <p>您的验证码是:</p>
325 <div style="background-color: #007bff; color: white; padding: 15px; border-radius: 5px; text-align: center; font-size: 24px; font-weight: bold; letter-spacing: 3px; margin: 20px 0;">
326 {verification_code}
327 </div>
328 <p style="color: #666; font-size: 14px;">
329 • 验证码有效期为15分钟<br>
330 • 请勿将验证码透露给他人<br>
331 • 如果这不是您的操作,请忽略此邮件
332 </p>
333 </div>
334 <div style="text-align: center; color: #999; font-size: 12px; border-top: 1px solid #eee; padding-top: 20px;">
335 <p>此邮件由系统自动发送,请勿回复</p>
336 </div>
337 </div>
338 </body>
339 </html>
340 """
341
342 # 纯文本内容(备用)
343 text_body = f"""
344 {message_intro}
345
346 您的验证码是:{verification_code}
347
348 验证码有效期为15分钟
349 请勿将验证码透露给他人
350 如果这不是您的操作,请忽略此邮件
351
352 此邮件由系统自动发送,请勿回复
353 """
354
355 # 添加邮件内容
356 text_part = MIMEText(text_body, 'plain', 'utf-8')
357 html_part = MIMEText(html_body, 'html', 'utf-8')
358
359 msg.attach(text_part)
360 msg.attach(html_part)
361
362 # 连接SMTP服务器并发送邮件
363 server = None
364 try:
365 server = smtplib.SMTP(Config.MAIL_SERVER, Config.MAIL_PORT)
366 if Config.MAIL_USE_TLS:
367 server.starttls()
368
369 server.login(Config.MAIL_USERNAME, Config.MAIL_PASSWORD)
370 result = server.send_message(msg)
371
372 # 检查发送结果
373 if result:
374 # 如果有失败的收件人,记录日志
375 print(f"邮件发送部分失败: {result}")
376
377 return {
378 'success': True,
379 'message': '邮件发送成功'
380 }
381 finally:
382 # 确保连接被正确关闭
383 if server:
384 try:
385 server.quit()
386 except Exception:
387 # 如果quit()失败,强制关闭连接
388 try:
389 server.close()
390 except Exception:
391 pass
392
393 except smtplib.SMTPAuthenticationError:
394 return {
395 'success': False,
396 'message': '邮件服务认证失败,请检查邮箱配置'
397 }
398 except smtplib.SMTPException as e:
399 return {
400 'success': False,
401 'message': f'邮件发送失败: {str(e)}'
402 }
403 except Exception as e:
404 return {
405 'success': False,
406 'message': f'发送邮件时发生错误: {str(e)}'
407 }
408
409 def verify_email_code(self, email, code, verification_type='register'):
410 """验证邮箱验证码
411
412 Args:
413 email: 邮箱地址
414 code: 验证码
415 verification_type: 验证类型
416
417 Returns:
418 dict: 验证结果
419 """
420 try:
421 # 查找最新的未验证的验证记录
422 verification = self.session.query(EmailVerification).filter(
423 EmailVerification.email == email,
424 EmailVerification.type == verification_type,
425 EmailVerification.is_verified == False
426 ).order_by(EmailVerification.created_at.desc()).first()
427
428 if not verification:
429 return {
430 'success': False,
431 'message': '验证码不存在或已过期'
432 }
433
434 # 验证验证码(检查是否为已加密的验证码)
435 verification_success = False
436 if self.is_password_hashed(code):
437 # 如果是已加密的验证码,直接比较
438 verification_success = verification.verify_hashed(code)
439 else:
440 # 如果是明文验证码,先加密再比较
441 verification_success = verification.verify(code)
442
443 if verification_success:
444 # 不在这里提交事务,留给调用者决定何时提交
445 # self.session.commit() # 注释掉立即提交
446 return {
447 'success': True,
448 'message': '验证成功',
449 'verification_id': verification.id
450 }
451 else:
452 return {
453 'success': False,
454 'message': '验证码错误或已过期'
455 }
456
457 except Exception as e:
458 print(f"Verify email code error: {str(e)}")
459 return {
460 'success': False,
461 'message': f'验证失败: {str(e)}'
462 }
463
464 def reset_password(self, email, new_password, verification_code):
465 """重置用户密码
466
467 Args:
468 email: 用户邮箱
469 new_password: 新密码
470 verification_code: 验证码
471
472 Returns:
473 dict: 重置结果
474 """
475 try:
476 # 检查是否有最近已验证的重置密码验证记录(5分钟内)
477
478 china_tz = pytz.timezone('Asia/Shanghai')
479 current_time = datetime.now(china_tz).replace(tzinfo=None)
480 five_minutes_ago = current_time - timedelta(minutes=5)
481
482 # 查找最近5分钟内已验证的重置密码验证记录
483 recent_verification = self.session.query(EmailVerification).filter(
484 EmailVerification.email == email,
485 EmailVerification.type == 'reset_password',
486 EmailVerification.is_verified == True,
487 EmailVerification.verified_at >= five_minutes_ago
488 ).order_by(EmailVerification.verified_at.desc()).first()
489
490 if not recent_verification:
491 return {
492 'success': False,
493 'message': '验证码未验证或已过期,请重新验证'
494 }
495
496 # 查找用户
497 user = self.session.query(users).filter(users.email == email).first()
498 if not user:
499 return {
500 'success': False,
501 'message': '用户不存在'
502 }
503
504 # 检查账号状态
505 if user.status != 'active':
506 return {
507 'success': False,
508 'message': '账号已被禁用,无法重置密码'
509 }
510
511 # 更新密码(使用安全加密函数避免重复加密)
512 user.password = self.safe_hash_password(new_password)
513 # 使用中国时区时间
514 china_tz = pytz.timezone('Asia/Shanghai')
515 user.updated_at = datetime.now(china_tz).replace(tzinfo=None)
516
517 # 提交更改
518 self.session.commit()
519
520 return {
521 'success': True,
522 'message': '密码重置成功'
523 }
524
525 except Exception as e:
526 self.session.rollback()
527 print(f"Reset password error: {str(e)}")
528 return {
529 'success': False,
530 'message': f'密码重置失败: {str(e)}'
531 }
532
533 def reset_password_with_verification(self, email, new_password, verification_code):
534 """重置用户密码(一步完成验证码验证和密码重置)
535
536 Args:
537 email: 用户邮箱
538 new_password: 新密码
539 verification_code: 验证码
540
541 Returns:
542 dict: 重置结果
543 """
544 try:
545 # 查找用户
546 user = self.session.query(users).filter(users.email == email).first()
547 if not user:
548 return {
549 'success': False,
550 'message': '用户不存在'
551 }
552
553 # 检查账号状态
554 if user.status != 'active':
555 return {
556 'success': False,
557 'message': '账号已被禁用,无法重置密码'
558 }
559
560 # 验证验证码
561 verification = self.session.query(EmailVerification).filter(
562 EmailVerification.email == email,
563 EmailVerification.type == 'reset_password',
564 EmailVerification.is_verified == False
565 ).order_by(EmailVerification.created_at.desc()).first()
566
567 if not verification:
568 return {
569 'success': False,
570 'message': '验证码不存在或已过期'
571 }
572
573 # 验证验证码(检查是否为已加密的验证码)
574 verification_success = False
575 if self.is_password_hashed(verification_code):
576 # 如果是已加密的验证码,直接比较
577 verification_success = verification.verify_hashed(verification_code)
578 else:
579 # 如果是明文验证码,先加密再比较
580 verification_success = verification.verify(verification_code)
581 if not verification_success:
582 return {
583 'success': False,
584 'message': '验证码错误或已过期'
585 }
586 # 如果验证码验证成功,标记为已验证
587 verification.is_verified = True
588 verification.verified_at = datetime.now(pytz.timezone('Asia/Shanghai')).replace(tzinfo=None)
589
590 # 更新密码(使用安全加密函数避免重复加密)
591 user.password = self.safe_hash_password(new_password)
592
593 # 使用中国时区时间更新时间戳
594 china_tz = pytz.timezone('Asia/Shanghai')
595 user.updated_at = datetime.now(china_tz).replace(tzinfo=None)
596
597 # 提交更改
598 self.session.commit()
599
600 return {
601 'success': True,
602 'message': '密码重置成功'
603 }
604
605 except Exception as e:
606 self.session.rollback()
607 print(f"Reset password with verification error: {str(e)}")
608 return {
609 'success': False,
610 'message': f'密码重置失败: {str(e)}'
611 }