| # utils/auth.py |
| import os |
| import jwt |
| from functools import wraps |
| from flask import request, jsonify, current_app |
| from models.user import User |
| from app import db |
| |
| def generate_token(user_id): |
| payload = { |
| 'user_id': user_id, |
| # you can add exp, iat here |
| } |
| token = jwt.encode(payload, current_app.config['SECRET_KEY'], algorithm='HS256') |
| return token |
| |
| def verify_token(token): |
| try: |
| payload = jwt.decode(token, current_app.config['SECRET_KEY'], algorithms=['HS256']) |
| user = User.query.get(payload['user_id']) |
| return user |
| except Exception: |
| return None |
| |
| def login_required(f): |
| @wraps(f) |
| def decorated(*args, **kwargs): |
| auth_header = request.headers.get('Authorization', None) |
| if not auth_header or not auth_header.startswith('Bearer '): |
| return jsonify({'error': 'Authorization header missing or invalid'}), 401 |
| token = auth_header.split()[1] |
| user = verify_token(token) |
| if not user or user.status != 'active': |
| return jsonify({'error': 'Invalid or expired token'}), 401 |
| # attach user to request context if needed |
| request.current_user = user |
| return f(*args, **kwargs) |
| return decorated |