tracker
Change-Id: I8f8ac81f9c4d7c7650cd64d2dade701dc6c11dce
diff --git a/ttorrent-master/ttorrent-tracker/pom.xml b/ttorrent-master/ttorrent-tracker/pom.xml
new file mode 100644
index 0000000..0eb1beb
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/pom.xml
@@ -0,0 +1,40 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>ttorrent/tracker</name>
+ <url>http://turn.github.com/ttorrent/</url>
+ <artifactId>ttorrent-tracker</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-common</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-bencoding</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-test-api</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/AddressChecker.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/AddressChecker.java
new file mode 100644
index 0000000..98241ea
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/AddressChecker.java
@@ -0,0 +1,14 @@
+package com.turn.ttorrent.tracker;
+
+public interface AddressChecker {
+
+ /**
+ * this method must return true if is incorrect ip and other peers can not connect to this peer. If this method return true
+ * tracker doesn't register the peer for current torrent
+ *
+ * 检验传入的peer ip是否是正确的
+ * @param ip specified address
+ */
+ boolean isBadAddress(String ip);
+
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/MultiAnnounceRequestProcessor.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/MultiAnnounceRequestProcessor.java
new file mode 100644
index 0000000..5bcbd4e
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/MultiAnnounceRequestProcessor.java
@@ -0,0 +1,59 @@
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.bcodec.BDecoder;
+import com.turn.ttorrent.bcodec.BEValue;
+import com.turn.ttorrent.bcodec.BEncoder;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.TrackerMessage;
+import com.turn.ttorrent.common.protocol.http.HTTPTrackerErrorMessage;
+import org.simpleframework.http.Status;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MultiAnnounceRequestProcessor {
+
+ private final TrackerRequestProcessor myTrackerRequestProcessor;
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(MultiAnnounceRequestProcessor.class);
+
+ public MultiAnnounceRequestProcessor(TrackerRequestProcessor trackerRequestProcessor) {
+ myTrackerRequestProcessor = trackerRequestProcessor;
+ }
+
+ public void process(final String body, final String url, final String hostAddress, final TrackerRequestProcessor.RequestHandler requestHandler) throws IOException {
+
+ final List<BEValue> responseMessages = new ArrayList<BEValue>();
+ final AtomicBoolean isAnySuccess = new AtomicBoolean(false);
+ for (String s : body.split("\n")) {
+ myTrackerRequestProcessor.process(s, hostAddress, new TrackerRequestProcessor.RequestHandler() {
+ @Override
+ public void serveResponse(int code, String description, ByteBuffer responseData) {
+ isAnySuccess.set(isAnySuccess.get() || (code == Status.OK.getCode()));
+ try {
+ responseMessages.add(BDecoder.bdecode(responseData));
+ } catch (IOException e) {
+ logger.warn("cannot decode message from byte buffer");
+ }
+ }
+ });
+ }
+ if (responseMessages.isEmpty()) {
+ ByteBuffer res;
+ Status status;
+ res = HTTPTrackerErrorMessage.craft("").getData();
+ status = Status.BAD_REQUEST;
+ requestHandler.serveResponse(status.getCode(), "", res);
+ return;
+ }
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ BEncoder.bencode(responseMessages, out);
+ requestHandler.serveResponse(isAnySuccess.get() ? Status.OK.getCode() : Status.BAD_REQUEST.getCode(), "", ByteBuffer.wrap(out.toByteArray()));
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/PeerCollectorThread.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/PeerCollectorThread.java
new file mode 100644
index 0000000..bdbd0e2
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/PeerCollectorThread.java
@@ -0,0 +1,38 @@
+package com.turn.ttorrent.tracker;
+
+/**
+ * The unfresh peer collector thread.
+ * <p>
+ * <p>
+ * Every PEER_COLLECTION_FREQUENCY_SECONDS, this thread will collect
+ * unfresh peers from all announced torrents.
+ * </p>
+ */
+
+// 周期性地清理不再活跃的 peers
+public class PeerCollectorThread extends Thread {
+
+ public static final int COLLECTION_FREQUENCY = 10;
+ private final TorrentsRepository myTorrentsRepository;
+ private volatile int myTorrentExpireTimeoutSec = 20 * 60;
+
+ public PeerCollectorThread(TorrentsRepository torrentsRepository) {
+ myTorrentsRepository = torrentsRepository;
+ }
+
+ public void setTorrentExpireTimeoutSec(int torrentExpireTimeoutSec) {
+ myTorrentExpireTimeoutSec = torrentExpireTimeoutSec;
+ }
+
+ @Override
+ public void run() {
+ while (!isInterrupted()) {
+ myTorrentsRepository.cleanup(myTorrentExpireTimeoutSec);
+ try {
+ Thread.sleep(COLLECTION_FREQUENCY * 1000);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TorrentsRepository.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TorrentsRepository.java
new file mode 100644
index 0000000..65ae892
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TorrentsRepository.java
@@ -0,0 +1,99 @@
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
+
+// 管理和存储 torrents(种子文件)信息的仓库,它实现了对 torrents 的增、查、更新、清理等操作。
+// 这个类的实现主要用于一个 Tracker 服务器,负责管理 torrent 信息并对参与下载和上传的 peer 进行跟踪。
+
+//torrent存储仓库,能根据infohash的值,获取唯一的torrent,能实现上传
+// torrent 相关信息:整个 BT 种子文件实际上就是一个 bencoding 编码的 Dictionary, 它有两个 Key, 分别是 announce 和 info
+// announce是btTracker的url
+// info是字典,对应资源相关信息,name,piece length(分片长度)
+// piece(处理将数据分片)
+// length / files(这两个键不能同时存在, 有且仅有其中一个,只是一个文件是length,文件夹是files)
+
+public class TorrentsRepository {
+
+ private final ReentrantLock[] myLocks;
+ // ConcurrentMap一种线程安全的map映射,键值对应,torrent 的哈希值--TrackedTorrent
+ private final ConcurrentMap<String, TrackedTorrent> myTorrents;
+
+ // 构造函数
+ public TorrentsRepository(int locksCount) {
+
+ if (locksCount <= 0) {
+ throw new IllegalArgumentException("Lock count must be positive");
+ }
+
+ myLocks = new ReentrantLock[locksCount];
+ for (int i = 0; i < myLocks.length; i++) {
+ myLocks[i] = new ReentrantLock();
+ }
+ myTorrents = new ConcurrentHashMap<String, TrackedTorrent>();
+ }
+
+ // 根据传入的种子的哈希值hexInfoHash,获取TrackedTorrent对象
+ public TrackedTorrent getTorrent(String hexInfoHash) {
+ return myTorrents.get(hexInfoHash);
+ }
+
+ // torrent不存在,进行添加操作
+ public void putIfAbsent(String hexInfoHash, TrackedTorrent torrent) {
+ myTorrents.putIfAbsent(hexInfoHash, torrent);
+ //调用myTorrents类型ConcurrentMap下的添加函数
+ }
+ // 更新torrent数据
+ public TrackedTorrent putIfAbsentAndUpdate(String hexInfoHash, TrackedTorrent torrent,
+ AnnounceRequestMessage.RequestEvent event, ByteBuffer peerId,
+ String hexPeerId, String ip, int port, long uploaded, long downloaded,
+ long left) throws UnsupportedEncodingException {
+ TrackedTorrent actualTorrent;
+ try {
+ lockFor(hexInfoHash).lock();
+ TrackedTorrent oldTorrent = myTorrents.putIfAbsent(hexInfoHash, torrent);
+ actualTorrent = oldTorrent == null ? torrent : oldTorrent;
+ actualTorrent.update(event, peerId, hexPeerId, ip, port, uploaded, downloaded, left);
+ } finally {
+ lockFor(hexInfoHash).unlock();
+ }
+ return actualTorrent;
+ }
+
+ // 锁,避免阻塞其他torrent操作
+ private ReentrantLock lockFor(String torrentHash) {
+ return myLocks[Math.abs(torrentHash.hashCode()) % myLocks.length];
+ }
+
+ @SuppressWarnings("unused")
+ public void clear() {
+ myTorrents.clear();
+ }
+
+ public void cleanup(int torrentExpireTimeoutSec) {
+ for (TrackedTorrent trackedTorrent : myTorrents.values()) {
+ try {
+ lockFor(trackedTorrent.getHexInfoHash()).lock();
+ trackedTorrent.collectUnfreshPeers(torrentExpireTimeoutSec);
+ if (trackedTorrent.getPeers().size() == 0) {
+ myTorrents.remove(trackedTorrent.getHexInfoHash());
+ }
+ } finally {
+ lockFor(trackedTorrent.getHexInfoHash()).unlock();
+ }
+ }
+ }
+
+
+ public Map<String, TrackedTorrent> getTorrents() {
+ return new HashMap<String, TrackedTorrent>(myTorrents);
+ }
+
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackedPeer.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackedPeer.java
new file mode 100644
index 0000000..d8057c5
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackedPeer.java
@@ -0,0 +1,224 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.Constants;
+import com.turn.ttorrent.bcodec.BEValue;
+import com.turn.ttorrent.common.*;
+import org.slf4j.Logger;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * A BitTorrent tracker peer.
+ * <p>
+ * <p>
+ * Represents a peer exchanging on a given torrent. In this implementation,
+ * we don't really care about the status of the peers and how much they
+ * have downloaded / exchanged because we are not a torrent exchange and
+ * don't need to keep track of what peers are doing while they're
+ * downloading. We only care about when they start, and when they are done.
+ * </p>
+ * <p>
+ * <p>
+ * We also never expire peers automatically. Unless peers send a STOPPED
+ * announce request, they remain as long as the torrent object they are a
+ * part of.
+ * </p>
+ */
+
+// 跟踪peer(对等结点,相当于使用该tracker的用户)
+/* 管理一个TrackedTorrent种子的相关的用户对象,能帮助在下载时候找到对象。
+* 新下载用户完成下载后被标记为完成,可以添加
+* */
+// 管理用户登录时间
+public class TrackedPeer extends Peer {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(TrackedPeer.class);
+
+ private final TimeService myTimeService;
+ private long uploaded;
+ private long downloaded;
+ private long left;
+ private TrackedTorrent torrent;//为什么包含这个实例??
+
+ /**
+ * Represents the state of a peer exchanging on this torrent.
+ * 展示用户和种子间的状态关系
+ * <p>
+ * <p>
+ * Peers can be in the STARTED state, meaning they have announced
+ * themselves to us and are eventually exchanging data with other peers.
+ * Note that a peer starting with a completed file will also be in the
+ * started state and will never notify as being in the completed state.
+ * This information can be inferred from the fact that the peer reports 0
+ * bytes left to download.
+ * </p>
+ * <p>
+ * <p>
+ * Peers enter the COMPLETED state when they announce they have entirely
+ * downloaded the file. As stated above, we may also elect them for this
+ * state if they report 0 bytes left to download.
+ * </p>
+ * <p>
+ * <p>
+ * Peers enter the STOPPED state very briefly before being removed. We
+ * still pass them to the STOPPED state in case someone else kept a
+ * reference on them.
+ * </p>
+ */
+ public enum PeerState {
+ UNKNOWN,
+ STARTED,
+ COMPLETED,
+ STOPPED
+ }
+
+ private PeerState state;
+ private long lastAnnounce; //登录时间记录
+
+ /**
+ * Instantiate a new tracked peer for the given torrent.
+ * 为给定的 torrent 实例化一个新的跟踪对等点。
+ *
+ * @param torrent The torrent this peer exchanges on.
+ * @param ip The peer's IP address.
+ * @param port The peer's port.
+ * @param peerId The byte-encoded peer ID.
+ */
+ public TrackedPeer(TrackedTorrent torrent, String ip, int port, ByteBuffer peerId) {
+ this(torrent, ip, port, peerId, new SystemTimeService());
+ }
+
+ TrackedPeer(TrackedTorrent torrent, String ip, int port,
+ ByteBuffer peerId, TimeService timeService) {
+ super(ip, port, peerId);
+ myTimeService = timeService;
+ this.torrent = torrent;
+
+ // Instantiated peers start in the UNKNOWN state.
+ this.state = PeerState.UNKNOWN;
+ this.lastAnnounce = myTimeService.now();
+
+ this.uploaded = 0;
+ this.downloaded = 0;
+ this.left = 0;
+ }
+
+ /**
+ * Update this peer's state and information.
+ * <p>
+ * <p>
+ * <b>Note:</b> if the peer reports 0 bytes left to download, its state will
+ * be automatically be set to COMPLETED.
+ * </p>
+ *
+ * @param state The peer's state.
+ * @param uploaded Uploaded byte count, as reported by the peer.
+ * @param downloaded Downloaded byte count, as reported by the peer.
+ * @param left Left-to-download byte count, as reported by the peer.
+ */
+ public void update(PeerState state, long uploaded, long downloaded,
+ long left) {
+
+ // 自动检测下载完成
+ if (PeerState.STARTED.equals(state) && left == 0) {
+ state = PeerState.COMPLETED;
+ }
+
+ if (!state.equals(this.state)) {
+ logger.trace("Peer {} {} download of {}.",
+ new Object[]{
+ this,
+ state.name().toLowerCase(),
+ this.torrent,
+ });
+ }
+
+ this.state = state;
+ this.lastAnnounce = myTimeService.now();
+ this.uploaded = uploaded;
+ this.downloaded = downloaded;
+ this.left = left;
+ }
+
+ /**
+ * Tells whether this peer has completed its download and can thus be
+ * considered a seeder.
+ * 告知这个peer是否完成上传,能否作为一个seeder种子持有者
+ */
+ public boolean isCompleted() {
+ return PeerState.COMPLETED.equals(this.state);
+ }
+
+ /**
+ * Returns how many bytes the peer reported it has uploaded so far.
+ */
+ public long getUploaded() {
+ return this.uploaded;
+ }
+
+ /**
+ * Returns how many bytes the peer reported it has downloaded so far.
+ */
+ public long getDownloaded() {
+ return this.downloaded;
+ }
+
+ /**
+ * Returns how many bytes the peer reported it needs to retrieve before
+ * its download is complete.
+ * 目前还剩余多少内容没有下载
+ */
+ public long getLeft() {
+ return this.left;
+ }
+
+ /**
+ * Tells whether this peer has checked in with the tracker recently.
+ * <p>
+ * <p>
+ * Non-fresh peers are automatically terminated and collected by the
+ * Tracker.
+ * 用户是否活跃
+ * </p>
+ */
+ public boolean isFresh(int expireTimeoutSec) {
+ return this.lastAnnounce + expireTimeoutSec * 1000 > myTimeService.now();
+ }
+
+ /**
+ * Returns a BEValue representing this peer for inclusion in an
+ * announce reply from the tracker.
+ * <p>
+ * The returned BEValue is a dictionary containing the peer ID (in its
+ * original byte-encoded form), the peer's IP and the peer's port.
+ */
+ public BEValue toBEValue() throws UnsupportedEncodingException {
+ Map<String, BEValue> peer = new HashMap<String, BEValue>();
+ if (this.hasPeerId()) {
+ peer.put("peer id", new BEValue(this.getPeerIdArray()));
+ }
+ peer.put("ip", new BEValue(this.getIp(), Constants.BYTE_ENCODING));
+ peer.put("port", new BEValue(this.getPort()));
+ return new BEValue(peer);
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackedTorrent.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackedTorrent.java
new file mode 100644
index 0000000..26d8972
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackedTorrent.java
@@ -0,0 +1,303 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.common.*;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage.RequestEvent;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Tracked torrents are torrent for which we don't expect to have data files
+ * for.
+ * <p>
+ * <p>
+ * {@link TrackedTorrent} objects are used by the BitTorrent tracker to
+ * represent a torrent that is announced by the tracker. As such, it is not
+ * expected to point to any valid local data like. It also contains some
+ * additional information used by the tracker to keep track of which peers
+ * exchange on it, etc.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+
+//表示一个被 BitTorrent 跟踪器(tracker)跟踪的种子(torrent)。
+// 这个类并不涉及实际的数据文件,而是用于管理与该种子相关的连接(peers)以及跟踪种子状态的信息。
+// 实现用户获取一个种子的peer值,方便进行分片下载
+
+public class TrackedTorrent implements TorrentHash {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(TrackedTorrent.class);
+
+ /**
+ * Minimum announce interval requested from peers, in seconds.
+ */
+ public static final int MIN_ANNOUNCE_INTERVAL_SECONDS = 5;
+
+ /**
+ * Default number of peers included in a tracker response.
+ */
+ private static final int DEFAULT_ANSWER_NUM_PEERS = 30;
+
+ /**
+ * Default announce interval requested from peers, in seconds.
+ */
+ private static final int DEFAULT_ANNOUNCE_INTERVAL_SECONDS = 10;
+
+ private int answerPeers;
+ private int announceInterval;
+
+ private final byte[] info_hash;
+ //表示种子的哈希值(通过种子文件的元数据得到)。
+ // info 键对应的值生成的 SHA1 哈希, 该哈希值可作为所要请求的资源的标识符
+
+ /**
+ * Peers currently exchanging on this torrent.
+ * 目前服务器中存在的用户
+ */
+ private ConcurrentMap<PeerUID, TrackedPeer> peers;
+
+ /**
+ * Create a new tracked torrent from meta-info binary data.
+ * 创建空TrackedTorrent
+ *
+ * @param info_hash The meta-info byte data.
+ * encoded and hashed back to create the torrent's SHA-1 hash.
+ * available.
+ */
+ public TrackedTorrent(byte[] info_hash) {
+ this.info_hash = info_hash;
+
+ this.peers = new ConcurrentHashMap<PeerUID, TrackedPeer>();
+ this.answerPeers = TrackedTorrent.DEFAULT_ANSWER_NUM_PEERS;
+ this.announceInterval = TrackedTorrent.DEFAULT_ANNOUNCE_INTERVAL_SECONDS;
+ }
+
+ /**
+ * Returns the map of all peers currently exchanging on this torrent.
+ */
+ public Map<PeerUID, TrackedPeer> getPeers() {
+ return this.peers;
+ }
+
+ /**
+ * Add a peer exchanging on this torrent.
+ *
+ * @param peer The new Peer involved with this torrent.
+ */
+ public void addPeer(TrackedPeer peer) {
+ this.peers.put(new PeerUID(peer.getAddress(), this.getHexInfoHash()), peer);
+ }
+
+ public TrackedPeer getPeer(PeerUID peerUID) {
+ return this.peers.get(peerUID);
+ }
+
+ public TrackedPeer removePeer(PeerUID peerUID) {
+ return this.peers.remove(peerUID);
+ }
+
+ /**
+ * Count the number of seeders (peers in the COMPLETED state) on this
+ * torrent.
+ */
+ public int seeders() {
+ int count = 0;
+ for (TrackedPeer peer : this.peers.values()) {
+ if (peer.isCompleted()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Count the number of leechers (non-COMPLETED peers) on this torrent.
+ */
+ public int leechers() {
+ int count = 0;
+ for (TrackedPeer peer : this.peers.values()) {
+ if (!peer.isCompleted()) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
+ * Remove unfresh peers from this torrent.
+ * <p>
+ * <p>
+ * Collect and remove all non-fresh peers from this torrent. This is
+ * usually called by the periodic peer collector of the BitTorrent tracker.
+ * </p>
+ */
+ public void collectUnfreshPeers(int expireTimeoutSec) {
+ for (TrackedPeer peer : this.peers.values()) {
+ if (!peer.isFresh(expireTimeoutSec)) {
+ this.peers.remove(new PeerUID(peer.getAddress(), this.getHexInfoHash()));
+ }
+ }
+ }
+
+ /**
+ * Get the announce interval for this torrent.
+ */
+ public int getAnnounceInterval() {
+ return this.announceInterval;
+ }
+
+ /**
+ * Set the announce interval for this torrent.
+ *
+ * @param interval New announce interval, in seconds.
+ */
+ public void setAnnounceInterval(int interval) {
+ if (interval <= 0) {
+ throw new IllegalArgumentException("Invalid announce interval");
+ }
+
+ this.announceInterval = interval;
+ }
+
+ /**
+ * Update this torrent's swarm from an announce event.
+ * <p>
+ * <p>
+ * This will automatically create a new peer on a 'started' announce event,
+ * and remove the peer on a 'stopped' announce event.
+ * </p>
+ *
+ * @param event The reported event. If <em>null</em>, means a regular
+ * interval announce event, as defined in the BitTorrent specification.
+ * @param peerId The byte-encoded peer ID.
+ * @param hexPeerId The hexadecimal representation of the peer's ID.
+ * @param ip The peer's IP address.
+ * @param port The peer's inbound port.
+ * @param uploaded The peer's reported uploaded byte count.
+ * @param downloaded The peer's reported downloaded byte count.
+ * @param left The peer's reported left to download byte count.
+ * @return The peer that sent us the announce request.
+ */
+ public TrackedPeer update(RequestEvent event, ByteBuffer peerId,
+ String hexPeerId, String ip, int port, long uploaded, long downloaded,
+ long left) throws UnsupportedEncodingException {
+ logger.trace("event {}, Peer: {}:{}", new Object[]{event.getEventName(), ip, port});
+ TrackedPeer peer = null;
+ TrackedPeer.PeerState state = TrackedPeer.PeerState.UNKNOWN;
+
+ PeerUID peerUID = new PeerUID(new InetSocketAddress(ip, port), getHexInfoHash());
+ if (RequestEvent.STARTED.equals(event)) {
+ state = TrackedPeer.PeerState.STARTED;
+ } else if (RequestEvent.STOPPED.equals(event)) {
+ peer = this.removePeer(peerUID);
+ state = TrackedPeer.PeerState.STOPPED;
+ } else if (RequestEvent.COMPLETED.equals(event)) {
+ peer = this.getPeer(peerUID);
+ state = TrackedPeer.PeerState.COMPLETED;
+ } else if (RequestEvent.NONE.equals(event)) {
+ peer = this.getPeer(peerUID);
+ state = TrackedPeer.PeerState.STARTED;
+ } else {
+ throw new IllegalArgumentException("Unexpected announce event type!");
+ }
+
+ if (peer == null) {
+ peer = new TrackedPeer(this, ip, port, peerId);
+ this.addPeer(peer);
+ }
+ peer.update(state, uploaded, downloaded, left);
+ return peer;
+ }
+
+ /**
+ * Get a list of peers we can return in an announce response for this
+ * torrent.
+ *
+ * @param peer The peer making the request, so we can exclude it from the
+ * list of returned peers.
+ * @return A list of peers we can include in an announce response.
+ */
+ public List<Peer> getSomePeers(Peer peer) {
+ List<Peer> peers = new LinkedList<Peer>();
+
+ // Extract answerPeers random peers
+ List<TrackedPeer> candidates = new LinkedList<TrackedPeer>(this.peers.values());
+ Collections.shuffle(candidates);
+
+ int count = 0;
+ for (TrackedPeer candidate : candidates) {
+ // Don't include the requesting peer in the answer.
+ if (peer != null && peer.looksLike(candidate)) {
+ continue;
+ }
+
+ // Only serve at most ANSWER_NUM_PEERS peers
+ if (count++ > this.answerPeers) {
+ break;
+ }
+
+ peers.add(candidate);
+ }
+
+ return peers;
+ }
+
+ /**
+ * Load a tracked torrent from the given torrent file.
+ *
+ * @param torrent The abstract {@link File} object representing the
+ * <tt>.torrent</tt> file to load.
+ * @throws IOException When the torrent file cannot be read.
+ */
+ public static TrackedTorrent load(File torrent) throws IOException {
+
+ TorrentMetadata torrentMetadata = new TorrentParser().parseFromFile(torrent);
+ return new TrackedTorrent(torrentMetadata.getInfoHash());
+ }
+
+
+ // 返回种子哈希值
+ @Override
+ public byte[] getInfoHash() {
+ return this.info_hash;
+ }
+
+ @Override
+ public String getHexInfoHash() {
+ return TorrentUtils.byteArrayToHexString(this.info_hash);
+ }
+
+ @Override
+ public String toString() {
+ return "TrackedTorrent{" +
+ "info_hash=" + getHexInfoHash() +
+ '}';
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/Tracker.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/Tracker.java
new file mode 100644
index 0000000..fc178f9
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/Tracker.java
@@ -0,0 +1,289 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// tracker客户端
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.simpleframework.http.core.ContainerServer;
+import org.simpleframework.transport.connect.Connection;
+import org.simpleframework.transport.connect.SocketConnection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * BitTorrent tracker.
+ * <p>
+ * <p>
+ * The tracker usually listens on port 6969 (the standard BitTorrent tracker
+ * port). Torrents must be registered directly to this tracker with the
+ * {@link #announce(TrackedTorrent torrent)}</code> method.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+
+// 翻译:tracker 监听6969端口,种子被TrackedTorrent类表示
+public class Tracker {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(Tracker.class);// 日志记录
+
+ /**
+ * Request path handled by the tracker announce request handler.
+ * 由跟踪器公告请求处理程序处理的请求路径 保存路径端口ip
+ */
+ public static final String ANNOUNCE_URL = "/announce";
+
+ /**
+ * Default tracker listening port (BitTorrent's default is 6969).
+ */
+ public static final int DEFAULT_TRACKER_PORT = 6969;
+
+ /**
+ * Default server name and version announced by the tracker.
+ */
+ public static final String DEFAULT_VERSION_STRING = "BitTorrent Tracker (ttorrent)";
+
+ private Connection connection;
+
+ /**
+ * The in-memory repository of torrents tracked.
+ */
+ // 种子仓库管理
+ private final TorrentsRepository myTorrentsRepository;
+
+ private PeerCollectorThread myPeerCollectorThread;
+ private boolean stop;
+ private String myAnnounceUrl;
+ private final int myPort;
+ private SocketAddress myBoundAddress = null;
+
+ private final TrackerServiceContainer myTrackerServiceContainer;
+
+ /**
+ * Create a new BitTorrent tracker listening at the given address.
+ *
+ * @throws IOException Throws an <em>IOException</em> if the tracker
+ * cannot be initialized.
+ */
+ //新建tracker的声明 port是运行端口
+ public Tracker(int port) throws IOException {
+ this(port,
+ getDefaultAnnounceUrl(new InetSocketAddress(InetAddress.getLocalHost(), port)).toString()
+ );
+ //调用下一个声明 很多数据是自动生成的,创建tracker的时候只要声明默认端口
+ }
+
+ public Tracker(int port, String announceURL) throws IOException {
+ myPort = port;
+ myAnnounceUrl = announceURL;
+ myTorrentsRepository = new TorrentsRepository(10);
+ final TrackerRequestProcessor requestProcessor = new TrackerRequestProcessor(myTorrentsRepository);
+ myTrackerServiceContainer = new TrackerServiceContainer(requestProcessor,
+ new MultiAnnounceRequestProcessor(requestProcessor));
+ myPeerCollectorThread = new PeerCollectorThread(myTorrentsRepository);
+ }
+
+ public Tracker(int port, String announceURL, TrackerRequestProcessor requestProcessor, TorrentsRepository torrentsRepository) throws IOException {
+ myPort = port;
+ myAnnounceUrl = announceURL;
+ myTorrentsRepository = torrentsRepository;
+ myTrackerServiceContainer = new TrackerServiceContainer(requestProcessor, new MultiAnnounceRequestProcessor(requestProcessor));
+ myPeerCollectorThread = new PeerCollectorThread(myTorrentsRepository);
+ }
+
+ /**
+ * Returns the full announce URL served by this tracker.
+ * <p>
+ * <p>
+ * This has the form http://host:port/announce.
+ * </p>
+ */
+ private static URL getDefaultAnnounceUrl(InetSocketAddress address) {
+ try {
+ return new URL("http",
+ address.getAddress().getCanonicalHostName(),
+ address.getPort(),
+ ANNOUNCE_URL);
+ } catch (MalformedURLException mue) {
+ logger.error("Could not build tracker URL: {}!", mue, mue);
+ }
+
+ return null;
+ }
+
+ public String getAnnounceUrl() {
+ return myAnnounceUrl;
+ }
+
+ public URI getAnnounceURI() {
+ try {
+ URL announceURL = new URL(getAnnounceUrl());
+ if (announceURL != null) {
+ return announceURL.toURI();
+ }
+ } catch (URISyntaxException e) {
+ logger.error("Cannot convert announce URL to URI", e);
+ } catch (MalformedURLException e) {
+ logger.error("Cannot create URL from announceURL", e);
+ }
+ return null;
+ }
+
+ /**
+ * Start the tracker thread.
+ */
+ public void start(final boolean startPeerCleaningThread) throws IOException {
+ logger.info("Starting BitTorrent tracker on {}...",
+ getAnnounceUrl());
+ connection = new SocketConnection(new ContainerServer(myTrackerServiceContainer));
+
+ List<SocketAddress> tries = new ArrayList<SocketAddress>() {{
+ try {
+ add(new InetSocketAddress(InetAddress.getByAddress(new byte[4]), myPort));
+ } catch (Exception ex) {
+ }
+ try {
+ add(new InetSocketAddress(InetAddress.getLocalHost(), myPort));
+ } catch (Exception ex) {
+ }
+ try {
+ add(new InetSocketAddress(InetAddress.getByName(new URL(getAnnounceUrl()).getHost()), myPort));
+ } catch (Exception ex) {
+ }
+ }};
+
+ boolean started = false;
+ for (SocketAddress address : tries) {
+ try {
+ if ((myBoundAddress = connection.connect(address)) != null) {
+ logger.info("Started torrent tracker on {}", address);
+ started = true;
+ break;
+ }
+ } catch (IOException ioe) {
+ logger.info("Can't start the tracker using address{} : ", address.toString(), ioe.getMessage());
+ }
+ }
+ if (!started) {
+ logger.error("Cannot start tracker on port {}. Stopping now...", myPort);
+ stop();
+ return;
+ }
+ if (startPeerCleaningThread) {
+ if (myPeerCollectorThread == null || !myPeerCollectorThread.isAlive() || myPeerCollectorThread.getState() != Thread.State.NEW) {
+ myPeerCollectorThread = new PeerCollectorThread(myTorrentsRepository);
+ }
+
+ myPeerCollectorThread.setName("peer-peerCollectorThread:" + myPort);
+ myPeerCollectorThread.start();
+ }
+ }
+
+ /**
+ * Stop the tracker.
+ * <p>
+ * <p>
+ * This effectively closes the listening HTTP connection to terminate
+ * the service, and interrupts the peer myPeerCollectorThread thread as well.
+ * </p>
+ */
+ public void stop() {
+ this.stop = true;
+
+ try {
+ this.connection.close();
+ logger.info("BitTorrent tracker closed.");
+ } catch (IOException ioe) {
+ logger.error("Could not stop the tracker: {}!", ioe.getMessage());
+ }
+
+ if (myPeerCollectorThread != null && myPeerCollectorThread.isAlive()) {
+ myPeerCollectorThread.interrupt();
+ try {
+ myPeerCollectorThread.join();
+ } catch (InterruptedException e) {
+ //
+ }
+ logger.info("Peer collection terminated.");
+ }
+ }
+
+ /**
+ * Announce a new torrent on this tracker.
+ * <p>
+ * <p>
+ * The fact that torrents must be announced here first makes this tracker a
+ * closed BitTorrent tracker: it will only accept clients for torrents it
+ * knows about, and this list of torrents is managed by the program
+ * instrumenting this Tracker class.
+ * </p>
+ *
+ * @param torrent The Torrent object to start tracking.
+ * @return The torrent object for this torrent on this tracker. This may be
+ * different from the supplied Torrent object if the tracker already
+ * contained a torrent with the same hash.
+ */
+
+ //synchronized 确保多线程访问共享资源不会出错
+ public synchronized TrackedTorrent announce(TrackedTorrent torrent) {
+ TrackedTorrent existing = myTorrentsRepository.getTorrent(torrent.getHexInfoHash());
+
+ if (existing != null) {
+ logger.warn("Tracker already announced torrent with hash {}.", existing.getHexInfoHash());
+ return existing;
+ }
+
+ myTorrentsRepository.putIfAbsent(torrent.getHexInfoHash(), torrent);
+ logger.info("Registered new torrent with hash {}.", torrent.getHexInfoHash());
+ return torrent;
+ }
+
+ /**
+ * Set to true to allow this tracker to track external torrents (i.e. those that were not explicitly announced here).
+ *
+ * @param acceptForeignTorrents true to accept foreign torrents (false otherwise)
+ */
+ public void setAcceptForeignTorrents(boolean acceptForeignTorrents) {
+ myTrackerServiceContainer.setAcceptForeignTorrents(acceptForeignTorrents);
+ }
+
+ /**
+ * @return all tracked torrents.
+ */
+ public Collection<TrackedTorrent> getTrackedTorrents() {
+ return Collections.unmodifiableCollection(myTorrentsRepository.getTorrents().values());
+ }
+
+ public TrackedTorrent getTrackedTorrent(String hash) {
+ return myTorrentsRepository.getTorrent(hash);
+ }
+
+ public void setAnnounceInterval(int announceInterval) {
+ myTrackerServiceContainer.setAnnounceInterval(announceInterval);
+ }
+
+ public void setPeerCollectorExpireTimeout(int expireTimeout) {
+ myPeerCollectorThread.setTorrentExpireTimeoutSec(expireTimeout);
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackerRequestProcessor.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackerRequestProcessor.java
new file mode 100644
index 0000000..afb51fb
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackerRequestProcessor.java
@@ -0,0 +1,330 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.Constants;
+import com.turn.ttorrent.bcodec.BEValue;
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.ErrorMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.MessageValidationException;
+import com.turn.ttorrent.common.protocol.http.HTTPAnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.http.HTTPAnnounceResponseMessage;
+import com.turn.ttorrent.common.protocol.http.HTTPTrackerErrorMessage;
+import org.simpleframework.http.Status;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Tracker service to serve the tracker's announce requests.
+ * announce 客户端和tracker服务器进行通信
+ * <p>
+ * <p>
+ * It only serves announce requests on /announce, and only serves torrents the
+ * {@link Tracker} it serves knows about.
+ * </p>
+ * <p>
+ * <p>
+ * The list of torrents {@see #requestHandler.getTorrentsMap()} is a map of torrent hashes to their
+ * corresponding Torrent objects, and is maintained by the {@link Tracker} this
+ * service is part of. The TrackerRequestProcessor only has a reference to this map, and
+ * does not modify it.
+ * </p>
+ *
+ * @author mpetazzoni
+ * @see <a href="http://wiki.theory.org/BitTorrentSpecification">BitTorrent protocol specification</a>
+ */
+public class TrackerRequestProcessor {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(TrackerRequestProcessor.class);
+
+ /**
+ * The list of announce request URL fields that need to be interpreted as
+ * numeric and thus converted as such in the request message parsing.
+ */
+ private static final String[] NUMERIC_REQUEST_FIELDS =
+ new String[]{
+ "port", "uploaded", "downloaded", "left",
+ "compact", "no_peer_id", "numwant"
+ };
+ private static final int SEEDER_ANNOUNCE_INTERVAL = 150;
+ // seeder通知间隔
+
+ private boolean myAcceptForeignTorrents = true; //default to true
+ private int myAnnounceInterval = 60; //default value
+ private final AddressChecker myAddressChecker; //ip地址检查器
+ private final TorrentsRepository myTorrentsRepository;
+
+
+ /**
+ * Create a new TrackerRequestProcessor serving the given torrents.
+ * 是一个对种子仓库进行管理的类
+ */
+ public TrackerRequestProcessor(TorrentsRepository torrentsRepository) {
+ this(torrentsRepository, new AddressChecker() {
+ @Override
+ public boolean isBadAddress(String ip) {
+ return false;
+ }
+ });
+ }
+
+ public TrackerRequestProcessor(TorrentsRepository torrentsRepository, AddressChecker addressChecker) {
+ myTorrentsRepository = torrentsRepository;
+ myAddressChecker = addressChecker;
+ }
+
+ /**
+ * Process the announce request.
+ * 处理请求
+ * <p>
+ * <p>
+ * This method attemps to read and parse the incoming announce request into
+ * an announce request message, then creates the appropriate announce
+ * response message and sends it back to the client.
+ * </p>
+ */
+ //传入数据uri=request.getAddress().toString(), 是发出请求的客户端地址包含顺便传入的参数,第二个参数是主机地址
+ //该函数的目的是处理种子文件的请求(例如,用户上传或下载种子文件时发送的公告请求)
+ public void process(final String uri, final String hostAddress, RequestHandler requestHandler)
+ throws IOException {
+ // Prepare the response headers.
+
+ /**
+ * Parse the query parameters into an announce request message.
+ *
+ * We need to rely on our own query parsing function because
+ * SimpleHTTP's Query map will contain UTF-8 decoded parameters, which
+ * doesn't work well for the byte-encoded strings we expect.
+ */
+ HTTPAnnounceRequestMessage announceRequest; //包含各种工具操作
+ try {
+ announceRequest = this.parseQuery(uri, hostAddress);
+ } catch (MessageValidationException mve) {
+ LoggerUtils.warnAndDebugDetails(logger, "Unable to parse request message. Request url is {}", uri, mve);
+ serveError(Status.BAD_REQUEST, mve.getMessage(), requestHandler);
+ return;
+ }
+
+ AnnounceRequestMessage.RequestEvent event = announceRequest.getEvent();//获取当前状态,比如完成或者开始,停止
+
+ if (event == null) {
+ event = AnnounceRequestMessage.RequestEvent.NONE;
+ }
+ TrackedTorrent torrent = myTorrentsRepository.getTorrent(announceRequest.getHexInfoHash());
+
+ // The requested torrent must be announced by the tracker if and only if myAcceptForeignTorrents is false
+ if (!myAcceptForeignTorrents && torrent == null) {
+ logger.warn("Requested torrent hash was: {}", announceRequest.getHexInfoHash());
+ serveError(Status.BAD_REQUEST, ErrorMessage.FailureReason.UNKNOWN_TORRENT, requestHandler);
+ return;
+ }
+
+ final boolean isSeeder = (event == AnnounceRequestMessage.RequestEvent.COMPLETED)
+ || (announceRequest.getLeft() == 0);//判断是做中还是下载
+
+ if (myAddressChecker.isBadAddress(announceRequest.getIp())) {//黑名单用户
+ if (torrent == null) {
+ writeEmptyResponse(announceRequest, requestHandler);
+ } else {
+ writeAnnounceResponse(torrent, null, isSeeder, requestHandler);
+ }
+ return;
+ }
+
+ final Peer peer = new Peer(announceRequest.getIp(), announceRequest.getPort());
+
+ try {
+ torrent = myTorrentsRepository.putIfAbsentAndUpdate(announceRequest.getHexInfoHash(),
+ new TrackedTorrent(announceRequest.getInfoHash()),
+ event,
+ ByteBuffer.wrap(announceRequest.getPeerId()),
+ announceRequest.getHexPeerId(),
+ announceRequest.getIp(),
+ announceRequest.getPort(),
+ announceRequest.getUploaded(),
+ announceRequest.getDownloaded(),
+ announceRequest.getLeft());
+ } catch (IllegalArgumentException iae) {
+ LoggerUtils.warnAndDebugDetails(logger, "Unable to update peer torrent. Request url is {}", uri, iae);
+ serveError(Status.BAD_REQUEST, ErrorMessage.FailureReason.INVALID_EVENT, requestHandler);
+ return;
+ }
+
+ // Craft and output the answer
+ writeAnnounceResponse(torrent, peer, isSeeder, requestHandler);
+ }
+
+ private void writeEmptyResponse(HTTPAnnounceRequestMessage announceRequest, RequestHandler requestHandler) throws IOException {
+ HTTPAnnounceResponseMessage announceResponse;
+ try {
+ announceResponse = HTTPAnnounceResponseMessage.craft(
+ myAnnounceInterval,
+ 0,
+ 0,
+ Collections.<Peer>emptyList(),
+ announceRequest.getHexInfoHash());
+ requestHandler.serveResponse(Status.OK.getCode(), Status.OK.getDescription(), announceResponse.getData());
+ } catch (Exception e) {
+ serveError(Status.INTERNAL_SERVER_ERROR, e.getMessage(), requestHandler);
+ }
+ }
+
+ public void setAnnounceInterval(int announceInterval) {
+ myAnnounceInterval = announceInterval;
+ }
+
+ public int getAnnounceInterval() {
+ return myAnnounceInterval;
+ }
+
+ private void writeAnnounceResponse(TrackedTorrent torrent, Peer peer, boolean isSeeder, RequestHandler requestHandler) throws IOException {
+ HTTPAnnounceResponseMessage announceResponse;
+ try {
+ announceResponse = HTTPAnnounceResponseMessage.craft(
+ isSeeder ? SEEDER_ANNOUNCE_INTERVAL : myAnnounceInterval,
+ torrent.seeders(),
+ torrent.leechers(),
+ isSeeder ? Collections.<Peer>emptyList() : torrent.getSomePeers(peer),
+ torrent.getHexInfoHash());
+ requestHandler.serveResponse(Status.OK.getCode(), Status.OK.getDescription(), announceResponse.getData());
+ } catch (Exception e) {
+ serveError(Status.INTERNAL_SERVER_ERROR, e.getMessage(), requestHandler);
+ }
+ }
+
+ /**
+ * Parse the query parameters using our defined BYTE_ENCODING.
+ * <p>
+ * <p>
+ * Because we're expecting byte-encoded strings as query parameters, we
+ * can't rely on SimpleHTTP's QueryParser which uses the wrong encoding for
+ * the job and returns us unparsable byte data. We thus have to implement
+ * our own little parsing method that uses BYTE_ENCODING to decode
+ * parameters from the URI.
+ * </p>
+ * <p>
+ * <p>
+ * <b>Note:</b> array parameters are not supported. If a key is present
+ * multiple times in the URI, the latest value prevails. We don't really
+ * need to implement this functionality as this never happens in the
+ * Tracker HTTP protocol.
+ * </p>
+ *
+ * @param uri
+ * @param hostAddress
+ * @return The {@link AnnounceRequestMessage} representing the client's
+ * announce request.
+ */
+ // 根据客户端传来uri,解析数据,生成一个HTTPAnnounceRequestMessage对象
+ private HTTPAnnounceRequestMessage parseQuery(final String uri, final String hostAddress)
+ throws IOException, MessageValidationException {
+ Map<String, BEValue> params = new HashMap<String, BEValue>(); //客户端传来的参数
+
+ try {
+// String uri = request.getAddress().toString();
+ for (String pair : uri.split("[?]")[1].split("&")) {
+ String[] keyval = pair.split("[=]", 2);
+ if (keyval.length == 1) {
+ this.recordParam(params, keyval[0], null);
+ } else {
+ this.recordParam(params, keyval[0], keyval[1]);
+ }
+ }
+ } catch (ArrayIndexOutOfBoundsException e) {
+ params.clear();
+ }
+
+ // Make sure we have the peer IP, fallbacking on the request's source
+ // address if the peer didn't provide it.
+ if (params.get("ip") == null) {
+ params.put("ip", new BEValue(
+ hostAddress,
+ Constants.BYTE_ENCODING));
+ }
+
+ return HTTPAnnounceRequestMessage.parse(new BEValue(params));
+ }
+
+ private void recordParam(Map<String, BEValue> params, String key, String value) {
+ try {
+ value = URLDecoder.decode(value, Constants.BYTE_ENCODING);
+
+ for (String f : NUMERIC_REQUEST_FIELDS) {
+ if (f.equals(key)) {
+ params.put(key, new BEValue(Long.valueOf(value)));
+ return;
+ }
+ }
+
+ params.put(key, new BEValue(value, Constants.BYTE_ENCODING));
+ } catch (UnsupportedEncodingException uee) {
+ // Ignore, act like parameter was not there
+ }
+ }
+
+ /**
+ * Write a {@link HTTPTrackerErrorMessage} to the response with the given
+ * HTTP status code.
+ *
+ * @param status The HTTP status code to return.
+ * @param error The error reported by the tracker.
+ */
+ private void serveError(Status status, HTTPTrackerErrorMessage error, RequestHandler requestHandler) throws IOException {
+ requestHandler.serveResponse(status.getCode(), status.getDescription(), error.getData());
+ }
+
+ /**
+ * Write an error message to the response with the given HTTP status code.
+ *
+ * @param status The HTTP status code to return.
+ * @param error The error message reported by the tracker.
+ */
+ private void serveError(Status status, String error, RequestHandler requestHandler) throws IOException {
+ this.serveError(status, HTTPTrackerErrorMessage.craft(error), requestHandler);
+ }
+
+ /**
+ * Write a tracker failure reason code to the response with the given HTTP
+ * status code.
+ *
+ * @param status The HTTP status code to return.
+ * @param reason The failure reason reported by the tracker.
+ */
+ private void serveError(Status status, ErrorMessage.FailureReason reason, RequestHandler requestHandler) throws IOException {
+ this.serveError(status, reason.getMessage(), requestHandler);
+ }
+
+ public void setAcceptForeignTorrents(boolean acceptForeignTorrents) {
+ myAcceptForeignTorrents = acceptForeignTorrents;
+ }
+
+ public interface RequestHandler {
+ // 返回http响应
+ void serveResponse(int code, String description, ByteBuffer responseData);
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackerServiceContainer.java b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackerServiceContainer.java
new file mode 100644
index 0000000..6fcecca
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/main/java/com/turn/ttorrent/tracker/TrackerServiceContainer.java
@@ -0,0 +1,113 @@
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.apache.commons.io.IOUtils;
+import org.simpleframework.http.Request;
+import org.simpleframework.http.Response;
+import org.simpleframework.http.core.Container;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+
+/**
+ * @author Sergey.Pak
+ * Date: 8/12/13
+ * Time: 8:25 PM
+ */
+
+// http请求处理器
+public class TrackerServiceContainer implements Container {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(TrackerServiceContainer.class);
+
+ private TrackerRequestProcessor myRequestProcessor;
+ private final MultiAnnounceRequestProcessor myMultiAnnounceRequestProcessor;
+
+ public TrackerServiceContainer(final TrackerRequestProcessor requestProcessor,
+ final MultiAnnounceRequestProcessor multiAnnounceRequestProcessor) {
+ myRequestProcessor = requestProcessor;
+ myMultiAnnounceRequestProcessor = multiAnnounceRequestProcessor;
+ }
+
+ /**
+ * Handle the incoming request on the tracker service.
+ * <p/>
+ * <p>
+ * This makes sure the request is made to the tracker's announce URL, and
+ * delegates handling of the request to the <em>process()</em> method after
+ * preparing the response object.
+ * </p>
+ *
+ * @param request The incoming HTTP request.
+ * @param response The response object.
+ */
+
+ // 处理单个的http请求 或是多个请求
+ @Override
+ public void handle(Request request, final Response response) {
+ // Reject non-announce requests
+ if (!Tracker.ANNOUNCE_URL.equals(request.getPath().toString())) {
+ response.setCode(404);
+ response.setText("Not Found");
+ return;
+ }
+
+ OutputStream body = null;
+ try {
+ body = response.getOutputStream();
+
+ response.set("Content-Type", "text/plain");
+ response.set("Server", "");
+ response.setDate("Date", System.currentTimeMillis());
+
+ if ("GET".equalsIgnoreCase(request.getMethod())) {//单独请求
+
+ myRequestProcessor.process(request.getAddress().toString(), request.getClientAddress().getAddress().getHostAddress(),
+ getRequestHandler(response));
+ } else {//多请求处理
+ myMultiAnnounceRequestProcessor.process(request.getContent(), request.getAddress().toString(),
+ request.getClientAddress().getAddress().getHostAddress(), getRequestHandler(response));
+ }
+ body.flush();
+ } catch (IOException ioe) {
+ logger.info("Error while writing response: {}!", ioe.getMessage());
+ } catch (Throwable t) {
+ LoggerUtils.errorAndDebugDetails(logger, "error in processing request {}", request, t);
+ } finally {
+ IOUtils.closeQuietly(body);
+ }
+
+ }
+
+ private TrackerRequestProcessor.RequestHandler getRequestHandler(final Response response) {
+ return new TrackerRequestProcessor.RequestHandler() {
+ @Override
+ public void serveResponse(int code, String description, ByteBuffer responseData) {
+ response.setCode(code);
+ response.setText(description);
+ try {
+ responseData.rewind();
+ final WritableByteChannel channel = Channels.newChannel(response.getOutputStream());
+ channel.write(responseData);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ };
+ }
+
+ public void setAcceptForeignTorrents(boolean acceptForeignTorrents) {
+ myRequestProcessor.setAcceptForeignTorrents(acceptForeignTorrents);
+ }
+
+ public void setAnnounceInterval(int announceInterval) {
+ myRequestProcessor.setAnnounceInterval(announceInterval);
+ }
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/test/java/com/turn/ttorrent/tracker/MultiAnnounceRequestProcessorTest.java b/ttorrent-master/ttorrent-tracker/src/test/java/com/turn/ttorrent/tracker/MultiAnnounceRequestProcessorTest.java
new file mode 100644
index 0000000..3d7d216
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/test/java/com/turn/ttorrent/tracker/MultiAnnounceRequestProcessorTest.java
@@ -0,0 +1,108 @@
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.TempFiles;
+import com.turn.ttorrent.Utils;
+import com.turn.ttorrent.bcodec.BDecoder;
+import com.turn.ttorrent.bcodec.BEValue;
+import com.turn.ttorrent.common.protocol.TrackerMessage;
+import com.turn.ttorrent.common.protocol.http.HTTPAnnounceResponseMessage;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+@Test
+public class MultiAnnounceRequestProcessorTest {
+
+ private Tracker tracker;
+ private TempFiles tempFiles;
+
+
+ public MultiAnnounceRequestProcessorTest() {
+ if (Logger.getRootLogger().getAllAppenders().hasMoreElements())
+ return;
+ BasicConfigurator.configure(new ConsoleAppender(new PatternLayout("[%d{MMdd HH:mm:ss,SSS}] %6p - %20.20c - %m %n")));
+ Logger.getRootLogger().setLevel(Utils.getLogLevel());
+ }
+
+ @BeforeMethod
+ protected void setUp() throws Exception {
+ tempFiles = new TempFiles();
+ startTracker();
+ }
+
+ public void processCorrectTest() throws TrackerMessage.MessageValidationException, IOException {
+ final URL url = new URL("http://localhost:6969/announce");
+
+ final String urlTemplate = url.toString() +
+ "?info_hash={hash}" +
+ "&peer_id=ABCDEFGHIJKLMNOPQRST" +
+ "&ip={ip}" +
+ "&port={port}" +
+ "&downloaded=1234" +
+ "&left=0" +
+ "&event=started";
+
+ final HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+ StringBuilder requestString = new StringBuilder();
+ for (int i = 0; i < 5; i++) {
+ if (i != 0) {
+ requestString.append("\n");
+ }
+ requestString.append(getUrlFromTemplate(urlTemplate, "1" + i, "127.0.0.1", 6881));
+ }
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "text/plain; charset=UTF-8");
+ connection.setDoOutput(true);
+ connection.getOutputStream().write(requestString.toString().getBytes("UTF-8"));
+
+ final InputStream inputStream = connection.getInputStream();
+
+ final BEValue bdecode = BDecoder.bdecode(inputStream);
+
+ assertEquals(tracker.getTrackedTorrents().size(), 5);
+ assertEquals(bdecode.getList().size(), 5);
+
+ for (BEValue beValue : bdecode.getList()) {
+
+ final HTTPAnnounceResponseMessage responseMessage = HTTPAnnounceResponseMessage.parse(beValue);
+ assertTrue(responseMessage.getPeers().isEmpty());
+ assertEquals(1, responseMessage.getComplete());
+ assertEquals(0, responseMessage.getIncomplete());
+ }
+ }
+
+ private String getUrlFromTemplate(String template, String hash, String ip, int port) {
+ return template.replace("{hash}", hash).replace("{ip}", ip).replace("{port}", String.valueOf(port));
+ }
+
+
+ private void startTracker() throws IOException {
+ this.tracker = new Tracker(6969);
+ tracker.setAnnounceInterval(5);
+ tracker.setPeerCollectorExpireTimeout(10);
+ this.tracker.start(true);
+ }
+
+ private void stopTracker() {
+ this.tracker.stop();
+ }
+
+ @AfterMethod
+ protected void tearDown() throws Exception {
+ stopTracker();
+ tempFiles.cleanup();
+ }
+
+}
diff --git a/ttorrent-master/ttorrent-tracker/src/test/java/com/turn/ttorrent/tracker/TorrentsRepositoryTest.java b/ttorrent-master/ttorrent-tracker/src/test/java/com/turn/ttorrent/tracker/TorrentsRepositoryTest.java
new file mode 100644
index 0000000..5f5b842
--- /dev/null
+++ b/ttorrent-master/ttorrent-tracker/src/test/java/com/turn/ttorrent/tracker/TorrentsRepositoryTest.java
@@ -0,0 +1,168 @@
+package com.turn.ttorrent.tracker;
+
+import com.turn.ttorrent.MockTimeService;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.testng.Assert.*;
+
+@Test
+public class TorrentsRepositoryTest {
+
+ private TorrentsRepository myTorrentsRepository;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ myTorrentsRepository = new TorrentsRepository(10);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+
+ }
+
+ public void testThatTorrentsStoredInRepository() {
+ assertEquals(myTorrentsRepository.getTorrents().size(), 0);
+ final TrackedTorrent torrent = new TrackedTorrent(new byte[]{1, 2, 3});
+
+ myTorrentsRepository.putIfAbsent(torrent.getHexInfoHash(), torrent);
+ assertTrue(myTorrentsRepository.getTorrent(torrent.getHexInfoHash()) == torrent);
+ final TrackedTorrent torrentCopy = new TrackedTorrent(new byte[]{1, 2, 3});
+
+ myTorrentsRepository.putIfAbsent(torrentCopy.getHexInfoHash(), torrentCopy);
+ assertTrue(myTorrentsRepository.getTorrent(torrent.getHexInfoHash()) == torrent);
+ assertEquals(myTorrentsRepository.getTorrents().size(), 1);
+
+ final TrackedTorrent secondTorrent = new TrackedTorrent(new byte[]{3, 2, 1});
+ myTorrentsRepository.putIfAbsent(secondTorrent.getHexInfoHash(), secondTorrent);
+ assertEquals(myTorrentsRepository.getTorrents().size(), 2);
+ }
+
+ public void testPutIfAbsentAndUpdate() throws UnsupportedEncodingException {
+
+ final AtomicBoolean updateInvoked = new AtomicBoolean();
+ TrackedTorrent torrent = new TrackedTorrent(new byte[]{1, 2, 3}) {
+ @Override
+ public TrackedPeer update(AnnounceRequestMessage.RequestEvent event, ByteBuffer peerId, String hexPeerId, String ip, int port, long uploaded, long downloaded, long left) throws UnsupportedEncodingException {
+ updateInvoked.set(true);
+ return super.update(event, peerId, hexPeerId, ip, port, uploaded, downloaded, left);
+ }
+ };
+ myTorrentsRepository.putIfAbsentAndUpdate(torrent.getHexInfoHash(), torrent,
+ AnnounceRequestMessage.RequestEvent.STARTED, ByteBuffer.allocate(5), "0",
+ "127.0.0.1", 6881, 5, 10, 12);
+ assertTrue(updateInvoked.get());
+ assertEquals(torrent.getPeers().size(), 1);
+ final TrackedPeer trackedPeer = torrent.getPeers().values().iterator().next();
+ assertEquals(trackedPeer.getIp(), "127.0.0.1");
+ assertEquals(trackedPeer.getPort(), 6881);
+ assertEquals(trackedPeer.getLeft(), 12);
+ assertEquals(trackedPeer.getDownloaded(), 10);
+ assertEquals(trackedPeer.getUploaded(), 5);
+ }
+
+ public void testThatCleanupDontLockAllTorrentsAndStorage() throws UnsupportedEncodingException {
+
+ final Semaphore cleanFinishLock = new Semaphore(0);
+ final Semaphore cleanStartLock = new Semaphore(0);
+ final TrackedTorrent torrent = new TrackedTorrent(new byte[]{1, 2, 3}) {
+ @Override
+ public void collectUnfreshPeers(int expireTimeoutSec) {
+ cleanStartLock.release();
+ try {
+ if (!cleanFinishLock.tryAcquire(1, TimeUnit.SECONDS)) {
+ fail("can not acquire semaphore");
+ }
+ } catch (InterruptedException e) {
+ fail("can not finish cleanup", e);
+ }
+ }
+ };
+
+ myTorrentsRepository.putIfAbsent(torrent.getHexInfoHash(), torrent);
+ torrent.addPeer(new TrackedPeer(torrent, "127.0.0.1", 6881, ByteBuffer.allocate(10)));
+ assertEquals(myTorrentsRepository.getTorrents().size(), 1);
+
+ final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ try {
+ final Future<Integer> cleanupFuture = executorService.submit(new Callable<Integer>() {
+ @Override
+ public Integer call() throws Exception {
+ myTorrentsRepository.cleanup(1);
+ return 0;
+ }
+ });
+ try {
+ if (!cleanStartLock.tryAcquire(1, TimeUnit.SECONDS)) {
+ fail("cannot acquire semaphore");
+ }
+ } catch (InterruptedException e) {
+ fail("don't received that cleanup is started", e);
+ }
+
+ final TrackedTorrent secondTorrent = new TrackedTorrent(new byte[]{3, 1, 1});
+
+ myTorrentsRepository.putIfAbsentAndUpdate(secondTorrent.getHexInfoHash(), secondTorrent,
+ AnnounceRequestMessage.RequestEvent.STARTED, ByteBuffer.allocate(5), "0",
+ "127.0.0.1", 6881, 0, 0, 1);
+
+ cleanFinishLock.release();
+ try {
+ cleanupFuture.get(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ fail("cleanup was interrupted", e);
+ } catch (ExecutionException e) {
+ fail("cleanup was failed with execution exception", e);
+ } catch (TimeoutException e) {
+ fail("cannot get result from future", e);
+ }
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ public void testThatTorrentsCanRemovedFromStorage() throws UnsupportedEncodingException {
+ TrackedTorrent torrent = new TrackedTorrent(new byte[]{1, 2, 3});
+
+ MockTimeService timeService = new MockTimeService();
+ timeService.setTime(10000);
+ final TrackedPeer peer = new TrackedPeer(torrent, "127.0.0.1", 6881, ByteBuffer.allocate(5), timeService);
+ torrent.addPeer(peer);
+
+ timeService.setTime(15000);
+ final TrackedPeer secondPeer = new TrackedPeer(torrent, "127.0.0.1", 6882, ByteBuffer.allocate(5), timeService);
+ torrent.addPeer(secondPeer);
+
+ myTorrentsRepository.putIfAbsent(torrent.getHexInfoHash(), torrent);
+
+ assertEquals(myTorrentsRepository.getTorrents().size(), 1);
+ assertEquals(torrent.getPeers().size(), 2);
+
+ timeService.setTime(17000);
+ myTorrentsRepository.cleanup(10);
+
+ assertEquals(myTorrentsRepository.getTorrents().size(), 1);
+ assertEquals(torrent.getPeers().size(), 2);
+
+ timeService.setTime(23000);
+ myTorrentsRepository.cleanup(10);
+
+ assertEquals(myTorrentsRepository.getTorrents().size(), 1);
+ assertEquals(torrent.getPeers().size(), 1);
+
+ timeService.setTime(40000);
+ myTorrentsRepository.cleanup(10);
+
+ assertEquals(myTorrentsRepository.getTorrents().size(), 0);
+ assertEquals(torrent.getPeers().size(), 0);
+
+ }
+}