推荐系统
Change-Id: I49b9205568f1ccf88b32b08511aff8b0bea8d1bd
diff --git a/rhj/backend/app/models/recall/usercf_recall.py b/rhj/backend/app/models/recall/usercf_recall.py
new file mode 100644
index 0000000..d75e6d8
--- /dev/null
+++ b/rhj/backend/app/models/recall/usercf_recall.py
@@ -0,0 +1,235 @@
+import pymysql
+from typing import List, Tuple, Dict, Set
+from collections import defaultdict
+import math
+import numpy as np
+
+class UserCFRecall:
+ """
+ UserCF (User-based Collaborative Filtering) 召回算法实现
+ 基于用户相似度的协同过滤算法
+ """
+
+ def __init__(self, db_config: dict, min_common_items: int = 3):
+ """
+ 初始化UserCF召回模型
+
+ Args:
+ db_config: 数据库配置
+ min_common_items: 计算用户相似度时的最小共同物品数
+ """
+ self.db_config = db_config
+ self.min_common_items = min_common_items
+ self.user_items = defaultdict(set)
+ self.item_users = defaultdict(set)
+ self.user_similarity = {}
+
+ def _get_user_item_interactions(self):
+ """获取用户-物品交互数据"""
+ conn = pymysql.connect(**self.db_config)
+ try:
+ cursor = conn.cursor()
+
+ # 获取用户行为数据,考虑不同行为的权重
+ cursor.execute("""
+ SELECT user_id, post_id, type, COUNT(*) as count
+ FROM behaviors
+ WHERE type IN ('like', 'favorite', 'comment', 'view')
+ GROUP BY user_id, post_id, type
+ """)
+
+ interactions = cursor.fetchall()
+
+ # 构建用户-物品交互矩阵(考虑行为权重)
+ user_item_scores = defaultdict(lambda: defaultdict(float))
+
+ # 定义不同行为的权重
+ behavior_weights = {
+ 'like': 1.0,
+ 'favorite': 2.0,
+ 'comment': 3.0,
+ 'view': 0.1
+ }
+
+ for user_id, post_id, behavior_type, count in interactions:
+ weight = behavior_weights.get(behavior_type, 1.0)
+ score = weight * count
+ user_item_scores[user_id][post_id] += score
+
+ # 转换为集合形式(用于相似度计算)
+ for user_id, items in user_item_scores.items():
+ # 只保留分数大于阈值的物品
+ threshold = 1.0 # 可调整阈值
+ for item_id, score in items.items():
+ if score >= threshold:
+ self.user_items[user_id].add(item_id)
+ self.item_users[item_id].add(user_id)
+
+ finally:
+ cursor.close()
+ conn.close()
+
+ def _calculate_user_similarity(self):
+ """计算用户相似度矩阵"""
+ print("开始计算用户相似度...")
+
+ users = list(self.user_items.keys())
+ total_pairs = len(users) * (len(users) - 1) // 2
+ processed = 0
+
+ for i, user_i in enumerate(users):
+ self.user_similarity[user_i] = {}
+
+ for user_j in users[i+1:]:
+ processed += 1
+ if processed % 10000 == 0:
+ print(f"处理进度: {processed}/{total_pairs}")
+
+ # 获取两个用户共同交互的物品
+ common_items = self.user_items[user_i] & self.user_items[user_j]
+
+ if len(common_items) < self.min_common_items:
+ similarity = 0.0
+ else:
+ # 计算余弦相似度
+ numerator = len(common_items)
+ denominator = math.sqrt(len(self.user_items[user_i]) * len(self.user_items[user_j]))
+ similarity = numerator / denominator if denominator > 0 else 0.0
+
+ self.user_similarity[user_i][user_j] = similarity
+ # 对称性
+ if user_j not in self.user_similarity:
+ self.user_similarity[user_j] = {}
+ self.user_similarity[user_j][user_i] = similarity
+
+ print("用户相似度计算完成")
+
+ def train(self):
+ """训练UserCF模型"""
+ self._get_user_item_interactions()
+ self._calculate_user_similarity()
+
+ def recall(self, user_id: int, num_items: int = 50, num_similar_users: int = 50) -> List[Tuple[int, float]]:
+ """
+ 为用户召回相似用户喜欢的物品
+
+ Args:
+ user_id: 目标用户ID
+ num_items: 召回物品数量
+ num_similar_users: 考虑的相似用户数量
+
+ Returns:
+ List of (item_id, score) tuples
+ """
+ # 如果尚未训练,先进行训练
+ if not hasattr(self, 'user_similarity') or not self.user_similarity:
+ self.train()
+
+ if user_id not in self.user_similarity or user_id not in self.user_items:
+ return []
+
+ # 获取最相似的用户
+ similar_users = sorted(
+ self.user_similarity[user_id].items(),
+ key=lambda x: x[1],
+ reverse=True
+ )[:num_similar_users]
+
+ # 获取目标用户已交互的物品
+ user_interacted_items = self.user_items[user_id]
+
+ # 计算候选物品的分数
+ candidate_scores = defaultdict(float)
+
+ for similar_user_id, similarity in similar_users:
+ if similarity <= 0:
+ continue
+
+ # 获取相似用户交互的物品
+ similar_user_items = self.user_items[similar_user_id]
+
+ for item_id in similar_user_items:
+ # 排除目标用户已经交互过的物品
+ if item_id not in user_interacted_items:
+ candidate_scores[item_id] += similarity
+
+ # 按分数排序并返回top-N
+ sorted_candidates = sorted(candidate_scores.items(), key=lambda x: x[1], reverse=True)
+ return sorted_candidates[:num_items]
+
+ def get_user_neighbors(self, user_id: int, num_neighbors: int = 10) -> List[Tuple[int, float]]:
+ """
+ 获取用户的相似邻居
+
+ Args:
+ user_id: 用户ID
+ num_neighbors: 邻居数量
+
+ Returns:
+ List of (neighbor_user_id, similarity) tuples
+ """
+ if user_id not in self.user_similarity:
+ return []
+
+ neighbors = sorted(
+ self.user_similarity[user_id].items(),
+ key=lambda x: x[1],
+ reverse=True
+ )[:num_neighbors]
+
+ return neighbors
+
+ def get_user_profile(self, user_id: int) -> Dict:
+ """
+ 获取用户画像信息
+
+ Args:
+ user_id: 用户ID
+
+ Returns:
+ 用户画像字典
+ """
+ if user_id not in self.user_items:
+ return {}
+
+ conn = pymysql.connect(**self.db_config)
+ try:
+ cursor = conn.cursor()
+
+ # 获取用户交互的物品类别统计
+ user_item_list = list(self.user_items[user_id])
+ if not user_item_list:
+ return {}
+
+ format_strings = ','.join(['%s'] * len(user_item_list))
+ cursor.execute(f"""
+ SELECT t.name, COUNT(*) as count
+ FROM post_tags pt
+ JOIN tags t ON pt.tag_id = t.id
+ WHERE pt.post_id IN ({format_strings})
+ GROUP BY t.name
+ ORDER BY count DESC
+ """, tuple(user_item_list))
+
+ tag_preferences = cursor.fetchall()
+
+ # 获取用户行为统计
+ cursor.execute("""
+ SELECT type, COUNT(*) as count
+ FROM behaviors
+ WHERE user_id = %s
+ GROUP BY type
+ """, (user_id,))
+
+ behavior_stats = cursor.fetchall()
+
+ return {
+ 'user_id': user_id,
+ 'total_interactions': len(self.user_items[user_id]),
+ 'tag_preferences': dict(tag_preferences),
+ 'behavior_stats': dict(behavior_stats)
+ }
+
+ finally:
+ cursor.close()
+ conn.close()