Merge "添加更新用户流量的方法,并定时跟新到所有用户"
Change-Id: I558ee7de6767ed1b78685883310a268ea51b198a
diff --git a/src/main/java/com/pt/controller/ResourceController.java b/src/main/java/com/pt/controller/ResourceController.java
index 306851a..2f75af6 100644
--- a/src/main/java/com/pt/controller/ResourceController.java
+++ b/src/main/java/com/pt/controller/ResourceController.java
@@ -77,13 +77,13 @@
Map<String, Object> ans = new HashMap<>();
if (!JWTUtils.checkToken(token, username, Constants.UserRole.USER)) {
- ans.put("message", "Invalid token");
+ ans.put("result", "Invalid token");
return ResponseEntity.badRequest().body(ans);
}
User user = userService.findByUsername(username);
if (user == null || user.getLevel() < 2) {
- ans.put("message", "Insufficient permissions to publish resources");
+ ans.put("result", "Insufficient permissions to publish resources");
return ResponseEntity.status(403).body(ans);
}
@@ -91,11 +91,11 @@
// 传入种子文件字节,同时传入资源其他信息
resourceService.publishResource(name, description, username, size, torrentFile.getBytes());
} catch (Exception e) {
- ans.put("message", "Failed to publish resource: " + e.getMessage());
+ ans.put("result", "Failed to publish resource: " + e.getMessage());
return ResponseEntity.status(500).body(ans);
}
- ans.put("message", "Resource published successfully");
+ ans.put("result", "Resource published successfully");
return ResponseEntity.ok(ans);
}
diff --git a/src/main/java/com/pt/controller/TrackerController.java b/src/main/java/com/pt/controller/TrackerController.java
index 33d8831..56c0867 100644
--- a/src/main/java/com/pt/controller/TrackerController.java
+++ b/src/main/java/com/pt/controller/TrackerController.java
@@ -7,29 +7,34 @@
import org.springframework.web.bind.annotation.*;
import java.io.IOException;
+import java.util.HashMap;
import java.util.Map;
@RestController
-@CrossOrigin(origins = "*")
@RequestMapping("/api/tracker")
+@CrossOrigin(origins = "*")
public class TrackerController {
@Autowired
private TrackerService trackerService;
- @GetMapping("/announce")
- public void announce(HttpServletRequest request, HttpServletResponse response) throws IOException {
+ // tracker相应bt客户端的announce请求
+ @PostMapping("/announce")
+ public void announceByPost(
+ @RequestParam("info_hash") String infoHash,
+ @RequestParam("peer_id") String peerId,
+ @RequestParam("port") int port,
+ HttpServletRequest request,
+ HttpServletResponse response
+ ) throws IOException {
try {
String ip = request.getRemoteAddr();
- Map<String, String[]> params = request.getParameterMap();
- // 验证必要参数
- if (!params.containsKey("username")) {
- response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
- response.getWriter().write("Missing required parameter: username");
- return;
- }
-
+ // 将参数封装为 Map 传给服务层(也可以直接传对象)
+ Map<String, String[]> params = new HashMap<>();
+ params.put("info_hash", new String[]{infoHash});
+ params.put("peer_id", new String[]{peerId});
+ params.put("port", new String[]{String.valueOf(port)});
byte[] bencodedResponse = trackerService.handleAnnounce(params, ip);
response.setContentType("application/x-bittorrent");
diff --git a/src/main/java/com/pt/entity/PeerInfoEntity.java b/src/main/java/com/pt/entity/PeerInfoEntity.java
index d232a73..56125cb 100644
--- a/src/main/java/com/pt/entity/PeerInfoEntity.java
+++ b/src/main/java/com/pt/entity/PeerInfoEntity.java
@@ -27,6 +27,22 @@
private String username; // 添加用户名字段
+ public PeerInfoEntity(String ipAddress, int port, String peerId) {
+ this.ip = ipAddress;
+ this.port = port;
+ this.peerId = peerId;
+ this.lastSeen = LocalDateTime.now();
+ this.status = "downloading"; // 默认状态为下载中
+ this.isActive = true; // 默认活跃状态
+ this.uploaded = 0;
+ this.downloaded = 0;
+ this.left = 0;
+ }
+
+ public PeerInfoEntity() {
+
+ }
+
public Long getId() {
return id;
}
diff --git a/src/main/java/com/pt/entity/User.java b/src/main/java/com/pt/entity/User.java
index aef2cd3..bc3a488 100644
--- a/src/main/java/com/pt/entity/User.java
+++ b/src/main/java/com/pt/entity/User.java
@@ -16,6 +16,7 @@
private int points;
private long uploaded;
private long downloaded;
+ private double shareRatio;
public User() {
}
@@ -79,6 +80,17 @@
this.downloaded = downloaded;
}
+ public double getShareRatio() {
+ if (downloaded == 0) {
+ return 0;
+ }
+ return (double) uploaded / downloaded;
+ }
+
+ public void setShareRatio(double shareRatio) {
+ this.shareRatio = shareRatio;
+ }
+
@Override
public String toString() {
return "{" +
diff --git a/src/main/java/com/pt/repository/TorrentMetaRepository.java b/src/main/java/com/pt/repository/TorrentMetaRepository.java
index 839fc01..5135661 100644
--- a/src/main/java/com/pt/repository/TorrentMetaRepository.java
+++ b/src/main/java/com/pt/repository/TorrentMetaRepository.java
@@ -3,8 +3,10 @@
import com.pt.entity.TorrentMeta;
import org.springframework.data.jpa.repository.JpaRepository;
+import java.util.Optional;
+
public interface TorrentMetaRepository extends JpaRepository<TorrentMeta, Long> {
- TorrentMeta findByInfoHash(String infoHash);
+ Optional<TorrentMeta> findByInfoHash(String infoHash);
TorrentMeta findByFilename(String filename);
}
diff --git a/src/main/java/com/pt/service/TorrentService.java b/src/main/java/com/pt/service/TorrentService.java
index ad38622..3c9d28d 100644
--- a/src/main/java/com/pt/service/TorrentService.java
+++ b/src/main/java/com/pt/service/TorrentService.java
@@ -10,6 +10,7 @@
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
@Service
public class TorrentService {
@@ -18,25 +19,17 @@
private TorrentMetaRepository torrentMetaRepository;
public TorrentMeta parseAndSaveTorrent(byte[] torrentBytes) throws Exception {
- System.out.println("111");
- for (byte b : torrentBytes) {
- System.out.println(b);
- }
Map<String, Object> torrentDict = (Map<String, Object>) BencodeCodec.decode(torrentBytes);
if (!torrentDict.containsKey("info")) {
throw new IllegalArgumentException("Invalid torrent file: missing 'info' dictionary");
}
Map<String, Object> infoDict = (Map<String, Object>) torrentDict.get("info");
-
- System.out.println(222);
// 计算 info_hash
byte[] infoEncoded = BencodeCodec.encode(infoDict);
MessageDigest sha1 = MessageDigest.getInstance("SHA-1");
byte[] infoHashBytes = sha1.digest(infoEncoded);
String infoHash = bytesToHex(infoHashBytes);
-
- System.out.println(333);
// 获取文件名,大小等
String name = (String) infoDict.get("name");
long length = 0;
@@ -50,17 +43,23 @@
}
length = totalLength;
}
-
// 保存到数据库
- TorrentMeta meta = new TorrentMeta();
- meta.setFilename(name);
- meta.setInfoHash(infoHash);
- meta.setSize(length);
- meta.setUploadTime(LocalDateTime.now());
- meta.setTorrentData(torrentBytes);
+ Optional<TorrentMeta> existing = torrentMetaRepository.findByInfoHash(infoHash);
- torrentMetaRepository.save(meta);
- return meta;
+ if (existing.isPresent()) {
+ System.out.println("该种子已存在,跳过保存");
+ return existing.get();
+ } else {
+ TorrentMeta meta = new TorrentMeta();
+ meta.setFilename(name);
+ meta.setInfoHash(infoHash);
+ meta.setSize(length);
+ meta.setUploadTime(LocalDateTime.now());
+ meta.setTorrentData(torrentBytes);
+
+ return torrentMetaRepository.save(meta);
+ }
+
}
diff --git a/src/main/java/com/pt/service/TorrentStatsService.java b/src/main/java/com/pt/service/TorrentStatsService.java
index 9596784..baa63e3 100644
--- a/src/main/java/com/pt/service/TorrentStatsService.java
+++ b/src/main/java/com/pt/service/TorrentStatsService.java
@@ -1,6 +1,7 @@
package com.pt.service;
import com.pt.entity.PeerInfoEntity;
+import com.pt.entity.TorrentMeta;
import com.pt.entity.TorrentStats;
import com.pt.exception.ResourceNotFoundException;
import com.pt.repository.PeerInfoRepository;
@@ -13,6 +14,7 @@
import java.time.LocalDateTime;
import java.util.List;
+import java.util.Optional;
@Service
public class TorrentStatsService {
@@ -91,11 +93,18 @@
}
// 3. 更新统计记录
- TorrentStats stats = statsRepository.findByTorrentId(
- torrentMetaRepository.findByInfoHash(infoHash).getId()
- ).orElse(new TorrentStats());
+ Optional<TorrentMeta> optionalMeta = torrentMetaRepository.findByInfoHash(infoHash);
+ if (optionalMeta.isEmpty()) {
+ // 处理找不到 info_hash 的情况
+ throw new IllegalArgumentException("Invalid info_hash: not found in database");
+ }
- stats.setTorrentId(torrentMetaRepository.findByInfoHash(infoHash).getId());
+ TorrentMeta meta = optionalMeta.get();
+
+ TorrentStats stats = statsRepository.findByTorrentId(meta.getId())
+ .orElse(new TorrentStats());
+
+ stats.setTorrentId(meta.getId());
stats.setSeederCount(seeders);
stats.setLeecherCount(leechers);
stats.setCompletedCount(completed);
diff --git a/src/main/java/com/pt/service/TrackerService.java b/src/main/java/com/pt/service/TrackerService.java
index 8a654d0..ea840e1 100644
--- a/src/main/java/com/pt/service/TrackerService.java
+++ b/src/main/java/com/pt/service/TrackerService.java
@@ -1,233 +1,100 @@
package com.pt.service;
-import com.pt.entity.PeerInfoEntity;
import com.pt.entity.TorrentMeta;
-import com.pt.entity.User;
-import com.pt.exception.ResourceNotFoundException;
-import com.pt.repository.PeerInfoRepository;
import com.pt.repository.TorrentMetaRepository;
import com.pt.utils.BencodeCodec;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
-import java.time.LocalDateTime;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
@Service
public class TrackerService {
- private static final Logger logger = LoggerFactory.getLogger(TrackerService.class);
+ private final Map<String, List<PeerInfo>> torrentPeers = new ConcurrentHashMap<>();
@Autowired
private TorrentMetaRepository torrentMetaRepository;
- @Autowired
- private PeerInfoRepository peerInfoRepository;
-
- @Autowired
- private TorrentStatsService statsService;
-
- @Autowired
- private UserService userService;
-
- @Transactional
public byte[] handleAnnounce(Map<String, String[]> params, String ipAddress) {
try {
- // 验证必要参数
- if (!params.containsKey("info_hash") || !params.containsKey("peer_id")
- || !params.containsKey("port")) {
- return errorResponse("Missing required parameters");
+ if (!params.containsKey("info_hash") || !params.containsKey("peer_id") || !params.containsKey("port")) {
+ return BencodeCodec.encode(Map.of("failure reason", "Missing required parameters"));
}
- // 解析参数
+ System.out.println("Received announce params: " + params);
+ System.out.println("Client IP: " + ipAddress);
+
+ // 用ISO-8859-1解码,确保二进制数据正确还原
String infoHash = decodeParam(params.get("info_hash")[0]);
- String peerId = decodeParam(params.get("peer_id")[0]);
- String username = decodeParam(params.get("username")[0]);
- int port = Integer.parseInt(params.get("port")[0]);
+ System.out.println("Decoded info_hash (raw bytes as hex): " + bytesToHex(infoHash.getBytes("ISO-8859-1")));
- // 获取事件类型
- String event = getEventParam(params);
-
- // 获取流量数据
- long uploaded = getLongParam(params, "uploaded", 0);
- long downloaded = getLongParam(params, "downloaded", 0);
- long left = getLongParam(params, "left", 0);
-
- // 验证种子是否存在
- TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
- if (meta == null) {
- return errorResponse("Torrent not found: " + infoHash);
+ Optional<TorrentMeta> meta = torrentMetaRepository.findByInfoHash(infoHash);
+ if (!meta.isPresent()) {
+ System.out.println("Invalid info_hash: not found in DB");
+ return BencodeCodec.encode(Map.of("failure reason", "Invalid info_hash"));
}
- // 创建或更新 peer 信息
- PeerInfoEntity peer = findOrCreatePeer(peerId, infoHash);
-
- // 计算流量增量
- long uploadedDelta = uploaded - peer.getUploaded();
- long downloadedDelta = downloaded - peer.getDownloaded();
-
- // 更新用户总流量
- updateUserTraffic(username, uploadedDelta, downloadedDelta);
+ String peerId = decodeParam(params.get("peer_id")[0]);
+ System.out.println("Decoded peer_id: " + peerId);
- // 设置 peer 属性
- peer.setUsername(username);
- setPeerProperties(peer, ipAddress, port, uploaded, downloaded, left);
+ int port = Integer.parseInt(params.get("port")[0]);
+ System.out.println("Port: " + port);
- // 处理事件类型
- handlePeerEvent(event, peer, left);
+ PeerInfo peer = new PeerInfo(ipAddress, port, peerId);
- // 保存 peer
- peerInfoRepository.save(peer);
+ torrentPeers.computeIfAbsent(infoHash, k -> new CopyOnWriteArrayList<>());
+ List<PeerInfo> peers = torrentPeers.get(infoHash);
- // 更新种子统计信息
- statsService.updateTorrentStats(infoHash);
+ boolean exists = peers.stream().anyMatch(p -> p.peerId.equals(peerId));
+ if (!exists) {
+ peers.add(peer);
+ System.out.println("Added new peer: " + peerId);
+ } else {
+ System.out.println("Peer already exists: " + peerId);
+ }
- // 获取 peer 列表响应
- List<PeerInfoEntity> activePeers = peerInfoRepository.findActivePeersByInfoHash(infoHash);
- byte[] peerBytes = buildPeerResponse(activePeers);
+ List<String> ips = peers.stream().map(p -> p.ip).toList();
+ List<Integer> ports = peers.stream().map(p -> p.port).toList();
+ byte[] peerBytes = BencodeCodec.buildCompactPeers(ips, ports);
- // 返回成功响应
return BencodeCodec.buildTrackerResponse(1800, peerBytes);
} catch (Exception e) {
- return errorResponse("Internal server error");
+ e.printStackTrace(); // 打印异常堆栈,方便定位错误
+ return BencodeCodec.encode(Map.of("failure reason", "Internal server error"));
}
}
- // 添加更新用户流量的方法
- @Transactional
- private void updateUserTraffic(String username, long uploadedDelta, long downloadedDelta) {
- if (uploadedDelta <= 0 && downloadedDelta <= 0) {
- return; // 没有新的流量,不需要更新
+ // 辅助函数:打印字节转成16进制字符串
+ private String bytesToHex(byte[] bytes) {
+ StringBuilder sb = new StringBuilder();
+ for (byte b : bytes) {
+ sb.append(String.format("%02x", b));
}
+ return sb.toString();
+ }
- User user = userService.findByUsername(username);
- if (user != null) {
- long oldUploaded = user.getUploaded();
- long oldDownloaded = user.getDownloaded();
-
- user.setUploaded(oldUploaded + uploadedDelta);
- user.setDownloaded(oldDownloaded + downloadedDelta);
- userService.save(user);
-
- // 更新用户等级
- userService.updateUserLevel(user.getUid());
-
- logger.info("用户 {} 流量更新: 上传 {} -> {} (+{}), 下载 {} -> {} (+{})",
- username,
- oldUploaded, user.getUploaded(), uploadedDelta,
- oldDownloaded, user.getDownloaded(), downloadedDelta);
- } else {
- logger.warn("尝试更新不存在的用户流量: {}", username);
+ // decodeParam 建议用ISO-8859-1解码
+ private String decodeParam(String param) throws UnsupportedEncodingException {
+ return URLDecoder.decode(param, "ISO-8859-1");
+ }
+
+
+ private static class PeerInfo {
+ String ip;
+ int port;
+ String peerId;
+
+ public PeerInfo(String ip, int port, String peerId) {
+ this.ip = ip;
+ this.port = port;
+ this.peerId = peerId;
}
}
+}
- // 辅助方法:获取事件参数
- private String getEventParam(Map<String, String[]> params) {
- return params.containsKey("event") ?
- decodeParam(params.get("event")[0]) : "update";
- }
-
- // 辅助方法:获取长整型参数
- private long getLongParam(Map<String, String[]> params, String key, long defaultValue) {
- return params.containsKey(key) ?
- Long.parseLong(params.get(key)[0]) : defaultValue;
- }
-
- // 辅助方法:查找或创建 peer
- private PeerInfoEntity findOrCreatePeer(String peerId, String infoHash) {
- return peerInfoRepository.findByPeerIdAndInfoHash(peerId, infoHash)
- .orElseGet(() -> {
- PeerInfoEntity newPeer = new PeerInfoEntity();
- newPeer.setPeerId(peerId);
- newPeer.setInfoHash(infoHash);
- newPeer.setActive(true);
- newPeer.setStatus("downloading");
- newPeer.setLastSeen(LocalDateTime.now());
- return newPeer;
- });
- }
-
- // 辅助方法:设置 peer 属性
- private void setPeerProperties(PeerInfoEntity peer, String ip, int port,
- long uploaded, long downloaded, long left) {
- peer.setIp(ip);
- peer.setPort(port);
- peer.setPeerId(peer.getPeerId() != null ? peer.getPeerId() : ""); // 防止 NPE
- peer.setInfoHash(peer.getInfoHash() != null ? peer.getInfoHash() : "");
- peer.setUploaded(uploaded);
- peer.setDownloaded(downloaded);
- peer.setLeft(left);
- peer.setLastSeen(LocalDateTime.now());
- }
-
- // 辅助方法:处理 peer 事件
- private void handlePeerEvent(String event, PeerInfoEntity peer, long left) {
- switch (event) {
- case "started":
- peer.setStatus("downloading");
- peer.setActive(true);
- break;
-
- case "stopped":
- peer.setActive(false);
- break;
-
- case "completed":
- handleCompletedEvent(peer);
- break;
-
- case "update":
- default:
- handleUpdateEvent(peer, left);
- break;
- }
- }
-
- // 处理完成事件
- private void handleCompletedEvent(PeerInfoEntity peer) {
- peer.setStatus("completed");
- peer.setActive(true);
- peer.setLeft(0);
- incrementCompletedCount(peer.getInfoHash());
- }
-
- // 处理更新事件
- private void handleUpdateEvent(PeerInfoEntity peer, long left) {
- if (left == 0 && "downloading".equals(peer.getStatus())) {
- // 检测到下载完成
- peer.setStatus("completed");
- incrementCompletedCount(peer.getInfoHash());
- }
- peer.setActive(true);
- }
-
- // 增加完成次数
- private void incrementCompletedCount(String infoHash) {
- TorrentMeta meta = torrentMetaRepository.findByInfoHash(infoHash);
- if (meta != null) {
- statsService.incrementCompletedCount(meta.getId());
- }
- }
-
- // 构建 peer 响应
- private byte[] buildPeerResponse(List<PeerInfoEntity> peers) {
- List<String> ips = peers.stream().map(PeerInfoEntity::getIp).toList();
- List<Integer> ports = peers.stream().map(PeerInfoEntity::getPort).toList();
- return BencodeCodec.buildCompactPeers(ips, ports);
- }
-
- // 构建错误响应
- private byte[] errorResponse(String reason) {
- return BencodeCodec.encode(Map.of("failure reason", reason));
- }
-
- // 解码参数
- private String decodeParam(String raw) {
- return new String(raw.getBytes(StandardCharsets.ISO_8859_1), StandardCharsets.UTF_8);
- }
-}
\ No newline at end of file
diff --git a/src/main/java/com/pt/utils/BencodeCodec.java b/src/main/java/com/pt/utils/BencodeCodec.java
index 2117d43..e3dfe5a 100644
--- a/src/main/java/com/pt/utils/BencodeCodec.java
+++ b/src/main/java/com/pt/utils/BencodeCodec.java
@@ -76,22 +76,21 @@
/* ------------- 解码部分 ------------- */
public static Object decode(byte[] data) throws IOException {
- try (ByteArrayInputStream in = new ByteArrayInputStream(data)) {
- in.mark(data.length);
+ try (PushbackInputStream in = new PushbackInputStream(new ByteArrayInputStream(data))) {
return decodeNext(in);
}
}
- private static Object decodeNext(InputStream in) throws IOException {
+ private static Object decodeNext(PushbackInputStream in) throws IOException {
int prefix = in.read();
if (prefix == -1) {
throw new IOException("Unexpected end of stream");
}
-
- in.mark(1024);
+ // no mark/reset calls here
if (prefix >= '0' && prefix <= '9') {
- in.reset();
+ // 字符串,回退这个字节,parseString自行读长度
+ in.unread(prefix);
return parseString(in);
} else if (prefix == 'i') {
return parseInteger(in);
@@ -104,33 +103,41 @@
}
}
+
+
private static String parseString(InputStream in) throws IOException {
- StringBuilder lenStr = new StringBuilder();
- int b;
- while ((b = in.read()) != -1 && b != ':') {
- if (b < '0' || b > '9') {
- throw new IOException("Invalid string length character: " + (char) b);
- }
- lenStr.append((char) b);
- }
- if (b == -1) {
- throw new IOException("Unexpected end of stream reading string length");
- }
- int length = Integer.parseInt(lenStr.toString());
+ int ch;
+ StringBuilder lenBuilder = new StringBuilder();
- byte[] buf = new byte[length];
- int offset = 0;
- while (offset < length) {
- int read = in.read(buf, offset, length - offset);
- if (read == -1) {
- throw new IOException("Unexpected end of stream reading string data");
+ while ((ch = in.read()) != -1 && ch != ':') {
+ if (!Character.isDigit(ch)) {
+ throw new IOException("Invalid string length prefix: " + (char) ch);
}
- offset += read;
+ lenBuilder.append((char) ch);
}
- return new String(buf, StandardCharsets.UTF_8);
+ if (ch != ':') {
+ throw new IOException("Expected ':' after string length");
+ }
+
+ int len = Integer.parseInt(lenBuilder.toString());
+ byte[] strBytes = new byte[len];
+
+ int read = 0;
+ while (read < len) {
+ int r = in.read(strBytes, read, len - read);
+ if (r == -1) {
+ throw new IOException("Unexpected end of stream when reading string");
+ }
+ read += r;
+ }
+
+ // 这里转换为 UTF-8 字符串返回,如果你确定是文本;如果是二进制可以改成返回byte[]
+ return new String(strBytes, StandardCharsets.UTF_8);
}
+
+
private static long parseInteger(InputStream in) throws IOException {
StringBuilder intStr = new StringBuilder();
int b;
@@ -140,41 +147,44 @@
if (b == -1) {
throw new IOException("Unexpected end of stream reading integer");
}
- return Long.parseLong(intStr.toString());
+
+ String intValue = intStr.toString();
+ System.out.println("Integer parsed raw: " + intValue); // debug line
+
+ return Long.parseLong(intValue);
}
- private static List<Object> parseList(InputStream in) throws IOException {
+ private static List<Object> parseList(PushbackInputStream in) throws IOException {
List<Object> list = new ArrayList<>();
- int b;
while (true) {
- in.mark(1);
- b = in.read();
- if (b == -1) {
- throw new IOException("Unexpected end of stream reading list");
- }
- if (b == 'e') {
+ int ch = in.read();
+ if (ch == 'e') {
break;
}
- in.reset();
+ if (ch == -1) {
+ throw new IOException("Unexpected end of stream in list");
+ }
+ in.unread(ch);
list.add(decodeNext(in));
}
return list;
}
- private static Map<String, Object> parseDict(InputStream in) throws IOException {
+ private static Map<String, Object> parseDict(PushbackInputStream in) throws IOException {
Map<String, Object> map = new LinkedHashMap<>();
- int b;
+
while (true) {
- in.mark(1);
- b = in.read();
- if (b == -1) {
- throw new IOException("Unexpected end of stream reading dictionary");
+ int ch = in.read();
+ if (ch == 'e') {
+ break; // 字典结束
}
- if (b == 'e') {
- break;
+ if (ch == -1) {
+ throw new IOException("Unexpected end of stream in dict");
}
- in.reset();
- String key = (String) decodeNext(in);
+ // 回退到上面读的字节,parseString 自己读长度
+ in.unread(ch);
+
+ String key = parseString(in);
Object value = decodeNext(in);
map.put(key, value);
}
@@ -187,8 +197,9 @@
public static byte[] buildCompactPeer(String ip, int port) {
try {
InetAddress addr = InetAddress.getByName(ip);
- ByteBuffer buffer = ByteBuffer.allocate(6);
- buffer.put(addr.getAddress());
+ byte[] ipBytes = addr.getAddress();
+ ByteBuffer buffer = ByteBuffer.allocate(ipBytes.length + 2);
+ buffer.put(ipBytes);
buffer.putShort((short) port);
return buffer.array();
} catch (IOException e) {
@@ -196,6 +207,7 @@
}
}
+
// 构造多个compact peer的二进制拼接
public static byte[] buildCompactPeers(List<String> ips, List<Integer> ports) {
if (ips.size() != ports.size()) throw new IllegalArgumentException("IPs and ports list size mismatch");