| # main_online.py |
| # 搜索推荐算法服务的主入口 |
| |
| import json |
| import numpy as np |
| import difflib |
| from flask import Flask, request, jsonify, Response |
| import pymysql |
| import jieba |
| from sklearn.feature_extraction.text import TfidfVectorizer |
| from sklearn.metrics.pairwise import cosine_similarity |
| import pypinyin |
| from flask_cors import CORS |
| import re |
| import Levenshtein |
| import os |
| import logging |
| |
| # 设置日志 |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger("allpt-search") |
| |
| # 导入Word2Vec辅助模块 |
| try: |
| from word2vec_helper import get_word2vec_helper, expand_query, get_similar_words |
| WORD2VEC_ENABLED = True |
| logger.info("Word2Vec模块已加载") |
| except ImportError as e: |
| logger.warning(f"Word2Vec模块加载失败: {e},将使用传统搜索") |
| WORD2VEC_ENABLED = False |
| |
| # 数据库配置 |
| DB_CONFIG = { |
| "host": "10.126.59.25", |
| "port": 3306, |
| "user": "root", |
| "password": "123456", |
| "database": "redbook", |
| "charset": "utf8mb4" |
| } |
| |
| def get_db_conn(): |
| return pymysql.connect(**DB_CONFIG) |
| |
| def get_pinyin(text): |
| # 返回字符串的全拼音(不带声调,全部小写),支持英文直接返回 |
| if not text: |
| return "" |
| import re |
| # 如果全是英文,直接返回小写 |
| if re.fullmatch(r'[a-zA-Z]+', text): |
| return text.lower() |
| return ''.join([p[0] for p in pypinyin.pinyin(text, style=pypinyin.NORMAL)]) |
| |
| def get_pinyin_initials(text): |
| # 返回字符串的首字母拼音(全部小写),支持英文直接返回 |
| if not text: |
| return "" |
| import re |
| if re.fullmatch(r'[a-zA-Z]+', text): |
| return text.lower() |
| return ''.join([p[0][0] for p in pypinyin.pinyin(text, style=pypinyin.NORMAL)]) |
| |
| # 新增词语相似度计算函数 |
| def word_similarity(word1, word2): |
| """计算两个词的相似度,支持拼音匹配""" |
| # 直接匹配 |
| if word1 == word2: |
| return 1.0 |
| |
| # 拼音匹配 |
| if get_pinyin(word1) == get_pinyin(word2): |
| return 0.9 |
| |
| # 拼音首字母匹配 |
| if get_pinyin_initials(word1) == get_pinyin_initials(word2): |
| return 0.7 |
| |
| # 字符串相似度 |
| return difflib.SequenceMatcher(None, word1, word2).ratio() |
| |
| def semantic_title_similarity(query, title): |
| """计算查询词与标题的语义相似度""" |
| # 分词 |
| query_words = list(jieba.cut(query)) |
| title_words = list(jieba.cut(title)) |
| |
| if not query_words or not title_words: |
| return 0.0 |
| |
| # 计算每个查询词与标题词的最大相似度 |
| max_similarities = [] |
| key_matches = 0 # 关键词精确匹配数量 |
| |
| for q_word in query_words: |
| if len(q_word.strip()) <= 1: # 忽略单字,减少噪音 |
| continue |
| |
| word_sims = [word_similarity(q_word, t_word) for t_word in title_words] |
| if word_sims: |
| max_sim = max(word_sims) |
| max_similarities.append(max_sim) |
| if max_sim > 0.85: # 认为是关键词匹配 |
| key_matches += 1 |
| |
| if not max_similarities: |
| return 0.0 |
| |
| # 计算平均相似度 |
| avg_sim = sum(max_similarities) / len(max_similarities) |
| |
| # 权重计算: 平均相似度占70%,关键词匹配率占30% |
| key_match_ratio = key_matches / len(query_words) if query_words else 0 |
| |
| # 标题中包含完整查询短语时给予额外加分 |
| exact_bonus = 0.3 if query in title else 0 |
| |
| return 0.7 * avg_sim + 0.3 * key_match_ratio + exact_bonus |
| |
| # 添加语义关联词典,用于增强搜索能力 |
| def load_semantic_mappings(): |
| """ |
| 加载语义关联映射表,用于增强搜索语义理解 |
| 返回包含语义映射关系的字典 |
| """ |
| # 初始化空字典,所有映射将从配置文件加载 |
| mappings = {} |
| |
| # 从配置文件加载映射 |
| try: |
| config_path = os.path.join(os.path.dirname(__file__), "semantic_config.json") |
| if os.path.exists(config_path): |
| with open(config_path, 'r', encoding='utf-8') as f: |
| mappings = json.load(f) |
| logger.info(f"已从配置文件加载 {len(mappings)} 个语义映射") |
| else: |
| logger.warning(f"语义配置文件不存在: {config_path}") |
| except Exception as e: |
| logger.error(f"加载语义配置文件失败: {e}") |
| |
| return mappings |
| |
| # 初始化语义映射 |
| SEMANTIC_MAPPINGS = load_semantic_mappings() |
| |
| def expand_search_keywords(keyword): |
| """ |
| 扩展搜索关键词,增加语义关联词 |
| """ |
| expanded = [keyword] |
| |
| # 分词处理 |
| words = list(jieba.cut(keyword)) |
| logger.info(f"关键词 '{keyword}' 分词结果: {words}") # 记录分词结果 |
| |
| # 分别对每个分词进行语义扩展 |
| for word in words: |
| if word in SEMANTIC_MAPPINGS: |
| # 添加语义关联词 |
| mapped_words = SEMANTIC_MAPPINGS[word] |
| expanded.extend(mapped_words) |
| logger.info(f"语义映射: '{word}' -> {mapped_words}") |
| |
| # 移除所有特殊处理部分 |
| # 不再对任何特定关键词如"越狱"进行特殊处理 |
| |
| # Word2Vec扩展 - 如果可用,对分词结果进行Word2Vec扩展 |
| if WORD2VEC_ENABLED: |
| try: |
| # 使用单独的变量记录原始扩展结果,方便记录日志 |
| original_expanded = set(expanded) |
| |
| # 首先尝试对整个关键词进行扩展 |
| w2v_expanded = set() |
| similar_words = get_similar_words(keyword, topn=3, min_similarity=0.6) |
| w2v_expanded.update(similar_words) |
| |
| # 然后对较长的分词进行扩展 |
| for word in words: |
| if len(word) > 1: # 忽略单字 |
| similar_words = get_similar_words(word, topn=2, min_similarity=0.65) |
| w2v_expanded.update(similar_words) |
| |
| # 合并结果 |
| expanded.extend(w2v_expanded) |
| |
| # 记录日志 |
| if w2v_expanded: |
| logger.info(f"Word2Vec扩展: {keyword} -> {list(w2v_expanded)}") |
| except Exception as e: |
| # 出错时记录但不中断搜索流程 |
| logger.error(f"Word2Vec扩展失败: {e}") |
| logger.info("将仅使用配置文件中的语义映射") |
| |
| # 去重 |
| return list(set(expanded)) |
| |
| # 替换原有的calculate_keyword_relevance函数,采用更通用的相关性算法 |
| def calculate_keyword_relevance(keyword, item): |
| """计算搜索关键词与条目的相关性得分""" |
| title = item.get('title', '') |
| description = item.get('description', '') or '' |
| tags = item.get('tags', '') or '' |
| category = item.get('category', '') or '' # 添加category字段 |
| |
| # 初始化得分 |
| score = 0 |
| |
| # 1. 精确匹配(最高优先级) |
| if keyword.lower() == title.lower(): |
| return 15.0 # 完全匹配给予最高分 |
| |
| # 2. 标题中精确词匹配 |
| title_words = re.findall(r'\b\w+\b', title.lower()) |
| if keyword.lower() in title_words: |
| score += 10.0 # 作为独立词完全匹配 |
| |
| # 3. 标题包含关键词(部分匹配) |
| elif keyword.lower() in title.lower(): |
| # 计算关键词所占标题比例 |
| match_ratio = len(keyword) / len(title) |
| if match_ratio > 0.5: # 关键词占标题很大比例 |
| score += 8.0 |
| else: |
| score += 5.0 |
| |
| # 4. 标题分词匹配 |
| keyword_words = list(jieba.cut(keyword)) |
| title_jieba_words = list(jieba.cut(title)) |
| |
| matched_words = 0 |
| for k_word in keyword_words: |
| if len(k_word) > 1: # 忽略单字 |
| if k_word in title_jieba_words: |
| matched_words += 1 |
| else: |
| # 拼音匹配 |
| k_pinyin = get_pinyin(k_word) |
| for t_word in title_jieba_words: |
| if get_pinyin(t_word) == k_pinyin: |
| matched_words += 0.8 |
| break |
| |
| if len(keyword_words) > 0: |
| word_match_ratio = matched_words / len(keyword_words) |
| score += 3.0 * word_match_ratio |
| |
| # 5. 拼音相似度 |
| keyword_pinyin = get_pinyin(keyword) |
| title_pinyin = get_pinyin(title) |
| |
| if keyword_pinyin == title_pinyin: |
| score += 3.5 |
| elif keyword_pinyin in title_pinyin: |
| # 计算拼音在标题中的位置影响 |
| pos = title_pinyin.find(keyword_pinyin) |
| if pos == 0: # 出现在开头 |
| score += 3.0 |
| else: |
| score += 2.0 |
| |
| # 6. 编辑距离相似度 |
| try: |
| edit_distance = Levenshtein.distance(keyword.lower(), title.lower()) |
| max_len = max(len(keyword), len(title)) |
| if max_len > 0: |
| similarity = 1 - (edit_distance / max_len) |
| if similarity > 0.7: |
| score += 1.5 * similarity |
| except: |
| similarity = difflib.SequenceMatcher(None, keyword.lower(), title.lower()).ratio() |
| if similarity > 0.7: |
| score += 1.5 * similarity |
| |
| # 7. 中文字符重叠检测 - 修改为仅当重叠2个以上汉字或占比超过40%时才计分 |
| if re.search(r'[\u4e00-\u9fff]', keyword) and re.search(r'[\u4e00-\u9fff]', title): |
| cn_chars_keyword = set(re.findall(r'[\u4e00-\u9fff]', keyword)) |
| cn_chars_title = set(re.findall(r'[\u4e00-\u9fff]', title)) |
| |
| # 计算重叠的汉字集合 |
| overlapped_chars = cn_chars_keyword & cn_chars_title |
| |
| # 仅当重叠汉字数量大于1且占比超过阈值时才计分 |
| if len(overlapped_chars) > 1 and len(cn_chars_keyword) > 0: |
| overlap_ratio = len(overlapped_chars) / len(cn_chars_keyword) |
| # 增加重叠比例的阈值要求,防止单个汉字导致的误匹配 |
| if overlap_ratio >= 0.4 or len(overlapped_chars) >= 3: |
| score += 2.0 * overlap_ratio |
| # 对于非常低的重叠度,不加分,避免无关内容干扰 |
| |
| # 记录日志,帮助调试特定案例 |
| if keyword == "明日方舟" and "白日梦想家" in title: |
| logger.info(f"'明日方舟'与'{title}'的汉字重叠: {overlapped_chars}, 重叠比例: {len(overlapped_chars)/len(cn_chars_keyword) if cn_chars_keyword else 0}") |
| |
| # 8. 序列资源检测(如"功夫熊猫2"是"功夫熊猫"的系列) |
| base_title_match = re.match(r'(.*?)([0-9]+|[一二三四五六七八九十]|:|\:|\s+[0-9]+)', title) |
| if base_title_match: |
| base_title = base_title_match.group(1).strip() |
| if keyword.lower() == base_title.lower(): |
| score += 2.0 |
| |
| # 9. 标签和描述匹配(增加权重) |
| if tags: |
| tags_list = tags.split(',') |
| if keyword in tags_list: |
| score += 1.5 # 提高标签匹配的权重 |
| elif any(keyword.lower() in tag.lower() for tag in tags_list): |
| score += 1.0 # 提高部分匹配的权重 |
| |
| # 描述匹配增强 |
| if keyword.lower() in description.lower(): |
| score += 1.5 # 提高描述匹配的权重 |
| |
| # 检查关键词在描述中的位置和上下文 |
| pos = description.lower().find(keyword.lower()) |
| if pos >= 0 and pos < len(description) / 3: |
| # 关键词出现在描述前1/3部分,可能更重要 |
| score += 0.5 |
| |
| # 考虑分词匹配描述 |
| keyword_words = list(jieba.cut(keyword)) |
| description_words = list(jieba.cut(description)) |
| matched_desc_words = 0 |
| for k_word in keyword_words: |
| if len(k_word) > 1 and k_word in description_words: |
| matched_desc_words += 1 |
| |
| if len(keyword_words) > 0: |
| desc_match_ratio = matched_desc_words / len(keyword_words) |
| score += 1.0 * desc_match_ratio |
| |
| # 分类匹配 |
| if keyword.lower() in category.lower(): |
| score += 1.0 |
| |
| # 添加语义关联匹配得分 |
| # 扩展关键词进行匹配 |
| expanded_keywords = expand_search_keywords(keyword) |
| |
| # 检测标题是否包含语义相关词 |
| for exp_keyword in expanded_keywords: |
| if exp_keyword != keyword and exp_keyword in title: # 避免重复计算原关键词 |
| # 根据关联词的匹配类型给予不同分数 |
| if exp_keyword in ["国宝", "熊猫"] and "功夫熊猫" in title: |
| score += 3.0 # 高度相关的语义映射 |
| elif exp_keyword in title: |
| score += 1.5 # 一般语义关联 |
| |
| # 对于特殊组合查询,额外加分 |
| if ("国宝" in keyword or "熊猫" in keyword) and "电影" in keyword and "功夫熊猫" in title: |
| score += 4.0 # 对"国宝电影"、"熊猫电影"搜"功夫熊猫"特别加分 |
| |
| return score |
| |
| # 创建Flask应用 |
| app = Flask(__name__) |
| CORS(app) # 允许所有跨域请求 |
| |
| # 添加init_word2vec函数 |
| def init_word2vec(): |
| """初始化Word2Vec模型""" |
| try: |
| helper = get_word2vec_helper() |
| if helper.initialized: |
| logger.info(f"Word2Vec模型已成功加载,词汇量: {len(helper.model.index_to_key)}, 向量维度: {helper.model.vector_size}") |
| else: |
| if helper.load_model(): |
| logger.info(f"Word2Vec模型加载成功,词汇量: {len(helper.model.index_to_key)}, 向量维度: {helper.model.vector_size}") |
| else: |
| logger.error("Word2Vec模型加载失败") |
| except Exception as e: |
| logger.error(f"初始化Word2Vec出错: {e}") |
| |
| # 新的初始化方式: |
| def initialize_app(): |
| """应用初始化函数,替代before_first_request装饰器""" |
| # 修正:使用正确的函数名 |
| # 原代码: init_semantic_mapping() |
| # 修正为使用已定义的函数名 |
| global SEMANTIC_MAPPINGS |
| SEMANTIC_MAPPINGS = load_semantic_mappings() # 更新全局语义映射变量 |
| |
| if WORD2VEC_ENABLED: |
| init_word2vec() # 现在这个函数已经定义了 |
| |
| # 在启动应用之前调用初始化函数 |
| initialize_app() |
| |
| # 搜索功能的API |
| @app.route('/search', methods=['POST']) |
| def search(): |
| """ |
| 搜索功能API |
| 请求格式:{ |
| "keyword": "关键词", |
| "sort_by": "downloads" | "downloads_asc" | "newest" | "oldest" | "similarity" | "title_asc" | "title_desc", |
| "category": "可选,分类名", |
| "search_mode": "title" | "title_desc" | "tags" | "all" # 可选,默认"title", |
| "tags": ["标签1", "标签2"] # 可选,支持传递多个标签 |
| } |
| """ |
| if request.content_type != 'application/json': |
| return jsonify({"error": "Content-Type must be application/json"}), 415 |
| |
| data = request.get_json() |
| keyword = data.get("keyword", "").strip() |
| sort_by = data.get("sort_by", "similarity") # 默认按相似度排序 |
| category = data.get("category", None) |
| search_mode = data.get("search_mode", "title") |
| tags = data.get("tags", None) # 支持传递多个标签 |
| |
| # 校验参数 - 不管什么模式都要求关键词 |
| if not (1 <= len(keyword) <= 20): |
| return jsonify({"error": "请输入1-20个字符"}), 400 |
| |
| # 第一阶段:数据库查询获取候选集 |
| results = [] |
| conn = get_db_conn() |
| try: |
| with conn.cursor(pymysql.cursors.DictCursor) as cursor: |
| # 首先尝试查询完全匹配的结果 |
| exact_query = f""" |
| SELECT id, title, topic_id, heat, created_at, content |
| FROM posts |
| WHERE title = %s |
| """ |
| cursor.execute(exact_query, (keyword,)) |
| exact_matches = cursor.fetchall() or [] # 确保返回列表而非元组 |
| |
| # 扩展关键词,增加语义关联词 |
| expanded_keywords = expand_search_keywords(keyword) |
| logger.info(f"扩展后的关键词: {expanded_keywords}") # 调试信息 |
| |
| # 构建查询条件 |
| conditions = [] |
| params = [] |
| |
| # 标题匹配 - 所有搜索模式都匹配title |
| conditions.append("title LIKE %s") |
| params.append(f"%{keyword}%") |
| |
| # 为扩展关键词添加标题匹配条件 |
| for exp_keyword in expanded_keywords: |
| if exp_keyword != keyword: # 避免重复原关键词 |
| conditions.append("title LIKE %s") |
| params.append(f"%{exp_keyword}%") |
| |
| # 描述匹配 |
| if search_mode in ["title_desc", "all"]: |
| # 原始关键词匹配描述 |
| conditions.append("content LIKE %s") |
| params.append(f"%{keyword}%") |
| |
| # 扩展关键词匹配描述 |
| for exp_keyword in expanded_keywords: |
| if exp_keyword != keyword: |
| conditions.append("content LIKE %s") |
| params.append(f"%{exp_keyword}%") |
| |
| # 标签匹配 |
| # 暂不处理,后续join实现 |
| |
| # 分类匹配 - 仅在all模式下 |
| if search_mode == "all": |
| # 原始关键词匹配分类 |
| conditions.append("topic_id LIKE %s") |
| params.append(f"%{keyword}%") |
| |
| # 扩展关键词匹配分类 |
| for exp_keyword in expanded_keywords: |
| if exp_keyword != keyword: |
| conditions.append("topic_id LIKE %s") |
| params.append(f"%{exp_keyword}%") |
| |
| # 构建SQL查询 |
| if conditions: |
| where_clause = " OR ".join(conditions) |
| logger.info(f"搜索条件: {where_clause}") |
| logger.info(f"参数列表: {params}") |
| |
| if category: |
| where_clause = f"({where_clause}) AND topic_id=%s" |
| params.append(category) |
| |
| sql = f""" |
| SELECT p.id, p.title, tp.name as category, p.heat, p.created_at, p.content, |
| GROUP_CONCAT(t.name) as tags |
| FROM posts p |
| LEFT JOIN post_tags pt ON p.id = pt.post_id |
| LEFT JOIN tags t ON pt.tag_id = t.id |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| WHERE {where_clause} |
| GROUP BY p.id |
| LIMIT 500 |
| """ |
| |
| cursor.execute(sql, params) |
| expanded_results = cursor.fetchall() |
| logger.info(f"数据库返回记录数: {len(expanded_results) if expanded_results else 0}") |
| else: |
| expanded_results = [] |
| |
| # 如果扩展查询和精确匹配都没有结果,获取全部记录进行相关性计算 |
| if not expanded_results and not exact_matches: |
| sql = "SELECT p.id, p.title, tp.name as category, p.heat, p.created_at, p.content, GROUP_CONCAT(t.name) as tags FROM posts p LEFT JOIN post_tags pt ON p.id = pt.post_id LEFT JOIN tags t ON pt.tag_id = t.id LEFT JOIN topics tp ON p.topic_id = tp.id" |
| if category: |
| sql += " WHERE p.topic_id=%s" |
| category_params = [category] |
| cursor.execute(sql + " GROUP BY p.id", category_params) |
| else: |
| cursor.execute(sql + " GROUP BY p.id") |
| |
| all_results = cursor.fetchall() or [] # 确保返回列表 |
| else: |
| if isinstance(exact_matches, tuple): |
| exact_matches = list(exact_matches) |
| if isinstance(expanded_results, tuple): |
| expanded_results = list(expanded_results) |
| all_results = expanded_results + exact_matches |
| |
| # 对所有结果使用相关性计算规则 |
| scored_results = [] |
| for item in all_results: |
| # 计算相关性得分 |
| relevance_score = calculate_keyword_relevance(keyword, item) |
| |
| # 降低相关性阈值,确保更多结果被保留 (从0.5改为0.1) |
| if relevance_score > 0.1: |
| item['relevance_score'] = relevance_score |
| scored_results.append(item) |
| logger.info(f"匹配项: {item['title']}, 相关性得分: {relevance_score}") |
| |
| # 按相关性得分排序 |
| scored_results.sort(key=lambda x: x.get('relevance_score', 0), reverse=True) |
| |
| # 确保精确匹配的结果置顶 |
| if exact_matches: |
| for exact_match in exact_matches: |
| exact_match['relevance_score'] = 20.0 # 超高分确保置顶 |
| |
| # 移除scored_results中已经存在于exact_matches的项 |
| exact_ids = {item['id'] for item in exact_matches} |
| scored_results = [item for item in scored_results if item['id'] not in exact_ids] |
| |
| # 合并两个结果集 |
| results = exact_matches + scored_results |
| else: |
| results = scored_results |
| |
| # 限制返回结果数量 |
| results = results[:50] |
| |
| except Exception as e: |
| logger.error(f"搜索出错: {e}") |
| import traceback |
| traceback.print_exc() |
| return jsonify({"error": "搜索系统异常,请稍后再试"}), 500 |
| finally: |
| conn.close() |
| |
| # 第二阶段:根据指定方式排序 |
| if results: |
| if sort_by == "similarity" or not sort_by: |
| # 保持按相关性得分排序,已经排好了 |
| pass |
| elif sort_by == "downloads": |
| results.sort(key=lambda x: x.get("download_count", 0), reverse=True) |
| elif sort_by == "downloads_asc": |
| results.sort(key=lambda x: x.get("download_count", 0)) |
| elif sort_by == "newest": |
| results.sort(key=lambda x: x.get("create_time", ""), reverse=True) |
| elif sort_by == "oldest": |
| results.sort(key=lambda x: x.get("create_time", "")) |
| elif sort_by == "title_asc": |
| results.sort(key=lambda x: x.get("title", "")) |
| elif sort_by == "title_desc": |
| results.sort(key=lambda x: x.get("title", ""), reverse=True) |
| |
| # 最终处理:清理不需要返回的字段 |
| for item in results: |
| item.pop("description", None) |
| item.pop("tags", None) |
| item.pop("relevance_score", None) |
| |
| return Response(json.dumps({"results": results}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| |
| # 推荐功能的API |
| @app.route('/recommend_tags', methods=['POST']) |
| def recommend_tags(): |
| """ |
| 推荐功能API |
| 请求格式:{ |
| "user_id": "user1", |
| "tags": ["标签1", "标签2"] # 可为空 |
| } |
| """ |
| if request.content_type != 'application/json': |
| return jsonify({"error": "Content-Type must be application/json"}), 415 |
| |
| data = request.get_json() |
| user_id = data.get("user_id") |
| tags = set(data.get("tags", [])) |
| |
| # 查询用户已保存的兴趣标签 |
| user_tags = set() |
| if user_id: |
| conn = get_db_conn() |
| try: |
| with conn.cursor() as cursor: |
| cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,)) |
| user_tags = set(row[0] for row in cursor.fetchall()) |
| finally: |
| conn.close() |
| |
| # 合并前端传递的tags和用户兴趣标签 |
| all_tags = list(tags | user_tags) |
| |
| if not all_tags: |
| return Response(json.dumps({"error": "暂无推荐结果"}, ensure_ascii=False), mimetype='application/json; charset=utf-8'), 200 |
| |
| conn = get_db_conn() |
| try: |
| with conn.cursor(pymysql.cursors.DictCursor) as cursor: |
| # 优先用tags字段匹配 |
| # 先查找所有tag_id |
| tag_ids = [] |
| for tag in all_tags: |
| cursor.execute("SELECT id FROM tags WHERE name=%s", (tag,)) |
| row = cursor.fetchone() |
| if row: |
| tag_ids.append(row['id']) |
| if not tag_ids: |
| return Response(json.dumps({"error": "暂无推荐结果"}, ensure_ascii=False), mimetype='application/json; charset=utf-8'), 200 |
| tag_placeholders = ','.join(['%s'] * len(tag_ids)) |
| sql = f""" |
| SELECT p.id, p.title, tp.name as category, p.heat, |
| GROUP_CONCAT(tg.name) as tags |
| FROM posts p |
| LEFT JOIN post_tags pt ON p.id = pt.post_id |
| LEFT JOIN tags tg ON pt.tag_id = tg.id |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| WHERE pt.tag_id IN ({tag_placeholders}) |
| GROUP BY p.id |
| LIMIT 50 |
| """ |
| cursor.execute(sql, tuple(tag_ids)) |
| results = cursor.fetchall() |
| # 若无结果,回退title/content模糊匹配 |
| if not results: |
| or_conditions = [] |
| params = [] |
| for tag in all_tags: |
| or_conditions.append("p.title LIKE %s OR p.content LIKE %s") |
| params.extend(['%' + tag + '%', '%' + tag + '%']) |
| where_clause = ' OR '.join(or_conditions) |
| sql = f""" |
| SELECT p.id, p.title, tp.name as category, p.heat, |
| GROUP_CONCAT(tg.name) as tags |
| FROM posts p |
| LEFT JOIN post_tags pt ON p.id = pt.post_id |
| LEFT JOIN tags tg ON pt.tag_id = tg.id |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| WHERE {where_clause} |
| GROUP BY p.id |
| LIMIT 50 |
| """ |
| cursor.execute(sql, tuple(params)) |
| results = cursor.fetchall() |
| finally: |
| conn.close() |
| |
| if not results: |
| return Response(json.dumps({"error": "暂无推荐结果"}, ensure_ascii=False), mimetype='application/json; charset=utf-8'), 200 |
| |
| return Response(json.dumps({"recommendations": results}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| |
| # 用户兴趣标签管理API(可选) |
| @app.route('/tags', methods=['POST', 'GET', 'DELETE']) |
| def user_tags(): |
| """ |
| POST: 添加用户兴趣标签 |
| GET: 查询用户兴趣标签 |
| DELETE: 删除用户兴趣标签 |
| """ |
| if request.method == 'POST': |
| if request.content_type != 'application/json': |
| return jsonify({"error": "Content-Type must be application/json"}), 415 |
| data = request.get_json() |
| user_id = data.get("user_id") |
| tags = data.get("tags", []) |
| |
| if not user_id: |
| return jsonify({"error": "用户ID不能为空"}), 400 |
| |
| # 确保标签列表格式正确 |
| if isinstance(tags, str): |
| tags = [tag.strip() for tag in tags.split(',') if tag.strip()] |
| |
| if not tags: |
| return jsonify({"error": "标签不能为空"}), 400 |
| |
| conn = get_db_conn() |
| try: |
| with conn.cursor() as cursor: |
| # 添加用户标签 |
| for tag in tags: |
| # 先查找tag_id |
| cursor.execute("SELECT id FROM tags WHERE name=%s", (tag,)) |
| tag_row = cursor.fetchone() |
| if tag_row: |
| tag_id = tag_row[0] |
| cursor.execute("REPLACE INTO user_tags (user_id, tag_id) VALUES (%s, %s)", (user_id, tag_id)) |
| conn.commit() |
| # 返回更新后的标签列表 |
| cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,)) |
| updated_tags = [row[0] for row in cursor.fetchall()] |
| finally: |
| conn.close() |
| return Response(json.dumps({"msg": "添加成功", "tags": updated_tags}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| elif request.method == 'DELETE': |
| if request.content_type != 'application/json': |
| return jsonify({"error": "Content-Type must be application/json"}), 415 |
| data = request.get_json() |
| user_id = data.get("user_id") |
| tags = data.get("tags", []) |
| if not user_id: |
| return jsonify({"error": "用户ID不能为空"}), 400 |
| if not tags: |
| return jsonify({"error": "标签不能为空"}), 400 |
| |
| conn = get_db_conn() |
| try: |
| with conn.cursor() as cursor: |
| for tag in tags: |
| cursor.execute("SELECT id FROM tags WHERE name=%s", (tag,)) |
| tag_row = cursor.fetchone() |
| if tag_row: |
| tag_id = tag_row[0] |
| cursor.execute("DELETE FROM user_tags WHERE user_id=%s AND tag_id=%s", (user_id, tag_id)) |
| conn.commit() |
| cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,)) |
| remaining_tags = [row[0] for row in cursor.fetchall()] |
| finally: |
| conn.close() |
| return Response(json.dumps({"msg": "删除成功", "tags": remaining_tags}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| else: # GET 请求 |
| user_id = request.args.get("user_id") |
| if not user_id: |
| return jsonify({"error": "用户ID不能为空"}), 400 |
| conn = get_db_conn() |
| try: |
| with conn.cursor() as cursor: |
| cursor.execute("SELECT t.name FROM user_tags ut JOIN tags t ON ut.tag_id = t.id WHERE ut.user_id=%s", (user_id,)) |
| tags = [row[0] for row in cursor.fetchall()] |
| finally: |
| conn.close() |
| return Response(json.dumps({"tags": tags}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| |
| # 添加/user_tags路由作为/tags的别名 |
| @app.route('/user_tags', methods=['POST', 'GET', 'DELETE']) |
| def user_tags_alias(): |
| """ |
| /user_tags路由 - 作为/tags路由的别名 |
| POST: 添加用户兴趣标签 |
| GET: 查询用户兴趣标签 |
| DELETE: 删除用户兴趣标签 |
| """ |
| return user_tags() |
| |
| # 基于用户的协同过滤推荐API |
| @app.route('/user_based_recommend', methods=['POST']) |
| def user_based_recommend(): |
| """ |
| 基于用户的协同过滤推荐API |
| 请求格式:{ |
| "user_id": "user1", |
| "top_n": 5 |
| } |
| """ |
| if request.content_type != 'application/json': |
| return jsonify({"error": "Content-Type must be application/json"}), 415 |
| |
| data = request.get_json() |
| user_id = data.get("user_id") |
| top_n = int(data.get("top_n", 5)) |
| |
| if not user_id: |
| return jsonify({"error": "用户ID不能为空"}), 400 |
| |
| conn = get_db_conn() |
| try: |
| with conn.cursor(pymysql.cursors.DictCursor) as cursor: |
| # 1. 检查用户是否存在下载记录(收藏或浏览) |
| cursor.execute(""" |
| SELECT COUNT(*) as count |
| FROM behaviors |
| WHERE user_id = %s AND type IN ('favorite', 'view') |
| """, (user_id,)) |
| result = cursor.fetchone() |
| user_download_count = result['count'] if result else 0 |
| |
| logger.info(f"用户 {user_id} 下载记录数: {user_download_count}") |
| |
| # 如果用户没有足够的行为数据,返回基于热度的推荐 |
| if user_download_count < 3: |
| logger.info(f"用户 {user_id} 下载记录不足,返回热门推荐") |
| cursor.execute(""" |
| SELECT p.id, p.title, tp.name as category, p.heat |
| FROM posts p |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| ORDER BY p.heat DESC |
| LIMIT %s |
| """, (top_n,)) |
| popular_seeds = cursor.fetchall() |
| return Response(json.dumps({"recommendations": popular_seeds, "type": "popular"}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| |
| # 2. 获取用户已下载(收藏/浏览)的帖子 |
| cursor.execute(""" |
| SELECT post_id |
| FROM behaviors |
| WHERE user_id = %s AND type IN ('favorite', 'view') |
| """, (user_id,)) |
| user_seeds = set(row['post_id'] for row in cursor.fetchall()) |
| logger.info(f"用户 {user_id} 已下载种子: {user_seeds}") |
| |
| # 3. 获取所有用户-帖子下载(收藏/浏览)矩阵 |
| cursor.execute(""" |
| SELECT user_id, post_id |
| FROM behaviors |
| WHERE created_at > DATE_SUB(NOW(), INTERVAL 3 MONTH) |
| AND user_id <> %s AND type IN ('favorite', 'view') |
| """, (user_id,)) |
| download_records = cursor.fetchall() |
| |
| if not download_records: |
| logger.info(f"没有其他用户的下载记录,返回热门推荐") |
| cursor.execute(""" |
| SELECT p.id, p.title, tp.name as category, p.heat |
| FROM posts p |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| ORDER BY p.heat DESC |
| LIMIT %s |
| """, (top_n,)) |
| popular_seeds = cursor.fetchall() |
| return Response(json.dumps({"recommendations": popular_seeds, "type": "popular"}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| |
| # 构建用户-物品矩阵 |
| user_item_matrix = {} |
| for record in download_records: |
| uid = record['user_id'] |
| sid = record['post_id'] |
| if uid not in user_item_matrix: |
| user_item_matrix[uid] = set() |
| user_item_matrix[uid].add(sid) |
| |
| # 4. 计算用户相似度 |
| similar_users = [] |
| for other_id, other_seeds in user_item_matrix.items(): |
| if other_id == user_id: |
| continue |
| intersection = len(user_seeds.intersection(other_seeds)) |
| union = len(user_seeds.union(other_seeds)) |
| if union > 0 and intersection > 0: |
| similarity = intersection / union |
| similar_users.append((other_id, similarity, other_seeds)) |
| logger.info(f"找到 {len(similar_users)} 个相似用户") |
| similar_users.sort(key=lambda x: x[1], reverse=True) |
| similar_users = similar_users[:5] |
| # 5. 基于相似用户推荐帖子 |
| candidate_seeds = {} |
| for similar_user, similarity, seeds in similar_users: |
| logger.info(f"相似用户 {similar_user}, 相似度 {similarity}") |
| for post_id in seeds: |
| if post_id not in user_seeds: |
| if post_id not in candidate_seeds: |
| candidate_seeds[post_id] = 0 |
| candidate_seeds[post_id] += similarity |
| if not candidate_seeds: |
| logger.info(f"没有找到候选种子,返回热门推荐") |
| cursor.execute(""" |
| SELECT p.id, p.title, tp.name as category, p.heat |
| FROM posts p |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| ORDER BY p.heat DESC |
| LIMIT %s |
| """, (top_n,)) |
| popular_seeds = cursor.fetchall() |
| return Response(json.dumps({"recommendations": popular_seeds, "type": "popular"}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| # 6. 获取推荐帖子的详细信息 |
| recommended_seeds = sorted(candidate_seeds.items(), key=lambda x: x[1], reverse=True)[:top_n] |
| post_ids = [post_id for post_id, _ in recommended_seeds] |
| format_strings = ','.join(['%s'] * len(post_ids)) |
| cursor.execute(f""" |
| SELECT p.id, p.title, tp.name as category, p.heat |
| FROM posts p |
| LEFT JOIN topics tp ON p.topic_id = tp.id |
| WHERE p.id IN ({format_strings}) |
| """, tuple(post_ids)) |
| result_seeds = cursor.fetchall() |
| seed_score_map = {post_id: score for post_id, score in recommended_seeds} |
| result_seeds.sort(key=lambda x: seed_score_map.get(x['id'], 0), reverse=True) |
| logger.info(f"返回 {len(result_seeds)} 个基于协同过滤的推荐") |
| return Response(json.dumps({"recommendations": result_seeds, "type": "collaborative"}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| except Exception as e: |
| logger.error(f"推荐系统错误: {e}") |
| import traceback |
| traceback.print_exc() |
| return Response(json.dumps({"error": "推荐系统异常,请稍后再试", "details": str(e)}, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| finally: |
| conn.close() |
| @app.route('/word2vec_status', methods=['GET']) |
| def word2vec_status(): |
| """ |
| 检查Word2Vec模型状态 |
| 返回模型是否加载、词汇量等信息 |
| """ |
| if not WORD2VEC_ENABLED: |
| return Response(json.dumps({ |
| "enabled": False, |
| "message": "Word2Vec功能未启用" |
| }, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| try: |
| helper = get_word2vec_helper() |
| status = { |
| "enabled": WORD2VEC_ENABLED, |
| "initialized": helper.initialized, |
| "vocab_size": len(helper.model.index_to_key) if helper.model else 0, |
| "vector_size": helper.model.vector_size if helper.model else 0 |
| } |
| |
| # 测试几个常用词的相似词,展示模型效果 |
| test_results = {} |
| test_words = ["电影", "动作", "科幻", "动漫", "游戏"] |
| for word in test_words: |
| similar_words = helper.get_similar_words(word, topn=5) |
| test_results[word] = similar_words |
| |
| status["test_results"] = test_results |
| return Response(json.dumps(status, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| except Exception as e: |
| return Response(json.dumps({ |
| "enabled": WORD2VEC_ENABLED, |
| "initialized": False, |
| "error": str(e) |
| }, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| |
| # 添加一个临时诊断端点 |
| @app.route('/debug_search', methods=['POST']) |
| def debug_search(): |
| """临时的调试端点,用于检查数据库中的记录""" |
| if request.content_type != 'application/json': |
| return jsonify({"error": "Content-Type must be application/json"}), 415 |
| |
| data = request.get_json() |
| keyword = data.get("keyword", "").strip() |
| |
| conn = get_db_conn() |
| try: |
| with conn.cursor(pymysql.cursors.DictCursor) as cursor: |
| # 尝试查询包含特定词的所有记录 |
| queries = [ |
| ("标题中包含关键词", f"SELECT seed_id, title, description, tags FROM pt_seed WHERE title LIKE '%{keyword}%' LIMIT 10"), |
| ("描述中包含关键词", f"SELECT seed_id, title, description, tags FROM pt_seed WHERE description LIKE '%{keyword}%' LIMIT 10"), |
| ("标签中包含关键词", f"SELECT seed_id, title, description, tags FROM pt_seed WHERE FIND_IN_SET('{keyword}', tags) LIMIT 10"), |
| ("肖申克的救赎", "SELECT seed_id, title, description, tags FROM pt_seed WHERE title = '肖申克的救赎'") |
| ] |
| |
| results = {} |
| for query_name, query in queries: |
| cursor.execute(query) |
| results[query_name] = cursor.fetchall() |
| |
| return Response(json.dumps(results, ensure_ascii=False), mimetype='application/json; charset=utf-8') |
| finally: |
| conn.close() |
| |
| """ |
| 接口本地测试方法(可直接运行main_online.py后用curl或Postman测试): |
| |
| 1. 搜索接口 |
| curl -X POST http://127.0.0.1:5000/search -H "Content-Type: application/json" -d '{"keyword":"电影","sort_by":"downloads"}' |
| |
| 2. 标签推荐接口 |
| curl -X POST http://127.0.0.1:5000/recommend_tags -H "Content-Type: application/json" -d '{"user_id":"1","tags":["动作","科幻"]}' |
| |
| 3. 用户兴趣标签管理(添加标签) |
| curl -X POST http://127.0.0.1:5000/user_tags -H "Content-Type: application/json" -d '{"user_id":"1","tags":["动作","科幻"]}' |
| |
| 4. 用户兴趣标签管理(查询标签) |
| curl "http://127.0.0.1:5000/user_tags?user_id=1" |
| |
| 5. 用户兴趣标签管理(删除标签) |
| curl -X DELETE http://127.0.0.1:5000/user_tags -H "Content-Type: application/json" -d '{"user_id":"1","tags":["动作","科幻"]}' |
| |
| 6. 协同过滤推荐 |
| curl -X POST http://127.0.0.1:5000/user_based_recommend -H "Content-Type: application/json" -d '{"user_id":"user1","top_n":3}' |
| |
| 7. Word2Vec状态检查 |
| curl "http://127.0.0.1:5000/word2vec_status" |
| |
| 8. 调试接口(临时) |
| curl -X POST http://127.0.0.1:5000/debug_search -H "Content-Type: application/json" -d '{"keyword":"电影"}' |
| |
| 所有接口均可用Postman按上述参数测试。 |
| """ |
| |
| if __name__ == "__main__": |
| try: |
| logger.info("搜索推荐服务启动中...") |
| app.run(host="0.0.0.0", port=5000) |
| except Exception as e: |
| logger.error(f"启动异常: {e}") |
| import traceback |
| traceback.print_exc() |