| package com.pt.service; |
| |
| import com.pt.entity.PeerInfoEntity; |
| import com.pt.entity.TorrentMeta; |
| import com.pt.entity.User; |
| import com.pt.exception.ResourceNotFoundException; |
| import com.pt.repository.PeerInfoRepository; |
| import com.pt.repository.TorrentMetaRepository; |
| import com.pt.utils.BencodeCodec; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.springframework.beans.factory.annotation.Autowired; |
| import org.springframework.stereotype.Service; |
| import org.springframework.transaction.annotation.Transactional; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.time.LocalDateTime; |
| import java.util.List; |
| import java.util.Map; |
| |
| @Service |
| public class TrackerService { |
| |
| private static final Logger logger = LoggerFactory.getLogger(TrackerService.class); |
| |
| @Autowired |
| private TorrentMetaRepository torrentMetaRepository; |
| |
| @Autowired |
| private PeerInfoRepository peerInfoRepository; |
| |
| @Autowired |
| private TorrentStatsService statsService; |
| |
| @Autowired |
| private UserService userService; |
| |
| @Transactional |
| public byte[] handleAnnounce(Map<String, String[]> params, String ipAddress) { |
| try { |
| // 验证必要参数 |
| if (!params.containsKey("info_hash") || !params.containsKey("peer_id") |
| || !params.containsKey("port")) { |
| return errorResponse("Missing required parameters"); |
| } |
| |
| // 解析参数 |
| String infoHash = decodeParam(params.get("info_hash")[0]); |
| String peerId = decodeParam(params.get("peer_id")[0]); |
| String username = decodeParam(params.get("username")[0]); |
| int port = Integer.parseInt(params.get("port")[0]); |
| |
| // 获取事件类型 |
| String event = getEventParam(params); |
| |
| // 获取流量数据 |
| long uploaded = getLongParam(params, "uploaded", 0); |
| long downloaded = getLongParam(params, "downloaded", 0); |
| long left = getLongParam(params, "left", 0); |
| |
| // 验证种子是否存在 |
| TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash); |
| if (meta == null) { |
| return errorResponse("Torrent not found: " + infoHash); |
| } |
| |
| // 创建或更新 peer 信息 |
| PeerInfoEntity peer = findOrCreatePeer(peerId, infoHash); |
| |
| // 计算流量增量 |
| long uploadedDelta = uploaded - peer.getUploaded(); |
| long downloadedDelta = downloaded - peer.getDownloaded(); |
| |
| // 更新用户总流量 |
| updateUserTraffic(username, uploadedDelta, downloadedDelta); |
| |
| // 设置 peer 属性 |
| peer.setUsername(username); |
| setPeerProperties(peer, ipAddress, port, uploaded, downloaded, left); |
| |
| // 处理事件类型 |
| handlePeerEvent(event, peer, left); |
| |
| // 保存 peer |
| peerInfoRepository.save(peer); |
| |
| // 更新种子统计信息 |
| statsService.updateTorrentStats(infoHash); |
| |
| // 获取 peer 列表响应 |
| List<PeerInfoEntity> activePeers = peerInfoRepository.findActivePeersByInfoHash(infoHash); |
| byte[] peerBytes = buildPeerResponse(activePeers); |
| |
| // 返回成功响应 |
| return BencodeCodec.buildTrackerResponse(1800, peerBytes); |
| } catch (Exception e) { |
| return errorResponse("Internal server error"); |
| } |
| } |
| |
| // 添加更新用户流量的方法 |
| @Transactional |
| private void updateUserTraffic(String username, long uploadedDelta, long downloadedDelta) { |
| if (uploadedDelta <= 0 && downloadedDelta <= 0) { |
| return; // 没有新的流量,不需要更新 |
| } |
| |
| User user = userService.findByUsername(username); |
| if (user != null) { |
| long oldUploaded = user.getUploaded(); |
| long oldDownloaded = user.getDownloaded(); |
| |
| user.setUploaded(oldUploaded + uploadedDelta); |
| user.setDownloaded(oldDownloaded + downloadedDelta); |
| userService.save(user); |
| |
| // 更新用户等级 |
| userService.updateUserLevel(user.getUid()); |
| |
| logger.info("用户 {} 流量更新: 上传 {} -> {} (+{}), 下载 {} -> {} (+{})", |
| username, |
| oldUploaded, user.getUploaded(), uploadedDelta, |
| oldDownloaded, user.getDownloaded(), downloadedDelta); |
| } else { |
| logger.warn("尝试更新不存在的用户流量: {}", username); |
| } |
| } |
| |
| // 辅助方法:获取事件参数 |
| private String getEventParam(Map<String, String[]> params) { |
| return params.containsKey("event") ? |
| decodeParam(params.get("event")[0]) : "update"; |
| } |
| |
| // 辅助方法:获取长整型参数 |
| private long getLongParam(Map<String, String[]> params, String key, long defaultValue) { |
| return params.containsKey(key) ? |
| Long.parseLong(params.get(key)[0]) : defaultValue; |
| } |
| |
| // 辅助方法:查找或创建 peer |
| private PeerInfoEntity findOrCreatePeer(String peerId, String infoHash) { |
| return peerInfoRepository.findByPeerIdAndInfoHash(peerId, infoHash) |
| .orElseGet(() -> { |
| PeerInfoEntity newPeer = new PeerInfoEntity(); |
| newPeer.setPeerId(peerId); |
| newPeer.setInfoHash(infoHash); |
| newPeer.setActive(true); |
| newPeer.setStatus("downloading"); |
| newPeer.setLastSeen(LocalDateTime.now()); |
| return newPeer; |
| }); |
| } |
| |
| // 辅助方法:设置 peer 属性 |
| private void setPeerProperties(PeerInfoEntity peer, String ip, int port, |
| long uploaded, long downloaded, long left) { |
| peer.setIp(ip); |
| peer.setPort(port); |
| peer.setPeerId(peer.getPeerId() != null ? peer.getPeerId() : ""); // 防止 NPE |
| peer.setInfoHash(peer.getInfoHash() != null ? peer.getInfoHash() : ""); |
| peer.setUploaded(uploaded); |
| peer.setDownloaded(downloaded); |
| peer.setLeft(left); |
| peer.setLastSeen(LocalDateTime.now()); |
| } |
| |
| // 辅助方法:处理 peer 事件 |
| private void handlePeerEvent(String event, PeerInfoEntity peer, long left) { |
| switch (event) { |
| case "started": |
| peer.setStatus("downloading"); |
| peer.setActive(true); |
| break; |
| |
| case "stopped": |
| peer.setActive(false); |
| break; |
| |
| case "completed": |
| handleCompletedEvent(peer); |
| break; |
| |
| case "update": |
| default: |
| handleUpdateEvent(peer, left); |
| break; |
| } |
| } |
| |
| // 处理完成事件 |
| private void handleCompletedEvent(PeerInfoEntity peer) { |
| peer.setStatus("completed"); |
| peer.setActive(true); |
| peer.setLeft(0); |
| incrementCompletedCount(peer.getInfoHash()); |
| } |
| |
| // 处理更新事件 |
| private void handleUpdateEvent(PeerInfoEntity peer, long left) { |
| if (left == 0 && "downloading".equals(peer.getStatus())) { |
| // 检测到下载完成 |
| peer.setStatus("completed"); |
| incrementCompletedCount(peer.getInfoHash()); |
| } |
| peer.setActive(true); |
| } |
| |
| // 增加完成次数 |
| private void incrementCompletedCount(String infoHash) { |
| TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash); |
| if (meta != null) { |
| statsService.incrementCompletedCount(meta.getId()); |
| } |
| } |
| |
| // 构建 peer 响应 |
| private byte[] buildPeerResponse(List<PeerInfoEntity> peers) { |
| List<String> ips = peers.stream().map(PeerInfoEntity::getIp).toList(); |
| List<Integer> ports = peers.stream().map(PeerInfoEntity::getPort).toList(); |
| return BencodeCodec.buildCompactPeers(ips, ports); |
| } |
| |
| // 构建错误响应 |
| private byte[] errorResponse(String reason) { |
| return BencodeCodec.encode(Map.of("failure reason", reason)); |
| } |
| |
| // 解码参数 |
| private String decodeParam(String raw) { |
| return new String(raw.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8); |
| } |
| } |