Merge "添加更新用户流量的方法,并定时跟新到所有用户"
Change-Id: I558ee7de6767ed1b78685883310a268ea51b198a
diff --git a/src/main/java/com/pt/service/TrackerService.java b/src/main/java/com/pt/service/TrackerService.java
index 191479b..ea840e1 100644
--- a/src/main/java/com/pt/service/TrackerService.java
+++ b/src/main/java/com/pt/service/TrackerService.java
@@ -1,180 +1,100 @@
package com.pt.service;
-import com.pt.entity.PeerInfoEntity;
import com.pt.entity.TorrentMeta;
-import com.pt.exception.ResourceNotFoundException;
-import com.pt.repository.PeerInfoRepository;
import com.pt.repository.TorrentMetaRepository;
import com.pt.utils.BencodeCodec;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
@Service
public class TrackerService {
+ private final Map<String, List<PeerInfo>> torrentPeers = new ConcurrentHashMap<>();
+
@Autowired
private TorrentMetaRepository torrentMetaRepository;
- @Autowired
- private PeerInfoRepository peerInfoRepository;
-
- @Autowired
- private TorrentStatsService statsService;
-
- @Transactional
public byte[] handleAnnounce(Map<String, String[]> params, String ipAddress) {
try {
- // 验证必要参数
- if (!params.containsKey("info_hash") || !params.containsKey("peer_id")
- || !params.containsKey("port")) {
- return errorResponse("Missing required parameters");
+ if (!params.containsKey("info_hash") || !params.containsKey("peer_id") || !params.containsKey("port")) {
+ return BencodeCodec.encode(Map.of("failure reason", "Missing required parameters"));
}
- // 解析参数
+ System.out.println("Received announce params: " + params);
+ System.out.println("Client IP: " + ipAddress);
+
+ // 用ISO-8859-1解码,确保二进制数据正确还原
String infoHash = decodeParam(params.get("info_hash")[0]);
- String peerId = decodeParam(params.get("peer_id")[0]);
- int port = Integer.parseInt(params.get("port")[0]);
+ System.out.println("Decoded info_hash (raw bytes as hex): " + bytesToHex(infoHash.getBytes("ISO-8859-1")));
- // 获取事件类型
- String event = getEventParam(params);
-
- // 获取流量数据
- long uploaded = getLongParam(params, "uploaded", 0);
- long downloaded = getLongParam(params, "downloaded", 0);
- long left = getLongParam(params, "left", 0);
-
- // 验证种子是否存在
- TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
- if (meta == null) {
- return errorResponse("Torrent not found: " + infoHash);
+ Optional<TorrentMeta> meta = torrentMetaRepository.findByInfoHash(infoHash);
+ if (!meta.isPresent()) {
+ System.out.println("Invalid info_hash: not found in DB");
+ return BencodeCodec.encode(Map.of("failure reason", "Invalid info_hash"));
}
- // 创建或更新 peer 信息
- PeerInfoEntity peer = findOrCreatePeer(peerId, infoHash);
+ String peerId = decodeParam(params.get("peer_id")[0]);
+ System.out.println("Decoded peer_id: " + peerId);
- // 设置 peer 属性
- setPeerProperties(peer, ipAddress, port, uploaded, downloaded, left);
+ int port = Integer.parseInt(params.get("port")[0]);
+ System.out.println("Port: " + port);
- // 处理事件类型
- handlePeerEvent(event, peer, left);
+ PeerInfo peer = new PeerInfo(ipAddress, port, peerId);
- // 保存 peer
- peerInfoRepository.save(peer);
+ torrentPeers.computeIfAbsent(infoHash, k -> new CopyOnWriteArrayList<>());
+ List<PeerInfo> peers = torrentPeers.get(infoHash);
- // 更新种子统计信息
- statsService.updateTorrentStats(infoHash);
+ boolean exists = peers.stream().anyMatch(p -> p.peerId.equals(peerId));
+ if (!exists) {
+ peers.add(peer);
+ System.out.println("Added new peer: " + peerId);
+ } else {
+ System.out.println("Peer already exists: " + peerId);
+ }
- // 获取 peer 列表响应
- List<PeerInfoEntity> activePeers = peerInfoRepository.findActivePeersByInfoHash(infoHash);
- byte[] peerBytes = buildPeerResponse(activePeers);
+ List<String> ips = peers.stream().map(p -> p.ip).toList();
+ List<Integer> ports = peers.stream().map(p -> p.port).toList();
+ byte[] peerBytes = BencodeCodec.buildCompactPeers(ips, ports);
- // 返回成功响应
return BencodeCodec.buildTrackerResponse(1800, peerBytes);
} catch (Exception e) {
- return errorResponse("Internal server error");
+ e.printStackTrace(); // 打印异常堆栈,方便定位错误
+ return BencodeCodec.encode(Map.of("failure reason", "Internal server error"));
}
}
- // 辅助方法:获取事件参数
- private String getEventParam(Map<String, String[]> params) {
- return params.containsKey("event") ?
- decodeParam(params.get("event")[0]) : "update";
+ // 辅助函数:打印字节转成16进制字符串
+ private String bytesToHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02x", b));
+ }
+ return sb.toString();
}
- // 辅助方法:获取长整型参数
- private long getLongParam(Map<String, String[]> params, String key, long defaultValue) {
- return params.containsKey(key) ?
- Long.parseLong(params.get(key)[0]) : defaultValue;
+ // decodeParam 建议用ISO-8859-1解码
+ private String decodeParam(String param) throws UnsupportedEncodingException {
+ return URLDecoder.decode(param, "ISO-8859-1");
}
- // 辅助方法:查找或创建 peer
- private PeerInfoEntity findOrCreatePeer(String peerId, String infoHash) {
- return peerInfoRepository.findByPeerIdAndInfoHash(peerId, infoHash)
- .orElseGet(PeerInfoEntity::new);
- }
- // 辅助方法:设置 peer 属性
- private void setPeerProperties(PeerInfoEntity peer, String ip, int port,
- long uploaded, long downloaded, long left) {
- peer.setIp(ip);
- peer.setPort(port);
- peer.setPeerId(peer.getPeerId() != null ? peer.getPeerId() : ""); // 防止 NPE
- peer.setInfoHash(peer.getInfoHash() != null ? peer.getInfoHash() : "");
- peer.setUploaded(uploaded);
- peer.setDownloaded(downloaded);
- peer.setLeft(left);
- peer.setLastSeen(LocalDateTime.now());
- }
+ private static class PeerInfo {
+ String ip;
+ int port;
+ String peerId;
- // 辅助方法:处理 peer 事件
- private void handlePeerEvent(String event, PeerInfoEntity peer, long left) {
- switch (event) {
- case "started":
- peer.setStatus("downloading");
- peer.setActive(true);
- break;
-
- case "stopped":
- peer.setActive(false);
- break;
-
- case "completed":
- handleCompletedEvent(peer);
- break;
-
- case "update":
- default:
- handleUpdateEvent(peer, left);
- break;
+ public PeerInfo(String ip, int port, String peerId) {
+ this.ip = ip;
+ this.port = port;
+ this.peerId = peerId;
}
}
+}
- // 处理完成事件
- private void handleCompletedEvent(PeerInfoEntity peer) {
- peer.setStatus("completed");
- peer.setActive(true);
- peer.setLeft(0);
- incrementCompletedCount(peer.getInfoHash());
- }
-
- // 处理更新事件
- private void handleUpdateEvent(PeerInfoEntity peer, long left) {
- if (left == 0 && "downloading".equals(peer.getStatus())) {
- // 检测到下载完成
- peer.setStatus("completed");
- incrementCompletedCount(peer.getInfoHash());
- }
- peer.setActive(true);
- }
-
- // 增加完成次数
- private void incrementCompletedCount(String infoHash) {
- TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
- if (meta != null) {
- statsService.incrementCompletedCount(meta.getId());
- }
- }
-
- // 构建 peer 响应
- private byte[] buildPeerResponse(List<PeerInfoEntity> peers) {
- List<String> ips = peers.stream().map(PeerInfoEntity::getIp).toList();
- List<Integer> ports = peers.stream().map(PeerInfoEntity::getPort).toList();
- return BencodeCodec.buildCompactPeers(ips, ports);
- }
-
- // 构建错误响应
- private byte[] errorResponse(String reason) {
- return BencodeCodec.encode(Map.of("failure reason", reason));
- }
-
- // 解码参数
- private String decodeParam(String raw) {
- return new String(raw.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
- }
-}
\ No newline at end of file