Merge "添加更新用户流量的方法,并定时跟新到所有用户"

Change-Id: I558ee7de6767ed1b78685883310a268ea51b198a
diff --git a/src/main/java/com/pt/utils/BencodeCodec.java b/src/main/java/com/pt/utils/BencodeCodec.java
index 2117d43..e3dfe5a 100644
--- a/src/main/java/com/pt/utils/BencodeCodec.java
+++ b/src/main/java/com/pt/utils/BencodeCodec.java
@@ -76,22 +76,21 @@
     /* ------------- 解码部分 ------------- */
 
     public static Object decode(byte[] data) throws IOException {
-        try (ByteArrayInputStream in = new ByteArrayInputStream(data)) {
-            in.mark(data.length);
+        try (PushbackInputStream in = new PushbackInputStream(new ByteArrayInputStream(data))) {
             return decodeNext(in);
         }
     }
 
-    private static Object decodeNext(InputStream in) throws IOException {
+    private static Object decodeNext(PushbackInputStream in) throws IOException {
         int prefix = in.read();
         if (prefix == -1) {
             throw new IOException("Unexpected end of stream");
         }
-
-        in.mark(1024);
+        // no mark/reset calls here
 
         if (prefix >= '0' && prefix <= '9') {
-            in.reset();
+            // 字符串,回退这个字节,parseString自行读长度
+            in.unread(prefix);
             return parseString(in);
         } else if (prefix == 'i') {
             return parseInteger(in);
@@ -104,33 +103,41 @@
         }
     }
 
+
+
     private static String parseString(InputStream in) throws IOException {
-        StringBuilder lenStr = new StringBuilder();
-        int b;
-        while ((b = in.read()) != -1 && b != ':') {
-            if (b < '0' || b > '9') {
-                throw new IOException("Invalid string length character: " + (char) b);
-            }
-            lenStr.append((char) b);
-        }
-        if (b == -1) {
-            throw new IOException("Unexpected end of stream reading string length");
-        }
-        int length = Integer.parseInt(lenStr.toString());
+        int ch;
+        StringBuilder lenBuilder = new StringBuilder();
 
-        byte[] buf = new byte[length];
-        int offset = 0;
-        while (offset < length) {
-            int read = in.read(buf, offset, length - offset);
-            if (read == -1) {
-                throw new IOException("Unexpected end of stream reading string data");
+        while ((ch = in.read()) != -1 && ch != ':') {
+            if (!Character.isDigit(ch)) {
+                throw new IOException("Invalid string length prefix: " + (char) ch);
             }
-            offset += read;
+            lenBuilder.append((char) ch);
         }
 
-        return new String(buf, StandardCharsets.UTF_8);
+        if (ch != ':') {
+            throw new IOException("Expected ':' after string length");
+        }
+
+        int len = Integer.parseInt(lenBuilder.toString());
+        byte[] strBytes = new byte[len];
+
+        int read = 0;
+        while (read < len) {
+            int r = in.read(strBytes, read, len - read);
+            if (r == -1) {
+                throw new IOException("Unexpected end of stream when reading string");
+            }
+            read += r;
+        }
+
+        // 这里转换为 UTF-8 字符串返回,如果你确定是文本;如果是二进制可以改成返回byte[]
+        return new String(strBytes, StandardCharsets.UTF_8);
     }
 
+
+
     private static long parseInteger(InputStream in) throws IOException {
         StringBuilder intStr = new StringBuilder();
         int b;
@@ -140,41 +147,44 @@
         if (b == -1) {
             throw new IOException("Unexpected end of stream reading integer");
         }
-        return Long.parseLong(intStr.toString());
+
+        String intValue = intStr.toString();
+        System.out.println("Integer parsed raw: " + intValue);  // debug line
+
+        return Long.parseLong(intValue);
     }
 
-    private static List<Object> parseList(InputStream in) throws IOException {
+    private static List<Object> parseList(PushbackInputStream in) throws IOException {
         List<Object> list = new ArrayList<>();
-        int b;
         while (true) {
-            in.mark(1);
-            b = in.read();
-            if (b == -1) {
-                throw new IOException("Unexpected end of stream reading list");
-            }
-            if (b == 'e') {
+            int ch = in.read();
+            if (ch == 'e') {
                 break;
             }
-            in.reset();
+            if (ch == -1) {
+                throw new IOException("Unexpected end of stream in list");
+            }
+            in.unread(ch);
             list.add(decodeNext(in));
         }
         return list;
     }
 
-    private static Map<String, Object> parseDict(InputStream in) throws IOException {
+    private static Map<String, Object> parseDict(PushbackInputStream in) throws IOException {
         Map<String, Object> map = new LinkedHashMap<>();
-        int b;
+
         while (true) {
-            in.mark(1);
-            b = in.read();
-            if (b == -1) {
-                throw new IOException("Unexpected end of stream reading dictionary");
+            int ch = in.read();
+            if (ch == 'e') {
+                break; // 字典结束
             }
-            if (b == 'e') {
-                break;
+            if (ch == -1) {
+                throw new IOException("Unexpected end of stream in dict");
             }
-            in.reset();
-            String key = (String) decodeNext(in);
+            // 回退到上面读的字节,parseString 自己读长度
+            in.unread(ch);
+
+            String key = parseString(in);
             Object value = decodeNext(in);
             map.put(key, value);
         }
@@ -187,8 +197,9 @@
     public static byte[] buildCompactPeer(String ip, int port) {
         try {
             InetAddress addr = InetAddress.getByName(ip);
-            ByteBuffer buffer = ByteBuffer.allocate(6);
-            buffer.put(addr.getAddress());
+            byte[] ipBytes = addr.getAddress();
+            ByteBuffer buffer = ByteBuffer.allocate(ipBytes.length + 2);
+            buffer.put(ipBytes);
             buffer.putShort((short) port);
             return buffer.array();
         } catch (IOException e) {
@@ -196,6 +207,7 @@
         }
     }
 
+
     // 构造多个compact peer的二进制拼接
     public static byte[] buildCompactPeers(List<String> ips, List<Integer> ports) {
         if (ips.size() != ports.size()) throw new IllegalArgumentException("IPs and ports list size mismatch");