feat: 完整集成JWLLL搜索推荐系统到Merge项目
新增功能:
- 完整的JWLLL搜索推荐后端服务 (back_jwlll/)
- 前端智能搜索和推荐功能集成
- HomeFeed组件增强: 数据源切换(原始数据 ↔ 智能推荐)
- 新增PostDetailJWLLL和UploadPageJWLLL组件
- 新增search_jwlll.js API接口
技术特性:
- 标签推荐和协同过滤推荐算法
- 中文分词和Word2Vec语义搜索
- 100%向后兼容,原功能完全保留
- 独立服务架构,无冲突部署
集成内容:
- JWLLL后端服务配置和依赖
- 前端路由和组件更新
- 样式文件和API集成
- 项目文档和启动工具
Change-Id: I1d008cf04eee40e7d81bfb9109f933d3447d1760
diff --git a/Merge/back_jwlll/app.py b/Merge/back_jwlll/app.py
new file mode 100644
index 0000000..940c564
--- /dev/null
+++ b/Merge/back_jwlll/app.py
@@ -0,0 +1,1076 @@
+# main_online.py
+# 搜索推荐算法服务的主入口
+
+import json
+import numpy as np
+import difflib
+from flask import Flask, request, jsonify, Response
+import pymysql
+import jieba
+from sklearn.feature_extraction.text import TfidfVectorizer
+from sklearn.metrics.pairwise import cosine_similarity
+import pypinyin
+from flask_cors import CORS
+import re
+import Levenshtein
+import os
+import logging
+
+# 设置日志
+logging.basicConfig(
+ level=logging.INFO,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+)
+logger = logging.getLogger("allpt-search")
+
+# 导入Word2Vec辅助模块
+try:
+ from word2vec_helper import get_word2vec_helper, expand_query, get_similar_words
+ WORD2VEC_ENABLED = True
+ logger.info("Word2Vec模块已加载")
+except ImportError as e:
+ logger.warning(f"Word2Vec模块加载失败: {e},将使用传统搜索")
+ WORD2VEC_ENABLED = False
+
+# 数据库配置
+DB_CONFIG = {
+ "host": "10.126.59.25",
+ "port": 3306,
+ "user": "root",
+ "password": "123456",
+ "database": "redbook",
+ "charset": "utf8mb4"
+}
+
+def get_db_conn():
+ return pymysql.connect(**DB_CONFIG)
+
+def get_pinyin(text):
+ # 返回字符串的全拼音(不带声调,全部小写),支持英文直接返回
+ if not text:
+ return ""
+ import re
+ # 如果全是英文,直接返回小写
+ if re.fullmatch(r'[a-zA-Z]+', text):
+ return text.lower()
+ return ''.join([p[0] for p in pypinyin.pinyin(text, style=pypinyin.NORMAL)])
+
+def get_pinyin_initials(text):
+ # 返回字符串的首字母拼音(全部小写),支持英文直接返回
+ if not text:
+ return ""
+ import re
+ if re.fullmatch(r'[a-zA-Z]+', text):
+ return text.lower()
+ return ''.join([p[0][0] for p in pypinyin.pinyin(text, style=pypinyin.NORMAL)])
+
+# 新增词语相似度计算函数
+def word_similarity(word1, word2):
+ """计算两个词的相似度,支持拼音匹配"""
+ # 直接匹配
+ if word1 == word2:
+ return 1.0
+
+ # 拼音匹配
+ if get_pinyin(word1) == get_pinyin(word2):
+ return 0.9
+
+ # 拼音首字母匹配
+ if get_pinyin_initials(word1) == get_pinyin_initials(word2):
+ return 0.7
+
+ # 字符串相似度
+ return difflib.SequenceMatcher(None, word1, word2).ratio()
+
+def semantic_title_similarity(query, title):
+ """计算查询词与标题的语义相似度"""
+ # 分词
+ query_words = list(jieba.cut(query))
+ title_words = list(jieba.cut(title))
+
+ if not query_words or not title_words:
+ return 0.0
+
+ # 计算每个查询词与标题词的最大相似度
+ max_similarities = []
+ key_matches = 0 # 关键词精确匹配数量
+
+ for q_word in query_words:
+ if len(q_word.strip()) <= 1: # 忽略单字,减少噪音
+ continue
+
+ word_sims = [word_similarity(q_word, t_word) for t_word in title_words]
+ if word_sims:
+ max_sim = max(word_sims)
+ max_similarities.append(max_sim)
+ if max_sim > 0.85: # 认为是关键词匹配
+ key_matches += 1
+
+ if not max_similarities:
+ return 0.0
+
+ # 计算平均相似度
+ avg_sim = sum(max_similarities) / len(max_similarities)
+
+ # 权重计算: 平均相似度占70%,关键词匹配率占30%
+ key_match_ratio = key_matches / len(query_words) if query_words else 0
+
+ # 标题中包含完整查询短语时给予额外加分
+ exact_bonus = 0.3 if query in title else 0
+
+ return 0.7 * avg_sim + 0.3 * key_match_ratio + exact_bonus
+
+# 添加语义关联词典,用于增强搜索能力
+def load_semantic_mappings():
+ """
+ 加载语义关联映射表,用于增强搜索语义理解
+ 返回包含语义映射关系的字典
+ """
+ # 初始化空字典,所有映射将从配置文件加载
+ mappings = {}
+
+ # 从配置文件加载映射
+ try:
+ config_path = os.path.join(os.path.dirname(__file__), "semantic_config.json")
+ if os.path.exists(config_path):
+ with open(config_path, 'r', encoding='utf-8') as f:
+ mappings = json.load(f)
+ logger.info(f"已从配置文件加载 {len(mappings)} 个语义映射")
+ else:
+ logger.warning(f"语义配置文件不存在: {config_path}")
+ except Exception as e:
+ logger.error(f"加载语义配置文件失败: {e}")
+
+ return mappings
+
+# 初始化语义映射
+SEMANTIC_MAPPINGS = load_semantic_mappings()
+
+def expand_search_keywords(keyword):
+ """
+ 扩展搜索关键词,增加语义关联词
+ """
+ expanded = [keyword]
+
+ # 分词处理
+ words = list(jieba.cut(keyword))
+ logger.info(f"关键词 '{keyword}' 分词结果: {words}") # 记录分词结果
+
+ # 分别对每个分词进行语义扩展
+ for word in words:
+ if word in SEMANTIC_MAPPINGS:
+ # 添加语义关联词
+ mapped_words = SEMANTIC_MAPPINGS[word]
+ expanded.extend(mapped_words)
+ logger.info(f"语义映射: '{word}' -> {mapped_words}")
+
+ # 移除所有特殊处理部分
+ # 不再对任何特定关键词如"越狱"进行特殊处理
+
+ # Word2Vec扩展 - 如果可用,对分词结果进行Word2Vec扩展
+ if WORD2VEC_ENABLED:
+ try:
+ # 使用单独的变量记录原始扩展结果,方便记录日志
+ original_expanded = set(expanded)
+
+ # 首先尝试对整个关键词进行扩展
+ w2v_expanded = set()
+ similar_words = get_similar_words(keyword, topn=3, min_similarity=0.6)
+ w2v_expanded.update(similar_words)
+
+ # 然后对较长的分词进行扩展
+ for word in words:
+ if len(word) > 1: # 忽略单字
+ similar_words = get_similar_words(word, topn=2, min_similarity=0.65)
+ w2v_expanded.update(similar_words)
+
+ # 合并结果
+ expanded.extend(w2v_expanded)
+
+ # 记录日志
+ if w2v_expanded:
+ logger.info(f"Word2Vec扩展: {keyword} -> {list(w2v_expanded)}")
+ except Exception as e:
+ # 出错时记录但不中断搜索流程
+ logger.error(f"Word2Vec扩展失败: {e}")
+ logger.info("将仅使用配置文件中的语义映射")
+
+ # 去重
+ return list(set(expanded))
+
+# 替换原有的calculate_keyword_relevance函数,采用更通用的相关性算法
+def calculate_keyword_relevance(keyword, item):
+ """计算搜索关键词与条目的相关性得分"""
+ title = item.get('title', '')
+ description = item.get('description', '') or ''
+ tags = item.get('tags', '') or ''
+ category = item.get('category', '') or '' # 添加category字段
+
+ # 初始化得分
+ score = 0
+
+ # 1. 精确匹配(最高优先级)
+ if keyword.lower() == title.lower():
+ return 15.0 # 完全匹配给予最高分
+
+ # 2. 标题中精确词匹配
+ title_words = re.findall(r'\b\w+\b', title.lower())
+ if keyword.lower() in title_words:
+ score += 10.0 # 作为独立词完全匹配
+
+ # 3. 标题包含关键词(部分匹配)
+ elif keyword.lower() in title.lower():
+ # 计算关键词所占标题比例
+ match_ratio = len(keyword) / len(title)
+ if match_ratio > 0.5: # 关键词占标题很大比例
+ score += 8.0
+ else:
+ score += 5.0
+
+ # 4. 标题分词匹配
+ keyword_words = list(jieba.cut(keyword))
+ title_jieba_words = list(jieba.cut(title))
+
+ matched_words = 0
+ for k_word in keyword_words:
+ if len(k_word) > 1: # 忽略单字
+ if k_word in title_jieba_words:
+ matched_words += 1
+ else:
+ # 拼音匹配
+ k_pinyin = get_pinyin(k_word)
+ for t_word in title_jieba_words:
+ if get_pinyin(t_word) == k_pinyin:
+ matched_words += 0.8
+ break
+
+ if len(keyword_words) > 0:
+ word_match_ratio = matched_words / len(keyword_words)
+ score += 3.0 * word_match_ratio
+
+ # 5. 拼音相似度
+ keyword_pinyin = get_pinyin(keyword)
+ title_pinyin = get_pinyin(title)
+
+ if keyword_pinyin == title_pinyin:
+ score += 3.5
+ elif keyword_pinyin in title_pinyin:
+ # 计算拼音在标题中的位置影响
+ pos = title_pinyin.find(keyword_pinyin)
+ if pos == 0: # 出现在开头
+ score += 3.0
+ else:
+ score += 2.0
+
+ # 6. 编辑距离相似度
+ try:
+ edit_distance = Levenshtein.distance(keyword.lower(), title.lower())
+ max_len = max(len(keyword), len(title))
+ if max_len > 0:
+ similarity = 1 - (edit_distance / max_len)
+ if similarity > 0.7:
+ score += 1.5 * similarity
+ except:
+ similarity = difflib.SequenceMatcher(None, keyword.lower(), title.lower()).ratio()
+ if similarity > 0.7:
+ score += 1.5 * similarity
+
+ # 7. 中文字符重叠检测 - 修改为仅当重叠2个以上汉字或占比超过40%时才计分
+ if re.search(r'[\u4e00-\u9fff]', keyword) and re.search(r'[\u4e00-\u9fff]', title):
+ cn_chars_keyword = set(re.findall(r'[\u4e00-\u9fff]', keyword))
+ cn_chars_title = set(re.findall(r'[\u4e00-\u9fff]', title))
+
+ # 计算重叠的汉字集合
+ overlapped_chars = cn_chars_keyword & cn_chars_title
+
+ # 仅当重叠汉字数量大于1且占比超过阈值时才计分
+ if len(overlapped_chars) > 1 and len(cn_chars_keyword) > 0:
+ overlap_ratio = len(overlapped_chars) / len(cn_chars_keyword)
+ # 增加重叠比例的阈值要求,防止单个汉字导致的误匹配
+ if overlap_ratio >= 0.4 or len(overlapped_chars) >= 3:
+ score += 2.0 * overlap_ratio
+ # 对于非常低的重叠度,不加分,避免无关内容干扰
+
+ # 记录日志,帮助调试特定案例
+ if keyword == "明日方舟" and "白日梦想家" in title:
+ logger.info(f"'明日方舟'与'{title}'的汉字重叠: {overlapped_chars}, 重叠比例: {len(overlapped_chars)/len(cn_chars_keyword) if cn_chars_keyword else 0}")
+
+ # 8. 序列资源检测(如"功夫熊猫2"是"功夫熊猫"的系列)
+ base_title_match = re.match(r'(.*?)([0-9]+|[一二三四五六七八九十]|:|\:|\s+[0-9]+)', title)
+ if base_title_match:
+ base_title = base_title_match.group(1).strip()
+ if keyword.lower() == base_title.lower():
+ score += 2.0
+
+ # 9. 标签和描述匹配(增加权重)
+ if tags:
+ tags_list = tags.split(',')
+ if keyword in tags_list:
+ score += 1.5 # 提高标签匹配的权重
+ elif any(keyword.lower() in tag.lower() for tag in tags_list):
+ score += 1.0 # 提高部分匹配的权重
+
+ # 描述匹配增强
+ if keyword.lower() in description.lower():
+ score += 1.5 # 提高描述匹配的权重
+
+ # 检查关键词在描述中的位置和上下文
+ pos = description.lower().find(keyword.lower())
+ if pos >= 0 and pos < len(description) / 3:
+ # 关键词出现在描述前1/3部分,可能更重要
+ score += 0.5
+
+ # 考虑分词匹配描述
+ keyword_words = list(jieba.cut(keyword))
+ description_words = list(jieba.cut(description))
+ matched_desc_words = 0
+ for k_word in keyword_words:
+ if len(k_word) > 1 and k_word in description_words:
+ matched_desc_words += 1
+
+ if len(keyword_words) > 0:
+ desc_match_ratio = matched_desc_words / len(keyword_words)
+ score += 1.0 * desc_match_ratio
+
+ # 分类匹配
+ if keyword.lower() in category.lower():
+ score += 1.0
+
+ # 添加语义关联匹配得分
+ # 扩展关键词进行匹配
+ expanded_keywords = expand_search_keywords(keyword)
+ # 检测标题是否包含语义相关词
+ for exp_keyword in expanded_keywords:
+ if exp_keyword != keyword and exp_keyword in title: # 避免重复计算原关键词
+ score += 1.5 # 一般语义关联
+
+ return score
+
+# 创建Flask应用
+app = Flask(__name__)
+CORS(app) # 允许所有跨域请求
+
+# 添加init_word2vec函数
+def init_word2vec():
+ """初始化Word2Vec模型"""
+ try:
+ helper = get_word2vec_helper()
+ if helper.initialized:
+ logger.info(f"Word2Vec模型已成功加载,词汇量: {len(helper.model.index_to_key)}, 向量维度: {helper.model.vector_size}")
+ else:
+ if helper.load_model():
+ logger.info(f"Word2Vec模型加载成功,词汇量: {len(helper.model.index_to_key)}, 向量维度: {helper.model.vector_size}")
+ else:
+ logger.error("Word2Vec模型加载失败")
+ except Exception as e:
+ logger.error(f"初始化Word2Vec出错: {e}")
+
+# 新的初始化方式:
+def initialize_app():
+ """应用初始化函数,替代before_first_request装饰器"""
+ # 修正:使用正确的函数名
+ # 原代码: init_semantic_mapping()
+ # 修正为使用已定义的函数名
+ global SEMANTIC_MAPPINGS
+ SEMANTIC_MAPPINGS = load_semantic_mappings() # 更新全局语义映射变量
+
+ if WORD2VEC_ENABLED:
+ init_word2vec() # 现在这个函数已经定义了
+
+# 在启动应用之前调用初始化函数
+initialize_app()
+
+# 测试路由
+@app.route('/test', methods=['GET'])
+def test():
+ import datetime
+ return jsonify({"message": "服务器正常运行", "timestamp": str(datetime.datetime.now())})
+
+# 获取单个帖子详情的API
+@app.route('/post/<int:post_id>', methods=['GET'])
+def get_post_detail(post_id):
+ """
+ 获取单个帖子详情
+ """
+ logger.info(f"接收到获取帖子详情请求,post_id: {post_id}")
+ conn = get_db_conn()
+ try:
+ with conn.cursor(pymysql.cursors.DictCursor) as cursor:
+ # 联表查询帖子详情,获取分类名和type
+ query = """
+ SELECT
+ p.id,
+ p.title,
+ p.content,
+ p.heat,
+ p.created_at as create_time,
+ p.updated_at as last_active,
+ p.status,
+ p.type,
+ tp.name as category
+ FROM posts p
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ WHERE p.id = %s
+ """
+ logger.info(f"执行查询: {query} with post_id: {post_id}")
+ cursor.execute(query, (post_id,))
+ post = cursor.fetchone()
+
+ logger.info(f"查询结果: {post}")
+
+ if not post:
+ logger.warning(f"帖子不存在,post_id: {post_id}")
+ return jsonify({"error": "帖子不存在"}), 404
+
+ # 设置默认值
+ post['tags'] = []
+ post['author'] = '匿名用户'
+ if not post.get('category'):
+ post['category'] = '未分类'
+ if not post.get('type'):
+ post['type'] = 'text'
+ # 格式化时间
+ if post['create_time']:
+ post['create_time'] = post['create_time'].strftime('%Y-%m-%d %H:%M:%S')
+ if post['last_active']:
+ post['last_active'] = post['last_active'].strftime('%Y-%m-%d %H:%M:%S')
+
+ logger.info(f"返回帖子详情: {post}")
+ return Response(json.dumps(post, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ except Exception as e:
+ logger.error(f"获取帖子详情失败: {e}")
+ import traceback
+ traceback.print_exc()
+ return jsonify({"error": "服务器内部错误"}), 500
+ finally:
+ conn.close()
+
+# 搜索功能的API
+@app.route('/search', methods=['POST'])
+def search():
+ """
+ 搜索功能API
+ 请求格式:{
+ "keyword": "关键词",
+ "sort_by": "downloads" | "downloads_asc" | "newest" | "oldest" | "similarity" | "title_asc" | "title_desc",
+ "category": "可选,分类名",
+ "search_mode": "title" | "title_desc" | "tags" | "all" # 可选,默认"title",
+ "tags": ["标签1", "标签2"] # 可选,支持传递多个标签
+ }
+ """
+ if request.content_type != 'application/json':
+ return jsonify({"error": "Content-Type must be application/json"}), 415
+
+ data = request.get_json()
+ keyword = data.get("keyword", "").strip()
+ sort_by = data.get("sort_by", "similarity") # 默认按相似度排序
+ category = data.get("category", None)
+ search_mode = data.get("search_mode", "title")
+ tags = data.get("tags", None) # 支持传递多个标签
+
+ # 校验参数 - 不管什么模式都要求关键词
+ if not (1 <= len(keyword) <= 20):
+ return jsonify({"error": "请输入1-20个字符"}), 400
+
+ # 第一阶段:数据库查询获取候选集
+ results = []
+ conn = get_db_conn()
+ try:
+ with conn.cursor(pymysql.cursors.DictCursor) as cursor:
+ # 首先尝试查询完全匹配的结果
+ exact_query = f"""
+ SELECT id, title, topic_id, heat, created_at, content
+ FROM posts
+ WHERE title = %s
+ """
+ cursor.execute(exact_query, (keyword,))
+ exact_matches = cursor.fetchall() or [] # 确保返回列表而非元组
+
+ # 扩展关键词,增加语义关联词
+ expanded_keywords = expand_search_keywords(keyword)
+ logger.info(f"扩展后的关键词: {expanded_keywords}") # 调试信息
+
+ # 构建查询条件
+ conditions = []
+ params = []
+
+ # 标题匹配 - 所有搜索模式都匹配title
+ conditions.append("title LIKE %s")
+ params.append(f"%{keyword}%")
+
+ # 为扩展关键词添加标题匹配条件
+ for exp_keyword in expanded_keywords:
+ if exp_keyword != keyword: # 避免重复原关键词
+ conditions.append("title LIKE %s")
+ params.append(f"%{exp_keyword}%")
+
+ # 描述匹配
+ if search_mode in ["title_desc", "all"]:
+ # 原始关键词匹配描述
+ conditions.append("content LIKE %s")
+ params.append(f"%{keyword}%")
+
+ # 扩展关键词匹配描述
+ for exp_keyword in expanded_keywords:
+ if exp_keyword != keyword:
+ conditions.append("content LIKE %s")
+ params.append(f"%{exp_keyword}%")
+
+ # 标签匹配
+ # 暂不处理,后续join实现
+
+ # 分类匹配 - 仅在all模式下
+ if search_mode == "all":
+ # 原始关键词匹配分类
+ conditions.append("topic_id LIKE %s")
+ params.append(f"%{keyword}%")
+
+ # 扩展关键词匹配分类
+ for exp_keyword in expanded_keywords:
+ if exp_keyword != keyword:
+ conditions.append("topic_id LIKE %s")
+ params.append(f"%{exp_keyword}%")
+
+ # 构建SQL查询
+ if conditions:
+ where_clause = " OR ".join(conditions)
+ logger.info(f"搜索条件: {where_clause}")
+ logger.info(f"参数列表: {params}")
+
+ if category:
+ where_clause = f"({where_clause}) AND topic_id=%s"
+ params.append(category)
+
+ sql = f"""
+ SELECT p.id, p.title, tp.name as category, p.heat, p.created_at, p.content,
+ GROUP_CONCAT(t.name) as tags
+ FROM posts p
+ LEFT JOIN post_tags pt ON p.id = pt.post_id
+ LEFT JOIN tags t ON pt.tag_id = t.id
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ WHERE {where_clause}
+ GROUP BY p.id
+ LIMIT 500
+ """
+
+ cursor.execute(sql, params)
+ expanded_results = cursor.fetchall()
+ logger.info(f"数据库返回记录数: {len(expanded_results) if expanded_results else 0}")
+ else:
+ expanded_results = []
+
+ # 如果扩展查询和精确匹配都没有结果,获取全部记录进行相关性计算
+ if not expanded_results and not exact_matches:
+ sql = "SELECT p.id, p.title, tp.name as category, p.heat, p.created_at, p.content, GROUP_CONCAT(t.name) as tags FROM posts p LEFT JOIN post_tags pt ON p.id = pt.post_id LEFT JOIN tags t ON pt.tag_id = t.id LEFT JOIN topics tp ON p.topic_id = tp.id"
+ if category:
+ sql += " WHERE p.topic_id=%s"
+ category_params = [category]
+ cursor.execute(sql + " GROUP BY p.id", category_params)
+ else:
+ cursor.execute(sql + " GROUP BY p.id")
+
+ all_results = cursor.fetchall() or [] # 确保返回列表
+ else:
+ if isinstance(exact_matches, tuple):
+ exact_matches = list(exact_matches)
+ if isinstance(expanded_results, tuple):
+ expanded_results = list(expanded_results)
+ all_results = expanded_results + exact_matches
+
+ # 对所有结果使用相关性计算规则
+ scored_results = []
+ for item in all_results:
+ # 计算相关性得分
+ relevance_score = calculate_keyword_relevance(keyword, item)
+
+ # 降低相关性阈值,确保更多结果被保留 (从0.5改为0.1)
+ if relevance_score > 0.1:
+ item['relevance_score'] = relevance_score
+ scored_results.append(item)
+ logger.info(f"匹配项: {item['title']}, 相关性得分: {relevance_score}")
+
+ # 按相关性得分排序
+ scored_results.sort(key=lambda x: x.get('relevance_score', 0), reverse=True)
+
+ # 确保精确匹配的结果置顶
+ if exact_matches:
+ for exact_match in exact_matches:
+ exact_match['relevance_score'] = 20.0 # 超高分确保置顶
+
+ # 移除scored_results中已经存在于exact_matches的项
+ exact_ids = {item['id'] for item in exact_matches}
+ scored_results = [item for item in scored_results if item['id'] not in exact_ids]
+
+ # 合并两个结果集
+ results = exact_matches + scored_results
+ else:
+ results = scored_results
+
+ # 限制返回结果数量
+ results = results[:50]
+
+ except Exception as e:
+ logger.error(f"搜索出错: {e}")
+ import traceback
+ traceback.print_exc()
+ return jsonify({"error": "搜索系统异常,请稍后再试"}), 500
+ finally:
+ conn.close()
+
+ # 第二阶段:根据指定方式排序
+ if results:
+ if sort_by == "similarity" or not sort_by:
+ # 保持按相关性得分排序,已经排好了
+ pass
+ elif sort_by == "downloads":
+ results.sort(key=lambda x: x.get("download_count", 0), reverse=True)
+ elif sort_by == "downloads_asc":
+ results.sort(key=lambda x: x.get("download_count", 0))
+ elif sort_by == "newest":
+ results.sort(key=lambda x: x.get("create_time", ""), reverse=True)
+ elif sort_by == "oldest":
+ results.sort(key=lambda x: x.get("create_time", ""))
+ elif sort_by == "title_asc":
+ results.sort(key=lambda x: x.get("title", ""))
+ elif sort_by == "title_desc":
+ results.sort(key=lambda x: x.get("title", ""), reverse=True)
+
+ # 最终处理:清理不需要返回的字段,并将 datetime 转为字符串
+ for item in results:
+ item.pop("description", None)
+ item.pop("tags", None)
+ item.pop("relevance_score", None)
+ for k, v in item.items():
+ if hasattr(v, 'isoformat'):
+ item[k] = v.isoformat(sep=' ', timespec='seconds')
+
+ return Response(json.dumps({"results": results}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+
+# 推荐功能的API
+@app.route('/recommend_tags', methods=['POST'])
+def recommend_tags():
+ """
+ 推荐功能API
+ 请求格式:{
+ "user_id": "user1",
+ "tags": ["标签1", "标签2"] # 可为空
+ }
+ """
+ if request.content_type != 'application/json':
+ return jsonify({"error": "Content-Type must be application/json"}), 415
+
+ data = request.get_json()
+ user_id = data.get("user_id")
+ tags = set(data.get("tags", []))
+
+ # 查询用户已保存的兴趣标签
+ user_tags = set()
+ if user_id:
+ conn = get_db_conn()
+ try:
+ with conn.cursor() as cursor:
+ cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,))
+ user_tags = set(row[0] for row in cursor.fetchall())
+ finally:
+ conn.close()
+
+ # 合并前端传递的tags和用户兴趣标签
+ all_tags = list(tags | user_tags)
+
+ if not all_tags:
+ return Response(json.dumps({"error": "暂无推荐结果"}, ensure_ascii=False), mimetype='application/json; charset=utf-8'), 200
+
+ conn = get_db_conn()
+ try:
+ with conn.cursor(pymysql.cursors.DictCursor) as cursor:
+ # 优先用tags字段匹配
+ # 先查找所有tag_id
+ tag_ids = []
+ for tag in all_tags:
+ cursor.execute("SELECT id FROM tags WHERE name=%s", (tag,))
+ row = cursor.fetchone()
+ if row:
+ tag_ids.append(row['id'])
+ if not tag_ids:
+ return Response(json.dumps({"error": "暂无推荐结果"}, ensure_ascii=False), mimetype='application/json; charset=utf-8'), 200
+ tag_placeholders = ','.join(['%s'] * len(tag_ids))
+ sql = f"""
+ SELECT p.id, p.title, tp.name as category, p.heat,
+ GROUP_CONCAT(tg.name) as tags
+ FROM posts p
+ LEFT JOIN post_tags pt ON p.id = pt.post_id
+ LEFT JOIN tags tg ON pt.tag_id = tg.id
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ WHERE pt.tag_id IN ({tag_placeholders})
+ GROUP BY p.id
+ LIMIT 50
+ """
+ cursor.execute(sql, tuple(tag_ids))
+ results = cursor.fetchall()
+ # 若无结果,回退title/content模糊匹配
+ if not results:
+ or_conditions = []
+ params = []
+ for tag in all_tags:
+ or_conditions.append("p.title LIKE %s OR p.content LIKE %s")
+ params.extend(['%' + tag + '%', '%' + tag + '%'])
+ where_clause = ' OR '.join(or_conditions)
+ sql = f"""
+ SELECT p.id, p.title, tp.name as category, p.heat,
+ GROUP_CONCAT(tg.name) as tags
+ FROM posts p
+ LEFT JOIN post_tags pt ON p.id = pt.post_id
+ LEFT JOIN tags tg ON pt.tag_id = tg.id
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ WHERE {where_clause}
+ GROUP BY p.id
+ LIMIT 50
+ """
+ cursor.execute(sql, tuple(params))
+ results = cursor.fetchall()
+ finally:
+ conn.close()
+
+ if not results:
+ return Response(json.dumps({"error": "暂无推荐结果"}, ensure_ascii=False), mimetype='application/json; charset=utf-8'), 200
+
+ return Response(json.dumps({"recommendations": results}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+
+# 用户兴趣标签管理API(可选)
+@app.route('/tags', methods=['POST', 'GET', 'DELETE'])
+def user_tags():
+ """
+ POST: 添加用户兴趣标签
+ GET: 查询用户兴趣标签
+ DELETE: 删除用户兴趣标签
+ """
+ if request.method == 'POST':
+ if request.content_type != 'application/json':
+ return jsonify({"error": "Content-Type must be application/json"}), 415
+ data = request.get_json()
+ user_id = data.get("user_id")
+ tags = data.get("tags", [])
+
+ if not user_id:
+ return jsonify({"error": "用户ID不能为空"}), 400
+
+ # 确保标签列表格式正确
+ if isinstance(tags, str):
+ tags = [tag.strip() for tag in tags.split(',') if tag.strip()]
+
+ if not tags:
+ return jsonify({"error": "标签不能为空"}), 400
+
+ conn = get_db_conn()
+ try:
+ with conn.cursor() as cursor:
+ # 添加用户标签
+ for tag in tags:
+ # 先查找tag_id
+ cursor.execute("SELECT id FROM tags WHERE name=%s", (tag,))
+ tag_row = cursor.fetchone()
+ if tag_row:
+ tag_id = tag_row[0]
+ cursor.execute("REPLACE INTO user_tags (user_id, tag_id) VALUES (%s, %s)", (user_id, tag_id))
+ conn.commit()
+ # 返回更新后的标签列表
+ cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,))
+ updated_tags = [row[0] for row in cursor.fetchall()]
+ finally:
+ conn.close()
+ return Response(json.dumps({"msg": "添加成功", "tags": updated_tags}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ elif request.method == 'DELETE':
+ if request.content_type != 'application/json':
+ return jsonify({"error": "Content-Type must be application/json"}), 415
+ data = request.get_json()
+ user_id = data.get("user_id")
+ tags = data.get("tags", [])
+ if not user_id:
+ return jsonify({"error": "用户ID不能为空"}), 400
+ if not tags:
+ return jsonify({"error": "标签不能为空"}), 400
+
+ conn = get_db_conn()
+ try:
+ with conn.cursor() as cursor:
+ for tag in tags:
+ cursor.execute("SELECT id FROM tags WHERE name=%s", (tag,))
+ tag_row = cursor.fetchone()
+ if tag_row:
+ tag_id = tag_row[0]
+ cursor.execute("DELETE FROM user_tags WHERE user_id=%s AND tag_id=%s", (user_id, tag_id))
+ conn.commit()
+ cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,))
+ remaining_tags = [row[0] for row in cursor.fetchall()]
+ finally:
+ conn.close()
+ return Response(json.dumps({"msg": "删除成功", "tags": remaining_tags}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ else: # GET 请求
+ user_id = request.args.get("user_id")
+ if not user_id:
+ return jsonify({"error": "用户ID不能为空"}), 400
+ conn = get_db_conn()
+ try:
+ with conn.cursor() as cursor:
+ cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,))
+ tags = [row[0] for row in cursor.fetchall()]
+ finally:
+ conn.close()
+ return Response(json.dumps({"tags": tags}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+
+# 添加/user_tags路由作为/tags的别名
+@app.route('/user_tags', methods=['POST', 'GET', 'DELETE'])
+def user_tags_alias():
+ """
+ /user_tags路由 - 作为/tags路由的别名
+ POST: 添加用户兴趣标签
+ GET: 查询用户兴趣标签
+ DELETE: 删除用户兴趣标签
+ """
+ return user_tags()
+
+# 基于用户的协同过滤推荐API
+@app.route('/user_based_recommend', methods=['POST'])
+def user_based_recommend():
+ """
+ 基于用户的协同过滤推荐API
+ 请求格式:{
+ "user_id": "user1",
+ "top_n": 5
+ }
+ """
+ if request.content_type != 'application/json':
+ return jsonify({"error": "Content-Type must be application/json"}), 415
+
+ data = request.get_json()
+ user_id = data.get("user_id")
+ top_n = int(data.get("top_n", 5))
+
+ if not user_id:
+ return jsonify({"error": "用户ID不能为空"}), 400
+
+ conn = get_db_conn()
+ try:
+ with conn.cursor(pymysql.cursors.DictCursor) as cursor:
+ # 1. 检查用户是否存在下载记录(收藏或浏览)
+ cursor.execute("""
+ SELECT COUNT(*) as count
+ FROM behaviors
+ WHERE user_id = %s AND type IN ('favorite', 'view')
+ """, (user_id,))
+ result = cursor.fetchone()
+ user_download_count = result['count'] if result else 0
+
+ logger.info(f"用户 {user_id} 下载记录数: {user_download_count}")
+
+ # 如果用户没有足够的行为数据,返回基于热度的推荐
+ if user_download_count < 3:
+ logger.info(f"用户 {user_id} 下载记录不足,返回热门推荐")
+ cursor.execute("""
+ SELECT p.id, p.title, tp.name as category, p.heat
+ FROM posts p
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ ORDER BY p.heat DESC
+ LIMIT %s
+ """, (top_n,))
+ popular_seeds = cursor.fetchall()
+ return Response(json.dumps({"recommendations": popular_seeds, "type": "popular"}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+
+ # 2. 获取用户已下载(收藏/浏览)的帖子
+ cursor.execute("""
+ SELECT post_id
+ FROM behaviors
+ WHERE user_id = %s AND type IN ('favorite', 'view')
+ """, (user_id,))
+ user_seeds = set(row['post_id'] for row in cursor.fetchall())
+ logger.info(f"用户 {user_id} 已下载种子: {user_seeds}")
+
+ # 3. 获取所有用户-帖子下载(收藏/浏览)矩阵
+ cursor.execute("""
+ SELECT user_id, post_id
+ FROM behaviors
+ WHERE created_at > DATE_SUB(NOW(), INTERVAL 3 MONTH)
+ AND user_id <> %s AND type IN ('favorite', 'view')
+ """, (user_id,))
+ download_records = cursor.fetchall()
+
+ if not download_records:
+ logger.info(f"没有其他用户的下载记录,返回热门推荐")
+ cursor.execute("""
+ SELECT p.id, p.title, tp.name as category, p.heat
+ FROM posts p
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ ORDER BY p.heat DESC
+ LIMIT %s
+ """, (top_n,))
+ popular_seeds = cursor.fetchall()
+ return Response(json.dumps({"recommendations": popular_seeds, "type": "popular"}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+
+ # 构建用户-物品矩阵
+ user_item_matrix = {}
+ for record in download_records:
+ uid = record['user_id']
+ sid = record['post_id']
+ if uid not in user_item_matrix:
+ user_item_matrix[uid] = set()
+ user_item_matrix[uid].add(sid)
+
+ # 4. 计算用户相似度
+ similar_users = []
+ for other_id, other_seeds in user_item_matrix.items():
+ if other_id == user_id:
+ continue
+ intersection = len(user_seeds.intersection(other_seeds))
+ union = len(user_seeds.union(other_seeds))
+ if union > 0 and intersection > 0:
+ similarity = intersection / union
+ similar_users.append((other_id, similarity, other_seeds))
+ logger.info(f"找到 {len(similar_users)} 个相似用户")
+ similar_users.sort(key=lambda x: x[1], reverse=True)
+ similar_users = similar_users[:5]
+ # 5. 基于相似用户推荐帖子
+ candidate_seeds = {}
+ for similar_user, similarity, seeds in similar_users:
+ logger.info(f"相似用户 {similar_user}, 相似度 {similarity}")
+ for post_id in seeds:
+ if post_id not in user_seeds:
+ if post_id not in candidate_seeds:
+ candidate_seeds[post_id] = 0
+ candidate_seeds[post_id] += similarity
+ if not candidate_seeds:
+ logger.info(f"没有找到候选种子,返回热门推荐")
+ cursor.execute("""
+ SELECT p.id, p.title, tp.name as category, p.heat
+ FROM posts p
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ ORDER BY p.heat DESC
+ LIMIT %s
+ """, (top_n,))
+ popular_seeds = cursor.fetchall()
+ return Response(json.dumps({"recommendations": popular_seeds, "type": "popular"}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ # 6. 获取推荐帖子的详细信息
+ recommended_seeds = sorted(candidate_seeds.items(), key=lambda x: x[1], reverse=True)[:top_n]
+ post_ids = [post_id for post_id, _ in recommended_seeds]
+ format_strings = ','.join(['%s'] * len(post_ids))
+ cursor.execute(f"""
+ SELECT p.id, p.title, tp.name as category, p.heat
+ FROM posts p
+ LEFT JOIN topics tp ON p.topic_id = tp.id
+ WHERE p.id IN ({format_strings})
+ """, tuple(post_ids))
+ result_seeds = cursor.fetchall()
+ seed_score_map = {post_id: score for post_id, score in recommended_seeds}
+ result_seeds.sort(key=lambda x: seed_score_map.get(x['id'], 0), reverse=True)
+ logger.info(f"返回 {len(result_seeds)} 个基于协同过滤的推荐")
+ return Response(json.dumps({"recommendations": result_seeds, "type": "collaborative"}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ except Exception as e:
+ logger.error(f"推荐系统错误: {e}")
+ import traceback
+ traceback.print_exc()
+ return Response(json.dumps({"error": "推荐系统异常,请稍后再试", "details": str(e)}, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ finally:
+ conn.close()
+@app.route('/word2vec_status', methods=['GET'])
+def word2vec_status():
+ """
+ 检查Word2Vec模型状态
+ 返回模型是否加载、词汇量等信息
+ """
+ if not WORD2VEC_ENABLED:
+ return Response(json.dumps({
+ "enabled": False,
+ "message": "Word2Vec功能未启用"
+ }, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ try:
+ helper = get_word2vec_helper()
+ status = {
+ "enabled": WORD2VEC_ENABLED,
+ "initialized": helper.initialized,
+ "vocab_size": len(helper.model.index_to_key) if helper.model else 0,
+ "vector_size": helper.model.vector_size if helper.model else 0
+ }
+
+ # 测试几个常用词的相似词,展示模型效果
+ test_results = {}
+ test_words = ["电影", "动作", "科幻", "动漫", "游戏"]
+ for word in test_words:
+ similar_words = helper.get_similar_words(word, topn=5)
+ test_results[word] = similar_words
+
+ status["test_results"] = test_results
+ return Response(json.dumps(status, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ except Exception as e:
+ return Response(json.dumps({
+ "enabled": WORD2VEC_ENABLED,
+ "initialized": False,
+ "error": str(e)
+ }, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+
+# 添加一个临时诊断端点
+@app.route('/debug_search', methods=['POST'])
+def debug_search():
+ """临时的调试端点,用于检查数据库中的记录"""
+ if request.content_type != 'application/json':
+ return jsonify({"error": "Content-Type must be application/json"}), 415
+
+ data = request.get_json()
+ keyword = data.get("keyword", "").strip()
+
+ conn = get_db_conn()
+ try:
+ with conn.cursor(pymysql.cursors.DictCursor) as cursor:
+ # 尝试查询包含特定词的所有记录
+ queries = [
+ ("标题中包含关键词", f"SELECT seed_id, title, description, tags FROM pt_seed WHERE title LIKE '%{keyword}%' LIMIT 10"),
+ ("描述中包含关键词", f"SELECT seed_id, title, description, tags FROM pt_seed WHERE description LIKE '%{keyword}%' LIMIT 10"),
+ ("标签中包含关键词", f"SELECT seed_id, title, description, tags FROM pt_seed WHERE FIND_IN_SET('{keyword}', tags) LIMIT 10"),
+ ("肖申克的救赎", "SELECT seed_id, title, description, tags FROM pt_seed WHERE title = '肖申克的救赎'")
+ ]
+
+ results = {}
+ for query_name, query in queries:
+ cursor.execute(query)
+ results[query_name] = cursor.fetchall()
+
+ return Response(json.dumps(results, ensure_ascii=False), mimetype='application/json; charset=utf-8')
+ finally:
+ conn.close()
+
+"""
+接口本地测试方法(可直接运行main_online.py后用curl或Postman测试):
+
+1. 搜索接口
+curl -X POST http://127.0.0.1:5000/search -H "Content-Type: application/json" -d '{"keyword":"电影","sort_by":"downloads"}'
+
+2. 标签推荐接口
+curl -X POST http://127.0.0.1:5000/recommend_tags -H "Content-Type: application/json" -d '{"user_id":"1","tags":["动作","科幻"]}'
+
+3. 用户兴趣标签管理(添加标签)
+curl -X POST http://127.0.0.1:5000/user_tags -H "Content-Type: application/json" -d '{"user_id":"1","tags":["动作","科幻"]}'
+
+4. 用户兴趣标签管理(查询标签)
+curl "http://127.0.0.1:5000/user_tags?user_id=1"
+
+5. 用户兴趣标签管理(删除标签)
+curl -X DELETE http://127.0.0.1:5000/user_tags -H "Content-Type: application/json" -d '{"user_id":"1","tags":["动作","科幻"]}'
+
+6. 协同过滤推荐
+curl -X POST http://127.0.0.1:5000/user_based_recommend -H "Content-Type: application/json" -d '{"user_id":"user1","top_n":3}'
+
+7. Word2Vec状态检查
+curl "http://127.0.0.1:5000/word2vec_status"
+
+8. 调试接口(临时)
+curl -X POST http://127.0.0.1:5000/debug_search -H "Content-Type: application/json" -d '{"keyword":"电影"}'
+
+所有接口均可用Postman按上述参数测试。
+"""
+
+if __name__ == "__main__":
+ try:
+ logger.info("搜索推荐服务启动中...")
+ app.run(host="0.0.0.0", port=5000)
+ except Exception as e:
+ logger.error(f"启动异常: {e}")
+ import traceback
+ traceback.print_exc()