blob: 8a654d054ce98ba491fcd4e0a12552fc7dddbe19 [file] [log] [blame]
package com.pt.service;
import com.pt.entity.PeerInfoEntity;
import com.pt.entity.TorrentMeta;
import com.pt.entity.User;
import com.pt.exception.ResourceNotFoundException;
import com.pt.repository.PeerInfoRepository;
import com.pt.repository.TorrentMetaRepository;
import com.pt.utils.BencodeCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
@Service
public class TrackerService {
private static final Logger logger = LoggerFactory.getLogger(TrackerService.class);
@Autowired
private TorrentMetaRepository torrentMetaRepository;
@Autowired
private PeerInfoRepository peerInfoRepository;
@Autowired
private TorrentStatsService statsService;
@Autowired
private UserService userService;
@Transactional
public byte[] handleAnnounce(Map<String, String[]> params, String ipAddress) {
try {
// 验证必要参数
if (!params.containsKey("info_hash") || !params.containsKey("peer_id")
|| !params.containsKey("port")) {
return errorResponse("Missing required parameters");
}
// 解析参数
String infoHash = decodeParam(params.get("info_hash")[0]);
String peerId = decodeParam(params.get("peer_id")[0]);
String username = decodeParam(params.get("username")[0]);
int port = Integer.parseInt(params.get("port")[0]);
// 获取事件类型
String event = getEventParam(params);
// 获取流量数据
long uploaded = getLongParam(params, "uploaded", 0);
long downloaded = getLongParam(params, "downloaded", 0);
long left = getLongParam(params, "left", 0);
// 验证种子是否存在
TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
if (meta == null) {
return errorResponse("Torrent not found: " + infoHash);
}
// 创建或更新 peer 信息
PeerInfoEntity peer = findOrCreatePeer(peerId, infoHash);
// 计算流量增量
long uploadedDelta = uploaded - peer.getUploaded();
long downloadedDelta = downloaded - peer.getDownloaded();
// 更新用户总流量
updateUserTraffic(username, uploadedDelta, downloadedDelta);
// 设置 peer 属性
peer.setUsername(username);
setPeerProperties(peer, ipAddress, port, uploaded, downloaded, left);
// 处理事件类型
handlePeerEvent(event, peer, left);
// 保存 peer
peerInfoRepository.save(peer);
// 更新种子统计信息
statsService.updateTorrentStats(infoHash);
// 获取 peer 列表响应
List<PeerInfoEntity> activePeers = peerInfoRepository.findActivePeersByInfoHash(infoHash);
byte[] peerBytes = buildPeerResponse(activePeers);
// 返回成功响应
return BencodeCodec.buildTrackerResponse(1800, peerBytes);
} catch (Exception e) {
return errorResponse("Internal server error");
}
}
// 添加更新用户流量的方法
@Transactional
private void updateUserTraffic(String username, long uploadedDelta, long downloadedDelta) {
if (uploadedDelta <= 0 && downloadedDelta <= 0) {
return; // 没有新的流量,不需要更新
}
User user = userService.findByUsername(username);
if (user != null) {
long oldUploaded = user.getUploaded();
long oldDownloaded = user.getDownloaded();
user.setUploaded(oldUploaded + uploadedDelta);
user.setDownloaded(oldDownloaded + downloadedDelta);
userService.save(user);
// 更新用户等级
userService.updateUserLevel(user.getUid());
logger.info("用户 {} 流量更新: 上传 {} -> {} (+{}), 下载 {} -> {} (+{})",
username,
oldUploaded, user.getUploaded(), uploadedDelta,
oldDownloaded, user.getDownloaded(), downloadedDelta);
} else {
logger.warn("尝试更新不存在的用户流量: {}", username);
}
}
// 辅助方法:获取事件参数
private String getEventParam(Map<String, String[]> params) {
return params.containsKey("event") ?
decodeParam(params.get("event")[0]) : "update";
}
// 辅助方法:获取长整型参数
private long getLongParam(Map<String, String[]> params, String key, long defaultValue) {
return params.containsKey(key) ?
Long.parseLong(params.get(key)[0]) : defaultValue;
}
// 辅助方法:查找或创建 peer
private PeerInfoEntity findOrCreatePeer(String peerId, String infoHash) {
return peerInfoRepository.findByPeerIdAndInfoHash(peerId, infoHash)
.orElseGet(() -> {
PeerInfoEntity newPeer = new PeerInfoEntity();
newPeer.setPeerId(peerId);
newPeer.setInfoHash(infoHash);
newPeer.setActive(true);
newPeer.setStatus("downloading");
newPeer.setLastSeen(LocalDateTime.now());
return newPeer;
});
}
// 辅助方法:设置 peer 属性
private void setPeerProperties(PeerInfoEntity peer, String ip, int port,
long uploaded, long downloaded, long left) {
peer.setIp(ip);
peer.setPort(port);
peer.setPeerId(peer.getPeerId() != null ? peer.getPeerId() : ""); // 防止 NPE
peer.setInfoHash(peer.getInfoHash() != null ? peer.getInfoHash() : "");
peer.setUploaded(uploaded);
peer.setDownloaded(downloaded);
peer.setLeft(left);
peer.setLastSeen(LocalDateTime.now());
}
// 辅助方法:处理 peer 事件
private void handlePeerEvent(String event, PeerInfoEntity peer, long left) {
switch (event) {
case "started":
peer.setStatus("downloading");
peer.setActive(true);
break;
case "stopped":
peer.setActive(false);
break;
case "completed":
handleCompletedEvent(peer);
break;
case "update":
default:
handleUpdateEvent(peer, left);
break;
}
}
// 处理完成事件
private void handleCompletedEvent(PeerInfoEntity peer) {
peer.setStatus("completed");
peer.setActive(true);
peer.setLeft(0);
incrementCompletedCount(peer.getInfoHash());
}
// 处理更新事件
private void handleUpdateEvent(PeerInfoEntity peer, long left) {
if (left == 0 && "downloading".equals(peer.getStatus())) {
// 检测到下载完成
peer.setStatus("completed");
incrementCompletedCount(peer.getInfoHash());
}
peer.setActive(true);
}
// 增加完成次数
private void incrementCompletedCount(String infoHash) {
TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
if (meta != null) {
statsService.incrementCompletedCount(meta.getId());
}
}
// 构建 peer 响应
private byte[] buildPeerResponse(List<PeerInfoEntity> peers) {
List<String> ips = peers.stream().map(PeerInfoEntity::getIp).toList();
List<Integer> ports = peers.stream().map(PeerInfoEntity::getPort).toList();
return BencodeCodec.buildCompactPeers(ips, ports);
}
// 构建错误响应
private byte[] errorResponse(String reason) {
return BencodeCodec.encode(Map.of("failure reason", reason));
}
// 解码参数
private String decodeParam(String raw) {
return new String(raw.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
}
}