增加了种子统计的内容
Change-Id: I139c8cf149c05465d1170baa4c143210bd71c888
diff --git a/src/main/java/com/pt/service/TorrentStatsService.java b/src/main/java/com/pt/service/TorrentStatsService.java
new file mode 100644
index 0000000..ee11689
--- /dev/null
+++ b/src/main/java/com/pt/service/TorrentStatsService.java
@@ -0,0 +1,112 @@
+package com.pt.service;
+
+import com.pt.entity.PeerInfoEntity;
+import com.pt.entity.TorrentStats;
+import com.pt.exception.ResourceNotFoundException;
+import com.pt.repository.PeerInfoRepository;
+import com.pt.repository.TorrentMetaRepository;
+import com.pt.repository.TorrentStatsRepository;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.stereotype.Repository;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.time.LocalDateTime;
+import java.util.List;
+
+@Service
+public class TorrentStatsService {
+
+ @Autowired
+ private PeerInfoRepository peerInfoRepository;
+
+ @Autowired
+ private TorrentStatsRepository statsRepository;
+
+ @Autowired
+ private TorrentMetaRepository torrentMetaRepository;
+
+ @Autowired
+ public TorrentStatsService(TorrentStatsRepository statsRepository) {
+ this.statsRepository = statsRepository;
+ }
+
+ /**
+ * 增加种子完成次数
+ *
+ * @param torrentId 种子ID
+ */
+ @Transactional
+ public void incrementCompletedCount(Long torrentId) {
+ // 1. 检查统计记录是否存在
+ if (!statsRepository.findByTorrentId(torrentId).isPresent()) {
+ // 创建新的统计记录
+ TorrentStats newStats = new TorrentStats();
+ newStats.setTorrentId(torrentId);
+ newStats.setCompletedCount(1);
+ newStats.setLastUpdated(LocalDateTime.now());
+ statsRepository.save(newStats);
+ return;
+ }
+
+ // 2. 原子操作增加完成次数
+ statsRepository.incrementCompletedCount(torrentId);
+
+ // 3. 更新最后更新时间
+ statsRepository.updateLastUpdatedToNow(torrentId);
+ }
+ /**
+ * 创建新的统计记录
+ */
+ private TorrentStats createNewStats(Long torrentId) {
+ TorrentStats stats = new TorrentStats();
+ stats.setTorrentId(torrentId);
+ stats.setSeederCount(0);
+ stats.setLeecherCount(0);
+ stats.setCompletedCount(0);
+ return statsRepository.save(stats);
+ }
+
+ // 每次客户端上报状态时调用
+ @Transactional
+ public void updateTorrentStats(String infoHash) {
+ // 1. 获取当前种子的peer信息
+ List<PeerInfoEntity> peers = peerInfoRepository.findByInfoHash(infoHash);
+
+ // 2. 统计各类人数
+ int seeders = 0;
+ int leechers = 0;
+ int completed = 0;
+
+ for (PeerInfoEntity peer : peers) {
+ if ("seeding".equals(peer.getStatus()) && peer.isActive()) {
+ seeders++;
+ } else if ("downloading".equals(peer.getStatus()) && peer.isActive()) {
+ leechers++;
+ }
+
+ if ("completed".equals(peer.getStatus())) {
+ completed++;
+ }
+ }
+
+ // 3. 更新统计记录
+ TorrentStats stats = statsRepository.findByTorrentId(
+ torrentMetaRepository.findByInfoHash(infoHash).getId()
+ ).orElse(new TorrentStats());
+
+ stats.setTorrentId(torrentMetaRepository.findByInfoHash(infoHash).getId());
+ stats.setSeederCount(seeders);
+ stats.setLeecherCount(leechers);
+ stats.setCompletedCount(completed);
+ stats.setLastUpdated(LocalDateTime.now());
+
+ statsRepository.save(stats);
+ }
+
+ // 获取种子统计信息
+ public TorrentStats getTorrentStats(Long torrentId) {
+ return statsRepository.findByTorrentId(torrentId)
+ .orElseThrow(() -> new ResourceNotFoundException("Stats not found"));
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/pt/service/TrackerService.java b/src/main/java/com/pt/service/TrackerService.java
index 7167ff3..191479b 100644
--- a/src/main/java/com/pt/service/TrackerService.java
+++ b/src/main/java/com/pt/service/TrackerService.java
@@ -1,76 +1,180 @@
package com.pt.service;
+import com.pt.entity.PeerInfoEntity;
import com.pt.entity.TorrentMeta;
+import com.pt.exception.ResourceNotFoundException;
+import com.pt.repository.PeerInfoRepository;
import com.pt.repository.TorrentMetaRepository;
import com.pt.utils.BencodeCodec;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.HashMap;
+import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
@Service
public class TrackerService {
- private final Map<String, List<PeerInfo>> torrentPeers = new ConcurrentHashMap<>();
-
@Autowired
private TorrentMetaRepository torrentMetaRepository;
+ @Autowired
+ private PeerInfoRepository peerInfoRepository;
+
+ @Autowired
+ private TorrentStatsService statsService;
+
+ @Transactional
public byte[] handleAnnounce(Map<String, String[]> params, String ipAddress) {
try {
- if (!params.containsKey("info_hash") || !params.containsKey("peer_id") || !params.containsKey("port")) {
- return BencodeCodec.encode(Map.of("failure reason", "Missing required parameters"));
+ // 验证必要参数
+ if (!params.containsKey("info_hash") || !params.containsKey("peer_id")
+ || !params.containsKey("port")) {
+ return errorResponse("Missing required parameters");
}
+ // 解析参数
String infoHash = decodeParam(params.get("info_hash")[0]);
- TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
- if (meta == null) {
- return BencodeCodec.encode(Map.of("failure reason", "Invalid info_hash"));
- }
-
String peerId = decodeParam(params.get("peer_id")[0]);
int port = Integer.parseInt(params.get("port")[0]);
- PeerInfo peer = new PeerInfo(ipAddress, port, peerId);
+ // 获取事件类型
+ String event = getEventParam(params);
- torrentPeers.computeIfAbsent(infoHash, k -> new CopyOnWriteArrayList<>());
- List<PeerInfo> peers = torrentPeers.get(infoHash);
+ // 获取流量数据
+ long uploaded = getLongParam(params, "uploaded", 0);
+ long downloaded = getLongParam(params, "downloaded", 0);
+ long left = getLongParam(params, "left", 0);
- boolean exists = peers.stream().anyMatch(p -> p.peerId.equals(peerId));
- if (!exists) {
- peers.add(peer);
+ // 验证种子是否存在
+ TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
+ if (meta == null) {
+ return errorResponse("Torrent not found: " + infoHash);
}
- List<String> ips = peers.stream().map(p -> p.ip).toList();
- List<Integer> ports = peers.stream().map(p -> p.port).toList();
- byte[] peerBytes = BencodeCodec.buildCompactPeers(ips, ports);
+ // 创建或更新 peer 信息
+ PeerInfoEntity peer = findOrCreatePeer(peerId, infoHash);
+ // 设置 peer 属性
+ setPeerProperties(peer, ipAddress, port, uploaded, downloaded, left);
+
+ // 处理事件类型
+ handlePeerEvent(event, peer, left);
+
+ // 保存 peer
+ peerInfoRepository.save(peer);
+
+ // 更新种子统计信息
+ statsService.updateTorrentStats(infoHash);
+
+ // 获取 peer 列表响应
+ List<PeerInfoEntity> activePeers = peerInfoRepository.findActivePeersByInfoHash(infoHash);
+ byte[] peerBytes = buildPeerResponse(activePeers);
+
+ // 返回成功响应
return BencodeCodec.buildTrackerResponse(1800, peerBytes);
} catch (Exception e) {
- return BencodeCodec.encode(Map.of("failure reason", "Internal server error"));
+ return errorResponse("Internal server error");
}
}
+ // 辅助方法:获取事件参数
+ private String getEventParam(Map<String, String[]> params) {
+ return params.containsKey("event") ?
+ decodeParam(params.get("event")[0]) : "update";
+ }
+
+ // 辅助方法:获取长整型参数
+ private long getLongParam(Map<String, String[]> params, String key, long defaultValue) {
+ return params.containsKey(key) ?
+ Long.parseLong(params.get(key)[0]) : defaultValue;
+ }
+
+ // 辅助方法:查找或创建 peer
+ private PeerInfoEntity findOrCreatePeer(String peerId, String infoHash) {
+ return peerInfoRepository.findByPeerIdAndInfoHash(peerId, infoHash)
+ .orElseGet(PeerInfoEntity::new);
+ }
+
+ // 辅助方法:设置 peer 属性
+ private void setPeerProperties(PeerInfoEntity peer, String ip, int port,
+ long uploaded, long downloaded, long left) {
+ peer.setIp(ip);
+ peer.setPort(port);
+ peer.setPeerId(peer.getPeerId() != null ? peer.getPeerId() : ""); // 防止 NPE
+ peer.setInfoHash(peer.getInfoHash() != null ? peer.getInfoHash() : "");
+ peer.setUploaded(uploaded);
+ peer.setDownloaded(downloaded);
+ peer.setLeft(left);
+ peer.setLastSeen(LocalDateTime.now());
+ }
+
+ // 辅助方法:处理 peer 事件
+ private void handlePeerEvent(String event, PeerInfoEntity peer, long left) {
+ switch (event) {
+ case "started":
+ peer.setStatus("downloading");
+ peer.setActive(true);
+ break;
+
+ case "stopped":
+ peer.setActive(false);
+ break;
+
+ case "completed":
+ handleCompletedEvent(peer);
+ break;
+
+ case "update":
+ default:
+ handleUpdateEvent(peer, left);
+ break;
+ }
+ }
+
+ // 处理完成事件
+ private void handleCompletedEvent(PeerInfoEntity peer) {
+ peer.setStatus("completed");
+ peer.setActive(true);
+ peer.setLeft(0);
+ incrementCompletedCount(peer.getInfoHash());
+ }
+
+ // 处理更新事件
+ private void handleUpdateEvent(PeerInfoEntity peer, long left) {
+ if (left == 0 && "downloading".equals(peer.getStatus())) {
+ // 检测到下载完成
+ peer.setStatus("completed");
+ incrementCompletedCount(peer.getInfoHash());
+ }
+ peer.setActive(true);
+ }
+
+ // 增加完成次数
+ private void incrementCompletedCount(String infoHash) {
+ TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
+ if (meta != null) {
+ statsService.incrementCompletedCount(meta.getId());
+ }
+ }
+
+ // 构建 peer 响应
+ private byte[] buildPeerResponse(List<PeerInfoEntity> peers) {
+ List<String> ips = peers.stream().map(PeerInfoEntity::getIp).toList();
+ List<Integer> ports = peers.stream().map(PeerInfoEntity::getPort).toList();
+ return BencodeCodec.buildCompactPeers(ips, ports);
+ }
+
+ // 构建错误响应
+ private byte[] errorResponse(String reason) {
+ return BencodeCodec.encode(Map.of("failure reason", reason));
+ }
+
+ // 解码参数
private String decodeParam(String raw) {
return new String(raw.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
}
-
- private static class PeerInfo {
- String ip;
- int port;
- String peerId;
-
- public PeerInfo(String ip, int port, String peerId) {
- this.ip = ip;
- this.port = port;
- this.peerId = peerId;
- }
- }
-}
-
+}
\ No newline at end of file