tracker
Change-Id: I8f8ac81f9c4d7c7650cd64d2dade701dc6c11dce
diff --git a/ttorrent-master/ttorrent-client/pom.xml b/ttorrent-master/ttorrent-client/pom.xml
new file mode 100644
index 0000000..3b55515
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/pom.xml
@@ -0,0 +1,46 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>ttorrent/client</name>
+ <url>http://turn.github.com/ttorrent/</url>
+ <artifactId>ttorrent-client</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-bencoding</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-network</artifactId>
+ <version>1.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-common</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-test-api</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/AnnounceableInformationImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/AnnounceableInformationImpl.java
new file mode 100644
index 0000000..fdf0f60
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/AnnounceableInformationImpl.java
@@ -0,0 +1,70 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.TorrentHash;
+
+import java.util.List;
+
+class AnnounceableInformationImpl implements AnnounceableInformation {
+
+ private final long uploaded;
+ private final long downloaded;
+ private final long left;
+ private final TorrentHash torrentHash;
+ private final List<List<String>> announceUrls;
+ private final String announce;
+
+ public AnnounceableInformationImpl(long uploaded,
+ long downloaded,
+ long left,
+ TorrentHash torrentHash,
+ List<List<String>> announceUrls,
+ String announce) {
+ this.uploaded = uploaded;
+ this.downloaded = downloaded;
+ this.left = left;
+ this.torrentHash = torrentHash;
+ this.announceUrls = announceUrls;
+ this.announce = announce;
+ }
+
+ @Override
+ public long getUploaded() {
+ return uploaded;
+ }
+
+ @Override
+ public long getDownloaded() {
+ return downloaded;
+ }
+
+ @Override
+ public long getLeft() {
+ return left;
+ }
+
+ @Override
+ public List<List<String>> getAnnounceList() {
+ return announceUrls;
+ }
+
+ @Override
+ public String getAnnounce() {
+ return announce;
+ }
+
+ @Override
+ public byte[] getInfoHash() {
+ return torrentHash.getInfoHash();
+ }
+
+ @Override
+ public String getHexInfoHash() {
+ return torrentHash.getHexInfoHash();
+ }
+
+ @Override
+ public String toString() {
+ return "announceable torrent " + torrentHash.getHexInfoHash() + " for trackers " + announceUrls;
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/ClientState.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/ClientState.java
new file mode 100644
index 0000000..526106e
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/ClientState.java
@@ -0,0 +1,10 @@
+package com.turn.ttorrent.client;
+
+public enum ClientState {
+ WAITING,
+ VALIDATING,
+ SHARING,
+ SEEDING,
+ ERROR,
+ DONE
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/CommunicationManager.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/CommunicationManager.java
new file mode 100644
index 0000000..4de7ed3
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/CommunicationManager.java
@@ -0,0 +1,865 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ * <p>
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.Constants;
+import com.turn.ttorrent.client.announce.*;
+import com.turn.ttorrent.client.network.CountLimitConnectionAllower;
+import com.turn.ttorrent.client.network.OutgoingConnectionListener;
+import com.turn.ttorrent.client.network.StateChannelListener;
+import com.turn.ttorrent.client.peer.PeerActivityListener;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.client.storage.FairPieceStorageFactory;
+import com.turn.ttorrent.client.storage.FileCollectionStorage;
+import com.turn.ttorrent.client.storage.PieceStorage;
+import com.turn.ttorrent.client.storage.PieceStorageFactory;
+import com.turn.ttorrent.common.*;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.PeerMessage;
+import com.turn.ttorrent.network.*;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.turn.ttorrent.Constants.DEFAULT_SOCKET_CONNECTION_TIMEOUT_MILLIS;
+import static com.turn.ttorrent.common.protocol.AnnounceRequestMessage.RequestEvent.*;
+
+/**
+ * A pure-java BitTorrent client.
+ * <p/>
+ * <p>
+ * A BitTorrent client in its bare essence shares a given torrent. If the
+ * torrent is not complete locally, it will continue to download it. If or
+ * after the torrent is complete, the client may eventually continue to seed it
+ * for other clients.
+ * </p>
+ * <p/>
+ * <p>
+ * This BitTorrent client implementation is made to be simple to embed and
+ * simple to use. First, initialize a ShareTorrent object from a torrent
+ * meta-info source (either a file or a byte array, see
+ * com.turn.ttorrent.SharedTorrent for how to create a SharedTorrent object).
+ * </p>
+ *
+ * @author mpetazzoni
+ *
+ * 实现从torrent元信息源(文件或字节数组均可)初始化一个 ShareTorrent 对象。
+ */
+public class CommunicationManager implements AnnounceResponseListener, PeerActivityListener, Context, ConnectionManagerContext {
+
+ protected static final Logger logger = TorrentLoggerFactory.getLogger(CommunicationManager.class);
+
+ public static final String BITTORRENT_ID_PREFIX = "-TO0042-";
+
+ private AtomicBoolean stop = new AtomicBoolean(false);
+
+ private Announce announce;
+
+ private volatile boolean myStarted = false;
+ private final TorrentLoader myTorrentLoader;
+ private final TorrentsStorage torrentsStorage;
+ private final CountLimitConnectionAllower myInConnectionAllower;
+ private final CountLimitConnectionAllower myOutConnectionAllower;
+ private final AtomicInteger mySendBufferSize;
+ private final AtomicInteger myReceiveBufferSize;
+ private final PeersStorage peersStorage;
+ private volatile ConnectionManager myConnectionManager;
+ private final ExecutorService myExecutorService;
+ private final ExecutorService myPieceValidatorExecutor;
+
+ /**
+ * @param workingExecutor executor service for run connection worker and process incoming data. Must have a pool size at least 2
+ * 处理连接相关 建立来凝结,接收处理数据
+ * @param pieceValidatorExecutor executor service for calculation sha1 hashes of downloaded pieces
+ * 计算下载片段的哈希值
+ */
+ public CommunicationManager(ExecutorService workingExecutor, ExecutorService pieceValidatorExecutor) {
+ this(workingExecutor, pieceValidatorExecutor, new TrackerClientFactoryImpl());
+ }
+
+ /**
+ * @param workingExecutor executor service for run connection worker and process incoming data. Must have a pool size at least 2
+ * @param pieceValidatorExecutor executor service for calculation sha1 hashes of downloaded pieces
+ * @param trackerClientFactory factory which creates instances for communication with tracker
+ */
+ public CommunicationManager(ExecutorService workingExecutor, ExecutorService pieceValidatorExecutor, TrackerClientFactory trackerClientFactory) {
+ this.announce = new Announce(this, trackerClientFactory);// 负责于tracker进行通信的
+ this.torrentsStorage = new TorrentsStorage();// torrent文件存储
+ this.peersStorage = new PeersStorage();// 结点存储
+ this.mySendBufferSize = new AtomicInteger();// 整数 发送字节大小
+ this.myTorrentLoader = new TorrentLoaderImpl(this.torrentsStorage);//将加载的种子转化为分享的种子
+ this.myReceiveBufferSize = new AtomicInteger();// 整数 接收字节大小
+ this.myInConnectionAllower = new CountLimitConnectionAllower(peersStorage);
+ this.myOutConnectionAllower = new CountLimitConnectionAllower(peersStorage);
+ this.myExecutorService = workingExecutor;
+ myPieceValidatorExecutor = pieceValidatorExecutor;
+ }
+
+ /**
+ * Adds torrent to storage, validate downloaded files and start seeding and leeching the torrent
+ *
+ * @param dotTorrentFilePath path to torrent metadata file
+ * @param downloadDirPath path to directory where downloaded files are placed
+ * @return {@link TorrentManager} instance for monitoring torrent state
+ * @throws IOException if IO error occurs in reading metadata file
+ */
+ //通过用户提供的种子文件路径(dotTorrentFilePath)来添加种子到存储中
+ //并将下载的文件保存到指定的本地目录(downloadDirPath)。
+ public TorrentManager addTorrent(String dotTorrentFilePath, String downloadDirPath) throws IOException {
+ return addTorrent(dotTorrentFilePath, downloadDirPath, FairPieceStorageFactory.INSTANCE);
+ }
+
+ /**
+ * Adds torrent to storage with specified listeners, validate downloaded files and start seeding and leeching the torrent
+ *
+ * @param dotTorrentFilePath path to torrent metadata file
+ * @param downloadDirPath path to directory where downloaded files are placed
+ * @param listeners specified listeners
+ * @return {@link TorrentManager} instance for monitoring torrent state
+ * @throws IOException if IO error occurs in reading metadata file
+ */
+ public TorrentManager addTorrent(String dotTorrentFilePath, String downloadDirPath, List<TorrentListener> listeners) throws IOException {
+ return addTorrent(dotTorrentFilePath, downloadDirPath, FairPieceStorageFactory.INSTANCE, listeners);
+ }
+
+ /**
+ * Adds torrent to storage with specified {@link PieceStorageFactory}.
+ * It can be used for skipping initial validation of data
+ *
+ * @param dotTorrentFilePath path to torrent metadata file
+ * @param downloadDirPath path to directory where downloaded files are placed
+ * @param pieceStorageFactory factory for creating {@link PieceStorage}.
+ * @return {@link TorrentManager} instance for monitoring torrent state
+ * @throws IOException if IO error occurs in reading metadata file
+ */
+ public TorrentManager addTorrent(String dotTorrentFilePath,
+ String downloadDirPath,
+ PieceStorageFactory pieceStorageFactory) throws IOException {
+ return addTorrent(dotTorrentFilePath, downloadDirPath, pieceStorageFactory, Collections.<TorrentListener>emptyList());
+ }
+
+ /**
+ * Adds torrent to storage with specified {@link PieceStorageFactory}.
+ * It can be used for skipping initial validation of data
+ *
+ * @param dotTorrentFilePath path to torrent metadata file
+ * @param downloadDirPath path to directory where downloaded files are placed
+ * @param pieceStorageFactory factory for creating {@link PieceStorage}.
+ * @return {@link TorrentManager} instance for monitoring torrent state
+ * @throws IOException if IO error occurs in reading metadata file
+ */
+ public TorrentManager addTorrent(String dotTorrentFilePath,
+ String downloadDirPath,
+ PieceStorageFactory pieceStorageFactory,
+ List<TorrentListener> listeners) throws IOException {
+ FileMetadataProvider metadataProvider = new FileMetadataProvider(dotTorrentFilePath);
+ TorrentMetadata metadata = metadataProvider.getTorrentMetadata();
+ FileCollectionStorage fileCollectionStorage = FileCollectionStorage.create(metadata, new File(downloadDirPath));
+ PieceStorage pieceStorage = pieceStorageFactory.createStorage(metadata, fileCollectionStorage);
+ return addTorrent(metadataProvider, pieceStorage, listeners);
+ }
+
+ /**
+ * Adds torrent to storage with any storage and metadata source
+ *
+ * @param metadataProvider specified metadata source
+ * @param pieceStorage specified storage of pieces
+ * @return {@link TorrentManager} instance for monitoring torrent state
+ * @throws IOException if IO error occurs in reading metadata file
+ */
+ public TorrentManager addTorrent(TorrentMetadataProvider metadataProvider, PieceStorage pieceStorage) throws IOException {
+ return addTorrent(metadataProvider, pieceStorage, Collections.<TorrentListener>emptyList());
+ }
+
+ /**
+ * Adds torrent to storage with any storage, metadata source and specified listeners
+ *
+ * @param metadataProvider specified metadata source
+ * @param pieceStorage specified storage of pieces
+ * @param listeners specified listeners
+ * @return {@link TorrentManager} instance for monitoring torrent state
+ * @throws IOException if IO error occurs in reading metadata file
+ */
+ public TorrentManager addTorrent(TorrentMetadataProvider metadataProvider,
+ PieceStorage pieceStorage,
+ List<TorrentListener> listeners) throws IOException {
+ TorrentMetadata torrentMetadata = metadataProvider.getTorrentMetadata();
+ EventDispatcher eventDispatcher = new EventDispatcher();
+ for (TorrentListener listener : listeners) {
+ eventDispatcher.addListener(listener);
+ }
+ final LoadedTorrentImpl loadedTorrent = new LoadedTorrentImpl(
+ new TorrentStatistic(),
+ metadataProvider,
+ torrentMetadata,
+ pieceStorage,
+ eventDispatcher);
+
+ if (pieceStorage.isFinished()) {
+ loadedTorrent.getTorrentStatistic().setLeft(0);
+ } else {
+ long left = calculateLeft(pieceStorage, torrentMetadata);
+ loadedTorrent.getTorrentStatistic().setLeft(left);
+ }
+ eventDispatcher.multicaster().validationComplete(pieceStorage.getAvailablePieces().cardinality(), torrentMetadata.getPiecesCount());
+
+ this.torrentsStorage.addTorrent(loadedTorrent.getTorrentHash().getHexInfoHash(), loadedTorrent);
+ forceAnnounceAndLogError(loadedTorrent, pieceStorage.isFinished() ? COMPLETED : STARTED);
+ logger.debug(String.format("Added torrent %s (%s)", loadedTorrent, loadedTorrent.getTorrentHash().getHexInfoHash()));
+ return new TorrentManagerImpl(eventDispatcher, loadedTorrent.getTorrentHash());
+ //addtorrent最终返回的数据
+ }
+
+ // 计算剩余大小,不用管
+ private long calculateLeft(PieceStorage pieceStorage, TorrentMetadata torrentMetadata) {
+
+ long size = 0;
+ for (TorrentFile torrentFile : torrentMetadata.getFiles()) {
+ size += torrentFile.size;
+ }
+
+ int pieceLength = torrentMetadata.getPieceLength();
+ long result = 0;
+ BitSet availablePieces = pieceStorage.getAvailablePieces();
+ for (int i = 0; i < torrentMetadata.getPiecesCount(); i++) {
+ if (availablePieces.get(i)) {
+ continue;
+ }
+ result += Math.min(pieceLength, size - i * pieceLength);
+ }
+ return result;
+ }
+
+ private void forceAnnounceAndLogError(LoadedTorrent torrent, AnnounceRequestMessage.RequestEvent event) {
+ try {
+ this.announce.forceAnnounce(torrent.createAnnounceableInformation(), this, event);
+ } catch (IOException e) {
+ logger.warn("unable to force announce torrent {}", torrent);
+ logger.debug("", e);
+ }
+ }
+
+ /**
+ * Removes specified torrent from storage.
+ *
+ * @param torrentHash specified torrent hash
+ */
+
+ //移除任务
+ public void removeTorrent(String torrentHash) {
+ logger.debug("Stopping seeding " + torrentHash);
+ final Pair<SharedTorrent, LoadedTorrent> torrents = torrentsStorage.remove(torrentHash);
+
+ SharedTorrent torrent = torrents.first();
+ if (torrent != null) {
+ torrent.setClientState(ClientState.DONE);
+ torrent.closeFully();
+ }
+ List<SharingPeer> peers = getPeersForTorrent(torrentHash);
+ for (SharingPeer peer : peers) {
+ peer.unbind(true);
+ }
+ sendStopEvent(torrents.second(), torrentHash);
+ }
+
+ private void sendStopEvent(LoadedTorrent loadedTorrent, String torrentHash) {
+ if (loadedTorrent == null) {
+ logger.info("Announceable torrent {} not found in storage after unsuccessful download attempt", torrentHash);
+ return;
+ }
+ forceAnnounceAndLogError(loadedTorrent, STOPPED);
+ }
+
+ /**
+ * set specified announce interval between requests to the tracker
+ *
+ * @param announceInterval announce interval in seconds
+ */
+ public void setAnnounceInterval(final int announceInterval) {
+ announce.setAnnounceInterval(announceInterval);
+ }
+
+ /**
+ * Return the torrent this client is exchanging on.
+ */
+ public Collection<SharedTorrent> getTorrents() {
+ return this.torrentsStorage.activeTorrents();
+ }
+
+ @SuppressWarnings("unused")
+ public URI getDefaultTrackerURI() {
+ return announce.getDefaultTrackerURI();
+ }
+
+ /**
+ * Returns the set of known peers.
+ */
+ public Set<SharingPeer> getPeers() {
+ return new HashSet<SharingPeer>(this.peersStorage.getSharingPeers());
+ }
+
+ public void setMaxInConnectionsCount(int maxConnectionsCount) {
+ this.myInConnectionAllower.setMyMaxConnectionCount(maxConnectionsCount);
+ }
+
+ /**
+ * set ups new receive buffer size, that will be applied to all new connections.
+ * If value is equal or less, than zero, then method doesn't have effect
+ *
+ * @param newSize new size
+ */
+ public void setReceiveBufferSize(int newSize) {
+ myReceiveBufferSize.set(newSize);
+ }
+
+ /**
+ * set ups new send buffer size, that will be applied to all new connections.
+ * If value is equal or less, than zero, then method doesn't have effect
+ *
+ * @param newSize new size
+ */
+ public void setSendBufferSize(int newSize) {
+ mySendBufferSize.set(newSize);
+ }
+
+ public void setMaxOutConnectionsCount(int maxConnectionsCount) {
+ this.myOutConnectionAllower.setMyMaxConnectionCount(maxConnectionsCount);
+ }
+
+ /**
+ * Runs client instance and starts announcing, seeding and downloading of all torrents from storage
+ *
+ * @param bindAddresses list of addresses which are used for sending to the tracker. Current client
+ * must be available for other peers on the addresses
+ * @throws IOException if any io error occurs
+ */
+ public void start(final InetAddress... bindAddresses) throws IOException {
+ start(bindAddresses, Constants.DEFAULT_ANNOUNCE_INTERVAL_SEC, null, new SelectorFactoryImpl());
+ }
+
+ /**
+ * Runs client instance and starts announcing, seeding and downloading of all torrents from storage
+ *
+ * @param bindAddresses list of addresses which are used for sending to the tracker. Current client
+ * must be available for other peers on the addresses
+ * @param defaultTrackerURI default tracker address.
+ * All torrents will be announced not only on the trackers from metadata file but also to this tracker
+ * @throws IOException if any io error occurs
+ */
+ public void start(final InetAddress[] bindAddresses, final URI defaultTrackerURI) throws IOException {
+ start(bindAddresses, Constants.DEFAULT_ANNOUNCE_INTERVAL_SEC, defaultTrackerURI, new SelectorFactoryImpl());
+ }
+
+ public Peer[] getSelfPeers(final InetAddress[] bindAddresses) throws UnsupportedEncodingException {
+ Peer self = peersStorage.getSelf();
+
+ if (self == null) {
+ return new Peer[0];
+ }
+
+ Peer[] result = new Peer[bindAddresses.length];
+ for (int i = 0; i < bindAddresses.length; i++) {
+ final InetAddress bindAddress = bindAddresses[i];
+ final Peer peer = new Peer(new InetSocketAddress(bindAddress.getHostAddress(), self.getPort()));
+ peer.setTorrentHash(self.getHexInfoHash());
+ //if we have more, that one bind address, then only for first set self peer id. For other generate it
+ if (i == 0) {
+ peer.setPeerId(self.getPeerId());
+ } else {
+ final String id = CommunicationManager.BITTORRENT_ID_PREFIX + UUID.randomUUID().toString().split("-")[4];
+ byte[] idBytes = id.getBytes(Constants.BYTE_ENCODING);
+ peer.setPeerId(ByteBuffer.wrap(idBytes));
+ }
+ result[i] = peer;
+ }
+ return result;
+ }
+
+ /**
+ * Runs client instance and starts announcing, seeding and downloading of all torrents from storage
+ *
+ * @param bindAddresses list of addresses which are used for sending to the tracker. Current client
+ * must be available for other peers on the addresses
+ * @param announceIntervalSec default announce interval. This interval can be override by tracker
+ * @param defaultTrackerURI default tracker address.
+ * All torrents will be announced not only on the trackers from metadata file but also to this tracker
+ * @param selectorFactory factory for creating {@link java.nio.channels.Selector} instance.
+ * @throws IOException if any io error occurs
+ */
+ public void start(final InetAddress[] bindAddresses,
+ final int announceIntervalSec,
+ final URI defaultTrackerURI,
+ final SelectorFactory selectorFactory) throws IOException {
+ start(bindAddresses, announceIntervalSec, defaultTrackerURI, selectorFactory,
+ new FirstAvailableChannel(6881, 6889));
+ }
+
+ public void start(final InetAddress[] bindAddresses,
+ final int announceIntervalSec,
+ final URI defaultTrackerURI,
+ final SelectorFactory selectorFactory,
+ final ServerChannelRegister serverChannelRegister) throws IOException {
+ this.myConnectionManager = new ConnectionManager(
+ this,
+ new SystemTimeService(),
+ myInConnectionAllower,
+ myOutConnectionAllower,
+ selectorFactory,
+ mySendBufferSize,
+ myReceiveBufferSize);
+ this.setSocketConnectionTimeout(DEFAULT_SOCKET_CONNECTION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ try {
+ this.myConnectionManager.initAndRunWorker(serverChannelRegister);
+ } catch (IOException e) {
+ LoggerUtils.errorAndDebugDetails(logger, "error in initialization server channel", e);
+ this.stop();
+ return;
+ }
+ final String id = CommunicationManager.BITTORRENT_ID_PREFIX + UUID.randomUUID().toString().split("-")[4];
+ byte[] idBytes = id.getBytes(Constants.BYTE_ENCODING);
+ Peer self = new Peer(new InetSocketAddress(myConnectionManager.getBindPort()), ByteBuffer.wrap(idBytes));
+ peersStorage.setSelf(self);
+ logger.info("BitTorrent client [{}] started and " +
+ "listening at {}:{}...",
+ new Object[]{
+ self.getShortHexPeerId(),
+ self.getIp(),
+ self.getPort()
+ });
+
+ announce.start(defaultTrackerURI, this, getSelfPeers(bindAddresses), announceIntervalSec);
+ this.stop.set(false);
+
+ myStarted = true;
+ }
+
+ /**
+ * Immediately but gracefully stop this client.
+ */
+ public void stop() {
+ this.stop(60, TimeUnit.SECONDS);
+ }
+
+ void stop(int timeout, TimeUnit timeUnit) {
+ boolean wasStopped = this.stop.getAndSet(true);
+ if (wasStopped) return;
+
+ if (!myStarted)
+ return;
+
+ this.myConnectionManager.close();
+
+ logger.trace("try stop announce thread...");
+
+ this.announce.stop();
+
+ logger.trace("announce thread is stopped");
+
+ for (SharedTorrent torrent : this.torrentsStorage.activeTorrents()) {
+ logger.trace("try close torrent {}", torrent);
+ torrent.closeFully();
+ if (torrent.isFinished()) {
+ torrent.setClientState(ClientState.DONE);
+ } else {
+ torrent.setClientState(ClientState.ERROR);
+ }
+ }
+
+ logger.debug("Closing all remaining peer connections...");
+ for (SharingPeer peer : this.peersStorage.getSharingPeers()) {
+ peer.unbind(true);
+ }
+
+ torrentsStorage.clear();
+ logger.info("BitTorrent client signing off.");
+ }
+
+ public void setCleanupTimeout(int timeout, TimeUnit timeUnit) throws IllegalStateException {
+ ConnectionManager connectionManager = this.myConnectionManager;
+ if (connectionManager == null) {
+ throw new IllegalStateException("connection manager is null");
+ }
+ connectionManager.setCleanupTimeout(timeUnit.toMillis(timeout));
+ }
+
+ public void setSocketConnectionTimeout(int timeout, TimeUnit timeUnit) throws IllegalStateException {
+ ConnectionManager connectionManager = this.myConnectionManager;
+ if (connectionManager == null) {
+ throw new IllegalStateException("connection manager is null");
+ }
+ connectionManager.setSocketConnectionTimeout(timeUnit.toMillis(timeout));
+ }
+
+ /**
+ * Tells whether we are a seed for the torrent we're sharing.
+ */
+ public boolean isSeed(String hexInfoHash) {
+ SharedTorrent t = this.torrentsStorage.getTorrent(hexInfoHash);
+ return t != null && t.isComplete();
+ }
+
+ public List<SharingPeer> getPeersForTorrent(String torrentHash) {
+ if (torrentHash == null) return new ArrayList<SharingPeer>();
+
+ List<SharingPeer> result = new ArrayList<SharingPeer>();
+ for (SharingPeer sharingPeer : peersStorage.getSharingPeers()) {
+ if (torrentHash.equals(sharingPeer.getHexInfoHash())) {
+ result.add(sharingPeer);
+ }
+ }
+ return result;
+ }
+
+ public boolean isRunning() {
+ return myStarted;
+ }
+
+ private Collection<SharingPeer> getConnectedPeers() {
+ Set<SharingPeer> result = new HashSet<SharingPeer>();
+ for (SharingPeer peer : this.peersStorage.getSharingPeers()) {
+ if (peer.isConnected()) {
+ result.add(peer);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * @param hash specified torrent hash
+ * @return true if storage contains specified torrent. False otherwise
+ * @see TorrentsStorage#hasTorrent
+ */
+ @SuppressWarnings("unused")
+ public boolean containsTorrentWithHash(String hash) {
+ return torrentsStorage.hasTorrent(hash);
+ }
+
+ @Override
+ public PeersStorage getPeersStorage() {
+ return peersStorage;
+ }
+
+ @Override
+ public TorrentsStorage getTorrentsStorage() {
+ return torrentsStorage;
+ }
+
+ @Override
+ public ExecutorService getExecutor() {
+ return myExecutorService;
+ }
+
+ public ExecutorService getPieceValidatorExecutor() {
+ return myPieceValidatorExecutor;
+ }
+
+ @Override
+ public ConnectionListener newChannelListener() {
+ return new StateChannelListener(this);
+ }
+
+ @Override
+ public SharingPeer createSharingPeer(String host,
+ int port,
+ ByteBuffer peerId,
+ SharedTorrent torrent,
+ ByteChannel channel,
+ String clientIdentifier,
+ int clientVersion) {
+ return new SharingPeer(host, port, peerId, torrent, getConnectionManager(), this, channel, clientIdentifier, clientVersion);
+ }
+
+ @Override
+ public TorrentLoader getTorrentLoader() {
+ return myTorrentLoader;
+ }
+
+
+ /** AnnounceResponseListener handler(s). **********************************/
+
+ /**
+ * Handle an announce response event.
+ *
+ * @param interval The announce interval requested by the tracker.
+ * @param complete The number of seeders on this torrent.
+ * @param incomplete The number of leechers on this torrent.
+ */
+ @Override
+ public void handleAnnounceResponse(int interval, int complete, int incomplete, String hexInfoHash) {
+ final SharedTorrent sharedTorrent = this.torrentsStorage.getTorrent(hexInfoHash);
+ if (sharedTorrent != null) {
+ sharedTorrent.setSeedersCount(complete);
+ sharedTorrent.setLastAnnounceTime(System.currentTimeMillis());
+ }
+ setAnnounceInterval(interval);
+ }
+
+ /**
+ * Handle the discovery of new peers.
+ *
+ * @param peers The list of peers discovered (from the announce response or
+ * any other means like DHT/PEX, etc.).
+ */
+ @Override
+ public void handleDiscoveredPeers(List<Peer> peers, String hexInfoHash) {
+
+ if (peers.size() == 0) return;
+
+ SharedTorrent torrent = torrentsStorage.getTorrent(hexInfoHash);
+
+ if (torrent != null && torrent.isFinished()) return;
+
+ final LoadedTorrent announceableTorrent = torrentsStorage.getLoadedTorrent(hexInfoHash);
+ if (announceableTorrent == null) {
+ logger.info("announceable torrent {} is not found in storage. Maybe it was removed", hexInfoHash);
+ return;
+ }
+
+ if (announceableTorrent.getPieceStorage().isFinished()) return;
+
+ logger.debug("Got {} peer(s) ({}) for {} in tracker response", new Object[]{peers.size(),
+ Arrays.toString(peers.toArray()), hexInfoHash});
+
+ Map<PeerUID, Peer> uniquePeers = new HashMap<PeerUID, Peer>();
+ for (Peer peer : peers) {
+ final PeerUID peerUID = new PeerUID(peer.getAddress(), hexInfoHash);
+ if (uniquePeers.containsKey(peerUID)) continue;
+ uniquePeers.put(peerUID, peer);
+ }
+
+ for (Map.Entry<PeerUID, Peer> e : uniquePeers.entrySet()) {
+
+ PeerUID peerUID = e.getKey();
+ Peer peer = e.getValue();
+ boolean alreadyConnectedToThisPeer = peersStorage.getSharingPeer(peerUID) != null;
+
+ if (alreadyConnectedToThisPeer) {
+ logger.debug("skipping peer {}, because we already connected to this peer", peer);
+ continue;
+ }
+
+ ConnectionListener connectionListener = new OutgoingConnectionListener(
+ this,
+ announceableTorrent.getTorrentHash(),
+ peer.getIp(),
+ peer.getPort());
+
+ logger.debug("trying to connect to the peer {}", peer);
+
+ boolean connectTaskAdded = this.myConnectionManager.offerConnect(
+ new ConnectTask(peer.getIp(),
+ peer.getPort(),
+ connectionListener,
+ new SystemTimeService().now(),
+ Constants.DEFAULT_CONNECTION_TIMEOUT_MILLIS), 1, TimeUnit.SECONDS);
+ if (!connectTaskAdded) {
+ logger.info("can not connect to peer {}. Unable to add connect task to connection manager", peer);
+ }
+ }
+ }
+
+ /**
+ * PeerActivityListener handler(s). *************************************
+ */
+
+ @Override
+ public void handlePeerChoked(SharingPeer peer) { /* Do nothing */ }
+
+ @Override
+ public void handlePeerReady(SharingPeer peer) { /* Do nothing */ }
+
+ @Override
+ public void handlePieceAvailability(SharingPeer peer,
+ Piece piece) { /* Do nothing */ }
+
+ @Override
+ public void handleBitfieldAvailability(SharingPeer peer,
+ BitSet availablePieces) { /* Do nothing */ }
+
+ @Override
+ public void handlePieceSent(SharingPeer peer,
+ Piece piece) { /* Do nothing */ }
+
+ /**
+ * Piece download completion handler.
+ * <p/>
+ * <p>
+ * When a piece is completed, and valid, we announce to all connected peers
+ * that we now have this piece.
+ * </p>
+ * <p/>
+ * <p>
+ * We use this handler to identify when all of the pieces have been
+ * downloaded. When that's the case, we can start the seeding period, if
+ * any.
+ * </p>
+ *
+ * @param peer The peer we got the piece from.
+ * @param piece The piece in question.
+ */
+ @Override
+ public void handlePieceCompleted(final SharingPeer peer, final Piece piece)
+ throws IOException {
+ final SharedTorrent torrent = peer.getTorrent();
+ final String torrentHash = torrent.getHexInfoHash();
+ try {
+ final Future<?> validationFuture = myPieceValidatorExecutor.submit(new Runnable() {
+ @Override
+ public void run() {
+ validatePieceAsync(torrent, piece, torrentHash, peer);
+ }
+ });
+ torrent.markCompletedAndAddValidationFuture(piece, validationFuture);
+ } catch (RejectedExecutionException e) {
+ torrent.markUncompleted(piece);
+ LoggerUtils.warnWithMessageAndDebugDetails(logger, "Unable to submit validation task for torrent {}", torrentHash, e);
+ }
+ }
+
+ private void validatePieceAsync(final SharedTorrent torrent, final Piece piece, String torrentHash, SharingPeer peer) {
+ try {
+ synchronized (piece) {
+
+ if (piece.isValid()) return;
+
+ piece.validate(torrent, piece);
+ if (piece.isValid()) {
+ torrent.notifyPieceDownloaded(piece, peer);
+ piece.finish();
+ // Send a HAVE message to all connected peers, which don't have the piece
+ PeerMessage have = PeerMessage.HaveMessage.craft(piece.getIndex());
+ for (SharingPeer remote : getConnectedPeers()) {
+ if (remote.getTorrent().getHexInfoHash().equals(torrentHash) &&
+ !remote.getAvailablePieces().get(piece.getIndex()))
+ remote.send(have);
+ }
+ peer.pieceDownloaded();
+
+ final boolean isTorrentComplete;
+ synchronized (torrent) {
+ torrent.removeValidationFuture(piece);
+
+ boolean isCurrentPeerSeeder = peer.getAvailablePieces().cardinality() == torrent.getPieceCount();
+ //if it's seeder we will send not interested message when we download full file
+ if (!isCurrentPeerSeeder) {
+ if (torrent.isAllPiecesOfPeerCompletedAndValidated(peer)) {
+ peer.notInteresting();
+ }
+ }
+
+ isTorrentComplete = torrent.isComplete();
+
+ if (isTorrentComplete) {
+ logger.info("Download of {} complete.", torrent.getDirectoryName());
+
+ torrent.finish();
+ }
+ }
+
+ if (isTorrentComplete) {
+
+ LoadedTorrent announceableTorrent = torrentsStorage.getLoadedTorrent(torrentHash);
+
+ if (announceableTorrent == null) return;
+
+ AnnounceableInformation announceableInformation = announceableTorrent.createAnnounceableInformation();
+
+ if (!TorrentUtils.isTrackerLessInfo(announceableInformation)) {
+ try {
+ announce.getCurrentTrackerClient(announceableInformation)
+ .announceAllInterfaces(COMPLETED, true, announceableInformation);
+ } catch (AnnounceException e) {
+ logger.debug("unable to announce torrent {} on tracker {}", torrent, torrent.getAnnounce());
+ }
+ }
+
+ for (SharingPeer remote : getPeersForTorrent(torrentHash)) {
+ remote.notInteresting();
+ }
+
+ }
+ } else {
+ torrent.markUncompleted(piece);
+ logger.info("Downloaded piece #{} from {} was not valid ;-(. Trying another peer", piece.getIndex(), peer);
+ peer.getPoorlyAvailablePieces().set(piece.getIndex());
+ }
+ }
+ } catch (Throwable e) {
+ torrent.markUncompleted(piece);
+ logger.warn("unhandled exception in piece {} validation task", e);
+ }
+ torrent.handlePeerReady(peer);
+ }
+
+ @Override
+ public void handlePeerDisconnected(SharingPeer peer) {
+ Peer p = new Peer(peer.getIp(), peer.getPort());
+ p.setPeerId(peer.getPeerId());
+ p.setTorrentHash(peer.getHexInfoHash());
+ logger.trace("Peer {} disconnected, [{}/{}].",
+ new Object[]{
+ peer,
+ getConnectedPeers().size(),
+ this.peersStorage.getSharingPeers().size()
+ });
+ PeerUID peerUID = new PeerUID(peer.getAddress(), peer.getHexInfoHash());
+ peersStorage.removeSharingPeer(peerUID);
+ }
+
+ @Override
+ public void afterPeerRemoved(SharingPeer peer) {
+ logger.trace("disconnected peer " + peer);
+ torrentsStorage.peerDisconnected(peer.getHexInfoHash());
+ }
+
+ @Override
+ public void handleIOException(SharingPeer peer, IOException ioe) {
+ logger.debug("I/O problem occured when reading or writing piece data for peer {}: {}.", peer, ioe.getMessage());
+
+ peer.unbind(true);
+ }
+
+ @Override
+ public void handleNewPeerConnected(SharingPeer peer) {
+ //do nothing
+ }
+
+ public ConnectionManager getConnectionManager() throws IllegalStateException {
+ ConnectionManager connectionManager = this.myConnectionManager;
+ if (connectionManager == null) {
+ throw new IllegalStateException("connection manager is null");
+ }
+ return connectionManager;
+ }
+
+ public boolean hasStop(){
+ return stop.get();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Context.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Context.java
new file mode 100644
index 0000000..24170e5
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Context.java
@@ -0,0 +1,29 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.network.ChannelListenerFactory;
+
+import java.util.concurrent.ExecutorService;
+
+public interface Context extends SharingPeerFactory, ChannelListenerFactory {
+
+ /**
+ * @return single instance of peers storage
+ */
+ PeersStorage getPeersStorage();
+
+ /**
+ * @return single instance of torrents storage
+ */
+ TorrentsStorage getTorrentsStorage();
+
+ /**
+ * @return executor for handling incoming messages
+ */
+ ExecutorService getExecutor();
+
+ /**
+ * @return single instance for load torrents
+ */
+ TorrentLoader getTorrentLoader();
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/EventDispatcher.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/EventDispatcher.java
new file mode 100644
index 0000000..f031219
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/EventDispatcher.java
@@ -0,0 +1,80 @@
+package com.turn.ttorrent.client;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class EventDispatcher {
+
+ private final List<TorrentListener> listeners;
+ private final TorrentListener notifyer;
+
+ public EventDispatcher() {
+ this.listeners = new CopyOnWriteArrayList<TorrentListener>();
+ this.notifyer = createNotifyer();
+ }
+
+ private TorrentListener createNotifyer() {
+ return new TorrentListener() {
+ @Override
+ public void peerConnected(PeerInformation peerInformation) {
+ for (TorrentListener listener : listeners) {
+ listener.peerConnected(peerInformation);
+ }
+ }
+
+ @Override
+ public void peerDisconnected(PeerInformation peerInformation) {
+ for (TorrentListener listener : listeners) {
+ listener.peerDisconnected(peerInformation);
+ }
+ }
+
+ @Override
+ public void pieceDownloaded(PieceInformation pieceInformation, PeerInformation peerInformation) {
+ for (TorrentListener listener : listeners) {
+ listener.pieceDownloaded(pieceInformation, peerInformation);
+ }
+ }
+
+ @Override
+ public void downloadComplete() {
+ for (TorrentListener listener : listeners) {
+ listener.downloadComplete();
+ }
+ }
+
+ @Override
+ public void pieceReceived(PieceInformation pieceInformation, PeerInformation peerInformation) {
+ for (TorrentListener listener : listeners) {
+ listener.pieceReceived(pieceInformation, peerInformation);
+ }
+ }
+
+ @Override
+ public void downloadFailed(Throwable cause) {
+ for (TorrentListener listener : listeners) {
+ listener.downloadFailed(cause);
+ }
+ }
+
+ @Override
+ public void validationComplete(int validpieces, int totalpieces) {
+ for (TorrentListener listener : listeners) {
+ listener.validationComplete(validpieces, totalpieces);
+ }
+ }
+ };
+ }
+
+ TorrentListener multicaster() {
+ return notifyer;
+ }
+
+ public boolean removeListener(TorrentListener listener) {
+ return listeners.remove(listener);
+ }
+
+ public void addListener(TorrentListener listener) {
+ listeners.add(listener);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/FileMetadataProvider.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/FileMetadataProvider.java
new file mode 100644
index 0000000..10b24dd
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/FileMetadataProvider.java
@@ -0,0 +1,24 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.TorrentMetadata;
+import com.turn.ttorrent.common.TorrentParser;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.File;
+import java.io.IOException;
+
+public class FileMetadataProvider implements TorrentMetadataProvider {
+
+ private final String filePath;
+
+ public FileMetadataProvider(String filePath) {
+ this.filePath = filePath;
+ }
+
+ @NotNull
+ @Override
+ public TorrentMetadata getTorrentMetadata() throws IOException {
+ File file = new File(filePath);
+ return new TorrentParser().parseFromFile(file);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Handshake.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Handshake.java
new file mode 100644
index 0000000..229681c
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Handshake.java
@@ -0,0 +1,190 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.Constants;
+import com.turn.ttorrent.common.TorrentHash;
+import com.turn.ttorrent.common.TorrentUtils;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.text.ParseException;
+
+
+/**
+ * Peer handshake handler.
+ *
+ * @author mpetazzoni
+ * @comments rpilyushin
+ *
+ */
+
+/**
+ * Represents a BitTorrent handshake message.
+ * This class encapsulates the structure and parsing logic for the handshake
+ * that is exchanged between peers when establishing a connection in the
+ * BitTorrent protocol.
+ */
+
+public class Handshake implements TorrentHash {
+
+ // BitTorrent protocol identifier as specified by the BitTorrent specification.
+ public static final String BITTORRENT_PROTOCOL_IDENTIFIER = "BitTorrent protocol";
+ // Base length for a handshake message without the protocol identifier.
+ public static final int BASE_HANDSHAKE_LENGTH = 49;
+
+ // ByteBuffer to store the full handshake data.
+ private ByteBuffer data;
+ // ByteBuffer to store the torrent info hash.
+ private ByteBuffer infoHash;
+ // ByteBuffer to store the peer ID.
+ private ByteBuffer peerId;
+
+ // String to store an identifier for the torrent, not used in the actual handshake message.
+ private String torrentIdentifier;
+
+ // The length of the protocol identifier string in this handshake.
+ private int myPstrlen;
+
+ // Private constructor for internal use to set up the handshake object.
+ private Handshake(ByteBuffer data, ByteBuffer infoHash,
+ ByteBuffer peerId) {
+ this.data = data;
+ this.data.rewind(); // Rewind the buffer to the start for reading.
+
+ this.infoHash = infoHash;
+ this.peerId = peerId;
+ }
+
+ // Returns the raw handshake data as a ByteBuffer.
+ public ByteBuffer getData() {
+ return this.data;
+ }
+
+ // Returns the info hash as a byte array.
+ public byte[] getInfoHash() {
+ return this.infoHash.array();
+ }
+
+ // Returns a hexadecimal string representation of the info hash.
+ public String getHexInfoHash() {
+ return TorrentUtils.byteArrayToHexString(getInfoHash());
+ }
+
+ // Returns the peer ID as a byte array.
+ public byte[] getPeerId() {
+ return this.peerId.array();
+ }
+
+ // Parses a ByteBuffer into a Handshake object, validating the structure of the handshake.
+ public static Handshake parse(ByteBuffer buffer)
+ throws ParseException, UnsupportedEncodingException {
+ // Get the length of the protocol identifier from the first byte.
+ int pstrlen = Byte.valueOf(buffer.get()).intValue();
+ // Check that the length is correct given the remaining data.
+ if (pstrlen < 0 ||
+ buffer.remaining() != BASE_HANDSHAKE_LENGTH + pstrlen - 1) {
+ throw new ParseException("Incorrect handshake message length " +
+ "(pstrlen=" + pstrlen + ") !", 0);
+ }
+
+ // Parse the protocol identifier and validate it.
+ byte[] pstr = new byte[pstrlen];
+ buffer.get(pstr);
+
+ if (!Handshake.BITTORRENT_PROTOCOL_IDENTIFIER.equals(
+ new String(pstr, Constants.BYTE_ENCODING))) {
+ throw new ParseException("Invalid protocol identifier!", 1);
+ }
+
+ // Skip over the reserved bytes, which are not currently used.
+ byte[] reserved = new byte[8];
+ buffer.get(reserved);
+
+ // Parse the info hash and peer ID from the buffer.
+ byte[] infoHash = new byte[20];
+ buffer.get(infoHash);
+ byte[] peerId = new byte[20];
+ buffer.get(peerId);
+ // Return a new handshake object with the parsed data.
+ return new Handshake(buffer, ByteBuffer.wrap(infoHash),
+ ByteBuffer.wrap(peerId));
+ }
+
+ // Additional overloaded parse method which also sets the torrent identifier.
+ public static Handshake parse(ByteBuffer buffer, String torrentIdentifier) throws UnsupportedEncodingException, ParseException {
+ Handshake hs = Handshake.parse(buffer);
+ hs.setTorrentIdentifier(torrentIdentifier);
+ return hs;
+ }
+
+ // Additional overloaded parse method which also sets the protocol identifier length.
+ public static Handshake parse(ByteBuffer buffer, int pstrlen) throws UnsupportedEncodingException, ParseException {
+ Handshake hs = Handshake.parse(buffer);
+ hs.myPstrlen = pstrlen;
+ return hs;
+ }
+
+ // Method to craft a new handshake message given a torrent info hash and peer ID.
+ public static Handshake craft(byte[] torrentInfoHash, byte[] clientPeerId) {
+ try {
+ // Allocate a ByteBuffer with the size of the handshake message.
+ ByteBuffer buffer = ByteBuffer.allocate(
+ Handshake.BASE_HANDSHAKE_LENGTH +
+ Handshake.BITTORRENT_PROTOCOL_IDENTIFIER.length());
+
+ byte[] reserved = new byte[8]; // Reserved bytes, not used.
+ ByteBuffer infoHash = ByteBuffer.wrap(torrentInfoHash);
+ ByteBuffer peerId = ByteBuffer.wrap(clientPeerId);
+
+ // Construct the handshake message in the buffer.
+ buffer.put((byte) Handshake
+ .BITTORRENT_PROTOCOL_IDENTIFIER.length());
+ buffer.put(Handshake
+ .BITTORRENT_PROTOCOL_IDENTIFIER.getBytes(Constants.BYTE_ENCODING));
+ buffer.put(reserved);
+ buffer.put(infoHash);
+ buffer.put(peerId);
+
+ // Return a new handshake object with the constructed message.
+ return new Handshake(buffer, infoHash, peerId);
+ } catch (UnsupportedEncodingException uee) {
+ return null; // In case the encoding is not supported, return null.
+ }
+ }
+
+ // Additional method to craft a handshake message with the torrent identifier set.
+ public static Handshake parse(byte[] torrentInfoHash, byte[] clientPeerId, String torrentIdentifier) throws UnsupportedEncodingException, ParseException {
+ Handshake hs = Handshake.craft(torrentInfoHash, clientPeerId);
+ hs.setTorrentIdentifier(torrentIdentifier);
+ return hs;
+ }
+
+ // Sets the torrent identifier for this handshake.
+ public void setTorrentIdentifier(String torrentIdentifier) {
+ this.torrentIdentifier = torrentIdentifier;
+ }
+
+ // Gets the protocol identifier length for this handshake.
+ public int getPstrlen() {
+ return myPstrlen;
+ }
+
+ // Gets the torrent identifier.
+ public String getTorrentIdentifier() {
+ return torrentIdentifier;
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/LoadedTorrent.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/LoadedTorrent.java
new file mode 100644
index 0000000..765a56c
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/LoadedTorrent.java
@@ -0,0 +1,45 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.storage.PieceStorage;
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.TorrentHash;
+import com.turn.ttorrent.common.TorrentMetadata;
+import com.turn.ttorrent.common.TorrentStatistic;
+import org.jetbrains.annotations.NotNull;
+
+public interface LoadedTorrent {
+
+ /**
+ * @return {@link PieceStorage} where stored available pieces
+ */
+ PieceStorage getPieceStorage();
+
+ /**
+ * @return {@link TorrentMetadata} instance
+ * @throws IllegalStateException if unable to fetch metadata from source
+ * (e.g. source is .torrent file and it was deleted manually)
+ */
+ TorrentMetadata getMetadata() throws IllegalStateException;
+
+ /**
+ * @return new instance of {@link AnnounceableInformation} for announce this torrent to the tracker
+ */
+ @NotNull
+ AnnounceableInformation createAnnounceableInformation();
+
+ /**
+ * @return {@link TorrentStatistic} instance related with this torrent
+ */
+ TorrentStatistic getTorrentStatistic();
+
+ /**
+ * @return hash of this torrent
+ */
+ TorrentHash getTorrentHash();
+
+ /**
+ * @return related {@link EventDispatcher}
+ */
+ EventDispatcher getEventDispatcher();
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/LoadedTorrentImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/LoadedTorrentImpl.java
new file mode 100644
index 0000000..37a6a7a
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/LoadedTorrentImpl.java
@@ -0,0 +1,88 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.storage.PieceStorage;
+import com.turn.ttorrent.common.*;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+public class LoadedTorrentImpl implements LoadedTorrent {
+
+ private final TorrentStatistic torrentStatistic;
+ private final TorrentHash torrentHash;
+ private final List<List<String>> announceUrls;
+ private final String announce;
+ private final PieceStorage pieceStorage;
+ private final TorrentMetadataProvider metadataProvider;
+ private final EventDispatcher eventDispatcher;
+
+ LoadedTorrentImpl(TorrentStatistic torrentStatistic,
+ TorrentMetadataProvider metadataProvider,
+ TorrentMetadata torrentMetadata,
+ PieceStorage pieceStorage,
+ EventDispatcher eventDispatcher) {
+ this.torrentStatistic = torrentStatistic;
+ this.metadataProvider = metadataProvider;
+ this.eventDispatcher = eventDispatcher;
+ torrentHash = new ImmutableTorrentHash(torrentMetadata.getInfoHash());
+ if (torrentMetadata.getAnnounceList() != null) {
+ this.announceUrls = Collections.unmodifiableList(torrentMetadata.getAnnounceList());
+ } else {
+ this.announceUrls = Collections.singletonList(Collections.singletonList(torrentMetadata.getAnnounce()));
+ }
+ this.announce = torrentMetadata.getAnnounce();
+ this.pieceStorage = pieceStorage;
+ }
+
+ @Override
+ public PieceStorage getPieceStorage() {
+ return pieceStorage;
+ }
+
+ @Override
+ public TorrentMetadata getMetadata() {
+ try {
+ return metadataProvider.getTorrentMetadata();
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to fetch torrent metadata from metadata provider: " + metadataProvider, e);
+ }
+ }
+
+ @Override
+ public TorrentStatistic getTorrentStatistic() {
+ return torrentStatistic;
+ }
+
+ @Override
+ @NotNull
+ public AnnounceableInformation createAnnounceableInformation() {
+ return new AnnounceableInformationImpl(
+ torrentStatistic.getUploadedBytes(),
+ torrentStatistic.getDownloadedBytes(),
+ torrentStatistic.getLeftBytes(),
+ torrentHash,
+ announceUrls,
+ announce
+ );
+ }
+
+ @Override
+ public TorrentHash getTorrentHash() {
+ return torrentHash;
+ }
+
+ @Override
+ public EventDispatcher getEventDispatcher() {
+ return eventDispatcher;
+ }
+
+ @Override
+ public String toString() {
+ return "LoadedTorrentImpl{" +
+ "piece storage='" + pieceStorage + '\'' +
+ ", metadata provider='" + metadataProvider + '\'' +
+ '}';
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PeerInformation.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PeerInformation.java
new file mode 100644
index 0000000..5b02c8d
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PeerInformation.java
@@ -0,0 +1,27 @@
+package com.turn.ttorrent.client;
+
+import java.net.InetSocketAddress;
+
+public interface PeerInformation {
+
+ /**
+ * @return {@link InetSocketAddress} of remote peer
+ */
+ InetSocketAddress getAddress();
+
+ /**
+ * @return id of current peer which the peers sent in the handshake
+ */
+ byte[] getId();
+
+ /**
+ * @return client identifier of current peer
+ */
+ String getClientIdentifier();
+
+ /**
+ * @return client version of current peer
+ */
+ int getClientVersion();
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PeersStorage.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PeersStorage.java
new file mode 100644
index 0000000..cc6239a
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PeersStorage.java
@@ -0,0 +1,49 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.PeerUID;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class PeersStorage {
+
+ private volatile Peer self = null;
+ private final ConcurrentHashMap<PeerUID, SharingPeer> connectedSharingPeers;
+
+ public PeersStorage() {
+ this.connectedSharingPeers = new ConcurrentHashMap<PeerUID, SharingPeer>();
+ }
+
+ public Peer getSelf() {
+ return self;
+ }
+
+ public void setSelf(Peer self) {
+ this.self = self;
+ }
+
+ public SharingPeer putIfAbsent(PeerUID peerId, SharingPeer sharingPeer) {
+ return connectedSharingPeers.putIfAbsent(peerId, sharingPeer);
+ }
+
+ public SharingPeer removeSharingPeer(PeerUID peerId) {
+ return connectedSharingPeers.remove(peerId);
+ }
+
+ public SharingPeer getSharingPeer(PeerUID peerId) {
+ return connectedSharingPeers.get(peerId);
+ }
+
+ public void removeSharingPeer(SharingPeer peer) {
+ connectedSharingPeers.values().remove(peer);
+ }
+
+ public Collection<SharingPeer> getSharingPeers() {
+ return new ArrayList<SharingPeer>(connectedSharingPeers.values());
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Piece.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Piece.java
new file mode 100644
index 0000000..d966813
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/Piece.java
@@ -0,0 +1,294 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.client.storage.PieceStorage;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.TorrentUtils;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+
+/**
+ * A torrent piece.
+ *
+ * <p>
+ * This class represents a torrent piece. Torrents are made of pieces, which
+ * are in turn made of blocks that are exchanged using the peer protocol.
+ * The piece length is defined at the torrent level, but the last piece that
+ * makes the torrent might be smaller.
+ * </p>
+ *
+ * <p>
+ * If the torrent has multiple files, pieces can spread across file boundaries.
+ * The TorrentByteStorage abstracts this problem to give Piece objects the
+ * impression of a contiguous, linear byte storage.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+public class Piece implements Comparable<Piece>, PieceInformation {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(Piece.class);
+
+ private final PieceStorage pieceStorage;
+ private final int index;
+ private final long length;
+ private final byte[] hash;
+
+ private volatile boolean valid;
+ private int seen;
+ private ByteBuffer data;
+
+ /**
+ * Initialize a new piece in the byte bucket.
+ *
+ * @param pieceStorage The underlying piece storage bucket.
+ * @param index This piece index in the torrent.
+ * @param length This piece length, in bytes.
+ * @param hash This piece 20-byte SHA1 hash sum.
+ */
+ public Piece(PieceStorage pieceStorage, int index, long length, byte[] hash) {
+ this.pieceStorage = pieceStorage;
+ this.index = index;
+ this.length = length;
+ this.hash = hash;
+
+ // Piece is considered invalid until first check.
+ this.valid = false;
+
+ // Piece start unseen
+ this.seen = 0;
+
+ this.data = null;
+ }
+
+ @Override
+ public int getSize() {
+ return (int)length;
+ }
+
+ /**
+ * Tells whether this piece's data is valid or not.
+ */
+ public boolean isValid() {
+ return this.valid;
+ }
+
+ /**
+ * Returns the index of this piece in the torrent.
+ */
+ public int getIndex() {
+ return this.index;
+ }
+
+ /**
+ * Returns the size, in bytes, of this piece.
+ *
+ * <p>
+ * All pieces, except the last one, are expected to have the same size.
+ * </p>
+ */
+ public long size() {
+ return this.length;
+ }
+
+ /**
+ * Tells whether this piece is available in the current connected peer swarm.
+ */
+ public boolean available() {
+ return this.seen > 0;
+ }
+
+ /**
+ * Mark this piece as being seen at the given peer.
+ *
+ * @param peer The sharing peer this piece has been seen available at.
+ */
+ public void seenAt(SharingPeer peer) {
+ this.seen++;
+ }
+
+ /**
+ * Mark this piece as no longer being available at the given peer.
+ *
+ * @param peer The sharing peer from which the piece is no longer available.
+ */
+ public void noLongerAt(SharingPeer peer) {
+ this.seen--;
+ }
+
+ void setValid(boolean valid) {
+ this.valid = valid;
+ }
+
+ /**
+ * Validates this piece.
+ *
+ * @return Returns true if this piece, as stored in the underlying byte
+ * storage, is valid, i.e. its SHA1 sum matches the one from the torrent
+ * meta-info.
+ */
+ public boolean validate(SharedTorrent torrent, Piece piece) throws IOException {
+
+ logger.trace("Validating {}...", this);
+
+ // TODO: remove cast to int when large ByteBuffer support is
+ // implemented in Java.
+ byte[] pieceBytes = data.array();
+ final byte[] calculatedHash = TorrentUtils.calculateSha1Hash(pieceBytes);
+ this.valid = Arrays.equals(calculatedHash, this.hash);
+ logger.trace("validating result of piece {} is {}", this.index, this.valid);
+
+ return this.isValid();
+ }
+
+ /**
+ * Internal piece data read function.
+ *
+ * <p>
+ * This function will read the piece data without checking if the piece has
+ * been validated. It is simply meant at factoring-in the common read code
+ * from the validate and read functions.
+ * </p>
+ *
+ * @param offset Offset inside this piece where to start reading.
+ * @param length Number of bytes to read from the piece.
+ * @return A byte buffer containing the piece data.
+ * @throws IllegalArgumentException If <em>offset + length</em> goes over
+ * the piece boundary.
+ * @throws IOException If the read can't be completed (I/O error, or EOF
+ * reached, which can happen if the piece is not complete).
+ */
+ private ByteBuffer _read(long offset, long length, ByteBuffer buffer) throws IOException {
+ if (offset + length > this.length) {
+ throw new IllegalArgumentException("Piece#" + this.index +
+ " overrun (" + offset + " + " + length + " > " +
+ this.length + ") !");
+ }
+
+ // TODO: remove cast to int when large ByteBuffer support is
+ // implemented in Java.
+ int position = buffer.position();
+ byte[] bytes = this.pieceStorage.readPiecePart(this.index, (int)offset, (int)length);
+ buffer.put(bytes);
+ buffer.rewind();
+ buffer.limit(bytes.length + position);
+ return buffer;
+ }
+
+ /**
+ * Read a piece block from the underlying byte storage.
+ *
+ * <p>
+ * This is the public method for reading this piece's data, and it will
+ * only succeed if the piece is complete and valid on disk, thus ensuring
+ * any data that comes out of this function is valid piece data we can send
+ * to other peers.
+ * </p>
+ *
+ * @param offset Offset inside this piece where to start reading.
+ * @param length Number of bytes to read from the piece.
+ * @return A byte buffer containing the piece data.
+ * @throws IllegalArgumentException If <em>offset + length</em> goes over
+ * the piece boundary.
+ * @throws IllegalStateException If the piece is not valid when attempting
+ * to read it.
+ * @throws IOException If the read can't be completed (I/O error, or EOF
+ * reached, which can happen if the piece is not complete).
+ */
+ public ByteBuffer read(long offset, int length, ByteBuffer block)
+ throws IllegalArgumentException, IllegalStateException, IOException {
+ if (!this.valid) {
+ throw new IllegalStateException("Attempting to read an " +
+ "known-to-be invalid piece!");
+ }
+
+ return this._read(offset, length, block);
+ }
+
+ /**
+ * Record the given block at the given offset in this piece.
+ *
+ * @param block The ByteBuffer containing the block data.
+ * @param offset The block offset in this piece.
+ */
+ public void record(ByteBuffer block, int offset) {
+ if (this.data == null) {
+ // TODO: remove cast to int when large ByteBuffer support is
+ // implemented in Java.
+ this.data = ByteBuffer.allocate((int) this.length);
+ }
+
+ int pos = block.position();
+ this.data.position(offset);
+ this.data.put(block);
+ block.position(pos);
+ }
+
+ public void finish() throws IOException {
+ this.data.rewind();
+ logger.trace("Recording {}...", this);
+ try {
+ pieceStorage.savePiece(index, this.data.array());
+ } finally {
+ this.data = null;
+ }
+ }
+
+ /**
+ * Return a human-readable representation of this piece.
+ */
+ public String toString() {
+ return String.format("piece#%4d%s",
+ this.index,
+ this.isValid() ? "+" : "-");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Piece) {
+ return this.index == ((Piece) obj).index;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Piece comparison function for ordering pieces based on their
+ * availability.
+ *
+ * @param other The piece to compare with, should not be <em>null</em>.
+ */
+ public int compareTo(Piece other) {
+ // return true for the same pieces, otherwise sort by time seen, then by index;
+ if (this.equals(other)) {
+ return 0;
+ } else if (this.seen == other.seen) {
+ return new Integer(this.index).compareTo(other.index);
+ } else if (this.seen < other.seen) {
+ return -1;
+ } else {
+ return 1;
+ }
+ }
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PieceInformation.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PieceInformation.java
new file mode 100644
index 0000000..7cadaa3
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/PieceInformation.java
@@ -0,0 +1,15 @@
+package com.turn.ttorrent.client;
+
+public interface PieceInformation {
+
+ /**
+ * @return piece index. Indexing starts from zero
+ */
+ int getIndex();
+
+ /**
+ * @return piece size. This value must be equals piece size specified by metadata excluding last piece
+ */
+ int getSize();
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SelectorFactoryImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SelectorFactoryImpl.java
new file mode 100644
index 0000000..e97133e
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SelectorFactoryImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.network.SelectorFactory;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+
+public class SelectorFactoryImpl implements SelectorFactory {
+
+ @Override
+ public Selector newSelector() throws IOException {
+ return Selector.open();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SharedTorrent.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SharedTorrent.java
new file mode 100644
index 0000000..a1b6035
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SharedTorrent.java
@@ -0,0 +1,851 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.Constants;
+import com.turn.ttorrent.client.peer.PeerActivityListener;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.client.storage.PieceStorage;
+import com.turn.ttorrent.client.storage.TorrentByteStorage;
+import com.turn.ttorrent.client.strategy.*;
+import com.turn.ttorrent.common.Optional;
+import com.turn.ttorrent.common.*;
+import org.apache.commons.io.FileUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+
+
+/**
+ * A torrent shared by the BitTorrent client.
+ * <p/>
+ * <p>
+ * The {@link SharedTorrent} class extends the Torrent class with all the data
+ * and logic required by the BitTorrent client implementation.
+ * </p>
+ * <p/>
+ * <p>
+ * <em>Note:</em> this implementation currently only supports single-file
+ * torrents.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+
+// 创建torrent对象
+// 接口继承的代码不用怎么看,反正实现是在文件里的
+public class SharedTorrent implements PeerActivityListener, TorrentMetadata, TorrentInfo {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(SharedTorrent.class);
+
+ private final static RequestStrategy DEFAULT_REQUEST_STRATEGY = new RequestStrategyImplAnyInteresting();
+
+ /**
+ * End-game trigger ratio.
+ *
+ * <p>
+ * Eng-game behavior (requesting already requested pieces from available
+ * and ready peers to try to speed-up the end of the transfer) will only be
+ * enabled when the ratio of completed pieces over total pieces in the
+ * torrent is over this value.
+ * </p>
+ */
+ private static final float ENG_GAME_COMPLETION_RATIO = 0.95f;
+ private static final int END_GAME_STATIC_PIECES_COUNT = 20;
+ private static final long END_GAME_INVOCATION_PERIOD_MS = 2000;
+
+ private final TorrentStatistic myTorrentStatistic;
+
+ private long myLastAnnounceTime = -1;
+ private int mySeedersCount = 0;
+
+ private final PieceStorage pieceStorage;
+ private boolean isFileChannelOpen = false;
+ private final Map<Integer, Future<?>> myValidationFutures;
+ private final TorrentMetadata myTorrentMetadata;
+ private final long myTorrentTotalSize;
+
+ private final int pieceLength;
+ private final ByteBuffer piecesHashes;
+
+ private boolean initialized;
+ private Piece[] pieces;
+ private final BitSet completedPieces;
+ private final BitSet requestedPieces;
+ private final RequestStrategy myRequestStrategy;
+ private final EventDispatcher eventDispatcher;
+
+ private final List<SharingPeer> myDownloaders = new CopyOnWriteArrayList<SharingPeer>();
+ private final EndGameStrategy endGameStrategy = new EndGameStrategyImpl(2);
+ private volatile long endGameEnabledOn = -1;
+
+ private volatile ClientState clientState = ClientState.WAITING;
+ private static final int MAX_VALIDATION_TASK_COUNT = 200;
+ private static final int MAX_REQUESTED_PIECES_PER_TORRENT = 100;
+
+ /**
+ * Create a new shared torrent from meta-info
+ *
+ * @param torrentMetadata The meta-info
+ * @param eventDispatcher
+ */
+ public SharedTorrent(TorrentMetadata torrentMetadata, PieceStorage pieceStorage, RequestStrategy requestStrategy,
+ TorrentStatistic torrentStatistic, EventDispatcher eventDispatcher) {
+ myTorrentMetadata = torrentMetadata;
+ this.pieceStorage = pieceStorage;
+ this.eventDispatcher = eventDispatcher;
+ myTorrentStatistic = torrentStatistic;
+ myValidationFutures = new HashMap<Integer, Future<?>>();
+ long totalSize = 0;
+ for (TorrentFile torrentFile : myTorrentMetadata.getFiles()) {
+ totalSize += torrentFile.size;
+ }
+ myTorrentTotalSize = totalSize;
+ this.myRequestStrategy = requestStrategy;
+
+ this.pieceLength = myTorrentMetadata.getPieceLength();
+ this.piecesHashes = ByteBuffer.wrap(myTorrentMetadata.getPiecesHashes());
+
+ if (this.piecesHashes.capacity() / Constants.PIECE_HASH_SIZE *
+ (long) this.pieceLength < myTorrentTotalSize) {
+ throw new IllegalArgumentException("Torrent size does not " +
+ "match the number of pieces and the piece size!");
+ }
+
+ this.initialized = false;
+ this.pieces = new Piece[0];
+ this.completedPieces = new BitSet(torrentMetadata.getPiecesCount());
+ this.requestedPieces = new BitSet();
+ }
+
+ public static SharedTorrent fromFile(File source, PieceStorage pieceStorage, TorrentStatistic torrentStatistic)
+ throws IOException {
+ byte[] data = FileUtils.readFileToByteArray(source);
+ TorrentMetadata torrentMetadata = new TorrentParser().parse(data);
+ return new SharedTorrent(torrentMetadata, pieceStorage, DEFAULT_REQUEST_STRATEGY, torrentStatistic, new EventDispatcher());
+ }
+
+ private synchronized void closeFileChannelIfNecessary() throws IOException {
+ if (isFileChannelOpen && myDownloaders.size() == 0) {
+ logger.debug("Closing file channel for {} if necessary. Downloaders: {}", getHexInfoHash(), myDownloaders.size());
+ this.pieceStorage.close();
+ isFileChannelOpen = false;
+ }
+ }
+
+ /**
+ * Get the number of bytes uploaded for this torrent.
+ */
+ public long getUploaded() {
+ return myTorrentStatistic.getUploadedBytes();
+ }
+
+ /**
+ * Get the number of bytes downloaded for this torrent.
+ * <p/>
+ * <p>
+ * <b>Note:</b> this could be more than the torrent's length, and should
+ * not be used to determine a completion percentage.
+ * </p>
+ */
+ public long getDownloaded() {
+ return myTorrentStatistic.getDownloadedBytes();
+ }
+
+ /**
+ * Get the number of bytes left to download for this torrent.
+ */
+ public long getLeft() {
+ return myTorrentStatistic.getLeftBytes();
+ }
+
+ public int getSeedersCount() {
+ return mySeedersCount;
+ }
+
+ public void setSeedersCount(int seedersCount) {
+ mySeedersCount = seedersCount;
+ }
+
+ public long getLastAnnounceTime() {
+ return myLastAnnounceTime;
+ }
+
+ public void setLastAnnounceTime(long lastAnnounceTime) {
+ myLastAnnounceTime = lastAnnounceTime;
+ }
+
+ /**
+ * Tells whether this torrent has been fully initialized yet.
+ */
+ public boolean isInitialized() {
+ return this.initialized;
+ }
+
+ /**
+ * Stop the torrent initialization as soon as possible.
+ */
+ public void stop() {
+ }
+
+ /**
+ * Build this torrent's pieces array.
+ * <p/>
+ * <p>
+ * Hash and verify any potentially present local data and create this
+ * torrent's pieces array from their respective hash provided in the
+ * torrent meta-info.
+ * </p>
+ * <p/>
+ * <p>
+ * This function should be called soon after the constructor to initialize
+ * the pieces array.
+ * </p>
+ */
+ public synchronized void init() throws InterruptedException, IOException {
+ setClientState(ClientState.VALIDATING);
+
+ if (this.isInitialized()) {
+ throw new IllegalStateException("Torrent was already initialized!");
+ }
+
+ hashSingleThread();
+
+ this.initialized = true;
+ }
+
+ private void initPieces() {
+ int nPieces = (int) (Math.ceil(
+ (double) myTorrentTotalSize / this.pieceLength));
+ this.pieces = new Piece[nPieces];
+ this.piecesHashes.clear();
+ }
+
+ private void hashSingleThread() {
+ initPieces();
+
+ logger.debug("Analyzing local data for {} with {} threads...",
+ myTorrentMetadata.getDirectoryName(), TorrentCreator.HASHING_THREADS_COUNT);
+ for (int idx = 0; idx < this.pieces.length; idx++) {
+ byte[] hash = new byte[Constants.PIECE_HASH_SIZE];
+ this.piecesHashes.get(hash);
+
+ // The last piece may be shorter than the torrent's global piece
+ // length. Let's make sure we get the right piece length in any
+ // situation.
+ long off = ((long) idx) * this.pieceLength;
+ long len = Math.min(
+ myTorrentTotalSize - off,
+ this.pieceLength);
+
+ Piece piece = new Piece(this.pieceStorage, idx, len, hash
+ );
+ this.pieces[idx] = piece;
+ piece.setValid(pieceStorage.getAvailablePieces().get(idx));
+
+ if (piece.isValid()) {
+ this.completedPieces.set(piece.getIndex());
+ }
+ }
+ }
+
+ public synchronized void close() {
+ logger.trace("Closing torrent", myTorrentMetadata.getDirectoryName());
+ try {
+ this.pieceStorage.close();
+ isFileChannelOpen = false;
+ } catch (IOException ioe) {
+ logger.error("Error closing torrent byte storage: {}",
+ ioe.getMessage());
+ }
+ }
+
+ public synchronized void closeFully() {
+ logger.trace("Closing torrent", myTorrentMetadata.getDirectoryName());
+ try {
+ this.pieceStorage.closeFully();
+ isFileChannelOpen = false;
+ } catch (IOException ioe) {
+ logger.error("Error closing torrent byte storage: {}",
+ ioe.getMessage());
+ }
+ }
+
+ /**
+ * Retrieve a piece object by index.
+ *
+ * @param index The index of the piece in this torrent.
+ */
+ public Piece getPiece(int index) {
+ if (this.pieces == null) {
+ throw new IllegalStateException("Torrent not initialized yet.");
+ }
+
+ if (index >= this.pieces.length) {
+ throw new IllegalArgumentException("Invalid piece index!");
+ }
+
+ return this.pieces[index];
+ }
+
+ /**
+ * Return a copy of the bit field of available pieces for this torrent.
+ * <p/>
+ * <p>
+ * Available pieces are pieces available in the swarm, and it does not
+ * include our own pieces.
+ * </p>
+ */
+ public BitSet getAvailablePieces() {
+ if (!this.isInitialized()) {
+ throw new IllegalStateException("Torrent not yet initialized!");
+ }
+
+ BitSet availablePieces = new BitSet(this.pieces.length);
+
+ synchronized (this.pieces) {
+ for (Piece piece : this.pieces) {
+ if (piece.available()) {
+ availablePieces.set(piece.getIndex());
+ }
+ }
+ }
+
+ return availablePieces;
+ }
+
+ /**
+ * Return a copy of the completed pieces bitset.
+ */
+ public BitSet getCompletedPieces() {
+ if (!this.isInitialized()) {
+ throw new IllegalStateException("Torrent not yet initialized!");
+ }
+
+ return pieceStorage.getAvailablePieces();
+ }
+
+ /**
+ * Tells whether this torrent has been fully downloaded, or is fully
+ * available locally.
+ */
+ public synchronized boolean isComplete() {
+ return this.pieces.length > 0
+ && pieceStorage.getAvailablePieces().cardinality() == myTorrentMetadata.getPiecesCount();
+ }
+
+ /**
+ * Finalize the download of this torrent.
+ * <p/>
+ * <p>
+ * This realizes the final, pre-seeding phase actions on this torrent,
+ * which usually consists in putting the torrent data in their final form
+ * and at their target location.
+ * </p>
+ *
+ * @see TorrentByteStorage#finish
+ */
+ public synchronized void finish() {
+ if (!this.isInitialized()) {
+ throw new IllegalStateException("Torrent not yet initialized!");
+ }
+
+ if (!this.isComplete()) {
+ throw new IllegalStateException("Torrent download is not complete!");
+ }
+
+ eventDispatcher.multicaster().downloadComplete();
+ setClientState(ClientState.SEEDING);
+ }
+
+ public boolean isFinished() {
+ return pieceStorage.getAvailablePieces().cardinality() == myTorrentMetadata.getPiecesCount();
+ }
+
+ public ClientState getClientState() {
+ return this.clientState;
+ }
+
+ public void setClientState(ClientState clientState) {
+ this.clientState = clientState;
+ }
+
+ /**
+ * Mark a piece as completed, decrementing the piece size in bytes from our
+ * left bytes to download counter.
+ */
+ public synchronized void markCompleted(Piece piece) {
+ if (this.completedPieces.get(piece.getIndex())) {
+ return;
+ }
+
+ // A completed piece means that's that much data left to download for
+ // this torrent.
+ myTorrentStatistic.addLeft(-piece.size());
+ this.completedPieces.set(piece.getIndex());
+ if (completedPieces.cardinality() == getPiecesCount()) {
+ logger.info("all pieces are received for torrent {}. Validating...", this);
+ }
+ }
+
+ public synchronized void markUncompleted(Piece piece) {
+ if (!this.completedPieces.get(piece.getIndex())) {
+ return;
+ }
+
+ removeValidationFuture(piece);
+ myTorrentStatistic.addLeft(piece.size());
+ this.completedPieces.clear(piece.getIndex());
+ }
+
+ public synchronized void removeValidationFuture(Piece piece) {
+ myValidationFutures.remove(piece.getIndex());
+ }
+
+ public void notifyPieceDownloaded(Piece piece, SharingPeer peer) {
+ eventDispatcher.multicaster().pieceDownloaded(piece, peer);
+ }
+
+ /** PeerActivityListener handler(s). *************************************/
+
+ /**
+ * Peer choked handler.
+ * <p/>
+ * <p>
+ * When a peer chokes, the requests made to it are canceled and we need to
+ * mark the eventually piece we requested from it as available again for
+ * download tentative from another peer.
+ * </p>
+ *
+ * @param peer The peer that choked.
+ */
+ @Override
+ public synchronized void handlePeerChoked(SharingPeer peer) {
+ Set<Piece> pieces = peer.getRequestedPieces();
+
+ if (pieces.size() > 0) {
+ for (Piece piece : pieces) {
+ this.requestedPieces.set(piece.getIndex(), false);
+ }
+ }
+
+ logger.trace("Peer {} choked, we now have {} outstanding " +
+ "request(s): {}.",
+ new Object[]{
+ peer,
+ this.requestedPieces.cardinality(),
+ this.requestedPieces
+ });
+ }
+
+ /**
+ * Peer ready handler.
+ * <p/>
+ * <p>
+ * When a peer becomes ready to accept piece block requests, select a piece
+ * to download and go for it.
+ * </p>
+ *
+ * @param peer The peer that became ready.
+ */
+ @Override
+ public void handlePeerReady(SharingPeer peer) {
+ initIfNecessary(peer);
+
+ RequestsCollection requestsCollection = getRequestsCollection(peer);
+ requestsCollection.sendAllRequests();
+ }
+
+ @NotNull
+ private synchronized RequestsCollection getRequestsCollection(final SharingPeer peer) {
+ if (myValidationFutures.size() > MAX_VALIDATION_TASK_COUNT) return RequestsCollection.Empty.INSTANCE;
+
+ if (this.requestedPieces.cardinality() > MAX_REQUESTED_PIECES_PER_TORRENT) return RequestsCollection.Empty.INSTANCE;
+
+ int completedAndValidated = pieceStorage.getAvailablePieces().cardinality();
+
+ boolean turnOnEndGame = completedAndValidated > getPiecesCount() * ENG_GAME_COMPLETION_RATIO ||
+ completedAndValidated > getPiecesCount() - END_GAME_STATIC_PIECES_COUNT;
+ if (turnOnEndGame) {
+ long now = System.currentTimeMillis();
+ if (now - END_GAME_INVOCATION_PERIOD_MS > endGameEnabledOn) {
+ endGameEnabledOn = now;
+ logger.info("Running end-game mode, currently available {}/{} pieces",
+ pieceStorage.getAvailablePieces().cardinality(),
+ getPieceCount());
+ return endGameStrategy.collectRequests(pieces, myDownloaders);
+ }
+ return RequestsCollection.Empty.INSTANCE;
+ }
+
+ final BitSet interesting = peer.getAvailablePieces();
+ interesting.andNot(this.completedPieces);
+ interesting.andNot(this.requestedPieces);
+
+ int maxRequestingPieces = Math.min(10, interesting.cardinality());
+ int currentlyDownloading = peer.getDownloadingPiecesCount();
+ Map<Piece, List<SharingPeer>> toRequest = new HashMap<Piece, List<SharingPeer>>();
+ while (currentlyDownloading < maxRequestingPieces) {
+ if (!peer.isConnected()) {
+ break;
+ }
+
+ if (interesting.cardinality() == 0) {
+ return RequestsCollection.Empty.INSTANCE;
+ }
+
+ Piece chosen = myRequestStrategy.choosePiece(interesting, pieces);
+ if (chosen == null) {
+ logger.info("chosen piece is null");
+ break;
+ }
+ this.requestedPieces.set(chosen.getIndex());
+ currentlyDownloading++;
+ toRequest.put(chosen, Collections.singletonList(peer));
+ interesting.clear(chosen.getIndex());
+ }
+
+ return new RequestsCollectionImpl(toRequest);
+ }
+
+ public synchronized void initIfNecessary(SharingPeer peer) {
+ if (!isInitialized()) {
+ try {
+ init();
+ } catch (InterruptedException e) {
+ logger.info("Interrupted init", e);
+ peer.unbind(true);
+ return;
+ } catch (IOException e) {
+ logger.info("IOE during init", e);
+ peer.unbind(true);
+ return;
+ }
+ }
+ }
+
+ /**
+ * Piece availability handler.
+ * <p/>
+ * <p>
+ * Handle updates in piece availability from a peer's HAVE message. When
+ * this happens, we need to mark that piece as available from the peer.
+ * </p>
+ *
+ * @param peer The peer we got the update from.
+ * @param piece The piece that became available.
+ */
+ @Override
+ public void handlePieceAvailability(SharingPeer peer, Piece piece) {
+ boolean isPeerInteresting = !this.completedPieces.get(piece.getIndex()) &&
+ !this.requestedPieces.get(piece.getIndex());
+ if (isPeerInteresting) {
+ peer.interesting();
+ }
+
+ piece.seenAt(peer);
+
+ logger.trace("Peer {} contributes {} piece(s) [{}/{}/{}].",
+ new Object[]{
+ peer,
+ peer.getAvailablePieces().cardinality(),
+ this.completedPieces.cardinality(),
+ this.getAvailablePieces().cardinality(),
+ this.pieces.length
+ });
+
+ if (!peer.isChoked() &&
+ peer.isInteresting() &&
+ !peer.isDownloading()) {
+ this.handlePeerReady(peer);
+ }
+ }
+
+ /**
+ * Bit field availability handler.
+ * <p/>
+ * <p>
+ * Handle updates in piece availability from a peer's BITFIELD message.
+ * When this happens, we need to mark in all the pieces the peer has that
+ * they can be reached through this peer, thus augmenting the global
+ * availability of pieces.
+ * </p>
+ *
+ * @param peer The peer we got the update from.
+ * @param availablePieces The pieces availability bit field of the peer.
+ */
+ @Override
+ public void handleBitfieldAvailability(SharingPeer peer,
+ BitSet availablePieces) {
+ // Determine if the peer is interesting for us or not, and notify it.
+ BitSet interesting = (BitSet) availablePieces.clone();
+ synchronized (this) {
+ interesting.andNot(this.completedPieces);
+ interesting.andNot(this.requestedPieces);
+ }
+ // Record the peer has all the pieces it told us it had.
+ for (int i = availablePieces.nextSetBit(0); i >= 0;
+ i = availablePieces.nextSetBit(i + 1)) {
+ this.pieces[i].seenAt(peer);
+ }
+
+ if (interesting.cardinality() == 0) {
+ peer.notInteresting();
+ } else {
+ peer.interesting();
+ }
+
+ logger.debug("Peer {} contributes {} piece(s), total pieces count: {}.",
+ new Object[]{
+ peer,
+ availablePieces.cardinality(),
+ myTorrentMetadata.getPiecesCount()
+ });
+ }
+
+ public int getDownloadersCount() {
+ return myDownloaders.size();
+ }
+
+ @Override
+ public void afterPeerRemoved(SharingPeer peer) {
+
+ }
+
+ /**
+ * Piece upload completion handler.
+ * <p/>
+ * <p>
+ * When a piece has been sent to a peer, we just record that we sent that
+ * many bytes. If the piece is valid on the peer's side, it will send us a
+ * HAVE message and we'll record that the piece is available on the peer at
+ * that moment (see <code>handlePieceAvailability()</code>).
+ * </p>
+ *
+ * @param peer The peer we got this piece from.
+ * @param piece The piece in question.
+ */
+ @Override
+ public void handlePieceSent(SharingPeer peer, Piece piece) {
+ logger.trace("Completed upload of {} to {}.", piece, peer);
+ myTorrentStatistic.addUploaded(piece.size());
+ }
+
+ /**
+ * Piece download completion handler.
+ * <p/>
+ * <p>
+ * If the complete piece downloaded is valid, we can record in the torrent
+ * completedPieces bit field that we know have this piece.
+ * </p>
+ *
+ * @param peer The peer we got this piece from.
+ * @param piece The piece in question.
+ */
+ @Override
+ public void handlePieceCompleted(SharingPeer peer,
+ Piece piece) throws IOException {
+ // Regardless of validity, record the number of bytes downloaded and
+ // mark the piece as not requested anymore
+ myTorrentStatistic.addDownloaded(piece.size());
+ this.requestedPieces.set(piece.getIndex(), false);
+
+ logger.trace("We now have {} piece(s) and {} outstanding request(s): {}",
+ new Object[]{
+ this.completedPieces.cardinality(),
+ this.requestedPieces.cardinality(),
+ this.requestedPieces
+ });
+ }
+
+ /**
+ * Peer disconnection handler.
+ * <p/>
+ * <p>
+ * When a peer disconnects, we need to mark in all of the pieces it had
+ * available that they can't be reached through this peer anymore.
+ * </p>
+ *
+ * @param peer The peer we got this piece from.
+ */
+ @Override
+ public synchronized void handlePeerDisconnected(SharingPeer peer) {
+ BitSet availablePieces = peer.getAvailablePieces();
+
+ for (int i = availablePieces.nextSetBit(0); i >= 0;
+ i = availablePieces.nextSetBit(i + 1)) {
+ this.pieces[i].noLongerAt(peer);
+ }
+
+ Set<Piece> requested = peer.getRequestedPieces();
+ if (requested != null) {
+ for (Piece piece : requested) {
+ this.requestedPieces.set(piece.getIndex(), false);
+ }
+ }
+
+ myDownloaders.remove(peer);
+
+ try {
+ closeFileChannelIfNecessary();
+ } catch (IOException e) {
+ logger.info("I/O error on attempt to close file storage: " + e.toString());
+ }
+
+ logger.debug("Peer {} went away with {} piece(s) [{}/{}].",
+ new Object[]{
+ peer,
+ availablePieces.cardinality(),
+ this.completedPieces.cardinality(),
+ this.pieces.length
+ });
+ logger.trace("We now have {} piece(s) and {} outstanding request(s): {}",
+ new Object[]{
+ this.completedPieces.cardinality(),
+ this.requestedPieces.cardinality(),
+ this.requestedPieces
+ });
+ eventDispatcher.multicaster().peerDisconnected(peer);
+ }
+
+ @Override
+ public synchronized void handleIOException(SharingPeer peer,
+ IOException ioe) {
+ eventDispatcher.multicaster().downloadFailed(ioe);
+ }
+
+ @Override
+ public synchronized void handleNewPeerConnected(SharingPeer peer) {
+ initIfNecessary(peer);
+ eventDispatcher.multicaster().peerConnected(peer);
+ }
+
+ @Override
+ public String toString() {
+ return "SharedTorrent{" +
+ Arrays.toString(TorrentUtils.getTorrentFileNames(myTorrentMetadata).toArray()) +
+ "}";
+ }
+
+ @Override
+ public String getDirectoryName() {
+ return myTorrentMetadata.getDirectoryName();
+ }
+
+ @Override
+ public List<TorrentFile> getFiles() {
+ return myTorrentMetadata.getFiles();
+ }
+
+ @Nullable
+ @Override
+ public List<List<String>> getAnnounceList() {
+ return myTorrentMetadata.getAnnounceList();
+ }
+
+ @Nullable
+ @Override
+ public String getAnnounce() {
+ return myTorrentMetadata.getAnnounce();
+ }
+
+ @Override
+ public Optional<Long> getCreationDate() {
+ return myTorrentMetadata.getCreationDate();
+ }
+
+ @Override
+ public Optional<String> getComment() {
+ return myTorrentMetadata.getComment();
+ }
+
+ @Override
+ public Optional<String> getCreatedBy() {
+ return myTorrentMetadata.getCreatedBy();
+ }
+
+ @Override
+ public int getPieceLength() {
+ return myTorrentMetadata.getPieceLength();
+ }
+
+ @Override
+ public byte[] getPiecesHashes() {
+ return myTorrentMetadata.getPiecesHashes();
+ }
+
+ @Override
+ public boolean isPrivate() {
+ return myTorrentMetadata.isPrivate();
+ }
+
+ @Override
+ public int getPiecesCount() {
+ return myTorrentMetadata.getPiecesCount();
+ }
+
+ @Override
+ public byte[] getInfoHash() {
+ return myTorrentMetadata.getInfoHash();
+ }
+
+ @Override
+ public String getHexInfoHash() {
+ return myTorrentMetadata.getHexInfoHash();
+ }
+
+ @Override
+ public int getPieceCount() {
+ return getPiecesCount();
+ }
+
+ @Override
+ public long getPieceSize(int pieceIdx) {
+ return getPieceLength();
+ }
+
+ public synchronized void savePieceAndValidate(Piece p) throws IOException {
+// p.finish();
+ }
+
+ public synchronized void markCompletedAndAddValidationFuture(Piece piece, Future<?> validationFuture) {
+ this.markCompleted(piece);
+ myValidationFutures.put(piece.getIndex(), validationFuture);
+ }
+
+ public synchronized boolean isAllPiecesOfPeerCompletedAndValidated(SharingPeer peer) {
+ final BitSet availablePieces = peer.getAvailablePieces();
+ for (Piece piece : pieces) {
+ final boolean peerHaveCurrentPiece = availablePieces.get(piece.getIndex());
+ if (!peerHaveCurrentPiece) continue;
+ if (!completedPieces.get(piece.getIndex())) return false;
+ if (myValidationFutures.get(piece.getIndex()) != null) return false;
+ }
+ return true;
+ }
+
+ public void addConnectedPeer(SharingPeer sharingPeer) {
+ myDownloaders.add(sharingPeer);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SharingPeerFactory.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SharingPeerFactory.java
new file mode 100644
index 0000000..f5263e1
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SharingPeerFactory.java
@@ -0,0 +1,18 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.peer.SharingPeer;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+public interface SharingPeerFactory {
+
+ SharingPeer createSharingPeer(String host,
+ int port,
+ ByteBuffer peerId,
+ SharedTorrent torrent,
+ ByteChannel channel,
+ String clientIdentifier,
+ int clientVersion);
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SimpleClient.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SimpleClient.java
new file mode 100644
index 0000000..cc43ccd
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/SimpleClient.java
@@ -0,0 +1,122 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.TorrentMetadata;
+import com.turn.ttorrent.common.TorrentStatistic;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+// 客户端
+public class SimpleClient {
+
+ private final static int DEFAULT_EXECUTOR_SIZE = 10;
+ private final CommunicationManager communicationManager;
+
+ public SimpleClient() {
+ this(DEFAULT_EXECUTOR_SIZE, DEFAULT_EXECUTOR_SIZE);// 构造函数重载,载入下面的传入两个函数的函数
+ }
+
+ public SimpleClient(int workingExecutorSize, int validatorExecutorSize) {
+ communicationManager = new CommunicationManager(Executors.newFixedThreadPool(workingExecutorSize), Executors.newFixedThreadPool(validatorExecutorSize));
+ }
+
+ public void stop() {
+ stop(60, TimeUnit.SECONDS);
+ }
+
+ public void stop(int timeout, TimeUnit timeUnit) {
+ communicationManager.stop(timeout, timeUnit);
+ Exception interruptedException = null;
+ boolean anyFailedByTimeout = false;
+ for (ExecutorService executorService : Arrays.asList(
+ communicationManager.getExecutor(),
+ communicationManager.getPieceValidatorExecutor())) {
+ executorService.shutdown();
+
+ //if the thread is already interrupted don't try to await termination
+ if (Thread.currentThread().isInterrupted()) continue;
+
+ try {
+ if (!executorService.awaitTermination(timeout, timeUnit)) {
+ anyFailedByTimeout = true;
+ }
+ } catch (InterruptedException e) {
+ interruptedException = e;
+ }
+ }
+ if (interruptedException != null) {
+ throw new RuntimeException("Thread was interrupted, " +
+ "shutdown methods are invoked but maybe tasks are not finished yet", interruptedException);
+ }
+ if (anyFailedByTimeout)
+ throw new RuntimeException("At least one executor was not fully shutdown because timeout was elapsed");
+
+ }
+
+ //torrentFile 是种子路径
+ public void downloadTorrent(String torrentFile, String downloadDir, InetAddress iPv4Address) throws IOException, InterruptedException {
+ communicationManager.start(iPv4Address);
+ final Semaphore semaphore = new Semaphore(0);
+ List<TorrentListener> listeners = Collections.<TorrentListener>singletonList(
+ new TorrentListenerWrapper() {
+
+ @Override
+ public void validationComplete(int validpieces, int totalpieces) {
+ if (validpieces == totalpieces) semaphore.release();
+ }
+
+ @Override
+ public void downloadComplete() {
+ semaphore.release();
+ }
+ }
+ );
+ TorrentManager torrentManager = communicationManager.addTorrent(torrentFile, downloadDir, listeners);
+ semaphore.acquire();
+ }
+
+ private TorrentManager startDownloading(String torrentFile, String downloadDir, InetAddress iPv4Address) throws IOException {
+ communicationManager.start(iPv4Address);
+ return communicationManager.addTorrent(torrentFile, downloadDir);
+ }
+
+ public TorrentManager downloadTorrentAsync(String torrentFile,
+ String downloadDir,
+ InetAddress iPv4Address) throws IOException {
+ return startDownloading(torrentFile, downloadDir, iPv4Address);
+ }
+
+
+ /**
+ * Get statistics for a given torrent file
+ * @param dotTorrentFilePath
+ * @return
+ * @throws IOException If unable to get torrent metadata
+ * @throws IllegalStateException If the torrent has not been loaded
+ */
+ public TorrentStatistic getStatistics(String dotTorrentFilePath) throws IOException {
+ FileMetadataProvider metadataProvider = new FileMetadataProvider(dotTorrentFilePath);
+ TorrentMetadata metadata = metadataProvider.getTorrentMetadata();
+ LoadedTorrent loadedTorrent = communicationManager.getTorrentsStorage().getLoadedTorrent(metadata.getHexInfoHash());
+ if (loadedTorrent != null) {
+ return new TorrentStatistic(loadedTorrent.getTorrentStatistic());
+ }
+
+ throw new IllegalStateException("Torrent has not been loaded yet");
+
+ }
+
+
+ public boolean hasStop() {
+ return communicationManager.hasStop();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentConnectionListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentConnectionListener.java
new file mode 100644
index 0000000..2c346ce
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentConnectionListener.java
@@ -0,0 +1,17 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.TorrentHash;
+
+import java.nio.channels.SocketChannel;
+
+/**
+ * @author Sergey.Pak
+ * Date: 9/9/13
+ * Time: 7:46 PM
+ */
+public interface TorrentConnectionListener {
+
+ boolean hasTorrent(TorrentHash torrentHash);
+
+ void handleNewPeerConnection(SocketChannel s, byte[] peerId, String hexInfoHash);
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentListener.java
new file mode 100644
index 0000000..dcab6dc
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentListener.java
@@ -0,0 +1,57 @@
+package com.turn.ttorrent.client;
+
+public interface TorrentListener {
+
+ /**
+ * Invoked when connection with peer is established
+ *
+ * @param peerInformation specified information about peer
+ */
+ void peerConnected(PeerInformation peerInformation);
+
+ /**
+ * Invoked when connection with peer is closed.
+ *
+ * @param peerInformation specified information about peer
+ */
+ void peerDisconnected(PeerInformation peerInformation);
+
+ /**
+ * Invoked when piece is downloaded and validated
+ *
+ * @param pieceInformation specified information about piece
+ * @param peerInformation specified information about peer
+ */
+ void pieceDownloaded(PieceInformation pieceInformation, PeerInformation peerInformation);
+
+ /**
+ * Invoked when downloading is fully downloaded (last piece is received and validated)
+ */
+ void downloadComplete();
+
+ /**
+ * invoked when piece is downloaded but not validated yet
+ *
+ * @param pieceInformation specified information about piece
+ * @param peerInformation specified information about peer
+ */
+ void pieceReceived(PieceInformation pieceInformation, PeerInformation peerInformation);
+
+ /**
+ * Invoked when download was failed with any exception (e.g. some runtime exception or i/o exception in file operation).
+ *
+ * @param cause specified exception
+ */
+ void downloadFailed(Throwable cause);
+
+ /**
+ * Invoked when validation of torrent is done.
+ * If total pieces count and valid pieces count are equals it means that torrent is fully downloaded.
+ * {@link #downloadComplete()} listener will not be invoked in this case
+ *
+ * @param validpieces count of valid pieces. Must be not greater as #totalpieces
+ * @param totalpieces total pieces count in torrent
+ */
+ void validationComplete(int validpieces, int totalpieces);
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentListenerWrapper.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentListenerWrapper.java
new file mode 100644
index 0000000..57356fb
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentListenerWrapper.java
@@ -0,0 +1,39 @@
+package com.turn.ttorrent.client;
+
+public class TorrentListenerWrapper implements TorrentListener {
+
+ @Override
+ public void peerConnected(PeerInformation peerInformation) {
+
+ }
+
+ @Override
+ public void peerDisconnected(PeerInformation peerInformation) {
+
+ }
+
+ @Override
+ public void pieceDownloaded(PieceInformation pieceInformation, PeerInformation peerInformation) {
+
+ }
+
+ @Override
+ public void downloadComplete() {
+
+ }
+
+ @Override
+ public void downloadFailed(Throwable cause) {
+
+ }
+
+ @Override
+ public void pieceReceived(PieceInformation pieceInformation, PeerInformation peerInformation) {
+
+ }
+
+ @Override
+ public void validationComplete(int validpieces, int totalpieces) {
+
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentLoader.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentLoader.java
new file mode 100644
index 0000000..4b15f7d
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentLoader.java
@@ -0,0 +1,19 @@
+package com.turn.ttorrent.client;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+
+public interface TorrentLoader {
+
+ /**
+ * Creates or finds shared torrent instance for specified announceable torrent and return it
+ *
+ * @param loadedTorrent specified torrent
+ * @return shared torrent instance associated with current announceable torrent
+ * @throws IOException if any io error occurs
+ */
+ @NotNull
+ SharedTorrent loadTorrent(@NotNull LoadedTorrent loadedTorrent) throws IOException;
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentLoaderImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentLoaderImpl.java
new file mode 100644
index 0000000..15bfbe3
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentLoaderImpl.java
@@ -0,0 +1,47 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.strategy.RequestStrategyImplAnyInteresting;
+import com.turn.ttorrent.common.TorrentMetadata;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+
+//实现加载SharedTorrent根据给定的 LoadedTorrent 对象,加载一个 SharedTorrent 对象
+public class TorrentLoaderImpl implements TorrentLoader {
+
+ @NotNull
+ private final TorrentsStorage myTorrentsStorage;
+
+ public TorrentLoaderImpl(@NotNull TorrentsStorage torrentsStorage) {
+ myTorrentsStorage = torrentsStorage;
+ }
+
+ @Override
+ @NotNull
+ public SharedTorrent loadTorrent(@NotNull LoadedTorrent loadedTorrent) throws IOException {
+
+ final String hexInfoHash = loadedTorrent.getTorrentHash().getHexInfoHash();
+ SharedTorrent old = myTorrentsStorage.getTorrent(hexInfoHash);
+ if (old != null) {
+ return old;
+ }
+
+ TorrentMetadata torrentMetadata;
+ try {
+ torrentMetadata = loadedTorrent.getMetadata();
+ } catch (IllegalStateException e) {
+ myTorrentsStorage.remove(hexInfoHash);
+ throw e;
+ }
+
+ final SharedTorrent sharedTorrent = new SharedTorrent(torrentMetadata, loadedTorrent.getPieceStorage(),
+ new RequestStrategyImplAnyInteresting(),
+ loadedTorrent.getTorrentStatistic(), loadedTorrent.getEventDispatcher());
+
+ old = myTorrentsStorage.putIfAbsentActiveTorrent(hexInfoHash, sharedTorrent);
+ if (old != null) {
+ return old;
+ }
+ return sharedTorrent;
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentManager.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentManager.java
new file mode 100644
index 0000000..08c4913
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentManager.java
@@ -0,0 +1,35 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.TorrentHash;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public interface TorrentManager extends TorrentHash {
+
+ /**
+ * add specified listener which will be notified on new events
+ *
+ * @param listener specified listener
+ */
+ void addListener(TorrentListener listener);
+
+ /**
+ * remove specified listener which was added earlier by {@link TorrentManager#addListener} method.
+ * You can receive events in this listener after execution of the method if notify method was invoked before this method
+ *
+ * @param listener specified listener
+ * @return true if listeners was removed otherwise false (e.g. listener was not found)
+ */
+ boolean removeListener(TorrentListener listener);
+
+ /**
+ * wait until download will be finished
+ *
+ * @param timeout the maximum time to wait
+ * @param timeUnit the time unit of the timeout argument
+ * @throws InterruptedException if this thread was interrupted
+ * @throws TimeoutException if timeout was elapsed
+ */
+ void awaitDownloadComplete(int timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException;
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentManagerImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentManagerImpl.java
new file mode 100644
index 0000000..de9cf57
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentManagerImpl.java
@@ -0,0 +1,58 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.TorrentHash;
+
+import java.util.List;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+class TorrentManagerImpl implements TorrentManager {
+
+ private final EventDispatcher eventDispatcher;
+ private final TorrentHash hash;
+
+ TorrentManagerImpl(EventDispatcher eventDispatcher, TorrentHash hash) {
+ this.eventDispatcher = eventDispatcher;
+ this.hash = hash;
+ }
+
+ @Override
+ public void addListener(TorrentListener listener) {
+ eventDispatcher.addListener(listener);
+ }
+
+ @Override
+ public boolean removeListener(TorrentListener listener) {
+ return eventDispatcher.removeListener(listener);
+ }
+
+ @Override
+ public byte[] getInfoHash() {
+ return hash.getInfoHash();
+ }
+
+ @Override
+ public String getHexInfoHash() {
+ return hash.getHexInfoHash();
+ }
+
+ @Override
+ public void awaitDownloadComplete(int timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+ final Semaphore semaphore = new Semaphore(0);
+ TorrentListenerWrapper listener = new TorrentListenerWrapper() {
+ @Override
+ public void downloadComplete() {
+ semaphore.release();
+ }
+ };
+ try {
+ addListener(listener);
+ if (!semaphore.tryAcquire(timeout, timeUnit)) {
+ throw new TimeoutException("Unable to download torrent in specified timeout");
+ }
+ } finally {
+ removeListener(listener);
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentMetadataProvider.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentMetadataProvider.java
new file mode 100644
index 0000000..a389ec0
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentMetadataProvider.java
@@ -0,0 +1,21 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.bcodec.InvalidBEncodingException;
+import com.turn.ttorrent.common.TorrentMetadata;
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+
+public interface TorrentMetadataProvider {
+
+ /**
+ * load and return new {@link TorrentMetadata} instance from any source
+ *
+ * @return new torrent metadata instance
+ * @throws IOException if any IO error occurs
+ * @throws InvalidBEncodingException if specified source has invalid BEP format or missed required fields
+ */
+ @NotNull
+ TorrentMetadata getTorrentMetadata() throws IOException;
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentsStorage.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentsStorage.java
new file mode 100644
index 0000000..d055856
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/TorrentsStorage.java
@@ -0,0 +1,177 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.Pair;
+import com.turn.ttorrent.common.TorrentUtils;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+// 客户端获取
+//负责管理两类 torrent:活跃的(正在进行下载或上传的)和已加载的(已经被解析并加载进内存但可能未被活跃使用的) torrent。
+//该类提供了方法来添加、获取、移除、检查这些 torrent,同时支持高效的并发访问,确保线程安全。
+public class TorrentsStorage {
+
+ private final ReadWriteLock readWriteLock;
+ private final Map<String, SharedTorrent> activeTorrents;
+ private final Map<String, LoadedTorrent> loadedTorrents;
+
+ public TorrentsStorage() {
+ readWriteLock = new ReentrantReadWriteLock();
+ activeTorrents = new HashMap<String, SharedTorrent>();
+ loadedTorrents = new HashMap<String, LoadedTorrent>();
+ }
+
+ //根据hash查找是否有对应种子
+ public boolean hasTorrent(String hash) {
+ try {
+ readWriteLock.readLock().lock();
+ return loadedTorrents.containsKey(hash);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ //获取已加载的torrent
+ public LoadedTorrent getLoadedTorrent(String hash) {
+ try {
+ readWriteLock.readLock().lock();
+ return loadedTorrents.get(hash);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ // 处理peer断开连接的情况 种子不活跃了在此处进行操作
+ public void peerDisconnected(String torrentHash) {
+ final SharedTorrent torrent;
+ try {
+ readWriteLock.writeLock().lock();
+ torrent = activeTorrents.get(torrentHash);
+ if (torrent == null) return;
+
+ boolean isTorrentFinished = torrent.isFinished();
+ if (torrent.getDownloadersCount() == 0 && isTorrentFinished) {
+ activeTorrents.remove(torrentHash);
+ } else {
+ return;
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ torrent.close();
+ }
+
+ //获取活跃的torrent
+ public SharedTorrent getTorrent(String hash) {
+ try {
+ readWriteLock.readLock().lock();
+ return activeTorrents.get(hash);
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ // 将已经加载的种子加入
+ public void addTorrent(String hash, LoadedTorrent torrent) {
+ try {
+ readWriteLock.writeLock().lock();
+ loadedTorrents.put(hash, torrent);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public SharedTorrent putIfAbsentActiveTorrent(String hash, SharedTorrent torrent) {
+ try {
+ readWriteLock.writeLock().lock();
+ final SharedTorrent old = activeTorrents.get(hash);
+ if (old != null) return old;
+
+ return activeTorrents.put(hash, torrent);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ public Pair<SharedTorrent, LoadedTorrent> remove(String hash) {
+ final Pair<SharedTorrent, LoadedTorrent> result;
+ try {
+ readWriteLock.writeLock().lock();
+ final SharedTorrent sharedTorrent = activeTorrents.remove(hash);
+ final LoadedTorrent loadedTorrent = loadedTorrents.remove(hash);
+ result = new Pair<SharedTorrent, LoadedTorrent>(sharedTorrent, loadedTorrent);
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ if (result.second() != null) {
+ try {
+ result.second().getPieceStorage().close();
+ } catch (IOException ignored) {
+ }
+ }
+ if (result.first() != null) {
+ result.first().closeFully();
+ }
+ return result;
+ }
+
+ // 获取活跃的种子
+ public List<SharedTorrent> activeTorrents() {
+ try {
+ readWriteLock.readLock().lock();
+ return new ArrayList<SharedTorrent>(activeTorrents.values());
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ public List<AnnounceableInformation> announceableTorrents() {
+ List<AnnounceableInformation> result = new ArrayList<AnnounceableInformation>();
+ try {
+ readWriteLock.readLock().lock();
+ for (LoadedTorrent loadedTorrent : loadedTorrents.values()) {
+ AnnounceableInformation announceableInformation = loadedTorrent.createAnnounceableInformation();
+ if (TorrentUtils.isTrackerLessInfo(announceableInformation)) continue;
+ result.add(announceableInformation);
+ }
+ return result;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ public List<LoadedTorrent> getLoadedTorrents() {
+ try {
+ readWriteLock.readLock().lock();
+ return new ArrayList<LoadedTorrent>(loadedTorrents.values());
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ public void clear() {
+ final Collection<SharedTorrent> sharedTorrents;
+ final Collection<LoadedTorrent> loadedTorrents;
+ try {
+ readWriteLock.writeLock().lock();
+ sharedTorrents = new ArrayList<SharedTorrent>(activeTorrents.values());
+ loadedTorrents = new ArrayList<LoadedTorrent>(this.loadedTorrents.values());
+ this.loadedTorrents.clear();
+ activeTorrents.clear();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ for (SharedTorrent sharedTorrent : sharedTorrents) {
+ sharedTorrent.closeFully();
+ }
+ for (LoadedTorrent loadedTorrent : loadedTorrents) {
+ try {
+ loadedTorrent.getPieceStorage().close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/Announce.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/Announce.java
new file mode 100644
index 0000000..ad38700
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/Announce.java
@@ -0,0 +1,315 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import org.slf4j.Logger;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.net.UnknownServiceException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * BitTorrent announce sub-system.
+ * <p/>
+ * <p>
+ * A BitTorrent client must check-in to the torrent's tracker(s) to get peers
+ * and to report certain events.
+ * </p>
+ * <p/>
+ * <p>
+ * This Announce class implements a periodic announce request thread that will
+ * notify announce request event listeners for each tracker response.
+ * </p>
+ *
+ * @author mpetazzoni
+ * @see com.turn.ttorrent.common.protocol.TrackerMessage
+ */
+public class Announce implements Runnable {
+
+ protected static final Logger logger =
+ TorrentLoggerFactory.getLogger(Announce.class);
+
+ private List<Peer> myPeers;
+ private final TrackerClientFactory myTrackerClientFactory;
+
+ /**
+ * The tiers of tracker clients matching the tracker URIs defined in the
+ * torrent.
+ */
+ private final ConcurrentMap<String, TrackerClient> clients;
+ private final Context myContext;
+
+ /**
+ * Announce thread and control.
+ */
+ private Thread thread;
+ private volatile boolean stop;
+ private boolean forceStop;
+
+ /**
+ * Announce interval.
+ */
+ private int myAnnounceInterval;
+ private TrackerClient myDefaultTracker;
+
+ /**
+ * Initialize the base announce class members for the announcer.
+ */
+ public Announce(Context context, TrackerClientFactory trackerClientFactory) {
+ this.clients = new ConcurrentHashMap<String, TrackerClient>();
+ this.thread = null;
+ myTrackerClientFactory = trackerClientFactory;
+ myContext = context;
+ myPeers = new CopyOnWriteArrayList<Peer>();
+ }
+
+ public void forceAnnounce(AnnounceableInformation torrent, AnnounceResponseListener listener, AnnounceRequestMessage.RequestEvent event) throws UnknownServiceException, UnknownHostException {
+ URI trackerUrl = URI.create(torrent.getAnnounce());
+ TrackerClient client = this.clients.get(trackerUrl.toString());
+ try {
+ if (client == null) {
+ client = myTrackerClientFactory.createTrackerClient(myPeers, trackerUrl);
+ client.register(listener);
+ this.clients.put(trackerUrl.toString(), client);
+ }
+ client.announceAllInterfaces(event, false, torrent);
+ } catch (AnnounceException e) {
+ logger.info(String.format("Unable to force announce torrent %s on tracker %s.", torrent.getHexInfoHash(), String.valueOf(trackerUrl)));
+ logger.debug(String.format("Unable to force announce torrent %s on tracker %s.", torrent.getHexInfoHash(), String.valueOf(trackerUrl)), e);
+ }
+ }
+
+ /**
+ * Start the announce request thread.
+ */
+ public void start(final URI defaultTrackerURI, final AnnounceResponseListener listener, final Peer[] peers, final int announceInterval) {
+ myAnnounceInterval = announceInterval;
+ myPeers.addAll(Arrays.asList(peers));
+ if (defaultTrackerURI != null) {
+ try {
+ myDefaultTracker = myTrackerClientFactory.createTrackerClient(myPeers, defaultTrackerURI);
+ myDefaultTracker.register(listener);
+ this.clients.put(defaultTrackerURI.toString(), myDefaultTracker);
+ } catch (Exception e) {
+ }
+ } else {
+ myDefaultTracker = null;
+ }
+
+ this.stop = false;
+ this.forceStop = false;
+
+ if (this.thread == null || !this.thread.isAlive()) {
+ this.thread = new Thread(this);
+ this.thread.setName("torrent tracker announce thread");
+ this.thread.start();
+ }
+ }
+
+ /**
+ * Set the announce interval.
+ */
+ public void setAnnounceInterval(int announceInterval) {
+ if (announceInterval <= 0) {
+ this.stop(true);
+ return;
+ }
+
+ if (this.myAnnounceInterval == announceInterval) {
+ return;
+ }
+
+ logger.trace("Setting announce interval to {}s per tracker request.",
+ announceInterval);
+ this.myAnnounceInterval = announceInterval;
+ }
+
+ /**
+ * Stop the announce thread.
+ * <p/>
+ * <p>
+ * One last 'stopped' announce event might be sent to the tracker to
+ * announce we're going away, depending on the implementation.
+ * </p>
+ */
+ public void stop() {
+
+ this.stop = true;
+
+ if (this.thread != null && this.thread.isAlive()) {
+ this.thread.interrupt();
+
+ for (TrackerClient client : this.clients.values()) {
+ client.close();
+ }
+
+ try {
+ this.thread.join();
+ } catch (InterruptedException ie) {
+ // Ignore
+ }
+ }
+ this.myPeers.clear();
+
+ this.thread = null;
+ }
+
+ /**
+ * Main announce loop.
+ * <p/>
+ * <p>
+ * The announce thread starts by making the initial 'started' announce
+ * request to register on the tracker and get the announce interval value.
+ * Subsequent announce requests are ordinary, event-less, periodic requests
+ * for peers.
+ * </p>
+ * <p/>
+ * <p>
+ * Unless forcefully stopped, the announce thread will terminate by sending
+ * a 'stopped' announce request before stopping.
+ * </p>
+ */
+ @Override
+ public void run() {
+ logger.info("Starting announce loop...");
+
+
+ while (!this.stop && !Thread.currentThread().isInterrupted()) {
+
+ final List<AnnounceableInformation> announceableInformationList = myContext.getTorrentsStorage().announceableTorrents();
+ logger.debug("Starting announce for {} torrents", announceableInformationList.size());
+ announceAllTorrents(announceableInformationList, AnnounceRequestMessage.RequestEvent.NONE);
+ try {
+ Thread.sleep(this.myAnnounceInterval * 1000);
+ } catch (InterruptedException ie) {
+ break;
+ }
+ }
+
+ announceAllTorrents(myContext.getTorrentsStorage().announceableTorrents(), AnnounceRequestMessage.RequestEvent.STOPPED);
+
+ logger.info("Exited announce loop.");
+ }
+
+ private void defaultAnnounce(List<AnnounceableInformation> torrentsForAnnounce) {
+ for (AnnounceableInformation torrent : torrentsForAnnounce) {
+ if (this.stop || Thread.currentThread().isInterrupted()) {
+ break;
+ }
+ try {
+ TrackerClient trackerClient = this.getCurrentTrackerClient(torrent);
+ if (trackerClient != null) {
+ trackerClient.announceAllInterfaces(AnnounceRequestMessage.RequestEvent.NONE, false, torrent);
+ } else {
+ logger.warn("Tracker client for {} is null. Torrent is not announced on tracker", torrent.getHexInfoHash());
+ }
+ } catch (Exception e) {
+ logger.info(e.getMessage());
+ logger.debug(e.getMessage(), e);
+ }
+ }
+ }
+
+ private void announceAllTorrents(List<AnnounceableInformation> announceableInformationList, AnnounceRequestMessage.RequestEvent event) {
+
+ logger.debug("Started multi announce. Event {}, torrents {}", event, announceableInformationList);
+ final Map<String, List<AnnounceableInformation>> torrentsGroupingByAnnounceUrl = new HashMap<String, List<AnnounceableInformation>>();
+
+ for (AnnounceableInformation torrent : announceableInformationList) {
+ final URI uriForTorrent = getURIForTorrent(torrent);
+ if (uriForTorrent == null) continue;
+ String torrentURI = uriForTorrent.toString();
+ List<AnnounceableInformation> sharedTorrents = torrentsGroupingByAnnounceUrl.get(torrentURI);
+ if (sharedTorrents == null) {
+ sharedTorrents = new ArrayList<AnnounceableInformation>();
+ torrentsGroupingByAnnounceUrl.put(torrentURI, sharedTorrents);
+ }
+ sharedTorrents.add(torrent);
+ }
+
+ List<AnnounceableInformation> unannouncedTorrents = new ArrayList<AnnounceableInformation>();
+ for (Map.Entry<String, List<AnnounceableInformation>> e : torrentsGroupingByAnnounceUrl.entrySet()) {
+ TrackerClient trackerClient = this.clients.get(e.getKey());
+ if (trackerClient != null) {
+ try {
+ trackerClient.multiAnnounce(event, false, e.getValue(), myPeers);
+ } catch (AnnounceException t) {
+ LoggerUtils.warnAndDebugDetails(logger, "problem in multi announce {}", t.getMessage(), t);
+ unannouncedTorrents.addAll(e.getValue());
+ } catch (ConnectException t) {
+ LoggerUtils.warnWithMessageAndDebugDetails(logger, "Cannot connect to the tracker {}", e.getKey(), t);
+ logger.debug("next torrents contain {} in tracker list. {}", e.getKey(), e.getValue());
+ }
+ } else {
+ logger.warn("Tracker client for {} is null. Torrents are not announced on tracker", e.getKey());
+ if (e.getKey() == null || e.getKey().isEmpty()) {
+ for (AnnounceableInformation announceableInformation : e.getValue()) {
+ myContext.getTorrentsStorage().remove(announceableInformation.getHexInfoHash());
+ }
+ }
+ }
+ }
+ if (unannouncedTorrents.size() > 0) {
+ defaultAnnounce(unannouncedTorrents);
+ }
+ }
+
+ /**
+ * Returns the current tracker client used for announces.
+ */
+ public TrackerClient getCurrentTrackerClient(AnnounceableInformation torrent) {
+ final URI uri = getURIForTorrent(torrent);
+ if (uri == null) return null;
+ return this.clients.get(uri.toString());
+ }
+
+ private URI getURIForTorrent(AnnounceableInformation torrent) {
+ List<List<String>> announceList = torrent.getAnnounceList();
+ if (announceList.size() == 0) return null;
+ List<String> uris = announceList.get(0);
+ if (uris.size() == 0) return null;
+ return URI.create(uris.get(0));
+ }
+
+ public URI getDefaultTrackerURI() {
+ if (myDefaultTracker == null) {
+ return null;
+ }
+ return myDefaultTracker.getTrackerURI();
+ }
+
+ /**
+ * Stop the announce thread.
+ *
+ * @param hard Whether to force stop the announce thread or not, i.e. not
+ * send the final 'stopped' announce request or not.
+ */
+ private void stop(boolean hard) {
+ this.forceStop = hard;
+ this.stop();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/AnnounceException.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/AnnounceException.java
new file mode 100644
index 0000000..716877f
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/AnnounceException.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.announce;
+
+
+/**
+ * Exception thrown when an announce request failed.
+ *
+ * @author mpetazzoni
+ */
+public class AnnounceException extends Exception {
+
+ private static final long serialVersionUID = -1;
+
+ public AnnounceException(String message) {
+ super(message);
+ }
+
+ public AnnounceException(Throwable cause) {
+ super(cause);
+ }
+
+ public AnnounceException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/AnnounceResponseListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/AnnounceResponseListener.java
new file mode 100644
index 0000000..6d83cbd
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/AnnounceResponseListener.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.common.Peer;
+
+import java.util.EventListener;
+import java.util.List;
+
+
+/**
+ * EventListener interface for objects that want to receive tracker responses.
+ *
+ * @author mpetazzoni
+ */
+public interface AnnounceResponseListener extends EventListener {
+
+ /**
+ * Handle an announce response event.
+ *
+ * @param interval The announce interval requested by the tracker.
+ * @param complete The number of seeders on this torrent.
+ * @param incomplete The number of leechers on this torrent.
+ */
+ void handleAnnounceResponse(int interval, int complete, int incomplete, String hexInfoHash);
+
+ /**
+ * Handle the discovery of new peers.
+ *
+ * @param peers The list of peers discovered (from the announce response or
+ * any other means like DHT/PEX, etc.).
+ */
+ void handleDiscoveredPeers(List<Peer> peers, String hexInfoHash);
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/HTTPTrackerClient.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/HTTPTrackerClient.java
new file mode 100644
index 0000000..43b5b5d
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/HTTPTrackerClient.java
@@ -0,0 +1,335 @@
+/**
+ * Copyright (C) 2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.bcodec.BDecoder;
+import com.turn.ttorrent.bcodec.BEValue;
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.MessageValidationException;
+import com.turn.ttorrent.common.protocol.http.HTTPAnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.http.HTTPAnnounceResponseMessage;
+import com.turn.ttorrent.common.protocol.http.HTTPTrackerMessage;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.net.*;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Announcer for HTTP trackers.
+ *
+ * @author mpetazzoni
+ * @see <a href="http://wiki.theory.org/BitTorrentSpecification#Tracker_Request_Parameters">BitTorrent tracker request specification</a>
+ */
+public class HTTPTrackerClient extends TrackerClient {
+
+ protected static final Logger logger =
+ TorrentLoggerFactory.getLogger(HTTPTrackerClient.class);
+
+ /**
+ * Create a new HTTP announcer for the given torrent.
+ *
+ * @param peers Our own peer specification.
+ */
+ public HTTPTrackerClient(List<Peer> peers, URI tracker) {
+ super(peers, tracker);
+ }
+
+ /**
+ * Build, send and process a tracker announce request.
+ *
+ * <p>
+ * This function first builds an announce request for the specified event
+ * with all the required parameters. Then, the request is made to the
+ * tracker and the response analyzed.
+ * </p>
+ *
+ * <p>
+ * All registered {@link AnnounceResponseListener} objects are then fired
+ * with the decoded payload.
+ * </p>
+ *
+ * @param event The announce event type (can be AnnounceEvent.NONE for
+ * periodic updates).
+ * @param inhibitEvents Prevent event listeners from being notified.
+ * @param torrentInfo
+ */
+ public void announce(final AnnounceRequestMessage.RequestEvent event,
+ boolean inhibitEvents, final AnnounceableInformation torrentInfo, final List<Peer> adresses) throws AnnounceException {
+ logAnnounceRequest(event, torrentInfo);
+
+ final List<HTTPTrackerMessage> trackerResponses = new ArrayList<HTTPTrackerMessage>();
+ for (final Peer address : adresses) {
+ final URL target = encodeAnnounceToURL(event, torrentInfo, address);
+ try {
+ sendAnnounce(target, "GET", new ResponseParser() {
+ @Override
+ public void parse(InputStream inputStream, int responseCode) throws IOException, MessageValidationException {
+ if (responseCode != 200) {
+ logger.info("received not http 200 code from tracker for request " + target);
+ return;
+ }
+ trackerResponses.add(HTTPTrackerMessage.parse(inputStream));
+ }
+ });
+ } catch (ConnectException e) {
+ throw new AnnounceException(e.getMessage(), e);
+ }
+ }
+ // we process only first request:
+ if (trackerResponses.size() > 0) {
+ final HTTPTrackerMessage message = trackerResponses.get(0);
+ this.handleTrackerAnnounceResponse(message, inhibitEvents, torrentInfo.getHexInfoHash());
+ }
+ }
+
+ @Override
+ protected void multiAnnounce(AnnounceRequestMessage.RequestEvent event,
+ boolean inhibitEvent,
+ final List<? extends AnnounceableInformation> torrents,
+ List<Peer> addresses) throws AnnounceException, ConnectException {
+ List<List<HTTPTrackerMessage>> trackerResponses = new ArrayList<List<HTTPTrackerMessage>>();
+
+ URL trackerUrl;
+ try {
+ trackerUrl = this.tracker.toURL();
+ } catch (MalformedURLException e) {
+ throw new AnnounceException("Invalid tracker URL " + this.tracker, e);
+ }
+
+ for (final Peer address : addresses) {
+ StringBuilder body = new StringBuilder();
+ for (final AnnounceableInformation torrentInfo : torrents) {
+ body.append(encodeAnnounceToURL(event, torrentInfo, address)).append("\n");
+ }
+ final List<HTTPTrackerMessage> responsesForCurrentIp = new ArrayList<HTTPTrackerMessage>();
+ final String bodyStr = body.substring(0, body.length() - 1);
+ sendAnnounce(trackerUrl, bodyStr, "POST", new ResponseParser() {
+ @Override
+ public void parse(InputStream inputStream, int responseCode) throws IOException, MessageValidationException {
+
+ if (responseCode != 200) {
+ logger.info("received {} code from tracker for multi announce request.", responseCode);
+ logger.debug(bodyStr);
+ return;
+ }
+
+ final BEValue bdecode = BDecoder.bdecode(inputStream);
+ if (bdecode == null) {
+ logger.info("tracker sent bad response for multi announce message.");
+ logger.debug(bodyStr);
+ return;
+ }
+ final List<BEValue> list = bdecode.getList();
+ for (BEValue value : list) {
+ responsesForCurrentIp.add(HTTPTrackerMessage.parse(value));
+ }
+ }
+ });
+ if (!responsesForCurrentIp.isEmpty()) {
+ trackerResponses.add(responsesForCurrentIp);
+ }
+ }
+ // we process only first request:
+ if (trackerResponses.size() > 0) {
+ final List<HTTPTrackerMessage> messages = trackerResponses.get(0);
+ for (HTTPTrackerMessage message : messages) {
+
+ if (!(message instanceof HTTPAnnounceResponseMessage)) {
+ logger.info("Incorrect instance of message {}. Skipping...", message);
+ continue;
+ }
+
+ final String hexInfoHash = ((HTTPAnnounceResponseMessage) message).getHexInfoHash();
+ try {
+ this.handleTrackerAnnounceResponse(message, inhibitEvent, hexInfoHash);
+ } catch (AnnounceException e) {
+ LoggerUtils.errorAndDebugDetails(logger, "Unable to process tracker response {}", message, e);
+ }
+ }
+ }
+ }
+
+ private URL encodeAnnounceToURL(AnnounceRequestMessage.RequestEvent event, AnnounceableInformation torrentInfo, Peer peer) throws AnnounceException {
+ URL result;
+ try {
+ HTTPAnnounceRequestMessage request = this.buildAnnounceRequest(event, torrentInfo, peer);
+ result = request.buildAnnounceURL(this.tracker.toURL());
+ } catch (MalformedURLException mue) {
+ throw new AnnounceException("Invalid announce URL (" +
+ mue.getMessage() + ")", mue);
+ } catch (MessageValidationException mve) {
+ throw new AnnounceException("Announce request creation violated " +
+ "expected protocol (" + mve.getMessage() + ")", mve);
+ } catch (IOException ioe) {
+ throw new AnnounceException("Error building announce request (" +
+ ioe.getMessage() + ")", ioe);
+ }
+ return result;
+ }
+
+ private void sendAnnounce(final URL url, final String method, ResponseParser parser)
+ throws AnnounceException, ConnectException {
+ sendAnnounce(url, "", method, parser);
+ }
+
+ private void sendAnnounce(final URL url, final String body, final String method, ResponseParser parser)
+ throws AnnounceException, ConnectException {
+ HttpURLConnection conn = null;
+ InputStream in = null;
+ try {
+ conn = (HttpURLConnection) openConnectionCheckRedirects(url, body, method);
+ in = conn.getInputStream();
+ } catch (IOException ioe) {
+ if (conn != null) {
+ in = conn.getErrorStream();
+ }
+ }
+
+ // At this point if the input stream is null it means we have neither a
+ // response body nor an error stream from the server. No point in going
+ // any further.
+ if (in == null) {
+ throw new ConnectException("No response or unreachable tracker!");
+ }
+
+ try {
+ parser.parse(in, conn.getResponseCode());
+ } catch (IOException ioe) {
+ throw new AnnounceException("Error reading tracker response!", ioe);
+ } catch (MessageValidationException mve) {
+ throw new AnnounceException("Tracker message violates expected " +
+ "protocol (" + mve.getMessage() + ")", mve);
+ } finally {
+ // Make sure we close everything down at the end to avoid resource
+ // leaks.
+ try {
+ in.close();
+ } catch (IOException ioe) {
+ logger.info("Problem ensuring error stream closed!");
+ logger.debug("Problem ensuring error stream closed!", ioe);
+ }
+
+ // This means trying to close the error stream as well.
+ InputStream err = conn.getErrorStream();
+ if (err != null) {
+ try {
+ err.close();
+ } catch (IOException ioe) {
+ logger.info("Problem ensuring error stream closed!");
+ logger.debug("Problem ensuring error stream closed!", ioe);
+ }
+ }
+ }
+ }
+
+ private URLConnection openConnectionCheckRedirects(URL url, String body, String method) throws IOException {
+ boolean needRedirect;
+ int redirects = 0;
+ URLConnection connection = url.openConnection();
+ boolean firstIteration = true;
+ do {
+ needRedirect = false;
+ connection.setConnectTimeout(10000);
+ connection.setReadTimeout(10000);
+ HttpURLConnection http = null;
+ if (connection instanceof HttpURLConnection) {
+ http = (HttpURLConnection) connection;
+ http.setInstanceFollowRedirects(false);
+ }
+ if (http != null) {
+
+ if (firstIteration) {
+ firstIteration = false;
+ http.setRequestProperty("Content-Type", "text/plain; charset=UTF-8");
+ http.setRequestMethod(method);
+ if (!body.isEmpty()) {
+ connection.setDoOutput(true);
+ connection.getOutputStream().write(body.getBytes("UTF-8"));
+ }
+ }
+
+ int stat = http.getResponseCode();
+ if (stat >= 300 && stat <= 307 && stat != 306 &&
+ stat != HttpURLConnection.HTTP_NOT_MODIFIED) {
+ URL base = http.getURL();
+ String newLocation = http.getHeaderField("Location");
+ URL target = newLocation == null ? null : new URL(base, newLocation);
+ http.disconnect();
+ // Redirection should be allowed only for HTTP and HTTPS
+ // and should be limited to 5 redirections at most.
+ if (redirects >= 5) {
+ throw new IOException("too many redirects");
+ }
+ if (target == null || !(target.getProtocol().equals("http")
+ || target.getProtocol().equals("https"))) {
+ throw new IOException("illegal URL redirect or protocol");
+ }
+ needRedirect = true;
+ connection = target.openConnection();
+ redirects++;
+ }
+ }
+ }
+ while (needRedirect);
+ return connection;
+ }
+
+ /**
+ * Build the announce request tracker message.
+ *
+ * @param event The announce event (can be <tt>NONE</tt> or <em>null</em>)
+ * @return Returns an instance of a {@link HTTPAnnounceRequestMessage}
+ * that can be used to generate the fully qualified announce URL, with
+ * parameters, to make the announce request.
+ * @throws UnsupportedEncodingException
+ * @throws IOException
+ * @throws MessageValidationException
+ */
+ private HTTPAnnounceRequestMessage buildAnnounceRequest(
+ AnnounceRequestMessage.RequestEvent event, AnnounceableInformation torrentInfo, Peer peer)
+ throws IOException,
+ MessageValidationException {
+ // Build announce request message
+ final long uploaded = torrentInfo.getUploaded();
+ final long downloaded = torrentInfo.getDownloaded();
+ final long left = torrentInfo.getLeft();
+ return HTTPAnnounceRequestMessage.craft(
+ torrentInfo.getInfoHash(),
+ peer.getPeerIdArray(),
+ peer.getPort(),
+ uploaded,
+ downloaded,
+ left,
+ true, false, event,
+ peer.getIp(),
+ AnnounceRequestMessage.DEFAULT_NUM_WANT);
+ }
+
+ private interface ResponseParser {
+
+ void parse(InputStream inputStream, int responseCode) throws IOException, MessageValidationException;
+
+ }
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClient.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClient.java
new file mode 100644
index 0000000..1baddf6
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClient.java
@@ -0,0 +1,215 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.AnnounceResponseMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.ErrorMessage;
+import org.slf4j.Logger;
+
+import java.net.ConnectException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public abstract class TrackerClient {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(TrackerClient.class);
+
+
+ /**
+ * The set of listeners to announce request answers.
+ */
+ private final Set<AnnounceResponseListener> listeners;
+
+ protected final List<Peer> myAddress;
+ protected final URI tracker;
+
+ public TrackerClient(final List<Peer> peers, final URI tracker) {
+ this.listeners = new HashSet<AnnounceResponseListener>();
+ myAddress = peers;
+ this.tracker = tracker;
+ }
+
+ /**
+ * Register a new announce response listener.
+ *
+ * @param listener The listener to register on this announcer events.
+ */
+ public void register(AnnounceResponseListener listener) {
+ this.listeners.add(listener);
+ }
+
+ /**
+ * Returns the URI this tracker clients connects to.
+ */
+ public URI getTrackerURI() {
+ return this.tracker;
+ }
+
+ public void announceAllInterfaces(final AnnounceRequestMessage.RequestEvent event,
+ boolean inhibitEvent, final AnnounceableInformation torrent) throws AnnounceException {
+ try {
+ announce(event, inhibitEvent, torrent, myAddress);
+ } catch (AnnounceException e) {
+ throw new AnnounceException(String.format("Unable to announce tracker %s event %s for torrent %s and peers %s. Reason %s",
+ getTrackerURI(), event.getEventName(), torrent.getHexInfoHash(), Arrays.toString(myAddress.toArray()), e.getMessage()), e);
+ }
+ }
+
+ /**
+ * Build, send and process a tracker announce request.
+ *
+ * <p>
+ * This function first builds an announce request for the specified event
+ * with all the required parameters. Then, the request is made to the
+ * tracker and the response analyzed.
+ * </p>
+ *
+ * <p>
+ * All registered {@link AnnounceResponseListener} objects are then fired
+ * with the decoded payload.
+ * </p>
+ *
+ * @param event The announce event type (can be AnnounceEvent.NONE for
+ * periodic updates).
+ * @param inhibitEvent Prevent event listeners from being notified.
+ * @param torrent
+ */
+ protected abstract void announce(final AnnounceRequestMessage.RequestEvent event,
+ boolean inhibitEvent, final AnnounceableInformation torrent, final List<Peer> peer) throws AnnounceException;
+
+ protected abstract void multiAnnounce(final AnnounceRequestMessage.RequestEvent event,
+ boolean inhibitEvent,
+ final List<? extends AnnounceableInformation> torrents,
+ final List<Peer> peer) throws AnnounceException, ConnectException;
+
+ protected void logAnnounceRequest(AnnounceRequestMessage.RequestEvent event, AnnounceableInformation torrent) {
+ if (event != AnnounceRequestMessage.RequestEvent.NONE) {
+ logger.debug("Announcing {} to tracker with {}U/{}D/{}L bytes...",
+ new Object[]{
+ this.formatAnnounceEvent(event),
+ torrent.getUploaded(),
+ torrent.getDownloaded(),
+ torrent.getLeft()
+ });
+ } else {
+ logger.debug("Simply announcing to tracker with {}U/{}D/{}L bytes...",
+ new Object[]{
+ torrent.getUploaded(),
+ torrent.getDownloaded(),
+ torrent.getLeft()
+ });
+ }
+ }
+
+ /**
+ * Close any opened announce connection.
+ *
+ * <p>
+ * This method is called to make sure all connections
+ * are correctly closed when the announce thread is asked to stop.
+ * </p>
+ */
+ protected void close() {
+ // Do nothing by default, but can be overloaded.
+ }
+
+ /**
+ * Formats an announce event into a usable string.
+ */
+ protected String formatAnnounceEvent(
+ AnnounceRequestMessage.RequestEvent event) {
+ return AnnounceRequestMessage.RequestEvent.NONE.equals(event)
+ ? ""
+ : String.format(" %s", event.name());
+ }
+
+ /**
+ * Handle the announce response from the tracker.
+ *
+ * <p>
+ * Analyzes the response from the tracker and acts on it. If the response
+ * is an error, it is logged. Otherwise, the announce response is used
+ * to fire the corresponding announce and peer events to all announce
+ * listeners.
+ * </p>
+ *
+ * @param message The incoming {@link TrackerMessage}.
+ * @param inhibitEvents Whether or not to prevent events from being fired.
+ */
+ protected void handleTrackerAnnounceResponse(TrackerMessage message,
+ boolean inhibitEvents, String hexInfoHash) throws AnnounceException {
+ if (message instanceof ErrorMessage) {
+ ErrorMessage error = (ErrorMessage) message;
+ throw new AnnounceException(error.getReason());
+ }
+
+ if (!(message instanceof AnnounceResponseMessage)) {
+ throw new AnnounceException("Unexpected tracker message type " +
+ message.getType().name() + "!");
+ }
+
+
+ AnnounceResponseMessage response =
+ (AnnounceResponseMessage) message;
+
+ this.fireAnnounceResponseEvent(
+ response.getComplete(),
+ response.getIncomplete(),
+ response.getInterval(),
+ hexInfoHash);
+
+ if (inhibitEvents) {
+ return;
+ }
+
+ this.fireDiscoveredPeersEvent(
+ response.getPeers(),
+ hexInfoHash);
+ }
+
+ /**
+ * Fire the announce response event to all listeners.
+ *
+ * @param complete The number of seeders on this torrent.
+ * @param incomplete The number of leechers on this torrent.
+ * @param interval The announce interval requested by the tracker.
+ */
+ protected void fireAnnounceResponseEvent(int complete, int incomplete, int interval, String hexInfoHash) {
+ for (AnnounceResponseListener listener : this.listeners) {
+ listener.handleAnnounceResponse(interval, complete, incomplete, hexInfoHash);
+ }
+ }
+
+ /**
+ * Fire the new peer discovery event to all listeners.
+ *
+ * @param peers The list of peers discovered.
+ */
+ protected void fireDiscoveredPeersEvent(List<Peer> peers, String hexInfoHash) {
+ for (AnnounceResponseListener listener : this.listeners) {
+ listener.handleDiscoveredPeers(peers, hexInfoHash);
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClientFactory.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClientFactory.java
new file mode 100644
index 0000000..5e61e36
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClientFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.common.Peer;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.net.UnknownServiceException;
+import java.util.List;
+
+public interface TrackerClientFactory {
+
+ /**
+ * Create a {@link TrackerClient} announcing to the given tracker address.
+ *
+ * @param peers The list peer the tracker client will announce on behalf of.
+ * @param tracker The tracker address as a {@link java.net.URI}.
+ * @throws UnknownHostException If the tracker address is invalid.
+ * @throws UnknownServiceException If the tracker protocol is not supported.
+ */
+ TrackerClient createTrackerClient(List<Peer> peers, URI tracker) throws UnknownHostException, UnknownServiceException;
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClientFactoryImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClientFactoryImpl.java
new file mode 100644
index 0000000..542bda0
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/TrackerClientFactoryImpl.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.common.Peer;
+
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.net.UnknownServiceException;
+import java.util.List;
+
+public class TrackerClientFactoryImpl implements TrackerClientFactory {
+
+ @Override
+ public TrackerClient createTrackerClient(List<Peer> peers, URI tracker) throws UnknownHostException, UnknownServiceException {
+ String scheme = tracker.getScheme();
+
+ if ("http".equals(scheme) || "https".equals(scheme)) {
+ return new HTTPTrackerClient(peers, tracker);
+ } else if ("udp".equals(scheme)) {
+ return new UDPTrackerClient(peers, tracker);
+ }
+
+ throw new UnknownServiceException(
+ "Unsupported announce scheme: " + scheme + "!");
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java
new file mode 100644
index 0000000..0549908
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/announce/UDPTrackerClient.java
@@ -0,0 +1,373 @@
+/**
+ * Copyright (C) 2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.announce;
+
+import com.turn.ttorrent.common.AnnounceableInformation;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.AnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.ConnectionResponseMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.ErrorMessage;
+import com.turn.ttorrent.common.protocol.TrackerMessage.MessageValidationException;
+import com.turn.ttorrent.common.protocol.udp.UDPAnnounceRequestMessage;
+import com.turn.ttorrent.common.protocol.udp.UDPConnectRequestMessage;
+import com.turn.ttorrent.common.protocol.udp.UDPConnectResponseMessage;
+import com.turn.ttorrent.common.protocol.udp.UDPTrackerMessage;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.nio.channels.UnsupportedAddressTypeException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Announcer for UDP trackers.
+ *
+ * <p>
+ * The UDP tracker protocol requires a two-step announce request/response
+ * exchange where the peer is first required to establish a "connection"
+ * with the tracker by sending a connection request message and retreiving
+ * a connection ID from the tracker to use in the following announce
+ * request messages (valid for 2 minutes).
+ * </p>
+ *
+ * <p>
+ * It also contains a backing-off retry mechanism (on a 15*2^n seconds
+ * scheme), in which if the announce request times-out for more than the
+ * connection ID validity period, another connection request/response
+ * exchange must be made before attempting to retransmit the announce
+ * request.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+public class UDPTrackerClient extends TrackerClient {
+
+ protected static final Logger logger =
+ TorrentLoggerFactory.getLogger(UDPTrackerClient.class);
+
+ /**
+ * Back-off timeout uses 15 * 2 ^ n formula.
+ */
+ private static final int UDP_BASE_TIMEOUT_SECONDS = 15;
+
+ /**
+ * We don't try more than 8 times (3840 seconds, as per the formula defined
+ * for the backing-off timeout.
+ *
+ * @see #UDP_BASE_TIMEOUT_SECONDS
+ */
+ private static final int UDP_MAX_TRIES = 8;
+
+ /**
+ * For STOPPED announce event, we don't want to be bothered with waiting
+ * that long. We'll try once and bail-out early.
+ */
+ private static final int UDP_MAX_TRIES_ON_STOPPED = 1;
+
+ /**
+ * Maximum UDP packet size expected, in bytes.
+ *
+ * The biggest packet in the exchange is the announce response, which in 20
+ * bytes + 6 bytes per peer. Common numWant is 50, so 20 + 6 * 50 = 320.
+ * With headroom, we'll ask for 512 bytes.
+ */
+ private static final int UDP_PACKET_LENGTH = 512;
+
+ private final InetSocketAddress address;
+ private final Random random;
+
+ private DatagramSocket socket;
+ private Date connectionExpiration;
+ private long connectionId;
+ private int transactionId;
+ private boolean stop;
+
+ private enum State {
+ CONNECT_REQUEST,
+ ANNOUNCE_REQUEST
+ }
+
+ /**
+ *
+ */
+ protected UDPTrackerClient(List<Peer> peers, URI tracker)
+ throws UnknownHostException {
+ super(peers, tracker);
+
+ /**
+ * The UDP announce request protocol only supports IPv4
+ *
+ * @see http://bittorrent.org/beps/bep_0015.html#ipv6
+ */
+ for (Peer peer : peers) {
+ if (!(InetAddress.getByName(peer.getIp()) instanceof Inet4Address)) {
+ throw new UnsupportedAddressTypeException();
+ }
+ }
+
+ this.address = new InetSocketAddress(
+ tracker.getHost(),
+ tracker.getPort());
+
+ this.socket = null;
+ this.random = new Random();
+ this.connectionExpiration = null;
+ this.stop = false;
+ }
+
+ @Override
+ protected void multiAnnounce(AnnounceRequestMessage.RequestEvent event, boolean inhibitEvent, List<? extends AnnounceableInformation> torrents, List<Peer> peer) throws AnnounceException {
+ throw new AnnounceException("Not implemented");
+ }
+
+ @Override
+ public void announce(final AnnounceRequestMessage.RequestEvent event,
+ boolean inhibitEvents, final AnnounceableInformation torrent, final List<Peer> peers) throws AnnounceException {
+ logAnnounceRequest(event, torrent);
+
+ State state = State.CONNECT_REQUEST;
+ int maxAttempts = AnnounceRequestMessage.RequestEvent
+ .STOPPED.equals(event)
+ ? UDP_MAX_TRIES_ON_STOPPED
+ : UDP_MAX_TRIES;
+ int attempts = -1;
+
+ try {
+ this.socket = new DatagramSocket();
+ this.socket.connect(this.address);
+
+ while (++attempts <= maxAttempts) {
+ // Transaction ID is randomized for each exchange.
+ this.transactionId = this.random.nextInt();
+
+ // Immediately decide if we can send the announce request
+ // directly or not. For this, we need a valid, non-expired
+ // connection ID.
+ if (this.connectionExpiration != null) {
+ if (new Date().before(this.connectionExpiration)) {
+ state = State.ANNOUNCE_REQUEST;
+ } else {
+ logger.debug("Announce connection ID expired, " +
+ "reconnecting with tracker...");
+ }
+ }
+
+ switch (state) {
+ case CONNECT_REQUEST:
+ this.send(UDPConnectRequestMessage
+ .craft(this.transactionId).getData());
+
+ try {
+ this.handleTrackerConnectResponse(
+ UDPTrackerMessage.UDPTrackerResponseMessage
+ .parse(this.recv(attempts)));
+ attempts = -1;
+ } catch (SocketTimeoutException ste) {
+ // Silently ignore the timeout and retry with a
+ // longer timeout, unless announce stop was
+ // requested in which case we need to exit right
+ // away.
+ if (stop) {
+ return;
+ }
+ }
+ break;
+
+ case ANNOUNCE_REQUEST:
+ for (Peer peer : peers) {
+ this.send(this.buildAnnounceRequest(event, torrent, peer).getData());
+ }
+
+ try {
+ this.handleTrackerAnnounceResponse(
+ UDPTrackerMessage.UDPTrackerResponseMessage
+ .parse(this.recv(attempts)), inhibitEvents, torrent.getHexInfoHash());
+ // If we got here, we succesfully completed this
+ // announce exchange and can simply return to exit the
+ // loop.
+ return;
+ } catch (SocketTimeoutException ste) {
+ // Silently ignore the timeout and retry with a
+ // longer timeout, unless announce stop was
+ // requested in which case we need to exit right
+ // away.
+ if (stop) {
+ return;
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException("Invalid announce state!");
+ }
+ }
+
+ // When the maximum number of attempts was reached, the announce
+ // really timed-out. We'll try again in the next announce loop.
+ throw new AnnounceException("Timeout while announcing" +
+ this.formatAnnounceEvent(event) + " to tracker!");
+ } catch (IOException ioe) {
+ throw new AnnounceException("Error while announcing" +
+ this.formatAnnounceEvent(event) +
+ " to tracker: " + ioe.getMessage(), ioe);
+ } catch (MessageValidationException mve) {
+ throw new AnnounceException("Tracker message violates expected " +
+ "protocol (" + mve.getMessage() + ")", mve);
+ }
+ }
+
+ /**
+ * Handles the tracker announce response message.
+ *
+ * <p>
+ * Verifies the transaction ID of the message before passing it over to
+ * {@link Announce#()}.
+ * </p>
+ *
+ * @param message The message received from the tracker in response to the
+ * announce request.
+ */
+ @Override
+ protected void handleTrackerAnnounceResponse(TrackerMessage message,
+ boolean inhibitEvents, String hexInfoHash) throws AnnounceException {
+ this.validateTrackerResponse(message);
+ super.handleTrackerAnnounceResponse(message, inhibitEvents, hexInfoHash);
+ }
+
+ /**
+ * Close this announce connection.
+ */
+ @Override
+ protected void close() {
+ this.stop = true;
+
+ // Close the socket to force blocking operations to return.
+ if (this.socket != null && !this.socket.isClosed()) {
+ this.socket.close();
+ }
+ }
+
+ private UDPAnnounceRequestMessage buildAnnounceRequest(
+ final AnnounceRequestMessage.RequestEvent event, final AnnounceableInformation torrent, final Peer peer) {
+ return UDPAnnounceRequestMessage.craft(
+ this.connectionId,
+ transactionId,
+ torrent.getInfoHash(),
+ peer.getPeerIdArray(),
+ torrent.getDownloaded(),
+ torrent.getUploaded(),
+ torrent.getLeft(),
+ event,
+ peer.getAddress().getAddress(),
+ 0,
+ AnnounceRequestMessage.DEFAULT_NUM_WANT,
+ peer.getPort());
+ }
+
+ /**
+ * Validates an incoming tracker message.
+ *
+ * <p>
+ * Verifies that the message is not an error message (throws an exception
+ * with the error message if it is) and that the transaction ID matches the
+ * current one.
+ * </p>
+ *
+ * @param message The incoming tracker message.
+ */
+ private void validateTrackerResponse(TrackerMessage message)
+ throws AnnounceException {
+ if (message instanceof ErrorMessage) {
+ throw new AnnounceException(((ErrorMessage) message).getReason());
+ }
+
+ if (message instanceof UDPTrackerMessage &&
+ (((UDPTrackerMessage) message).getTransactionId() != this.transactionId)) {
+ throw new AnnounceException("Invalid transaction ID!");
+ }
+ }
+
+ /**
+ * Handles the tracker connect response message.
+ *
+ * @param message The message received from the tracker in response to the
+ * connection request.
+ */
+ private void handleTrackerConnectResponse(TrackerMessage message)
+ throws AnnounceException {
+ this.validateTrackerResponse(message);
+
+ if (!(message instanceof ConnectionResponseMessage)) {
+ throw new AnnounceException("Unexpected tracker message type " +
+ message.getType().name() + "!");
+ }
+
+ UDPConnectResponseMessage connectResponse =
+ (UDPConnectResponseMessage) message;
+
+ this.connectionId = connectResponse.getConnectionId();
+ Calendar now = Calendar.getInstance();
+ now.add(Calendar.MINUTE, 1);
+ this.connectionExpiration = now.getTime();
+ }
+
+ /**
+ * Send a UDP packet to the tracker.
+ *
+ * @param data The {@link ByteBuffer} to send in a datagram packet to the
+ * tracker.
+ */
+ private void send(ByteBuffer data) {
+ try {
+ this.socket.send(new DatagramPacket(
+ data.array(),
+ data.capacity(),
+ this.address));
+ } catch (IOException ioe) {
+ logger.info("Error sending datagram packet to tracker at {}: {}.", this.address, ioe.getMessage());
+ }
+ }
+
+ /**
+ * Receive a UDP packet from the tracker.
+ *
+ * @param attempt The attempt number, used to calculate the timeout for the
+ * receive operation.
+ * @retun Returns a {@link ByteBuffer} containing the packet data.
+ */
+ private ByteBuffer recv(int attempt)
+ throws IOException, SocketException, SocketTimeoutException {
+ int timeout = UDP_BASE_TIMEOUT_SECONDS * (int) Math.pow(2, attempt);
+ logger.trace("Setting receive timeout to {}s for attempt {}...",
+ timeout, attempt);
+ this.socket.setSoTimeout(timeout * 1000);
+
+ try {
+ DatagramPacket p = new DatagramPacket(
+ new byte[UDP_PACKET_LENGTH],
+ UDP_PACKET_LENGTH);
+ this.socket.receive(p);
+ return ByteBuffer.wrap(p.getData(), 0, p.getLength());
+ } catch (SocketTimeoutException ste) {
+ throw ste;
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/CountLimitConnectionAllower.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/CountLimitConnectionAllower.java
new file mode 100644
index 0000000..d3aee8b
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/CountLimitConnectionAllower.java
@@ -0,0 +1,35 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.PeersStorage;
+import com.turn.ttorrent.network.NewConnectionAllower;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.turn.ttorrent.Constants.DEFAULT_MAX_CONNECTION_COUNT;
+
+/**
+ * this implementation allows fixed count of open connection simultaneously
+ */
+
+//限制同时建立连接的数量
+public class CountLimitConnectionAllower implements NewConnectionAllower {
+
+ private final PeersStorage myPeersStorage;
+
+ private final AtomicInteger myMaxConnectionCount = new AtomicInteger();
+
+ public CountLimitConnectionAllower(PeersStorage peersStorage) {
+ this.myPeersStorage = peersStorage;
+ myMaxConnectionCount.set(DEFAULT_MAX_CONNECTION_COUNT);
+
+ }
+
+ public void setMyMaxConnectionCount(int newMaxCount) {
+ myMaxConnectionCount.set(newMaxCount);
+ }
+
+ @Override
+ public boolean isNewConnectionAllowed() {
+ return myPeersStorage.getSharingPeers().size() < myMaxConnectionCount.get();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/DataProcessor.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/DataProcessor.java
new file mode 100644
index 0000000..eb9645a
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/DataProcessor.java
@@ -0,0 +1,27 @@
+package com.turn.ttorrent.client.network;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+
+public interface DataProcessor {
+
+ /**
+ * the method must read data from channel and process it
+ *
+ * @param socketChannel specified socket channel with data
+ * @return data processor which must process next data
+ * @throws IOException if an I/O error occurs
+ */
+ DataProcessor processAndGetNext(ByteChannel socketChannel) throws IOException;
+
+ /**
+ * the method must handle error and correctly release resources
+ *
+ * @param socketChannel specified channel
+ * @param e specified exception
+ * @return data processor which must process next error. Can be null
+ * @throws IOException if an I/O error occurs
+ */
+ DataProcessor handleError(ByteChannel socketChannel, Throwable e) throws IOException;
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/DataProcessorUtil.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/DataProcessorUtil.java
new file mode 100644
index 0000000..f3a6399
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/DataProcessorUtil.java
@@ -0,0 +1,21 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+
+public final class DataProcessorUtil {
+
+ public static void closeChannelIfOpen(Logger logger, ByteChannel channel) {
+ if (channel.isOpen()) {
+ logger.trace("close channel {}", channel);
+ try {
+ channel.close();
+ } catch (IOException e) {
+ LoggerUtils.errorAndDebugDetails(logger, "unable to close channel {}", channel, e);
+ }
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/HandshakeReceiver.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/HandshakeReceiver.java
new file mode 100644
index 0000000..f3feb79
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/HandshakeReceiver.java
@@ -0,0 +1,171 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.client.Handshake;
+import com.turn.ttorrent.client.LoadedTorrent;
+import com.turn.ttorrent.client.SharedTorrent;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.PeerUID;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.text.ParseException;
+import java.util.Arrays;
+import java.util.concurrent.RejectedExecutionException;
+
+public class HandshakeReceiver implements DataProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(HandshakeReceiver.class);
+
+ private final Context myContext;
+ private final String myHostAddress;
+ private final int myPort;
+ private final boolean myIsOutgoingConnection;
+ private ByteBuffer messageBytes;
+ private int pstrLength;
+
+ HandshakeReceiver(Context context,
+ String hostAddress,
+ int port,
+ boolean isOutgoingListener) {
+ myContext = context;
+ myHostAddress = hostAddress;
+ myPort = port;
+ this.pstrLength = -1;
+ this.myIsOutgoingConnection = isOutgoingListener;
+ }
+
+ @Override
+ public DataProcessor processAndGetNext(ByteChannel socketChannel) throws IOException {
+
+ if (pstrLength == -1) {
+ ByteBuffer len = ByteBuffer.allocate(1);
+ int readBytes = -1;
+ try {
+ readBytes = socketChannel.read(len);
+ } catch (IOException ignored) {
+ }
+ if (readBytes == -1) {
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+ if (readBytes == 0) {
+ return this;
+ }
+ len.rewind();
+ byte pstrLen = len.get();
+ this.pstrLength = pstrLen;
+ messageBytes = ByteBuffer.allocate(this.pstrLength + Handshake.BASE_HANDSHAKE_LENGTH);
+ messageBytes.put(pstrLen);
+ }
+ int readBytes = -1;
+ try {
+ readBytes = socketChannel.read(messageBytes);
+ } catch (IOException e) {
+ LoggerUtils.warnAndDebugDetails(logger, "unable to read data from {}", socketChannel, e);
+ }
+ if (readBytes == -1) {
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+ if (messageBytes.remaining() != 0) {
+ return this;
+ }
+ Handshake hs = parseHandshake(socketChannel.toString());
+
+ if (hs == null) {
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+
+ final LoadedTorrent announceableTorrent = myContext.getTorrentsStorage().getLoadedTorrent(hs.getHexInfoHash());
+
+ if (announceableTorrent == null) {
+ logger.debug("Announceable torrent {} is not found in storage", hs.getHexInfoHash());
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+
+ SharedTorrent torrent;
+ try {
+ torrent = myContext.getTorrentLoader().loadTorrent(announceableTorrent);
+ } catch (IllegalStateException e) {
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ } catch(Exception e) {
+ LoggerUtils.warnWithMessageAndDebugDetails(logger, "cannot load torrent {}", hs.getHexInfoHash(), e);
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+
+ logger.trace("got handshake {} from {}", Arrays.toString(messageBytes.array()), socketChannel);
+
+ String clientTypeVersion = new String(Arrays.copyOf(hs.getPeerId(), 8));
+ String clientType = clientTypeVersion.substring(1, 3);
+ int clientVersion = 0;
+ try {
+ clientVersion = Integer.parseInt(clientTypeVersion.substring(3, 7));
+ } catch (NumberFormatException ignored) {}
+ final SharingPeer sharingPeer =
+ myContext.createSharingPeer(myHostAddress,
+ myPort,
+ ByteBuffer.wrap(hs.getPeerId()),
+ torrent,
+ socketChannel,
+ clientType,
+ clientVersion);
+ PeerUID peerUID = new PeerUID(sharingPeer.getAddress(), hs.getHexInfoHash());
+
+ SharingPeer old = myContext.getPeersStorage().putIfAbsent(peerUID, sharingPeer);
+ if (old != null) {
+ logger.debug("Already connected to old peer {}, close current connection with {}", old, sharingPeer);
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+
+ // If I am not a leecher
+ if (!myIsOutgoingConnection) {
+ logger.trace("send handshake to {}", socketChannel);
+ try {
+ final Handshake craft = Handshake.craft(hs.getInfoHash(), myContext.getPeersStorage().getSelf().getPeerIdArray());
+ socketChannel.write(craft.getData());
+ } catch (IOException e) {
+ LoggerUtils.warnAndDebugDetails(logger, "error in sending handshake to {}", socketChannel, e);
+ return new ShutdownAndRemovePeerProcessor(peerUID, myContext);
+ }
+ }
+
+ logger.debug("setup new connection with {}", sharingPeer);
+
+ try {
+ myContext.getExecutor().submit(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ sharingPeer.onConnectionEstablished();
+ } catch (Throwable e) {
+ LoggerUtils.warnAndDebugDetails(logger, "unhandled exception {} in executor task (onConnectionEstablished)", e.toString(), e);
+ }
+ }
+ });
+ torrent.addConnectedPeer(sharingPeer);
+ } catch (RejectedExecutionException e) {
+ LoggerUtils.warnAndDebugDetails(logger, "task 'onConnectionEstablished' submit is failed. Reason: {}", e.getMessage(), e);
+ return new ShutdownAndRemovePeerProcessor(peerUID, myContext).processAndGetNext(socketChannel);
+ }
+
+ return new WorkingReceiver(peerUID, myContext);
+ }
+
+ private Handshake parseHandshake(String socketChannelForLog) throws IOException {
+ try {
+ messageBytes.rewind();
+ return Handshake.parse(messageBytes, pstrLength);
+ } catch (ParseException e) {
+ logger.info("incorrect handshake message from " + socketChannelForLog, e);
+ }
+ return null;
+ }
+
+ @Override
+ public DataProcessor handleError(ByteChannel socketChannel, Throwable e) throws IOException {
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/HandshakeSender.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/HandshakeSender.java
new file mode 100644
index 0000000..9040efb
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/HandshakeSender.java
@@ -0,0 +1,61 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.client.Handshake;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentHash;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.Arrays;
+
+public class HandshakeSender implements DataProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(HandshakeSender.class);
+
+ private final TorrentHash myTorrentHash;
+ private final String myRemotePeerIp;
+ private final int myRemotePeerPort;
+ private final Context myContext;
+
+ public HandshakeSender(TorrentHash torrentHash,
+ String remotePeerIp,
+ int remotePeerPort,
+ Context context) {
+ myTorrentHash = torrentHash;
+ myRemotePeerIp = remotePeerIp;
+ myRemotePeerPort = remotePeerPort;
+ myContext = context;
+ }
+
+ @Override
+ public DataProcessor processAndGetNext(ByteChannel socketChannel) throws IOException {
+
+ Peer self = myContext.getPeersStorage().getSelf();
+ Handshake handshake = Handshake.craft(myTorrentHash.getInfoHash(), self.getPeerIdArray());
+ if (handshake == null) {
+ logger.warn("can not craft handshake message. Self peer id is {}, torrent hash is {}",
+ Arrays.toString(self.getPeerIdArray()),
+ Arrays.toString(myTorrentHash.getInfoHash()));
+ return new ShutdownProcessor();
+ }
+ ByteBuffer messageToSend = ByteBuffer.wrap(handshake.getData().array());
+ logger.trace("try send handshake {} to {}", handshake, socketChannel);
+ while (messageToSend.hasRemaining()) {
+ socketChannel.write(messageToSend);
+ }
+ return new HandshakeReceiver(
+ myContext,
+ myRemotePeerIp,
+ myRemotePeerPort,
+ true);
+ }
+
+ @Override
+ public DataProcessor handleError(ByteChannel socketChannel, Throwable e) throws IOException {
+ return new ShutdownProcessor().processAndGetNext(socketChannel);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/OutgoingConnectionListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/OutgoingConnectionListener.java
new file mode 100644
index 0000000..d0cd4c6
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/OutgoingConnectionListener.java
@@ -0,0 +1,48 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.common.TorrentHash;
+import com.turn.ttorrent.network.ConnectionListener;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public class OutgoingConnectionListener implements ConnectionListener {
+
+ private volatile DataProcessor myNext;
+ private final TorrentHash torrentHash;
+ private final String myRemotePeerIp;
+ private final int myRemotePeerPort;
+ private final Context myContext;
+
+ public OutgoingConnectionListener(Context context,
+ TorrentHash torrentHash,
+ String remotePeerIp,
+ int remotePeerPort) {
+ this.torrentHash = torrentHash;
+ myRemotePeerIp = remotePeerIp;
+ myRemotePeerPort = remotePeerPort;
+ myNext = new ShutdownProcessor();
+ myContext = context;
+ }
+
+ @Override
+ public void onNewDataAvailable(SocketChannel socketChannel) throws IOException {
+ this.myNext = this.myNext.processAndGetNext(socketChannel);
+ }
+
+ @Override
+ public void onConnectionEstablished(SocketChannel socketChannel) throws IOException {
+ HandshakeSender handshakeSender = new HandshakeSender(
+ torrentHash,
+ myRemotePeerIp,
+ myRemotePeerPort,
+ myContext);
+ this.myNext = handshakeSender.processAndGetNext(socketChannel);
+ }
+
+ @Override
+ public void onError(SocketChannel socketChannel, Throwable ex) throws IOException {
+ this.myNext.handleError(socketChannel, ex);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/ShutdownAndRemovePeerProcessor.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/ShutdownAndRemovePeerProcessor.java
new file mode 100644
index 0000000..2db3676
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/ShutdownAndRemovePeerProcessor.java
@@ -0,0 +1,47 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.client.PeersStorage;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.common.PeerUID;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+
+public class ShutdownAndRemovePeerProcessor implements DataProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(ShutdownAndRemovePeerProcessor.class);
+
+ private final PeerUID myPeerUID;
+ private final Context myContext;
+
+ public ShutdownAndRemovePeerProcessor(PeerUID peerId, Context context) {
+ myPeerUID = peerId;
+ myContext = context;
+ }
+
+ @Override
+ public DataProcessor processAndGetNext(ByteChannel socketChannel) throws IOException {
+ DataProcessorUtil.closeChannelIfOpen(logger, socketChannel);
+ logger.trace("try remove and unbind peer. Peer UID - {}", myPeerUID);
+ removePeer();
+ return null;
+ }
+
+ private void removePeer() {
+ PeersStorage peersStorage = myContext.getPeersStorage();
+ SharingPeer removedPeer = peersStorage.removeSharingPeer(myPeerUID);
+ if (removedPeer == null) {
+ logger.info("try to shutdown peer with id {}, but it is not found in storage", myPeerUID);
+ return;
+ }
+ removedPeer.unbind(true);
+ }
+
+ @Override
+ public DataProcessor handleError(ByteChannel socketChannel, Throwable e) throws IOException {
+ return processAndGetNext(socketChannel);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/ShutdownProcessor.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/ShutdownProcessor.java
new file mode 100644
index 0000000..b16a883
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/ShutdownProcessor.java
@@ -0,0 +1,23 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+
+public class ShutdownProcessor implements DataProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(ShutdownProcessor.class);
+
+ @Override
+ public DataProcessor processAndGetNext(ByteChannel socketChannel) throws IOException {
+ DataProcessorUtil.closeChannelIfOpen(logger, socketChannel);
+ return null;
+ }
+
+ @Override
+ public DataProcessor handleError(ByteChannel socketChannel, Throwable e) throws IOException {
+ return processAndGetNext(socketChannel);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/StateChannelListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/StateChannelListener.java
new file mode 100644
index 0000000..cde66f1
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/StateChannelListener.java
@@ -0,0 +1,37 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.network.ConnectionListener;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public class StateChannelListener implements ConnectionListener {
+
+ private volatile DataProcessor myNext;
+ private final Context myContext;
+
+ public StateChannelListener(Context context) {
+ myContext = context;
+ myNext = new ShutdownProcessor();
+ }
+
+ @Override
+ public void onNewDataAvailable(SocketChannel socketChannel) throws IOException {
+ this.myNext = this.myNext.processAndGetNext(socketChannel);
+ }
+
+ @Override
+ public void onConnectionEstablished(SocketChannel socketChannel) throws IOException {
+ this.myNext = new HandshakeReceiver(
+ myContext,
+ socketChannel.socket().getInetAddress().getHostAddress(),
+ socketChannel.socket().getPort(),
+ false);
+ }
+
+ @Override
+ public void onError(SocketChannel socketChannel, Throwable ex) throws IOException {
+ this.myNext = this.myNext.handleError(socketChannel, ex);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/WorkingReceiver.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/WorkingReceiver.java
new file mode 100644
index 0000000..ba46de5
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/network/WorkingReceiver.java
@@ -0,0 +1,156 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.client.Context;
+import com.turn.ttorrent.client.SharedTorrent;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.PeerUID;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.protocol.PeerMessage;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.text.ParseException;
+import java.util.Arrays;
+import java.util.concurrent.RejectedExecutionException;
+
+public class WorkingReceiver implements DataProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(WorkingReceiver.class);
+ //16 bytes is sufficient for all torrents messages except bitfield and piece.
+ //So piece and bitfield have dynamic size because bytebuffer for this messages will be allocated after get message length
+ private static final int DEF_BUFFER_SIZE = 16;
+ private static final int MAX_MESSAGE_SIZE = 2 * 1024 * 1024;
+
+ private final PeerUID myPeerUID;
+ private final Context myContext;
+ @NotNull
+ private ByteBuffer messageBytes;
+ private int pstrLength;
+
+ WorkingReceiver(PeerUID peerId,
+ Context context) {
+ myPeerUID = peerId;
+ myContext = context;
+
+ this.messageBytes = ByteBuffer.allocate(DEF_BUFFER_SIZE);
+ this.pstrLength = -1;
+ }
+
+ @Override
+ public DataProcessor processAndGetNext(ByteChannel socketChannel) throws IOException {
+ logger.trace("received data from channel", socketChannel);
+ if (pstrLength == -1) {
+ messageBytes.limit(PeerMessage.MESSAGE_LENGTH_FIELD_SIZE);
+ final int read;
+ try {
+ read = socketChannel.read(messageBytes);
+ } catch (IOException e) {
+ //Some clients close connection so that java throws IOException "An existing connection was forcibly closed by the remote host"
+ logger.debug("unable to read data from channel " + socketChannel, e);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+ if (read < 0) {
+ logger.debug("channel {} is closed by other peer", socketChannel);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+ if (messageBytes.hasRemaining()) {
+ return this;
+ }
+ this.pstrLength = messageBytes.getInt(0);
+ logger.trace("read of message length finished, Message length is {}", this.pstrLength);
+
+ if (this.pstrLength > MAX_MESSAGE_SIZE) {
+ logger.warn("Proposed limit of {} is larger than max message size {}",
+ PeerMessage.MESSAGE_LENGTH_FIELD_SIZE + this.pstrLength, MAX_MESSAGE_SIZE);
+ logger.warn("current bytes in buffer is {}", Arrays.toString(messageBytes.array()));
+ logger.warn("Close connection with peer {}", myPeerUID);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+ }
+
+ if (PeerMessage.MESSAGE_LENGTH_FIELD_SIZE + this.pstrLength > messageBytes.capacity()) {
+ ByteBuffer old = messageBytes;
+ old.rewind();
+ messageBytes = ByteBuffer.allocate(PeerMessage.MESSAGE_LENGTH_FIELD_SIZE + this.pstrLength);
+ messageBytes.put(old);
+ }
+
+ messageBytes.limit(PeerMessage.MESSAGE_LENGTH_FIELD_SIZE + this.pstrLength);
+
+ logger.trace("try read data from {}", socketChannel);
+ int readBytes;
+ try {
+ readBytes = socketChannel.read(messageBytes);
+ } catch (IOException e) {
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+ if (readBytes < 0) {
+ logger.debug("channel {} is closed by other peer", socketChannel);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+ if (messageBytes.hasRemaining()) {
+ logger.trace("buffer is not full, continue reading...");
+ return this;
+ }
+ logger.trace("finished read data from {}", socketChannel);
+
+ messageBytes.rewind();
+ this.pstrLength = -1;
+
+ final SharingPeer peer = myContext.getPeersStorage().getSharingPeer(myPeerUID);
+
+ final String hexInfoHash = peer.getHexInfoHash();
+ SharedTorrent torrent = myContext.getTorrentsStorage().getTorrent(hexInfoHash);
+ if (torrent == null || !myContext.getTorrentsStorage().hasTorrent(hexInfoHash)) {
+ logger.debug("torrent with hash {} for peer {} doesn't found in storage. Maybe somebody deletes it manually", hexInfoHash, peer);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+
+ logger.trace("try parse message from {}. Torrent {}", peer, torrent);
+ ByteBuffer bufferCopy = ByteBuffer.wrap(Arrays.copyOf(messageBytes.array(), messageBytes.limit()));
+
+ this.messageBytes = ByteBuffer.allocate(DEF_BUFFER_SIZE);
+ final PeerMessage message;
+
+ try {
+ message = PeerMessage.parse(bufferCopy, torrent);
+ } catch (ParseException e) {
+ LoggerUtils.warnAndDebugDetails(logger, "incorrect message was received from peer {}", peer, e);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+
+ logger.trace("get message {} from {}", message, socketChannel);
+
+ try {
+ myContext.getExecutor().submit(new Runnable() {
+ @Override
+ public void run() {
+ final Thread currentThread = Thread.currentThread();
+ final String oldName = currentThread.getName();
+ try {
+ currentThread.setName(oldName + " handle message for torrent " + myPeerUID.getTorrentHash() + " peer: " + peer.getHostIdentifier());
+ peer.handleMessage(message);
+ } catch (Throwable e) {
+ LoggerUtils.warnAndDebugDetails(logger, "unhandled exception {} in executor task (handleMessage)", e.toString(), e);
+ } finally {
+ currentThread.setName(oldName);
+ }
+
+ }
+ });
+ } catch (RejectedExecutionException e) {
+ LoggerUtils.warnAndDebugDetails(logger, "task submit is failed. Reason: {}", e.getMessage(), e);
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+ return this;
+ }
+
+ @Override
+ public DataProcessor handleError(ByteChannel socketChannel, Throwable e) throws IOException {
+ return new ShutdownAndRemovePeerProcessor(myPeerUID, myContext).processAndGetNext(socketChannel);
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/MessageListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/MessageListener.java
new file mode 100644
index 0000000..898ca33
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/MessageListener.java
@@ -0,0 +1,32 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.peer;
+
+import com.turn.ttorrent.common.protocol.PeerMessage;
+
+import java.util.EventListener;
+
+
+/**
+ * EventListener interface for objects that want to receive incoming messages
+ * from peers.
+ *
+ * @author mpetazzoni
+ */
+public interface MessageListener extends EventListener {
+
+ void handleMessage(PeerMessage msg);
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/PeerActivityListener.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/PeerActivityListener.java
new file mode 100644
index 0000000..786e889
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/PeerActivityListener.java
@@ -0,0 +1,142 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.peer;
+
+import com.turn.ttorrent.client.Piece;
+
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.EventListener;
+
+
+/**
+ * EventListener interface for objects that want to handle peer activity
+ * events like piece availability, or piece completion events, and more.
+ *
+ * @author mpetazzoni
+ */
+public interface PeerActivityListener extends EventListener {
+
+ /**
+ * Peer choked handler.
+ *
+ * <p>
+ * This handler is fired when a peer choked and now refuses to send data to
+ * us. This means we should not try to request or expect anything from it
+ * until it becomes ready again.
+ * </p>
+ *
+ * @param peer The peer that choked.
+ */
+ void handlePeerChoked(SharingPeer peer);
+
+ /**
+ * Peer ready handler.
+ *
+ * <p>
+ * This handler is fired when a peer notified that it is no longer choked.
+ * This means we can send piece block requests to it and start downloading.
+ * </p>
+ *
+ * @param peer The peer that became ready.
+ */
+ void handlePeerReady(SharingPeer peer);
+
+ /**
+ * Piece availability handler.
+ *
+ * <p>
+ * This handler is fired when an update in piece availability is received
+ * from a peer's HAVE message.
+ * </p>
+ *
+ * @param peer The peer we got the update from.
+ * @param piece The piece that became available from this peer.
+ */
+ void handlePieceAvailability(SharingPeer peer, Piece piece);
+
+ /**
+ * Bit field availability handler.
+ *
+ * <p>
+ * This handler is fired when an update in piece availability is received
+ * from a peer's BITFIELD message.
+ * </p>
+ *
+ * @param peer The peer we got the update from.
+ * @param availablePieces The pieces availability bit field of the peer.
+ */
+ void handleBitfieldAvailability(SharingPeer peer,
+ BitSet availablePieces);
+
+ /**
+ * Piece upload completion handler.
+ *
+ * <p>
+ * This handler is fired when a piece has been uploaded entirely to a peer.
+ * </p>
+ *
+ * @param peer The peer the piece was sent to.
+ * @param piece The piece in question.
+ */
+ void handlePieceSent(SharingPeer peer, Piece piece);
+
+ /**
+ * Piece download completion handler.
+ *
+ * <p>
+ * This handler is fired when a piece has been downloaded entirely and the
+ * piece data has been revalidated.
+ * </p>
+ *
+ * <p>
+ * <b>Note:</b> the piece may <em>not</em> be valid after it has been
+ * downloaded, in which case appropriate action should be taken to
+ * redownload the piece.
+ * </p>
+ *
+ * @param peer The peer we got this piece from.
+ * @param piece The piece in question.
+ */
+ void handlePieceCompleted(SharingPeer peer, Piece piece)
+ throws IOException;
+
+ /**
+ * Peer disconnection handler.
+ *
+ * <p>
+ * This handler is fired when a peer disconnects, or is disconnected due to
+ * protocol violation.
+ * </p>
+ *
+ * @param peer The peer we got this piece from.
+ */
+ void handlePeerDisconnected(SharingPeer peer);
+
+ /**
+ * Handler for IOException during peer operation.
+ *
+ * @param peer The peer whose activity trigger the exception.
+ * @param ioe The IOException object, for reporting.
+ */
+ void handleIOException(SharingPeer peer, IOException ioe);
+
+
+ void handleNewPeerConnected(SharingPeer peer);
+
+ void afterPeerRemoved(SharingPeer peer);
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/Rate.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/Rate.java
new file mode 100644
index 0000000..9db88f3
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/Rate.java
@@ -0,0 +1,128 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.peer;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+
+/**
+ * A data exchange rate representation.
+ *
+ * <p>
+ * This is a utility class to keep track, and compare, of the data exchange
+ * rate (either download or upload) with a peer.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+public class Rate implements Comparable<Rate> {
+
+ public static final Comparator<Rate> RATE_COMPARATOR =
+ new RateComparator();
+
+ private long bytes = 0;
+ private long reset = 0;
+ private long last = 0;
+
+ /**
+ * Add a byte count to the current measurement.
+ *
+ * @param count The number of bytes exchanged since the last reset.
+ */
+ public synchronized void add(long count) {
+ this.bytes += count;
+ if (this.reset == 0) {
+ this.reset = System.currentTimeMillis();
+ }
+ this.last = System.currentTimeMillis();
+ }
+
+ /**
+ * Get the current rate.
+ *
+ * <p>
+ * The exchange rate is the number of bytes exchanged since the last
+ * reset and the last input.
+ * </p>
+ */
+ public synchronized float get() {
+ if (this.last - this.reset == 0) {
+ return 0;
+ }
+
+ return this.bytes / ((this.last - this.reset) / 1000.0f);
+ }
+
+ /**
+ * Reset the measurement.
+ */
+ public synchronized void reset() {
+ this.bytes = 0;
+ this.reset = System.currentTimeMillis();
+ this.last = this.reset;
+ }
+
+ @Override
+ public int compareTo(Rate other) {
+ return RATE_COMPARATOR.compare(this, other);
+ }
+
+ /**
+ * A rate comparator.
+ *
+ * <p>
+ * This class provides a comparator to sort peers by an exchange rate,
+ * comparing two rates and returning an ascending ordering.
+ * </p>
+ *
+ * <p>
+ * <b>Note:</b> we need to make sure here that we don't return 0, which
+ * would provide an ordering that is inconsistent with
+ * <code>equals()</code>'s behavior, and result in unpredictable behavior
+ * for sorted collections using this comparator.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+ private static class RateComparator
+ implements Comparator<Rate>, Serializable {
+
+ private static final long serialVersionUID = 72460233003600L;
+
+ /**
+ * Compare two rates together.
+ *
+ * <p>
+ * This method compares float, but we don't care too much about
+ * rounding errors. It's just to order peers so super-strict rate based
+ * order is not required.
+ * </p>
+ *
+ * @param a
+ * @param b
+ */
+ @Override
+ public int compare(Rate a, Rate b) {
+ if (a.get() > b.get()) {
+ return 1;
+ }
+
+ return -1;
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java
new file mode 100644
index 0000000..1cb3b34
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/SharingPeer.java
@@ -0,0 +1,807 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.peer;
+
+import com.turn.ttorrent.client.PeerInformation;
+import com.turn.ttorrent.client.Piece;
+import com.turn.ttorrent.client.SharedTorrent;
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.TorrentUtils;
+import com.turn.ttorrent.common.protocol.PeerMessage;
+import com.turn.ttorrent.network.ConnectionClosedException;
+import com.turn.ttorrent.network.ConnectionManager;
+import com.turn.ttorrent.network.WriteListener;
+import com.turn.ttorrent.network.WriteTask;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+/**
+ * A peer exchanging on a torrent wi th the BitTorrent client.
+ * 数据交换
+ * <p/>
+ * <p>
+ * A SharingPeer extends the base Peer class with all the data and logic needed
+ * by the BitTorrent client to interact with a peer exchanging on the same
+ * torrent.
+ * </p>
+ * <p/>
+ * <p>
+ * Peers are defined by their peer ID, IP address and port number, just like
+ * base peers. Peers we exchange with also contain four crucial attributes:
+ * </p>
+ * <p/>
+ * <ul>
+ * <li><code>choking</code>, which means we are choking this peer and we're
+ * not willing to send him anything for now;</li>
+ * <li><code>interesting</code>, which means we are interested in a piece
+ * this peer has;</li>
+ * <li><code>choked</code>, if this peer is choking and won't send us
+ * anything right now;</li>
+ * <li><code>interested</code>, if this peer is interested in something we
+ * have.</li>
+ * </ul>
+ * <p/>
+ * <p>
+ * Peers start choked and uninterested.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+public class SharingPeer extends Peer implements MessageListener, PeerInformation {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(SharingPeer.class);
+
+ private final Object availablePiecesLock;
+ private volatile boolean choking;
+ private volatile boolean interesting;
+ private volatile boolean choked;
+ private volatile boolean interested;
+ private final SharedTorrent torrent;
+ private final BitSet availablePieces;
+ private BitSet poorlyAvailablePieces;
+ private final Map<Piece, Integer> myRequestedPieces;
+
+ private volatile boolean downloading;
+
+ private final Rate download;
+ private final Rate upload;
+ private final AtomicInteger downloadedPiecesCount;
+ private final List<PeerActivityListener> listeners;
+
+ private final Object requestsLock;
+
+ private final AtomicBoolean isStopped;
+
+ private final ConnectionManager connectionManager;
+ private final ByteChannel socketChannel;
+
+ private final String clientIdentifier;
+ private final int clientVersion;
+
+ /**
+ * Create a new sharing peer on a given torrent.
+ * @param ip The peer's IP address.
+ * @param port The peer's port.
+ * @param peerId The byte-encoded peer ID.
+ * @param torrent The torrent this peer exchanges with us on.
+ * @param clientIdentifier
+ * @param clientVersion
+ */
+ public SharingPeer(String ip,
+ int port,
+ ByteBuffer peerId,
+ SharedTorrent torrent,
+ ConnectionManager connectionManager,
+ PeerActivityListener client,
+ ByteChannel channel,
+ String clientIdentifier,
+ int clientVersion) {
+ super(ip, port, peerId);
+
+ this.torrent = torrent;
+ this.clientIdentifier = clientIdentifier;
+ this.clientVersion = clientVersion;
+ this.listeners = Arrays.asList(client, torrent);
+ this.availablePieces = new BitSet(torrent.getPieceCount());
+ this.poorlyAvailablePieces = new BitSet(torrent.getPieceCount());
+
+ this.requestsLock = new Object();
+ this.socketChannel = channel;
+ this.isStopped = new AtomicBoolean(false);
+ this.availablePiecesLock = new Object();
+ this.myRequestedPieces = new HashMap<Piece, Integer>();
+ this.connectionManager = connectionManager;
+ this.download = new Rate();
+ this.upload = new Rate();
+ this.setTorrentHash(torrent.getHexInfoHash());
+ this.choking = true;
+ this.interesting = false;
+ this.choked = true;
+ this.interested = false;
+ this.downloading = false;
+ this.downloadedPiecesCount = new AtomicInteger();
+ }
+
+ public Rate getDLRate() {
+ return this.download;
+ }
+
+ public Rate getULRate() {
+ return this.upload;
+ }
+
+ /**
+ * Choke this peer.
+ * <p/>
+ * <p>
+ * We don't want to upload to this peer anymore, so mark that we're choking
+ * from this peer.
+ * </p>
+ */
+ public void choke() {
+ if (!this.choking) {
+ logger.trace("Choking {}", this);
+ this.send(PeerMessage.ChokeMessage.craft());
+ this.choking = true;
+ }
+ }
+
+ @Override
+ public byte[] getId() {
+ return getPeerIdArray();
+ }
+
+ @Override
+ public String getClientIdentifier() {
+ return clientIdentifier;
+ }
+
+ @Override
+ public int getClientVersion() {
+ return clientVersion;
+ }
+
+ public void onConnectionEstablished() {
+ firePeerConnected();
+ BitSet pieces = this.torrent.getCompletedPieces();
+ if (pieces.cardinality() > 0) {
+ this.send(PeerMessage.BitfieldMessage.craft(pieces));
+ }
+ resetRates();
+ }
+
+ /**
+ * Unchoke this peer.
+ * <p/>
+ * <p>
+ * Mark that we are no longer choking from this peer and can resume
+ * uploading to it.
+ * </p>
+ */
+ public void unchoke() {
+ logger.trace("Unchoking {}", this);
+ this.send(PeerMessage.UnchokeMessage.craft());
+ this.choking = false;
+ }
+
+ public boolean isChoking() {
+ return this.choking;
+ }
+
+ public void interesting() {
+ if (!this.interesting) {
+ logger.trace("Telling {} we're interested.", this);
+ this.send(PeerMessage.InterestedMessage.craft());
+ this.interesting = true;
+ }
+ }
+
+ public void notInteresting() {
+ if (this.interesting) {
+ logger.trace("Telling {} we're no longer interested.", this);
+ this.send(PeerMessage.NotInterestedMessage.craft());
+ this.interesting = false;
+ }
+ }
+
+ public boolean isInteresting() {
+ return this.interesting;
+ }
+
+ public boolean isChoked() {
+ return this.choked;
+ }
+
+ public boolean isInterested() {
+ return this.interested;
+ }
+
+ public BitSet getPoorlyAvailablePieces() {
+ return poorlyAvailablePieces;
+ }
+
+ /**
+ * Returns the available pieces from this peer.
+ *
+ * @return A clone of the available pieces bit field from this peer.
+ */
+ public BitSet getAvailablePieces() {
+ synchronized (this.availablePiecesLock) {
+ return (BitSet) this.availablePieces.clone();
+ }
+ }
+
+ /**
+ * Returns the currently requested piece, if any.
+ */
+ public Set<Piece> getRequestedPieces() {
+ synchronized (requestsLock) {
+ return myRequestedPieces.keySet();
+ }
+ }
+
+ public void resetRates() {
+ this.download.reset();
+ this.upload.reset();
+ }
+
+ public void pieceDownloaded() {
+ downloadedPiecesCount.incrementAndGet();
+ }
+
+ public int getDownloadedPiecesCount() {
+ return downloadedPiecesCount.get();
+ }
+
+ /**
+ * Tells whether this peer as an active connection through a peer exchange.
+ */
+ public boolean isConnected() {
+ return this.socketChannel.isOpen();
+ }
+
+ /**
+ * Unbind and disconnect this peer.
+ * <p/>
+ * <p>
+ * This terminates the eventually present and/or connected peer exchange
+ * with the peer and fires the peer disconnected event to any peer activity
+ * listeners registered on this peer.
+ * </p>
+ *
+ * @param force Force unbind without sending cancel requests.
+ */
+ public void unbind(boolean force) {
+ if (isStopped.getAndSet(true))
+ return;
+
+ try {
+ connectionManager.closeChannel(socketChannel);
+ } catch (IOException e) {
+ LoggerUtils.errorAndDebugDetails(logger, "cannot close socket channel. Peer {}", this, e);
+ }
+
+ this.firePeerDisconnected();
+
+ synchronized (requestsLock) {
+ this.downloading = myRequestedPieces.size() > 0;
+ myRequestedPieces.clear();
+ }
+
+ this.afterPeerDisconnected();
+ }
+
+ /**
+ * Send a message to the peer.
+ * <p/>
+ * <p>
+ * Delivery of the message can only happen if the peer is connected.
+ * </p>
+ *
+ * @param message The message to send to the remote peer through our peer
+ * exchange.
+ */
+ public void send(PeerMessage message) throws IllegalStateException {
+ logger.trace("Sending msg {} to {}", message.getType(), this);
+ if (this.isConnected()) {
+ ByteBuffer data = message.getData();
+ data.rewind();
+ connectionManager.offerWrite(new WriteTask(socketChannel, data, new WriteListener() {
+ @Override
+ public void onWriteFailed(String message, Throwable e) {
+ if (e == null) {
+ logger.info(message);
+ } else if (e instanceof ConnectionClosedException){
+ logger.debug(message, e);
+ unbind(true);
+ } else {
+ LoggerUtils.warnAndDebugDetails(logger, message, e);
+ }
+
+ }
+
+ @Override
+ public void onWriteDone() {
+ }
+ }), 1, TimeUnit.SECONDS);
+ } else {
+ logger.trace("Attempting to send a message to non-connected peer {}!", this);
+ unbind(true);
+ }
+ }
+
+ /**
+ * Download the given piece from this peer.
+ * <p/>
+ * <p>
+ * Starts a block request queue and pre-fill it with MAX_PIPELINED_REQUESTS
+ * block requests.
+ * </p>
+ * <p/>
+ * <p>
+ * Further requests will be added, one by one, every time a block is
+ * returned.
+ * </p>
+ *
+ * @param piece The piece chosen to be downloaded from this peer.
+ */
+ public void downloadPiece(final Piece piece)
+ throws IllegalStateException {
+ List<PeerMessage.RequestMessage> toSend = new ArrayList<PeerMessage.RequestMessage>();
+ synchronized (this.requestsLock) {
+ if (myRequestedPieces.containsKey(piece)) {
+ //already requested
+ return;
+ }
+ int requestedBlocksCount = 0;
+ int lastRequestedOffset = 0;
+ while (lastRequestedOffset < piece.size()) {
+ PeerMessage.RequestMessage request = PeerMessage.RequestMessage
+ .craft(piece.getIndex(), lastRequestedOffset,
+ Math.min((int) (piece.size() - lastRequestedOffset),
+ PeerMessage.RequestMessage.DEFAULT_REQUEST_SIZE));
+ toSend.add(request);
+ requestedBlocksCount++;
+ lastRequestedOffset = request.getLength() + lastRequestedOffset;
+ }
+ myRequestedPieces.put(piece, requestedBlocksCount);
+ this.downloading = myRequestedPieces.size() > 0;
+ }
+ for (PeerMessage.RequestMessage requestMessage : toSend) {
+ this.send(requestMessage);
+ }
+ }
+
+ public boolean isDownloading() {
+ return this.downloading;
+ }
+
+ /**
+ * Remove the REQUEST message from the request pipeline matching this
+ * PIECE message.
+ * <p/>
+ * <p>
+ * Upon reception of a piece block with a PIECE message, remove the
+ * corresponding request from the pipeline to make room for the next block
+ * requests.
+ * </p>
+ *
+ * @param piece The piece of PIECE message received.
+ */
+ private void removeBlockRequest(final Piece piece) {
+ synchronized (this.requestsLock) {
+ Integer requestedBlocksCount = myRequestedPieces.get(piece);
+ if (requestedBlocksCount == null) {
+ return;
+ }
+ if (requestedBlocksCount <= 1) {
+ //it's last block
+ myRequestedPieces.remove(piece);
+ } else {
+ myRequestedPieces.put(piece, requestedBlocksCount - 1);
+ }
+ this.downloading = myRequestedPieces.size() > 0;
+ }
+ }
+
+ /**
+ * Cancel all pending requests.
+ * <p/>
+ * <p>
+ * This queues CANCEL messages for all the requests in the queue, and
+ * returns the list of requests that were in the queue.
+ * </p>
+ * <p/>
+ * <p>
+ * If no request queue existed, or if it was empty, an empty set of request
+ * messages is returned.
+ * </p>
+ */
+ public void cancelPendingRequests() {
+ cancelPendingRequests(null);
+ }
+
+ public void cancelPendingRequests(@Nullable final Piece piece) {
+ synchronized (this.requestsLock) {
+ if (piece != null) {
+ myRequestedPieces.remove(piece);
+ } else {
+ myRequestedPieces.clear();
+ }
+ this.downloading = myRequestedPieces.size() > 0;
+ }
+ }
+
+ public int getRemainingRequestedPieces(final Piece piece) {
+ synchronized (this.requestsLock) {
+ Integer requestedBlocksCount = myRequestedPieces.get(piece);
+ if (requestedBlocksCount == null) return 0;
+ return requestedBlocksCount;
+ }
+ }
+
+ /**
+ * Handle an incoming message from this peer.
+ *
+ * @param msg The incoming, parsed message.
+ */
+ @Override
+ public void handleMessage(PeerMessage msg) {
+// logger.trace("Received msg {} from {}", msg.getType(), this);
+ if (isStopped.get())
+ return;
+ if (!torrent.isInitialized()) {
+ torrent.initIfNecessary(this);
+ }
+ switch (msg.getType()) {
+ case KEEP_ALIVE:
+ // Nothing to do, we're keeping the connection open anyways.
+ break;
+ case CHOKE:
+ this.choked = true;
+ this.firePeerChoked();
+ this.cancelPendingRequests();
+ break;
+ case UNCHOKE:
+ this.choked = false;
+ logger.trace("Peer {} is now accepting requests.", this);
+ this.firePeerReady();
+ break;
+ case INTERESTED:
+ this.interested = true;
+ if (this.choking) {
+ unchoke();
+ }
+ break;
+ case NOT_INTERESTED:
+ this.interested = false;
+ if (!interesting) {
+ unbind(true);
+ }
+ break;
+ case HAVE:
+ // Record this peer has the given piece
+ PeerMessage.HaveMessage have = (PeerMessage.HaveMessage) msg;
+ Piece havePiece = this.torrent.getPiece(have.getPieceIndex());
+
+ synchronized (this.availablePiecesLock) {
+ this.availablePieces.set(havePiece.getIndex());
+ logger.trace("Peer {} now has {} [{}/{}].",
+ new Object[]{
+ this,
+ havePiece,
+ this.availablePieces.cardinality(),
+ this.torrent.getPieceCount()
+ });
+ }
+
+ this.firePieceAvailabity(havePiece);
+ break;
+ case BITFIELD:
+ // Augment the hasPiece bit field from this BITFIELD message
+ PeerMessage.BitfieldMessage bitfield =
+ (PeerMessage.BitfieldMessage) msg;
+
+ synchronized (this.availablePiecesLock) {
+ this.availablePieces.or(bitfield.getBitfield());
+ logger.trace("Recorded bitfield from {} with {} " +
+ "pieces(s) [{}/{}].",
+ new Object[]{
+ this,
+ bitfield.getBitfield().cardinality(),
+ this.availablePieces.cardinality(),
+ this.torrent.getPieceCount()
+ });
+ }
+
+ this.fireBitfieldAvailabity();
+ break;
+ case REQUEST:
+ PeerMessage.RequestMessage request =
+ (PeerMessage.RequestMessage) msg;
+ logger.trace("Got request message for {} ({} {}@{}) from {}", new Object[]{
+ Arrays.toString(TorrentUtils.getTorrentFileNames(torrent).toArray()),
+ request.getPiece(),
+ request.getLength(),
+ request.getOffset(),
+ this
+ });
+ Piece rp = this.torrent.getPiece(request.getPiece());
+
+ // If we are choking from this peer and it still sends us
+ // requests, it is a violation of the BitTorrent protocol.
+ // Similarly, if the peer requests a piece we don't have, it
+ // is a violation of the BitTorrent protocol. In these
+ // situation, terminate the connection.
+ if (!rp.isValid()) {
+ logger.warn("Peer {} violated protocol, terminating exchange: " + this.isChoking() + " " + rp.isValid(), this);
+ this.unbind(true);
+ break;
+ }
+
+ if (request.getLength() >
+ PeerMessage.RequestMessage.MAX_REQUEST_SIZE) {
+ logger.warn("Peer {} requested a block too big, terminating exchange.", this);
+ this.unbind(true);
+ break;
+ }
+
+ // At this point we agree to send the requested piece block to
+ // the remote peer, so let's queue a message with that block
+ try {
+
+ ByteBuffer bufferForMessage = PeerMessage.PieceMessage.createBufferWithHeaderForMessage(
+ request.getPiece(), request.getOffset(), request.getLength());
+
+ rp.read(request.getOffset(), request.getLength(), bufferForMessage);
+
+ this.send(PeerMessage.PieceMessage.craft(request.getPiece(),
+ request.getOffset(), bufferForMessage));
+ this.upload.add(request.getLength());
+
+ if (request.getOffset() + request.getLength() == rp.size()) {
+ this.firePieceSent(rp);
+ }
+ } catch (IOException ioe) {
+ logger.debug("error", ioe);
+ this.fireIOException(new IOException(
+ "Error while sending piece block request!", ioe));
+ }
+
+ break;
+ case PIECE:
+ // Record the incoming piece block.
+
+ // Should we keep track of the requested pieces and act when we
+ // get a piece we didn't ask for, or should we just stay
+ // greedy?
+ PeerMessage.PieceMessage piece = (PeerMessage.PieceMessage) msg;
+ Piece p = this.torrent.getPiece(piece.getPiece());
+
+ logger.trace("Got piece ({} {}@{}) from {}", new Object[]{
+ p.getIndex(),
+ p.size(),
+ piece.getOffset(),
+ this
+ });
+
+ this.download.add(piece.getBlock().capacity());
+
+ try {
+ boolean isPieceDownloaded = false;
+ synchronized (p) {
+ // Remove the corresponding request from the request queue to
+ // make room for next block requests.
+ this.removeBlockRequest(p);
+ if (p.isValid()) {
+ this.cancelPendingRequests(p);
+ this.firePeerReady();
+ logger.trace("Discarding block for already completed " + p);
+ break;
+ }
+ //TODO add proper catch for IOException
+ p.record(piece.getBlock(), piece.getOffset());
+
+ // If the block offset equals the piece size and the block
+ // length is 0, it means the piece has been entirely
+ // downloaded. In this case, we have nothing to save, but
+ // we should validate the piece.
+ if (getRemainingRequestedPieces(p) == 0) {
+ this.firePieceCompleted(p);
+ isPieceDownloaded = true;
+ }
+ }
+ if (isPieceDownloaded) {
+ firePeerReady();
+ }
+ } catch (IOException ioe) {
+ logger.error(ioe.getMessage(), ioe);
+ this.fireIOException(new IOException(
+ "Error while storing received piece block!", ioe));
+ break;
+ }
+ break;
+ case CANCEL:
+ // No need to support
+ break;
+ }
+ }
+
+ /**
+ * Fire the peer choked event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer that chocked.
+ * </p>
+ */
+ private void firePeerChoked() {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handlePeerChoked(this);
+ }
+ }
+
+ /**
+ * Fire the peer ready event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer that unchoked or became ready.
+ * </p>
+ */
+ private void firePeerReady() {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handlePeerReady(this);
+ }
+ }
+
+ /**
+ * Fire the piece availability event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer (this), and the piece that became available.
+ * </p>
+ */
+ private void firePieceAvailabity(Piece piece) {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handlePieceAvailability(this, piece);
+ }
+ }
+
+ /**
+ * Fire the bit field availability event to all registered listeners.
+ * <p/>
+ * The event contains the peer (this), and the bit field of available pieces
+ * from this peer.
+ */
+ private void fireBitfieldAvailabity() {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handleBitfieldAvailability(this,
+ this.getAvailablePieces());
+ }
+ }
+
+ /**
+ * Fire the piece sent event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer (this), and the piece number that was
+ * sent to the peer.
+ * </p>
+ *
+ * @param piece The completed piece.
+ */
+ private void firePieceSent(Piece piece) {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handlePieceSent(this, piece);
+ }
+ }
+
+ /**
+ * Fire the piece completion event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer (this), and the piece number that was
+ * completed.
+ * </p>
+ *
+ * @param piece The completed piece.
+ */
+ private void firePieceCompleted(Piece piece) throws IOException {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handlePieceCompleted(this, piece);
+ }
+ }
+
+ /**
+ * Fire the peer disconnected event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer that disconnected (this).
+ * </p>
+ */
+ private void firePeerDisconnected() {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handlePeerDisconnected(this);
+ }
+ }
+
+ private void afterPeerDisconnected() {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.afterPeerRemoved(this);
+ }
+ }
+
+ private void firePeerConnected() {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handleNewPeerConnected(this);
+ }
+ }
+
+ /**
+ * Fire the IOException event to all registered listeners.
+ * <p/>
+ * <p>
+ * The event contains the peer that triggered the problem, and the
+ * exception object.
+ * </p>
+ */
+ private void fireIOException(IOException ioe) {
+ for (PeerActivityListener listener : this.listeners) {
+ listener.handleIOException(this, ioe);
+ }
+ }
+
+ public SharedTorrent getTorrent() {
+ return this.torrent;
+ }
+
+ public int getDownloadingPiecesCount() {
+ synchronized (requestsLock) {
+ return myRequestedPieces.size();
+ }
+ }
+
+ /**
+ * Download rate comparator.
+ * <p/>
+ * <p>
+ * Compares sharing peers based on their current download rate.
+ * </p>
+ *
+ * @author mpetazzoni
+ * @see Rate.RateComparator
+ */
+ public static class DLRateComparator
+ implements Comparator<SharingPeer>, Serializable {
+
+ private static final long serialVersionUID = 96307229964730L;
+
+ public int compare(SharingPeer a, SharingPeer b) {
+ return Rate.RATE_COMPARATOR.compare(a.getDLRate(), b.getDLRate());
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/SharingPeerInfo.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/SharingPeerInfo.java
new file mode 100644
index 0000000..05d29d7
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/peer/SharingPeerInfo.java
@@ -0,0 +1,22 @@
+package com.turn.ttorrent.client.peer;
+
+import com.turn.ttorrent.common.TorrentHash;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author Sergey.Pak
+ * Date: 8/9/13
+ * Time: 6:40 PM
+ */
+public interface SharingPeerInfo {
+
+ String getIp();
+
+ int getPort();
+
+ TorrentHash getTorrentHash();
+
+ ByteBuffer getPeerId();
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/EmptyPieceStorageFactory.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/EmptyPieceStorageFactory.java
new file mode 100644
index 0000000..271a32c
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/EmptyPieceStorageFactory.java
@@ -0,0 +1,23 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.common.TorrentMetadata;
+
+import java.util.BitSet;
+
+public class EmptyPieceStorageFactory implements PieceStorageFactory {
+
+ public static final EmptyPieceStorageFactory INSTANCE = new EmptyPieceStorageFactory();
+
+ private EmptyPieceStorageFactory() {
+ }
+
+ @Override
+ public PieceStorage createStorage(TorrentMetadata metadata, TorrentByteStorage byteStorage) {
+ return new PieceStorageImpl(
+ byteStorage,
+ new BitSet(metadata.getPiecesCount()),
+ metadata.getPiecesCount(),
+ metadata.getPieceLength()
+ );
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FairPieceStorageFactory.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FairPieceStorageFactory.java
new file mode 100644
index 0000000..3c700be
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FairPieceStorageFactory.java
@@ -0,0 +1,66 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.Constants;
+import com.turn.ttorrent.common.TorrentFile;
+import com.turn.ttorrent.common.TorrentMetadata;
+import com.turn.ttorrent.common.TorrentUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.BitSet;
+
+/**
+ * This implementation will read all pieces from storage and compare hashes of pieces with really hashed
+ * from metadata
+ */
+public class FairPieceStorageFactory implements PieceStorageFactory {
+
+ public final static FairPieceStorageFactory INSTANCE = new FairPieceStorageFactory();
+
+ private FairPieceStorageFactory() {
+ }
+
+ @Override
+ public PieceStorage createStorage(TorrentMetadata metadata, TorrentByteStorage byteStorage) throws IOException {
+ long totalSize = 0;
+ for (TorrentFile file : metadata.getFiles()) {
+ totalSize += file.size;
+ }
+
+ byteStorage.open(false);
+ BitSet availablePieces = new BitSet(metadata.getPiecesCount());
+ try {
+ if (!byteStorage.isBlank()) {
+ int pieceLength = metadata.getPieceLength();
+ for (int i = 0; i < metadata.getPiecesCount(); i++) {
+ long position = (long) i * pieceLength;
+ int len;
+ if (totalSize - position > pieceLength) {
+ len = pieceLength;
+ } else {
+ len = (int) (totalSize - position);
+ }
+ if (!byteStorage.isBlank(position, len)) {
+ ByteBuffer buffer = ByteBuffer.allocate(len);
+ byteStorage.read(buffer, position);
+ byte[] expectedHash = Arrays.copyOfRange(metadata.getPiecesHashes(), i * Constants.PIECE_HASH_SIZE, (i + 1) * Constants.PIECE_HASH_SIZE);
+ byte[] actualHash = TorrentUtils.calculateSha1Hash(buffer.array());
+ if (Arrays.equals(expectedHash, actualHash)) {
+ availablePieces.set(i);
+ }
+ }
+ }
+ }
+ } finally {
+ byteStorage.close();
+ }
+
+ return new PieceStorageImpl(
+ byteStorage,
+ availablePieces,
+ metadata.getPiecesCount(),
+ metadata.getPieceLength()
+ );
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FileCollectionStorage.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FileCollectionStorage.java
new file mode 100644
index 0000000..083e3e4
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FileCollectionStorage.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.common.TorrentFile;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.common.TorrentMetadata;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ * Multi-file torrent byte storage.
+ *
+ * <p>
+ * This implementation of the torrent byte storage provides support for
+ * multi-file torrents and completely abstracts the read/write operations from
+ * the notion of different files. The byte storage is represented as one
+ * continuous byte storage, directly accessible by offset regardless of which
+ * file this offset lands.
+ * </p>
+ *
+ * @author mpetazzoni
+ * @author dgiffin
+ */
+public class FileCollectionStorage implements TorrentByteStorage {
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(FileCollectionStorage.class);
+
+ private final List<FileStorage> files;
+ private final long size;
+ private volatile boolean myIsOpen;
+
+ /**
+ * Initialize a new multi-file torrent byte storage.
+ *
+ * @param files The list of individual {@link FileStorage}
+ * objects making up the torrent.
+ * @param size The total size of the torrent data, in bytes.
+ */
+ public FileCollectionStorage(List<FileStorage> files,
+ long size) {
+ this.files = files;
+ this.size = size;
+
+ logger.debug("Initialized torrent byte storage on {} file(s) " +
+ "({} total byte(s)).", files.size(), size);
+ }
+
+ public static FileCollectionStorage create(TorrentMetadata metadata, File parent) throws IOException {
+ if (!parent.isDirectory()) {
+ throw new IllegalArgumentException("Invalid parent directory!");
+ }
+ List<FileStorage> files = new LinkedList<FileStorage>();
+ long offset = 0L;
+ long totalSize = 0;
+ for (TorrentFile file : metadata.getFiles()) {
+ File actual = new File(parent, file.getRelativePathAsString());
+
+ if (!actual.getCanonicalPath().startsWith(parent.getCanonicalPath())) {
+ throw new SecurityException("Torrent file path attempted " +
+ "to break directory jail!");
+ }
+
+ if (!actual.getParentFile().exists() && !actual.getParentFile().mkdirs()) {
+ throw new IOException("Unable to create directories " + actual.getParent() + " for storing torrent file " + actual.getName());
+ }
+ files.add(new FileStorage(actual, offset, file.size));
+ offset += file.size;
+ totalSize += file.size;
+ }
+ return new FileCollectionStorage(files, totalSize);
+ }
+
+ public synchronized void open(final boolean seeder) throws IOException {
+ for (FileStorage file : files) {
+ if (!file.isOpen())
+ file.open(seeder);
+ }
+ myIsOpen = true;
+ }
+
+ @Override
+ public int read(ByteBuffer buffer, long position) throws IOException {
+ int requested = buffer.remaining();
+ int bytes = 0;
+
+ for (FileOffset fo : this.select(position, requested)) {
+ // TODO: remove cast to int when large ByteBuffer support is
+ // implemented in Java.
+ buffer.limit((int) (buffer.position() + fo.length));
+ bytes += fo.file.read(buffer, fo.offset);
+ }
+
+ if (bytes < requested) {
+ throw new IOException("Storage collection read underrun!");
+ }
+
+ return bytes;
+ }
+
+ @Override
+ public int write(ByteBuffer buffer, long position) throws IOException {
+ int requested = buffer.remaining();
+
+ int bytes = 0;
+
+ for (FileOffset fo : this.select(position, requested)) {
+ buffer.limit(bytes + (int) fo.length);
+ bytes += fo.file.write(buffer, fo.offset);
+ }
+
+ if (bytes < requested) {
+ throw new IOException("Storage collection write underrun!");
+ }
+
+ return bytes;
+ }
+
+ @Override
+ public boolean isBlank(long position, long size) {
+ for (FileOffset fo : this.select(position, size)) {
+ if (!fo.file.isBlank(fo.offset, fo.length)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public boolean isBlank() {
+ for (FileStorage file : this.files) {
+ if (!file.isBlank()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ for (FileStorage file : this.files) {
+ file.close();
+ }
+ myIsOpen = false;
+ }
+
+ @Override
+ public synchronized void finish() throws IOException {
+ for (FileStorage file : this.files) {
+ file.finish();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ for (FileStorage file : this.files) {
+ if (!file.isFinished()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public void delete() throws IOException {
+ for (FileStorage file : files) {
+ file.delete();
+ }
+ }
+
+ /**
+ * File operation details holder.
+ *
+ * <p>
+ * This simple inner class holds the details for a read or write operation
+ * on one of the underlying {@link FileStorage}s.
+ * </p>
+ *
+ * @author dgiffin
+ * @author mpetazzoni
+ */
+ private static class FileOffset {
+
+ public final FileStorage file;
+ public final long offset;
+ public final long length;
+
+ FileOffset(FileStorage file, long offset, long length) {
+ this.file = file;
+ this.offset = offset;
+ this.length = length;
+ }
+ }
+
+ /**
+ * Select the group of files impacted by an operation.
+ *
+ * <p>
+ * This function selects which files are impacted by a read or write
+ * operation, with their respective relative offset and chunk length.
+ * </p>
+ *
+ * @param offset The offset of the operation, in bytes, relative to the
+ * complete byte storage.
+ * @param length The number of bytes to read or write.
+ * @return A list of {@link FileOffset} objects representing the {@link
+ * FileStorage}s impacted by the operation, bundled with their
+ * respective relative offset and number of bytes to read or write.
+ * @throws IllegalArgumentException If the offset and length go over the
+ * byte storage size.
+ * @throws IllegalStateException If the files registered with this byte
+ * storage can't accommodate the request (should not happen, really).
+ */
+ private List<FileOffset> select(long offset, long length) {
+ if (offset + length > this.size) {
+ throw new IllegalArgumentException("Buffer overrun (" +
+ offset + " + " + length + " > " + this.size + ") !");
+ }
+
+ List<FileOffset> selected = new LinkedList<FileOffset>();
+ long bytes = 0;
+
+ for (FileStorage file : this.files) {
+ if (file.offset() >= offset + length) {
+ break;
+ }
+
+ if (file.offset() + file.size() < offset) {
+ continue;
+ }
+
+ long position = offset - file.offset();
+ position = position > 0 ? position : 0;
+ long size = Math.min(
+ file.size() - position,
+ length - bytes);
+ selected.add(new FileOffset(file, position, size));
+ bytes += size;
+ }
+
+ if (selected.size() == 0 || bytes < length) {
+ throw new IllegalStateException("Buffer underrun (only got " +
+ bytes + " out of " + length + " byte(s) requested)!");
+ }
+
+ return selected;
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FileStorage.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FileStorage.java
new file mode 100644
index 0000000..5f0ddde
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FileStorage.java
@@ -0,0 +1,260 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+
+/**
+ * Single-file torrent byte data storage.
+ *
+ * <p>
+ * This implementation of TorrentByteStorageFile provides a torrent byte data
+ * storage relying on a single underlying file and uses a RandomAccessFile
+ * FileChannel to expose thread-safe read/write methods.
+ * </p>
+ *
+ * @author mpetazzoni
+ */
+public class FileStorage implements TorrentByteStorage {
+
+ private static final String PARTIAL_FILE_NAME_SUFFIX = ".part";
+
+ private static final Logger logger =
+ TorrentLoggerFactory.getLogger(FileStorage.class);
+
+ private final File target;
+ private File partial;
+ private final long offset;
+ private final long size;
+
+ private RandomAccessFile raf;
+ private FileChannel channel;
+ private File current;
+ private boolean myIsOpen = false;
+ private boolean isBlank;
+
+ private final ReadWriteLock myLock = new ReentrantReadWriteLock();
+
+ public FileStorage(File file, long offset, long size) {
+ this.target = file;
+ this.offset = offset;
+ this.size = size;
+
+ }
+
+ public void open(final boolean seeder) throws IOException {
+ try {
+ myLock.writeLock().lock();
+ if (seeder) {
+ if (!target.exists()) {
+ throw new IOException("Target file " + target.getAbsolutePath() + " doesn't exist.");
+ }
+ this.current = this.target;
+ this.raf = new RandomAccessFile(this.current, "r");
+ } else {
+ this.partial = new File(this.target.getAbsolutePath() + PARTIAL_FILE_NAME_SUFFIX);
+
+ if (this.partial.exists()) {
+ logger.debug("Partial download found at {}. Continuing...",
+ this.partial.getAbsolutePath());
+ this.current = this.partial;
+ this.isBlank = false;
+ } else if (!this.target.exists()) {
+ logger.debug("Downloading new file to {}...",
+ this.partial.getAbsolutePath());
+ this.current = this.partial;
+ this.isBlank = true;
+ } else {
+ logger.debug("Using existing file {}.",
+ this.target.getAbsolutePath());
+ this.current = this.target;
+ this.isBlank = false;
+ }
+ this.raf = new RandomAccessFile(this.current, "rw");
+ this.raf.setLength(this.size);
+ }
+
+ // Set the file length to the appropriate size, eventually truncating
+ // or extending the file if it already exists with a different size.
+ myIsOpen = true;
+ this.channel = raf.getChannel();
+
+ logger.debug("Opened byte storage file at {} ({}+{} byte(s)).",
+ new Object[]{
+ this.current.getAbsolutePath(),
+ this.offset,
+ this.size,
+ });
+ } finally {
+ myLock.writeLock().unlock();
+ }
+ }
+
+ protected long offset() {
+ return this.offset;
+ }
+
+ public long size() {
+ return this.size;
+ }
+
+ @Override
+ public int read(ByteBuffer buffer, long position) throws IOException {
+ try {
+ myLock.readLock().lock();
+ int requested = buffer.remaining();
+
+ if (position + requested > this.size) {
+ throw new IllegalArgumentException("Invalid storage read request!");
+ }
+
+ int bytes = this.channel.read(buffer, position);
+ if (bytes < requested) {
+ throw new IOException("Storage underrun!");
+ }
+
+ return bytes;
+ } finally {
+ myLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public int write(ByteBuffer buffer, long position) throws IOException {
+ try {
+ myLock.writeLock().lock();
+ int requested = buffer.remaining();
+ this.isBlank = false;
+
+ if (position + requested > this.size) {
+ throw new IllegalArgumentException("Invalid storage write request!");
+ }
+
+ return this.channel.write(buffer, position);
+ } finally {
+ myLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ myLock.writeLock().lock();
+ if (!myIsOpen) return;
+ logger.debug("Closing file channel to {}. Channel open: {}", current.getName(), channel.isOpen());
+ if (this.channel.isOpen()) {
+ try {
+ this.channel.force(true);
+ } catch (ClosedByInterruptException ignored) {
+ }
+ }
+ this.raf.close();
+ myIsOpen = false;
+ } finally {
+ myLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Move the partial file to its final location.
+ */
+ @Override
+ public void finish() throws IOException {
+ try {
+ myLock.writeLock().lock();
+ logger.debug("Closing file channel to " + this.current.getName() +
+ " (download complete).");
+ if (this.channel.isOpen()) {
+ this.channel.force(true);
+ }
+
+ // Nothing more to do if we're already on the target file.
+ if (this.isFinished()) {
+ return;
+ }
+
+ try {
+ FileUtils.deleteQuietly(this.target);
+ this.raf.close();
+ FileUtils.moveFile(this.current, this.target);
+ } catch (Exception ex) {
+ logger.error("An error occurred while moving file to its final location", ex);
+ if (this.target.exists()) {
+ throw new IOException("Was unable to delete existing file " + target.getAbsolutePath(), ex);
+ }
+ FileUtils.copyFile(this.current, this.target);
+ }
+
+ this.current = this.target;
+
+ FileUtils.deleteQuietly(this.partial);
+ myIsOpen = false;
+ logger.debug("Moved torrent data from {} to {}.",
+ this.partial.getName(),
+ this.target.getName());
+ } finally {
+ myLock.writeLock().unlock();
+ }
+ }
+
+ public boolean isOpen() {
+ try {
+ myLock.readLock().lock();
+ return myIsOpen;
+ } finally {
+ myLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isBlank(long position, long size) {
+ return isBlank();
+ }
+
+ @Override
+ public boolean isBlank() {
+ try {
+ myLock.readLock().lock();
+ return isBlank;
+ } finally {
+ myLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ return this.current.equals(this.target);
+ }
+
+ @Override
+ public void delete() throws IOException {
+ close();
+ final File local = this.current;
+ if (local != null) local.delete();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FullyPieceStorageFactory.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FullyPieceStorageFactory.java
new file mode 100644
index 0000000..02763fb
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/FullyPieceStorageFactory.java
@@ -0,0 +1,26 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.common.TorrentMetadata;
+
+import java.util.BitSet;
+
+public class FullyPieceStorageFactory implements PieceStorageFactory {
+
+ public final static FullyPieceStorageFactory INSTANCE = new FullyPieceStorageFactory();
+
+ private FullyPieceStorageFactory() {
+ }
+
+ @Override
+ public PieceStorage createStorage(TorrentMetadata metadata, TorrentByteStorage byteStorage) {
+
+ BitSet availablePieces = new BitSet(metadata.getPiecesCount());
+ availablePieces.set(0, metadata.getPiecesCount());
+ return new PieceStorageImpl(
+ byteStorage,
+ availablePieces,
+ metadata.getPiecesCount(),
+ metadata.getPieceLength()
+ );
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorage.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorage.java
new file mode 100644
index 0000000..66318fd
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorage.java
@@ -0,0 +1,19 @@
+package com.turn.ttorrent.client.storage;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.BitSet;
+
+public interface PieceStorage extends Closeable {
+
+ void savePiece(int pieceIndex, byte[] pieceData) throws IOException;
+
+ byte[] readPiecePart(int pieceIndex, int offset, int length) throws IOException;
+
+ BitSet getAvailablePieces();
+
+ boolean isFinished();
+
+ void closeFully() throws IOException;
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorageFactory.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorageFactory.java
new file mode 100644
index 0000000..d00d44e
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorageFactory.java
@@ -0,0 +1,18 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.common.TorrentMetadata;
+
+import java.io.IOException;
+
+public interface PieceStorageFactory {
+
+ /**
+ * create new {@link PieceStorage} for specified torrent with specified byte storage
+ *
+ * @param metadata specified metadata
+ * @param byteStorage specified byte storage where will be stored pieces
+ * @return new {@link PieceStorage}
+ */
+ PieceStorage createStorage(TorrentMetadata metadata, TorrentByteStorage byteStorage) throws IOException;
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorageImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorageImpl.java
new file mode 100644
index 0000000..189d749
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/PieceStorageImpl.java
@@ -0,0 +1,173 @@
+package com.turn.ttorrent.client.storage;
+
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class PieceStorageImpl implements PieceStorage {
+
+ private final TorrentByteStorage fileCollectionStorage;
+ private final ReadWriteLock readWriteLock;
+
+ private final Object openStorageLock = new Object();
+
+ @Nullable
+ private volatile BitSet availablePieces;
+ private final int piecesCount;
+ private final int pieceSize;
+ private volatile boolean isOpen;
+ private volatile boolean closedFully = false;
+
+ public PieceStorageImpl(TorrentByteStorage fileCollectionStorage,
+ BitSet availablePieces,
+ int piecesCount,
+ int pieceSize) {
+ this.fileCollectionStorage = fileCollectionStorage;
+ this.readWriteLock = new ReentrantReadWriteLock();
+ this.piecesCount = piecesCount;
+ this.pieceSize = pieceSize;
+ BitSet bitSet = new BitSet(piecesCount);
+ bitSet.or(availablePieces);
+ if (bitSet.cardinality() != piecesCount) {
+ this.availablePieces = bitSet;
+ }
+ isOpen = false;
+ }
+
+ private void checkPieceIndex(int pieceIndex) {
+ if (pieceIndex < 0 || pieceIndex >= piecesCount) {
+ throw new IllegalArgumentException("Incorrect piece index " + pieceIndex + ". Piece index must be positive less than" + piecesCount);
+ }
+ }
+
+ @Override
+ public void savePiece(int pieceIndex, byte[] pieceData) throws IOException {
+ checkPieceIndex(pieceIndex);
+ try {
+ readWriteLock.writeLock().lock();
+
+ if (closedFully) throw new IOException("Storage is closed");
+
+ BitSet availablePieces = this.availablePieces;
+
+ boolean isFullyDownloaded = availablePieces == null;
+
+ if (isFullyDownloaded) return;
+
+ if (availablePieces.get(pieceIndex)) return;
+
+ openStorageIsNecessary(false);
+
+ long pos = pieceIndex;
+ pos = pos * pieceSize;
+ ByteBuffer buffer = ByteBuffer.wrap(pieceData);
+ fileCollectionStorage.write(buffer, pos);
+
+ availablePieces.set(pieceIndex);
+ boolean isFullyNow = availablePieces.cardinality() == piecesCount;
+ if (isFullyNow) {
+ this.availablePieces = null;
+ fileCollectionStorage.finish();
+ fileCollectionStorage.close();
+ fileCollectionStorage.open(true);
+ }
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void openStorageIsNecessary(boolean onlyRead) throws IOException {
+ if (!isOpen) {
+ fileCollectionStorage.open(onlyRead);
+ isOpen = true;
+ }
+ }
+
+ @Override
+ public byte[] readPiecePart(int pieceIndex, int offset, int length) throws IOException {
+ checkPieceIndex(pieceIndex);
+ try {
+ readWriteLock.readLock().lock();
+
+ if (closedFully) throw new IOException("Storage is closed");
+
+ BitSet availablePieces = this.availablePieces;
+ if (availablePieces != null && !availablePieces.get(pieceIndex)) {
+ throw new IllegalArgumentException("trying reading part of not available piece");
+ }
+
+ synchronized (openStorageLock) {
+ openStorageIsNecessary(availablePieces == null);
+ }
+
+ ByteBuffer buffer = ByteBuffer.allocate(length);
+ long pos = pieceIndex;
+ pos = pos * pieceSize + offset;
+ fileCollectionStorage.read(buffer, pos);
+ return buffer.array();
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ try {
+ readWriteLock.readLock().lock();
+ return availablePieces == null;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void closeFully() throws IOException {
+ try {
+ readWriteLock.writeLock().lock();
+ close0();
+ closedFully = true;
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public BitSet getAvailablePieces() {
+ try {
+ readWriteLock.readLock().lock();
+ BitSet result = new BitSet(piecesCount);
+
+ BitSet availablePieces = this.availablePieces;
+ boolean isFullyDownloaded = availablePieces == null;
+
+ if (isFullyDownloaded) {
+ result.set(0, piecesCount);
+ return result;
+ }
+ result.or(availablePieces);
+ return result;
+ } finally {
+ readWriteLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ readWriteLock.writeLock().lock();
+ close0();
+ } finally {
+ readWriteLock.writeLock().unlock();
+ }
+ }
+
+ private void close0() throws IOException {
+ if (!isOpen) return;
+ fileCollectionStorage.close();
+ isOpen = false;
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/TorrentByteStorage.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/TorrentByteStorage.java
new file mode 100644
index 0000000..15ba9bc
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/storage/TorrentByteStorage.java
@@ -0,0 +1,112 @@
+/**
+ * Copyright (C) 2011-2012 Turn, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.turn.ttorrent.client.storage;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+
+/**
+ * Abstract torrent byte storage.
+ *
+ * <p>
+ * This interface defines the methods for accessing an abstracted torrent byte
+ * storage. A torrent, especially when it contains multiple files, needs to be
+ * seen as one single continuous stream of bytes. Torrent pieces will most
+ * likely span accross file boundaries. This abstracted byte storage aims at
+ * providing a simple interface for read/write access to the torrent data,
+ * regardless of how it is composed underneath the piece structure.
+ * </p>
+ *
+ * @author mpetazzoni
+ * @author dgiffin
+ */
+public interface TorrentByteStorage extends Closeable {
+
+ void open(boolean seeder) throws IOException;
+
+ /**
+ * Read from the byte storage.
+ *
+ * <p>
+ * Read {@code length} bytes at position {@code position} from the underlying
+ * byte storage and return them in a {@link ByteBuffer}.
+ * </p>
+ *
+ * @param buffer The buffer to read the bytes into. The buffer's limit will
+ * control how many bytes are read from the storage.
+ * @param position The position, in bytes, to read from. This must be within
+ * the storage boundary.
+ * @return The number of bytes read from the storage.
+ * @throws IOException If an I/O error occurs while reading from the
+ * byte storage.
+ */
+ int read(ByteBuffer buffer, long position) throws IOException;
+
+ /**
+ * Write bytes to the byte storage.
+ *
+ * <p>
+ * </p>
+ *
+ * @param block A {@link ByteBuffer} containing the bytes to write to the
+ * storage. The buffer limit is expected to be set correctly: all bytes
+ * from the buffer will be used.
+ * @param position Position in the underlying byte storage to write the block
+ * at.
+ * @return The number of bytes written to the storage.
+ * @throws IOException If an I/O error occurs while writing to the byte
+ * storage.
+ */
+ int write(ByteBuffer block, long position) throws IOException;
+
+ /**
+ * Finalize the byte storage when the download is complete.
+ *
+ * <p>
+ * This gives the byte storage the opportunity to perform finalization
+ * operations when the download completes, like moving the files from a
+ * temporary location to their destination.
+ * </p>
+ *
+ * @throws IOException If the finalization failed.
+ */
+ void finish() throws IOException;
+
+ /**
+ * Tells whether this byte storage has been finalized.
+ */
+ boolean isFinished();
+
+ /**
+ * @param position Position in the underlying byte storage to write the block at.
+ * @param size Size of region to check.
+ * @return true if the region starting with positions only contains zeros
+ */
+ boolean isBlank(long position, long size);
+
+ /**
+ *
+ * @return true if the enter storage only contains zeros
+ */
+ boolean isBlank();
+
+ /**
+ * Delete byte storage information
+ */
+ void delete() throws IOException;
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/EndGameStrategy.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/EndGameStrategy.java
new file mode 100644
index 0000000..eb1c86d
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/EndGameStrategy.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+import com.turn.ttorrent.client.peer.SharingPeer;
+
+import java.util.List;
+
+public interface EndGameStrategy {
+
+ RequestsCollection collectRequests(Piece[] allPieces, List<SharingPeer> connectedPeers);
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/EndGameStrategyImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/EndGameStrategyImpl.java
new file mode 100644
index 0000000..edc0589
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/EndGameStrategyImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+import com.turn.ttorrent.client.peer.SharingPeer;
+
+import java.util.*;
+
+public class EndGameStrategyImpl implements EndGameStrategy {
+
+ private static final Random RANDOM = new Random();
+
+ private final int peersPerPiece;
+
+ public EndGameStrategyImpl(int peersPerPiece) {
+ this.peersPerPiece = peersPerPiece;
+ }
+
+ @Override
+ public RequestsCollection collectRequests(Piece[] allPieces, List<SharingPeer> connectedPeers) {
+ List<SharingPeer> sorted = new ArrayList<SharingPeer>(connectedPeers);
+ Map<Piece, List<SharingPeer>> selectedPieces = new HashMap<Piece, List<SharingPeer>>();
+ Collections.sort(sorted, new Comparator<SharingPeer>() {
+ @Override
+ public int compare(SharingPeer o1, SharingPeer o2) {
+ return Integer.valueOf(o1.getDownloadedPiecesCount()).compareTo(o2.getDownloadedPiecesCount());
+ }
+ });
+ for (Piece piece : allPieces) {
+ if (piece.isValid()) continue;
+
+ //if we don't have piece, then request this piece from two random peers
+ //(peers are selected by peer rank, peer with better rank will be selected more often then peer with bad rank
+ List<SharingPeer> selectedPeers = selectGoodPeers(piece, peersPerPiece, sorted);
+ selectedPieces.put(piece, selectedPeers);
+ }
+ return new RequestsCollectionImpl(selectedPieces);
+ }
+
+ private List<SharingPeer> selectGoodPeers(Piece piece, int count, List<SharingPeer> sortedPeers) {
+ List<SharingPeer> notSelected = new ArrayList<SharingPeer>(sortedPeers);
+ Iterator<SharingPeer> iterator = notSelected.iterator();
+ while (iterator.hasNext()) {
+ SharingPeer peer = iterator.next();
+ boolean peerHasCurrentPiece = peer.getAvailablePieces().get(piece.getIndex());
+ boolean alreadyRequested = peer.getRequestedPieces().contains(piece);
+
+ if (!peerHasCurrentPiece || alreadyRequested) iterator.remove();
+ }
+ if (notSelected.size() <= count) return notSelected;
+
+ List<SharingPeer> selected = new ArrayList<SharingPeer>();
+ for (int i = 0; i < count; i++) {
+ SharingPeer sharingPeer = selectPeer(notSelected);
+ if (sharingPeer == null) continue;
+ notSelected.remove(sharingPeer);
+ selected.add(sharingPeer);
+ }
+
+ return selected;
+ }
+
+ private SharingPeer selectPeer(List<SharingPeer> notSelected) {
+ for (SharingPeer sharingPeer : notSelected) {
+ if (RANDOM.nextDouble() < 0.8) {
+ return sharingPeer;
+ }
+ }
+ return notSelected.get(RANDOM.nextInt(notSelected.size()));
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategy.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategy.java
new file mode 100644
index 0000000..5313b86
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategy.java
@@ -0,0 +1,22 @@
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+
+import java.util.BitSet;
+
+/**
+ * Interface for a piece request strategy provider.
+ *
+ * @author cjmalloy
+ */
+public interface RequestStrategy {
+
+ /**
+ * Choose a piece from the remaining pieces.
+ *
+ * @param interesting A set of the index of all interesting pieces
+ * @param pieces The complete array of pieces
+ * @return The chosen piece, or <code>null</code> if no piece is interesting
+ */
+ Piece choosePiece(BitSet interesting, Piece[] pieces);
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategyImplAnyInteresting.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategyImplAnyInteresting.java
new file mode 100644
index 0000000..5ba8ae4
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategyImplAnyInteresting.java
@@ -0,0 +1,23 @@
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Random;
+
+public class RequestStrategyImplAnyInteresting implements RequestStrategy {
+
+ private final Random myRandom = new Random();
+
+ @Override
+ public Piece choosePiece(BitSet interesting, Piece[] pieces) {
+ List<Piece> onlyInterestingPieces = new ArrayList<Piece>();
+ for (Piece p : pieces) {
+ if (interesting.get(p.getIndex())) onlyInterestingPieces.add(p);
+ }
+ if (onlyInterestingPieces.isEmpty()) return null;
+ return onlyInterestingPieces.get(myRandom.nextInt(onlyInterestingPieces.size()));
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategyImplSequential.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategyImplSequential.java
new file mode 100644
index 0000000..eabc24d
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestStrategyImplSequential.java
@@ -0,0 +1,22 @@
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+
+import java.util.BitSet;
+
+/**
+ * A sequential request strategy implementation.
+ *
+ * @author cjmalloy
+ */
+public class RequestStrategyImplSequential implements RequestStrategy {
+
+ @Override
+ public Piece choosePiece(BitSet interesting, Piece[] pieces) {
+
+ for (Piece p : pieces) {
+ if (interesting.get(p.getIndex())) return p;
+ }
+ return null;
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestsCollection.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestsCollection.java
new file mode 100644
index 0000000..9b41849
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestsCollection.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.strategy;
+
+public interface RequestsCollection {
+
+ void sendAllRequests();
+
+ final class Empty implements RequestsCollection {
+
+ public final static Empty INSTANCE = new Empty();
+
+ private Empty() {
+ }
+
+ @Override
+ public void sendAllRequests() {
+ //do nothing
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestsCollectionImpl.java b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestsCollectionImpl.java
new file mode 100644
index 0000000..6d18e3a
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/main/java/com/turn/ttorrent/client/strategy/RequestsCollectionImpl.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+import com.turn.ttorrent.client.peer.SharingPeer;
+
+import java.util.List;
+import java.util.Map;
+
+public class RequestsCollectionImpl implements RequestsCollection {
+
+ private final Map<Piece, List<SharingPeer>> selectedPieces;
+
+ public RequestsCollectionImpl(Map<Piece, List<SharingPeer>> selectedPieces) {
+ this.selectedPieces = selectedPieces;
+ }
+
+ @Override
+ public void sendAllRequests() {
+ for (Map.Entry<Piece, List<SharingPeer>> entry : selectedPieces.entrySet()) {
+ Piece piece = entry.getKey();
+ for (SharingPeer sharingPeer : entry.getValue()) {
+ sharingPeer.downloadPiece(piece);
+ }
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/ByteArrayStorage.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/ByteArrayStorage.java
new file mode 100644
index 0000000..17831f9
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/ByteArrayStorage.java
@@ -0,0 +1,77 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.storage.TorrentByteStorage;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class ByteArrayStorage implements TorrentByteStorage {
+
+ private final byte[] array;
+ private boolean finished = false;
+ private boolean isBlank;
+
+ public ByteArrayStorage(int maxSize) {
+ array = new byte[maxSize];
+ isBlank = true;
+ }
+
+ @Override
+ public void open(boolean seeder) {
+ }
+
+ private int intPosition(long position) {
+ if (position > Integer.MAX_VALUE || position < 0) {
+ throw new IllegalArgumentException("Position is too large");
+ }
+ return (int) position;
+ }
+
+ @Override
+ public int read(ByteBuffer buffer, long position) {
+
+ int pos = intPosition(position);
+ int bytesCount = buffer.remaining();
+ buffer.put(Arrays.copyOfRange(array, pos, pos + bytesCount));
+ return bytesCount;
+ }
+
+ @Override
+ public int write(ByteBuffer block, long position) {
+ int pos = intPosition(position);
+ int bytesCount = block.remaining();
+ byte[] toWrite = new byte[bytesCount];
+ block.get(toWrite);
+ System.arraycopy(toWrite, 0, array, pos, toWrite.length);
+ isBlank = false;
+ return bytesCount;
+ }
+
+ @Override
+ public void finish() {
+ finished = true;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return finished;
+ }
+
+ @Override
+ public boolean isBlank(long position, long size) {
+ return isBlank;
+ }
+
+ @Override
+ public boolean isBlank() {
+ return isBlank;
+ }
+
+ @Override
+ public void delete() {
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/EventDispatcherTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/EventDispatcherTest.java
new file mode 100644
index 0000000..01de8eb
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/EventDispatcherTest.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.client;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+
+@Test
+public class EventDispatcherTest {
+
+ private EventDispatcher eventDispatcher;
+ private PeerInformation peerInfo;
+ private PieceInformation pieceInfo;
+
+ @BeforeMethod
+ public void setUp() {
+ eventDispatcher = new EventDispatcher();
+
+ peerInfo = mock(PeerInformation.class);
+ pieceInfo = mock(PieceInformation.class);
+ }
+
+ public void testWithoutListeners() {
+
+ eventDispatcher.multicaster().downloadFailed(new RuntimeException());
+ eventDispatcher.multicaster().peerConnected(peerInfo);
+ eventDispatcher.multicaster().validationComplete(1, 4);
+ eventDispatcher.multicaster().pieceDownloaded(pieceInfo, peerInfo);
+ eventDispatcher.multicaster().downloadComplete();
+ eventDispatcher.multicaster().pieceReceived(pieceInfo, peerInfo);
+ eventDispatcher.multicaster().peerDisconnected(peerInfo);
+ }
+
+ public void testInvocation() {
+
+ final AtomicInteger invocationCount = new AtomicInteger();
+ int count = 5;
+ for (int i = 0; i < count; i++) {
+ eventDispatcher.addListener(new TorrentListenerWrapper() {
+ @Override
+ public void downloadComplete() {
+ invocationCount.incrementAndGet();
+ }
+ });
+ }
+
+ eventDispatcher.multicaster().peerConnected(peerInfo);
+
+ assertEquals(invocationCount.get(), 0);
+
+ eventDispatcher.multicaster().downloadComplete();
+
+ assertEquals(invocationCount.get(), count);
+
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/PeersStorageTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/PeersStorageTest.java
new file mode 100644
index 0000000..af55e8f
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/PeersStorageTest.java
@@ -0,0 +1,71 @@
+package com.turn.ttorrent.client;
+
+import com.turn.ttorrent.client.peer.PeerActivityListener;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.PeerUID;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ByteChannel;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNull;
+
+@Test
+public class PeersStorageTest {
+
+ private PeersStorage myPeersStorage;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ myPeersStorage = new PeersStorage();
+ }
+
+ public void getSetSelfTest() {
+
+ assertNull(myPeersStorage.getSelf());
+ Peer self = new Peer("", 1);
+ myPeersStorage.setSelf(self);
+ assertEquals(myPeersStorage.getSelf(), self);
+ }
+
+ public void testThatPeersStorageReturnNewCollection() {
+ SharingPeer sharingPeer = getMockSharingPeer();
+ myPeersStorage.putIfAbsent(new PeerUID(new InetSocketAddress("127.0.0.1", 6881), ""), sharingPeer);
+ Collection<SharingPeer> sharingPeers = myPeersStorage.getSharingPeers();
+
+ assertEquals(1, myPeersStorage.getSharingPeers().size());
+ assertEquals(1, sharingPeers.size());
+
+ sharingPeers.add(sharingPeer);
+
+ assertEquals(1, myPeersStorage.getSharingPeers().size());
+ assertEquals(2, sharingPeers.size());
+ }
+
+ private SharingPeer getMockSharingPeer() {
+ return new SharingPeer("1",
+ 1,
+ null,
+ mock(SharedTorrent.class),
+ null,
+ mock(PeerActivityListener.class),
+ mock(ByteChannel.class), "TO", 1234);
+ }
+
+ public void getAndRemoveSharingPeersTest() {
+ SharingPeer sharingPeer = getMockSharingPeer();
+ PeerUID peerUid = new PeerUID(new InetSocketAddress("127.0.0.1", 6881), "");
+ SharingPeer oldPeer = myPeersStorage.putIfAbsent(peerUid, sharingPeer);
+
+ assertNull(oldPeer);
+ assertEquals(myPeersStorage.getSharingPeer(peerUid), sharingPeer);
+
+ assertEquals(myPeersStorage.removeSharingPeer(peerUid), sharingPeer);
+ assertNull(myPeersStorage.removeSharingPeer(peerUid));
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/network/HandshakeReceiverTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/network/HandshakeReceiverTest.java
new file mode 100644
index 0000000..2b3849a
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/network/HandshakeReceiverTest.java
@@ -0,0 +1,187 @@
+package com.turn.ttorrent.client.network;
+
+import com.turn.ttorrent.TempFiles;
+import com.turn.ttorrent.Utils;
+import com.turn.ttorrent.client.*;
+import com.turn.ttorrent.client.peer.PeerActivityListener;
+import com.turn.ttorrent.client.peer.SharingPeer;
+import com.turn.ttorrent.client.storage.FairPieceStorageFactory;
+import com.turn.ttorrent.client.storage.FileCollectionStorage;
+import com.turn.ttorrent.common.Peer;
+import com.turn.ttorrent.common.TorrentCreator;
+import com.turn.ttorrent.common.TorrentMetadata;
+import com.turn.ttorrent.common.TorrentSerializer;
+import com.turn.ttorrent.network.ConnectionManager;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.ConsoleAppender;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.Pipe;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+@Test
+public class HandshakeReceiverTest {
+
+ private HandshakeReceiver myHandshakeReceiver;
+ private byte[] mySelfId;
+ private Context myContext;
+ private TempFiles myTempFiles;
+
+ public HandshakeReceiverTest() {
+ if (Logger.getRootLogger().getAllAppenders().hasMoreElements())
+ return;
+ BasicConfigurator.configure(new ConsoleAppender(new PatternLayout("[%d{MMdd HH:mm:ss,SSS} %t] %6p - %20.20c - %m %n")));
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ myTempFiles = new TempFiles();
+ Logger.getRootLogger().setLevel(Utils.getLogLevel());
+ mySelfId = "selfId1selfId2selfId".getBytes();
+ ByteBuffer selfId = ByteBuffer.wrap(mySelfId);
+ myContext = mock(Context.class);
+ PeersStorage peersStorage = new PeersStorage();
+ TorrentsStorage torrentsStorage = new TorrentsStorage();
+ when(myContext.getPeersStorage()).thenReturn(peersStorage);
+ when(myContext.getTorrentsStorage()).thenReturn(torrentsStorage);
+ peersStorage.setSelf(new Peer("127.0.0.1", 54645, selfId));
+ CommunicationManager communicationManager = mock(CommunicationManager.class);
+ when(communicationManager.getConnectionManager()).thenReturn(mock(ConnectionManager.class));
+ myHandshakeReceiver = new HandshakeReceiver(
+ myContext,
+ "127.0.0.1",
+ 45664,
+ false);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ myTempFiles.cleanup();
+ }
+
+ public void testReceiveHandshake() throws Exception {
+ Pipe p1 = Pipe.open();
+ Pipe p2 = Pipe.open();
+ ByteChannel client = new ByteSourceChannel(p1.source(), p2.sink());
+ ByteChannel server = new ByteSourceChannel(p2.source(), p1.sink());
+ String peerIdStr = "peerIdpeerIdpeerId22";
+ String torrentHashStr = "torrenttorrenttorren";
+ byte[] peerId = peerIdStr.getBytes();
+ byte[] torrentHash = torrentHashStr.getBytes();
+ Handshake hs = Handshake.craft(torrentHash, peerId);
+
+ if (hs == null) {
+ fail("Handshake instance is null");
+ }
+
+ ByteBuffer byteBuffer = hs.getData();
+ client.write(byteBuffer);
+
+ final File tempFile = myTempFiles.createTempFile(1024 * 1024);
+ TorrentMetadata torrent = TorrentCreator.create(tempFile, URI.create(""), "test");
+ File torrentFile = myTempFiles.createTempFile();
+ FileOutputStream fos = new FileOutputStream(torrentFile);
+ fos.write(new TorrentSerializer().serialize(torrent));
+ fos.close();
+
+ final LoadedTorrent loadedTorrent = mock(LoadedTorrent.class);
+ final SharedTorrent sharedTorrent =
+ SharedTorrent.fromFile(torrentFile,
+ FairPieceStorageFactory.INSTANCE.createStorage(torrent, FileCollectionStorage.create(torrent, tempFile.getParentFile())),
+ loadedTorrent.getTorrentStatistic());
+
+ TorrentLoader torrentsLoader = mock(TorrentLoader.class);
+ when(torrentsLoader.loadTorrent(loadedTorrent)).thenReturn(sharedTorrent);
+ when(myContext.getTorrentLoader()).thenReturn(torrentsLoader);
+ final ExecutorService executorService = Executors.newFixedThreadPool(1);
+ when(myContext.getExecutor()).thenReturn(executorService);
+ myContext.getTorrentsStorage().addTorrent(hs.getHexInfoHash(), loadedTorrent);
+
+ final AtomicBoolean onConnectionEstablishedInvoker = new AtomicBoolean(false);
+
+ final Semaphore semaphore = new Semaphore(0);
+ when(myContext.createSharingPeer(any(String.class),
+ anyInt(),
+ any(ByteBuffer.class),
+ any(SharedTorrent.class),
+ any(ByteChannel.class),
+ any(String.class),
+ anyInt()))
+ .thenReturn(new SharingPeer("127.0.0.1", 6881, ByteBuffer.wrap(peerId), sharedTorrent, null,
+ mock(PeerActivityListener.class), server, "TO", 1234) {
+ @Override
+ public void onConnectionEstablished() {
+ onConnectionEstablishedInvoker.set(true);
+ semaphore.release();
+ }
+ });
+
+ PeersStorage peersStorage = myContext.getPeersStorage();
+ assertEquals(0, myContext.getTorrentsStorage().activeTorrents().size());
+ assertEquals(peersStorage.getSharingPeers().size(), 0);
+ myHandshakeReceiver.processAndGetNext(server);
+ assertEquals(peersStorage.getSharingPeers().size(), 1);
+ ByteBuffer answer = ByteBuffer.allocate(byteBuffer.capacity());
+ client.read(answer);
+ answer.rewind();
+ Handshake answerHs = Handshake.parse(answer);
+ assertEquals(answerHs.getPeerId(), mySelfId);
+ semaphore.tryAcquire(1, TimeUnit.SECONDS);
+ assertTrue(onConnectionEstablishedInvoker.get());
+ executorService.shutdown();
+ }
+
+ // TODO: 11/15/17 bad tests (e.g. incorrect torrentID, incorrect handshake, etc
+
+ private static class ByteSourceChannel implements ByteChannel {
+
+ private final Pipe.SourceChannel readChannel;
+ private final Pipe.SinkChannel writeChannel;
+
+ public ByteSourceChannel(Pipe.SourceChannel readChannel, Pipe.SinkChannel writeChannel) {
+ this.readChannel = readChannel;
+ this.writeChannel = writeChannel;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ return readChannel.read(dst);
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ return this.writeChannel.write(src);
+ }
+
+ @Override
+ public boolean isOpen() {
+ throw new RuntimeException("not implemented");
+ }
+
+ @Override
+ public void close() throws IOException {
+ readChannel.close();
+ writeChannel.close();
+ }
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/FairPieceStorageFactoryTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/FairPieceStorageFactoryTest.java
new file mode 100644
index 0000000..6e35507
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/FairPieceStorageFactoryTest.java
@@ -0,0 +1,101 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.common.TorrentFile;
+import com.turn.ttorrent.common.TorrentMetadata;
+import org.jetbrains.annotations.NotNull;
+import org.testng.annotations.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+@Test
+public class FairPieceStorageFactoryTest {
+
+ public void testCreatingStorageForLargeFile() throws Exception {
+
+
+ int pieceLength = 1024 * 1024;
+ int pieceCount = 3000;
+
+ //last piece can have not fully size
+ final int lastPieceLength = pieceLength / 2;
+ long totalSize = (long) (pieceCount - 1) * pieceLength + lastPieceLength;
+
+ TorrentMetadata metadata = mock(TorrentMetadata.class);
+ when(metadata.getPieceLength()).thenReturn(pieceLength);
+ when(metadata.getPiecesCount()).thenReturn(pieceCount);
+ when(metadata.getFiles()).thenReturn(Collections.singletonList(new TorrentFile(Collections.singletonList("test.avi"), totalSize, "")));
+ when(metadata.getPiecesHashes()).thenReturn(new byte[pieceCount * 20]);
+
+ final AtomicBoolean isReadInvokedForLastPiece = new AtomicBoolean(false);
+ TorrentByteStorage storage = new TorrentByteStorage() {
+ @Override
+ public void open(boolean seeder) {
+
+ }
+
+ @Override
+ public int read(ByteBuffer buffer, long position) {
+ if (buffer.capacity() == lastPieceLength) {
+ isReadInvokedForLastPiece.set(true);
+ }
+ buffer.putInt(1);
+ return 1;
+ }
+
+ @Override
+ public int write(ByteBuffer block, long position) {
+ throw notImplemented();
+ }
+
+ @NotNull
+ private RuntimeException notImplemented() {
+ return new RuntimeException("notImplemented");
+ }
+
+ @Override
+ public void finish() {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean isFinished() {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean isBlank(long position, long size) {
+ return false;
+ }
+
+ @Override
+ public boolean isBlank() {
+ return false;
+ }
+
+ @Override
+ public void delete() {
+ throw notImplemented();
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
+ PieceStorage pieceStorage = FairPieceStorageFactory.INSTANCE.createStorage(metadata, storage);
+
+ assertTrue(isReadInvokedForLastPiece.get());
+ assertEquals(0, pieceStorage.getAvailablePieces().cardinality());
+ assertFalse(pieceStorage.isFinished());
+ }
+
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/FileCollectionStorageTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/FileCollectionStorageTest.java
new file mode 100644
index 0000000..60db718
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/FileCollectionStorageTest.java
@@ -0,0 +1,89 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.TempFiles;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+/**
+ * User: loyd
+ * Date: 11/24/13
+ */
+public class FileCollectionStorageTest {
+
+ private TempFiles tempFiles;
+
+ @BeforeMethod
+ public void setUp() {
+ tempFiles = new TempFiles();
+ }
+
+ @AfterMethod
+ public void tearDown() {
+ tempFiles.cleanup();
+ }
+
+ @Test
+ public void testSelect() throws Exception {
+ final File file1 = tempFiles.createTempFile();
+ final File file2 = tempFiles.createTempFile();
+
+ final List<FileStorage> files = new ArrayList<FileStorage>();
+ files.add(new FileStorage(file1, 0, 2));
+ files.add(new FileStorage(file2, 2, 2));
+ final FileCollectionStorage storage = new FileCollectionStorage(files, 4);
+
+ storage.open(false);
+ try {
+ // since all of these files already exist, we are considered finished
+ assertTrue(storage.isFinished());
+
+ // write to first file works
+ write(new byte[]{1, 2}, 0, storage);
+ check(new byte[]{1, 2}, file1);
+
+ // write to second file works
+ write(new byte[]{5, 6}, 2, storage);
+ check(new byte[]{5, 6}, file2);
+
+ // write to two files works
+ write(new byte[]{8, 9, 10, 11}, 0, storage);
+ check(new byte[]{8, 9}, file1);
+ check(new byte[]{10, 11}, file2);
+
+ // make sure partial write into next file works
+ write(new byte[]{100, 101, 102}, 0, storage);
+ check(new byte[]{102, 11}, file2);
+ } finally {
+ storage.close();
+ }
+ }
+
+ private void write(byte[] bytes, int offset, FileCollectionStorage storage) throws IOException {
+ storage.write(ByteBuffer.wrap(bytes), offset);
+ storage.finish();
+ }
+
+ private void check(byte[] bytes, File f) throws IOException {
+ final byte[] temp = new byte[bytes.length];
+ FileInputStream fileInputStream = new FileInputStream(f);
+ final int totalRead;
+ try {
+ totalRead = fileInputStream.read(temp);
+ } finally {
+ fileInputStream.close();
+ }
+ assertEquals(totalRead, temp.length);
+ assertEquals(temp, bytes);
+ }
+}
\ No newline at end of file
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/PieceStorageImplTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/PieceStorageImplTest.java
new file mode 100644
index 0000000..9cac770
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/storage/PieceStorageImplTest.java
@@ -0,0 +1,84 @@
+package com.turn.ttorrent.client.storage;
+
+import com.turn.ttorrent.client.ByteArrayStorage;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.BitSet;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class PieceStorageImplTest {
+
+ private PieceStorage pieceStorage;
+ private int pieceSize;
+ private int pieceCount;
+ private byte[] allPieces;
+
+ @BeforeMethod
+ public void setUp() throws IOException {
+
+ pieceSize = 12;
+ pieceCount = 8;
+ ByteArrayStorage storage = new ByteArrayStorage(pieceSize * pieceCount);
+ pieceStorage = new PieceStorageImpl(storage, new BitSet(), pieceCount, pieceSize);
+ allPieces = new byte[pieceCount * pieceSize];
+ for (byte i = 0; i < allPieces.length; i++) {
+ allPieces[i] = i;
+ }
+ }
+
+ @Test
+ public void testStorage() throws IOException {
+
+ assertEquals(pieceStorage.getAvailablePieces().cardinality(), 0);
+ byte[] firstPieceData = Arrays.copyOfRange(allPieces, pieceSize, 2 * pieceSize);
+ pieceStorage.savePiece(1, firstPieceData);
+ byte[] thirdPieceData = Arrays.copyOfRange(allPieces, 3 * pieceSize, 4 * pieceSize);
+ pieceStorage.savePiece(3, thirdPieceData);
+
+ BitSet availablePieces = pieceStorage.getAvailablePieces();
+ assertEquals(availablePieces.cardinality(), 2);
+ assertTrue(availablePieces.get(1));
+ assertTrue(availablePieces.get(3));
+
+ byte[] actualFirstPieceData = pieceStorage.readPiecePart(1, 0, pieceSize);
+ byte[] actualThirdPieceData = pieceStorage.readPiecePart(3, 0, pieceSize);
+ assertEquals(actualFirstPieceData, firstPieceData);
+ assertEquals(actualThirdPieceData, thirdPieceData);
+
+ //check that reading by parts works correctly
+ byte[] firstPiecePart = pieceStorage.readPiecePart(1, 0, pieceSize / 2);
+ byte[] secondPiecePart = pieceStorage.readPiecePart(1, pieceSize / 2, pieceSize / 2);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ outputStream.write(firstPiecePart);
+ outputStream.write(secondPiecePart);
+ assertEquals(firstPieceData, outputStream.toByteArray());
+
+ }
+
+ @Test
+ public void testFullStorage() throws IOException {
+ assertEquals(pieceStorage.getAvailablePieces().cardinality(), 0);
+ for (int i = 0; i < pieceCount; i++) {
+ assertEquals(pieceStorage.getAvailablePieces().cardinality(), i);
+ pieceStorage.savePiece(i, Arrays.copyOfRange(allPieces, i * pieceSize, (i + 1) * pieceSize));
+ }
+ assertEquals(pieceStorage.getAvailablePieces().cardinality(), pieceCount);
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testReadUnavailablePiece() throws IOException {
+ pieceStorage.readPiecePart(45, 0, pieceSize);
+ }
+
+ @AfterMethod
+ public void tearDown() throws IOException {
+ pieceStorage.close();
+ }
+}
diff --git a/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/strategy/RequestStrategyImplAnyInterestingTest.java b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/strategy/RequestStrategyImplAnyInterestingTest.java
new file mode 100644
index 0000000..56457b7
--- /dev/null
+++ b/ttorrent-master/ttorrent-client/src/test/java/com/turn/ttorrent/client/strategy/RequestStrategyImplAnyInterestingTest.java
@@ -0,0 +1,53 @@
+package com.turn.ttorrent.client.strategy;
+
+import com.turn.ttorrent.client.Piece;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.BitSet;
+import java.util.SortedSet;
+
+public class RequestStrategyImplAnyInterestingTest {
+
+ private final SortedSet<Piece> myRarest = null;//myRarest don't need for it strategy
+ private final int myPiecesTotal = 10;
+ private final Piece[] myPieces = new Piece[myPiecesTotal];
+ private final RequestStrategy myRequestStrategy = new RequestStrategyImplAnyInteresting();
+
+ @BeforeClass
+ public void init() {
+ for (int i = 0; i < myPieces.length; i++) {
+ myPieces[i] = new Piece(null, i, 0, new byte[0]);
+ }
+ }
+
+ @Test
+ public void choosePieceNoInterestingTest() {
+ Piece actual = myRequestStrategy.choosePiece(new BitSet(), myPieces);
+ Assert.assertNull(actual);
+ }
+
+ @Test
+ public void choosePieceOneInterestingTest() {
+ BitSet interesting = new BitSet();
+ for (int i = 0; i < myPieces.length; i++) {
+ interesting.clear();
+ interesting.set(i);
+ Piece expected = myPieces[i];
+ Piece actual = myRequestStrategy.choosePiece(interesting, myPieces);
+ Assert.assertEquals(expected, actual);
+ }
+ }
+
+ @Test
+ public void choosePieceTest() {
+ BitSet interesting = new BitSet();
+ int interestingFrom = 1;
+ int interestingTo = 5;
+ interesting.set(interestingFrom, interestingTo);
+ Piece actual = myRequestStrategy.choosePiece(interesting, myPieces);
+ Assert.assertTrue(actual.getIndex() >= interestingFrom && actual.getIndex() <= interestingTo);
+ }
+
+}