tracker
Change-Id: I8f8ac81f9c4d7c7650cd64d2dade701dc6c11dce
diff --git a/ttorrent-master/network/pom.xml b/ttorrent-master/network/pom.xml
new file mode 100644
index 0000000..72c994b
--- /dev/null
+++ b/ttorrent-master/network/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <name>ttorrent/network</name>
+ <url>http://turn.github.com/ttorrent/</url>
+ <artifactId>ttorrent-network</artifactId>
+ <version>1.0</version>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-common</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.turn</groupId>
+ <artifactId>ttorrent-test-api</artifactId>
+ <version>1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachment.java
new file mode 100644
index 0000000..acba652
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachment.java
@@ -0,0 +1,10 @@
+package com.turn.ttorrent.network;
+
+public interface AcceptAttachment {
+
+ /**
+ * @return channel listener factory for create listeners for new connections
+ */
+ ChannelListenerFactory getChannelListenerFactory();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachmentImpl.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachmentImpl.java
new file mode 100644
index 0000000..ce0b051
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachmentImpl.java
@@ -0,0 +1,32 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public class AcceptAttachmentImpl implements AcceptAttachment, TimeoutAttachment {
+
+ private final ChannelListenerFactory myChannelListenerFactory;
+
+ public AcceptAttachmentImpl(ChannelListenerFactory channelListenerFactory) {
+ this.myChannelListenerFactory = channelListenerFactory;
+ }
+
+ @Override
+ public ChannelListenerFactory getChannelListenerFactory() {
+ return myChannelListenerFactory;
+ }
+
+ @Override
+ public boolean isTimeoutElapsed(long currentTimeMillis) {
+ return false;//accept attachment doesn't closed by timeout
+ }
+
+ @Override
+ public void communicatedNow(long currentTimeMillis) {
+ }
+
+ @Override
+ public void onTimeoutElapsed(SocketChannel channel) throws IOException {
+
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ChannelListenerFactory.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ChannelListenerFactory.java
new file mode 100644
index 0000000..25e111c
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ChannelListenerFactory.java
@@ -0,0 +1,7 @@
+package com.turn.ttorrent.network;
+
+public interface ChannelListenerFactory {
+
+ ConnectionListener newChannelListener();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectTask.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectTask.java
new file mode 100644
index 0000000..5906873
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectTask.java
@@ -0,0 +1,59 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.SocketChannel;
+
+public class ConnectTask implements TimeoutAttachment, ReadAttachment {
+
+ private long lastCommunicationTime;
+ private final int myTimeoutMillis;
+ private final String myHost;
+ private final int myPort;
+ private final ConnectionListener myConnectionListener;
+
+ public ConnectTask(String host, int port, ConnectionListener connectionListener, long lastCommunicationTime, int timeoutMillis) {
+ this.myHost = host;
+ this.myPort = port;
+ this.myConnectionListener = connectionListener;
+ this.myTimeoutMillis = timeoutMillis;
+ this.lastCommunicationTime = lastCommunicationTime;
+ }
+
+ public String getHost() {
+ return myHost;
+ }
+
+ public int getPort() {
+ return myPort;
+ }
+
+ @Override
+ public ConnectionListener getConnectionListener() {
+ return myConnectionListener;
+ }
+
+ @Override
+ public String toString() {
+ return "ConnectTask{" +
+ "myHost='" + myHost + '\'' +
+ ", myPort=" + myPort +
+ '}';
+ }
+
+ @Override
+ public boolean isTimeoutElapsed(long currentTimeMillis) {
+ long minTimeForKeepAlive = currentTimeMillis - myTimeoutMillis;
+ return minTimeForKeepAlive > lastCommunicationTime;
+ }
+
+ @Override
+ public void communicatedNow(long currentTimeMillis) {
+ lastCommunicationTime = currentTimeMillis;
+ }
+
+ @Override
+ public void onTimeoutElapsed(SocketChannel channel) throws IOException {
+ myConnectionListener.onError(channel, new SocketTimeoutException());
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionClosedException.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionClosedException.java
new file mode 100644
index 0000000..f6dca57
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionClosedException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+
+public class ConnectionClosedException extends IOException {
+
+ public ConnectionClosedException() {
+ }
+
+ public ConnectionClosedException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionListener.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionListener.java
new file mode 100644
index 0000000..09f5888
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionListener.java
@@ -0,0 +1,32 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public interface ConnectionListener {
+
+ /**
+ * invoked when specified socket channel contains any data
+ *
+ * @param socketChannel specified socket channel with data
+ * @throws IOException if an I/O error occurs
+ */
+ void onNewDataAvailable(SocketChannel socketChannel) throws IOException;
+
+ /**
+ * invoked when get new connection
+ *
+ * @param socketChannel specified socket channel
+ * @throws IOException if an I/O error occurs
+ */
+ void onConnectionEstablished(SocketChannel socketChannel) throws IOException;
+
+ /**
+ * invoked when an error occurs
+ *
+ * @param socketChannel specified channel, associated with this channel
+ * @param ex specified exception
+ * @throws IOException if an I/O error occurs
+ */
+ void onError(SocketChannel socketChannel, Throwable ex) throws IOException;
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManager.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManager.java
new file mode 100644
index 0000000..76700a8
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManager.java
@@ -0,0 +1,175 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.keyProcessors.*;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Arrays;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.turn.ttorrent.Constants.DEFAULT_CLEANUP_RUN_TIMEOUT_MILLIS;
+import static com.turn.ttorrent.Constants.DEFAULT_SELECTOR_SELECT_TIMEOUT_MILLIS;
+
+public class ConnectionManager {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(ConnectionManager.class);
+
+ private final Selector selector;
+ private final TimeService myTimeService;
+ private volatile ConnectionWorker myConnectionWorker;
+ private int myBindPort;
+ private final ConnectionManagerContext myContext;
+ private volatile ServerSocketChannel myServerSocketChannel;
+ private volatile Future<?> myWorkerFuture;
+ private final NewConnectionAllower myIncomingConnectionAllower;
+ private final NewConnectionAllower myOutgoingConnectionAllower;
+ private final TimeoutStorage socketTimeoutStorage = new TimeoutStorageImpl();
+ private final AtomicBoolean alreadyInit = new AtomicBoolean(false);
+ private final AtomicInteger mySendBufferSize;
+ private final AtomicInteger myReceiveBufferSize;
+
+ public ConnectionManager(ConnectionManagerContext context,
+ TimeService timeService,
+ NewConnectionAllower newIncomingConnectionAllower,
+ NewConnectionAllower newOutgoingConnectionAllower,
+ SelectorFactory selectorFactory,
+ AtomicInteger mySendBufferSize,
+ AtomicInteger myReceiveBufferSize) throws IOException {
+ this.mySendBufferSize = mySendBufferSize;
+ this.myReceiveBufferSize = myReceiveBufferSize;
+ this.selector = selectorFactory.newSelector();
+ this.myTimeService = timeService;
+ myContext = context;
+ this.myIncomingConnectionAllower = newIncomingConnectionAllower;
+ this.myOutgoingConnectionAllower = newOutgoingConnectionAllower;
+ }
+
+ public void initAndRunWorker(ServerChannelRegister serverChannelRegister) throws IOException {
+
+ boolean wasInit = alreadyInit.getAndSet(true);
+
+ if (wasInit) {
+ throw new IllegalStateException("connection manager was already initialized");
+ }
+
+ myServerSocketChannel = serverChannelRegister.channelFor(selector);
+ myServerSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAttachmentImpl(myContext));
+ myBindPort = myServerSocketChannel.socket().getLocalPort();
+ String serverName = myServerSocketChannel.socket().toString();
+ myConnectionWorker = new ConnectionWorker(selector, Arrays.asList(
+ new InvalidKeyProcessor(),
+ new AcceptableKeyProcessor(selector, serverName, myTimeService, myIncomingConnectionAllower, socketTimeoutStorage,
+ mySendBufferSize, myReceiveBufferSize),
+ new ConnectableKeyProcessor(selector, myTimeService, socketTimeoutStorage,
+ mySendBufferSize, myReceiveBufferSize),
+ new ReadableKeyProcessor(serverName),
+ new WritableKeyProcessor()), DEFAULT_SELECTOR_SELECT_TIMEOUT_MILLIS, DEFAULT_CLEANUP_RUN_TIMEOUT_MILLIS,
+ myTimeService,
+ new CleanupKeyProcessor(myTimeService),
+ myOutgoingConnectionAllower);
+ myWorkerFuture = myContext.getExecutor().submit(myConnectionWorker);
+ }
+
+ public void setSelectorSelectTimeout(int timeout) {
+ ConnectionWorker workerLocal = myConnectionWorker;
+ checkThatWorkerIsInit(workerLocal);
+ workerLocal.setSelectorSelectTimeout(timeout);
+ }
+
+ private void checkThatWorkerIsInit(ConnectionWorker worker) {
+ if (worker == null) throw new IllegalStateException("Connection manager is not initialized!");
+ }
+
+ public boolean offerConnect(ConnectTask connectTask, int timeout, TimeUnit timeUnit) {
+ if (myConnectionWorker == null) {
+ return false;
+ }
+ return myConnectionWorker.offerConnect(connectTask, timeout, timeUnit);
+ }
+
+ public boolean offerWrite(WriteTask writeTask, int timeout, TimeUnit timeUnit) {
+ if (myConnectionWorker == null) {
+ return false;
+ }
+ return myConnectionWorker.offerWrite(writeTask, timeout, timeUnit);
+ }
+
+
+ public int getBindPort() {
+ return myBindPort;
+ }
+
+ public void close(int timeout, TimeUnit timeUnit) {
+ logger.debug("try close connection manager...");
+ boolean successfullyClosed = true;
+ if (myConnectionWorker != null) {
+ myWorkerFuture.cancel(true);
+ try {
+ boolean shutdownCorrectly = myConnectionWorker.stop(timeout, timeUnit);
+ if (!shutdownCorrectly) {
+ successfullyClosed = false;
+ logger.warn("unable to terminate worker in {} {}", timeout, timeUnit);
+ }
+ } catch (InterruptedException e) {
+ successfullyClosed = false;
+ LoggerUtils.warnAndDebugDetails(logger, "unable to await termination worker, thread was interrupted", e);
+ }
+ }
+ try {
+ this.myServerSocketChannel.close();
+ } catch (Throwable e) {
+ LoggerUtils.errorAndDebugDetails(logger, "unable to close server socket channel", e);
+ successfullyClosed = false;
+ }
+ for (SelectionKey key : this.selector.keys()) {
+ try {
+ if (key.isValid()) {
+ key.channel().close();
+ }
+ } catch (Throwable e) {
+ logger.error("unable to close socket channel {}", key.channel());
+ successfullyClosed = false;
+ logger.debug("", e);
+ }
+ }
+ try {
+ this.selector.close();
+ } catch (Throwable e) {
+ LoggerUtils.errorAndDebugDetails(logger, "unable to close selector channel", e);
+ successfullyClosed = false;
+ }
+ if (successfullyClosed) {
+ logger.debug("connection manager is successfully closed");
+ } else {
+ logger.error("connection manager wasn't closed successfully");
+ }
+ }
+
+ public void close() {
+ close(1, TimeUnit.MINUTES);
+ }
+
+ public void setCleanupTimeout(long timeoutMillis) {
+ ConnectionWorker workerLocal = myConnectionWorker;
+ checkThatWorkerIsInit(workerLocal);
+ workerLocal.setCleanupTimeout(timeoutMillis);
+ }
+
+ public void setSocketConnectionTimeout(long timeoutMillis) {
+ socketTimeoutStorage.setTimeout(timeoutMillis);
+ }
+
+ public void closeChannel(Channel channel) throws IOException {
+ channel.close();
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManagerContext.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManagerContext.java
new file mode 100644
index 0000000..c95cbb5
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManagerContext.java
@@ -0,0 +1,9 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.ExecutorService;
+
+public interface ConnectionManagerContext extends ChannelListenerFactory {
+
+ ExecutorService getExecutor();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionWorker.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionWorker.java
new file mode 100644
index 0000000..2d83275
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionWorker.java
@@ -0,0 +1,259 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.keyProcessors.CleanupProcessor;
+import com.turn.ttorrent.network.keyProcessors.KeyProcessor;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class ConnectionWorker implements Runnable {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(ConnectionWorker.class);
+ private static final String SELECTOR_THREAD_NAME = "Torrent channels manager thread";
+ private volatile boolean stop = false;
+ private final Selector selector;
+ private final BlockingQueue<ConnectTask> myConnectQueue;
+ private final BlockingQueue<WriteTask> myWriteQueue;
+ private final Semaphore mySemaphore;
+ private final List<KeyProcessor> myKeyProcessors;
+ private final TimeService myTimeService;
+ private long lastCleanupTime;
+ private volatile int mySelectorTimeoutMillis;
+ private volatile long myCleanupTimeoutMillis;
+ private final CleanupProcessor myCleanupProcessor;
+ private final NewConnectionAllower myNewConnectionAllower;
+
+ ConnectionWorker(Selector selector,
+ List<KeyProcessor> keyProcessors,
+ int selectorTimeoutMillis,
+ int cleanupTimeoutMillis,
+ TimeService timeService,
+ CleanupProcessor cleanupProcessor,
+ NewConnectionAllower myNewConnectionAllower) {
+ this.selector = selector;
+ this.myTimeService = timeService;
+ this.lastCleanupTime = timeService.now();
+ this.mySelectorTimeoutMillis = selectorTimeoutMillis;
+ this.myCleanupTimeoutMillis = cleanupTimeoutMillis;
+ this.myCleanupProcessor = cleanupProcessor;
+ this.myNewConnectionAllower = myNewConnectionAllower;
+ this.mySemaphore = new Semaphore(1);
+ this.myConnectQueue = new LinkedBlockingQueue<ConnectTask>(100);
+ this.myKeyProcessors = keyProcessors;
+ this.myWriteQueue = new LinkedBlockingQueue<WriteTask>(5000);
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ mySemaphore.acquire();
+ } catch (InterruptedException e) {
+ return;
+ }
+
+ final String oldName = Thread.currentThread().getName();
+
+ try {
+
+ Thread.currentThread().setName(SELECTOR_THREAD_NAME);
+
+ while (!stop && (!Thread.currentThread().isInterrupted())) {
+ try {
+ logger.trace("try select keys from selector");
+ int selected;
+ try {
+ selected = selector.select(mySelectorTimeoutMillis);
+ } catch (ClosedSelectorException e) {
+ break;
+ }
+ connectToPeersFromQueue();
+ processWriteTasks();
+ logger.trace("select keys from selector. Keys count is " + selected);
+ if (selected != 0) {
+ processSelectedKeys();
+ }
+ if (needRunCleanup()) {
+ cleanup();
+ }
+ } catch (Throwable e) {
+ LoggerUtils.warnAndDebugDetails(logger, "unable to select channel keys. Error message {}", e.getMessage(), e);
+ }
+ }
+ } catch (Throwable e) {
+ LoggerUtils.errorAndDebugDetails(logger, "exception on cycle iteration", e);
+ } finally {
+ Thread.currentThread().setName(oldName);
+ mySemaphore.release();
+ }
+ }
+
+ private void cleanup() {
+ lastCleanupTime = myTimeService.now();
+ for (SelectionKey key : selector.keys()) {
+ if (!key.isValid()) continue;
+ myCleanupProcessor.processCleanup(key);
+ }
+ }
+
+ private boolean needRunCleanup() {
+ return (myTimeService.now() - lastCleanupTime) > myCleanupTimeoutMillis;
+ }
+
+ private void processWriteTasks() {
+
+ final Iterator<WriteTask> iterator = myWriteQueue.iterator();
+ while (iterator.hasNext()) {
+ WriteTask writeTask = iterator.next();
+ if (stop || Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ logger.trace("try register channel for write. Write task is {}", writeTask);
+ SocketChannel socketChannel = (SocketChannel) writeTask.getSocketChannel();
+ if (!socketChannel.isOpen()) {
+ iterator.remove();
+ writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Channel is not open"), new ConnectionClosedException());
+ continue;
+ }
+ SelectionKey key = socketChannel.keyFor(selector);
+ if (key == null) {
+ logger.warn("unable to find key for channel {}", socketChannel);
+ iterator.remove();
+ writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Can not find key for the channel"), new ConnectionClosedException());
+ continue;
+ }
+ Object attachment = key.attachment();
+ if (!(attachment instanceof WriteAttachment)) {
+ logger.error("incorrect attachment {} for channel {}", attachment, socketChannel);
+ iterator.remove();
+ writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Incorrect attachment instance for the key"), new ConnectionClosedException());
+ continue;
+ }
+ WriteAttachment keyAttachment = (WriteAttachment) attachment;
+ if (keyAttachment.getWriteTasks().offer(writeTask)) {
+ iterator.remove();
+ try {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ } catch (CancelledKeyException e) {
+ writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Key is cancelled"), new ConnectionClosedException(e));
+ }
+ }
+ }
+ }
+
+ private String getDefaultWriteErrorMessageWithSuffix(SocketChannel socketChannel, String suffix) {
+ return "unable write data to channel " + socketChannel + ". " + suffix;
+ }
+
+ private void connectToPeersFromQueue() {
+ ConnectTask connectTask;
+ while ((connectTask = myConnectQueue.poll()) != null) {
+ if (stop || Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ logger.debug("try connect to peer. Connect task is {}", connectTask);
+ try {
+ SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
+ socketChannel.register(selector, SelectionKey.OP_CONNECT, connectTask);
+ socketChannel.connect(new InetSocketAddress(connectTask.getHost(), connectTask.getPort()));
+ } catch (IOException e) {
+ LoggerUtils.warnAndDebugDetails(logger, "unable connect. Connect task is {}", connectTask, e);
+ }
+ }
+ }
+
+ public boolean stop(int timeout, TimeUnit timeUnit) throws InterruptedException {
+ stop = true;
+ if (timeout <= 0) {
+ return true;
+ }
+ return mySemaphore.tryAcquire(timeout, timeUnit);
+ }
+
+ private void processSelectedKeys() {
+ Set<SelectionKey> selectionKeys = selector.selectedKeys();
+ for (SelectionKey key : selectionKeys) {
+ if (stop || Thread.currentThread().isInterrupted()) {
+ return;
+ }
+ try {
+ processSelectedKey(key);
+ } catch (Exception e) {
+ logger.warn("error {} in processing key. Close channel {}", e.getMessage(), key.channel());
+ logger.debug("", e);
+ try {
+ key.channel().close();
+ } catch (IOException ioe) {
+ LoggerUtils.errorAndDebugDetails(logger, "unable close bad channel", ioe);
+ }
+ }
+ }
+ selectionKeys.clear();
+ }
+
+ private void processSelectedKey(SelectionKey key) throws IOException {
+ logger.trace("try process key for channel {}", key.channel());
+ myCleanupProcessor.processSelected(key);
+ if (!key.channel().isOpen()) {
+ key.cancel();
+ return;
+ }
+ for (KeyProcessor keyProcessor : myKeyProcessors) {
+ if (keyProcessor.accept(key)) {
+ keyProcessor.process(key);
+ }
+ }
+ }
+
+ public boolean offerConnect(ConnectTask connectTask, int timeout, TimeUnit timeUnit) {
+ if (!myNewConnectionAllower.isNewConnectionAllowed()) {
+ logger.info("can not add connect task {} to queue. New connection is not allowed", connectTask);
+ return false;
+ }
+ return addTaskToQueue(connectTask, timeout, timeUnit, myConnectQueue);
+ }
+
+ public boolean offerWrite(WriteTask writeTask, int timeout, TimeUnit timeUnit) {
+ boolean done = addTaskToQueue(writeTask, timeout, timeUnit, myWriteQueue);
+ if (!done) {
+ writeTask.getListener().onWriteFailed("unable add task " + writeTask + " to the queue. Maybe queue is overload", null);
+ }
+ return done;
+ }
+
+ private <T> boolean addTaskToQueue(T task, int timeout, TimeUnit timeUnit, BlockingQueue<T> queue) {
+ try {
+ if (queue.offer(task, timeout, timeUnit)) {
+ logger.trace("added task {}. Wake up selector", task);
+ selector.wakeup();
+ return true;
+ }
+ } catch (InterruptedException e) {
+ logger.debug("Task {} interrupted before was added to queue", task);
+ }
+ logger.debug("Task {} was not added", task);
+ return false;
+ }
+
+ void setCleanupTimeout(long timeoutMillis) {
+ this.myCleanupTimeoutMillis = timeoutMillis;
+ }
+
+ void setSelectorSelectTimeout(int timeout) {
+ mySelectorTimeoutMillis = timeout;
+ }
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/FirstAvailableChannel.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/FirstAvailableChannel.java
new file mode 100644
index 0000000..b31a7ef
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/FirstAvailableChannel.java
@@ -0,0 +1,49 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+
+public class FirstAvailableChannel implements ServerChannelRegister {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(FirstAvailableChannel.class);
+
+ private final int firstTryPort;
+ private final int lastTryPort;
+
+ public FirstAvailableChannel(int firstTryPort, int lastTryPort) {
+ this.firstTryPort = firstTryPort;
+ this.lastTryPort = lastTryPort;
+ }
+
+ @NotNull
+ @Override
+ public ServerSocketChannel channelFor(Selector selector) throws IOException {
+ ServerSocketChannel myServerSocketChannel = selector.provider().openServerSocketChannel();
+ myServerSocketChannel.configureBlocking(false);
+ int bindPort = -1;
+ for (int port = firstTryPort; port <= lastTryPort; port++) {
+ try {
+ InetSocketAddress tryAddress = new InetSocketAddress(port);
+ myServerSocketChannel.socket().bind(tryAddress);
+ bindPort = tryAddress.getPort();
+ break;
+ } catch (IOException e) {
+ //try next port
+ logger.debug("Could not bind to port {}, trying next port...", port);
+ }
+ }
+ if (bindPort == -1) {
+ logger.error(String.format(
+ "No available ports in range [%d, %d] for the BitTorrent client!", firstTryPort, lastTryPort
+ ));
+ throw new IOException("No available port for the BitTorrent client!");
+ }
+ return myServerSocketChannel;
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/NewConnectionAllower.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/NewConnectionAllower.java
new file mode 100644
index 0000000..c1f834f
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/NewConnectionAllower.java
@@ -0,0 +1,10 @@
+package com.turn.ttorrent.network;
+
+public interface NewConnectionAllower {
+
+ /**
+ * @return true if we can accept new connection or can connect to other peer
+ */
+ boolean isNewConnectionAllowed();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadAttachment.java
new file mode 100644
index 0000000..2072e30
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadAttachment.java
@@ -0,0 +1,9 @@
+package com.turn.ttorrent.network;
+
+public interface ReadAttachment {
+
+ /**
+ * @return connection listener, associated with key with current attachment
+ */
+ ConnectionListener getConnectionListener();
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadWriteAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadWriteAttachment.java
new file mode 100644
index 0000000..6bb06e4
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadWriteAttachment.java
@@ -0,0 +1,50 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ReadWriteAttachment implements ReadAttachment, WriteAttachment, TimeoutAttachment {
+
+ private final static int WRITE_TASK_QUEUE_SIZE = 150;
+
+ private long lastCommunicationTime;
+ private final ConnectionListener connectionListener;
+ private final long myTimeoutMillis;
+ private final BlockingQueue<WriteTask> writeTasks;
+
+ public ReadWriteAttachment(ConnectionListener connectionListener, long lastCommunicationTime, long timeoutMillis) {
+ this.connectionListener = connectionListener;
+ this.writeTasks = new LinkedBlockingQueue<WriteTask>(WRITE_TASK_QUEUE_SIZE);
+ this.lastCommunicationTime = lastCommunicationTime;
+ this.myTimeoutMillis = timeoutMillis;
+ }
+
+ @Override
+ public ConnectionListener getConnectionListener() {
+ return connectionListener;
+ }
+
+ @Override
+ public BlockingQueue<WriteTask> getWriteTasks() {
+ return writeTasks;
+ }
+
+ @Override
+ public boolean isTimeoutElapsed(long currentTimeMillis) {
+ long minTimeForKeepAlive = currentTimeMillis - myTimeoutMillis;
+ return minTimeForKeepAlive > lastCommunicationTime;
+ }
+
+ @Override
+ public void communicatedNow(long currentTimeMillis) {
+ lastCommunicationTime = currentTimeMillis;
+ }
+
+ @Override
+ public void onTimeoutElapsed(SocketChannel channel) throws IOException {
+ connectionListener.onError(channel, new SocketTimeoutException());
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/SelectorFactory.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/SelectorFactory.java
new file mode 100644
index 0000000..b3b99b2
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/SelectorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+
+public interface SelectorFactory {
+
+ /**
+ * @return new {@link Selector} instance
+ * @throws IOException if any io error occurs
+ */
+ Selector newSelector() throws IOException;
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ServerChannelRegister.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ServerChannelRegister.java
new file mode 100644
index 0000000..8f69c5d
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ServerChannelRegister.java
@@ -0,0 +1,20 @@
+package com.turn.ttorrent.network;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerChannelRegister {
+
+ /**
+ * Create new channel and bind to specified selector
+ *
+ * @param selector specified selector
+ * @return new created server channel
+ */
+ @NotNull
+ ServerSocketChannel channelFor(Selector selector) throws IOException;
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutAttachment.java
new file mode 100644
index 0000000..6478666
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutAttachment.java
@@ -0,0 +1,29 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public interface TimeoutAttachment {
+
+ /**
+ * @param currentTimeMillis current time for timeout calculation
+ * @return true, if and only if timeout was elapsed
+ */
+ boolean isTimeoutElapsed(long currentTimeMillis);
+
+ /**
+ * set last communication time to current time
+ *
+ * @param currentTimeMillis current time in milliseconds
+ */
+ void communicatedNow(long currentTimeMillis);
+
+ /**
+ * must be invoked if timeout was elapsed
+ *
+ * @param channel specified channel for key associated with this attachment
+ * @throws IOException if an I/O error occurs
+ */
+ void onTimeoutElapsed(SocketChannel channel) throws IOException;
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorage.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorage.java
new file mode 100644
index 0000000..ff14d92
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorage.java
@@ -0,0 +1,13 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TimeoutStorage {
+
+ void setTimeout(long millis);
+
+ void setTimeout(int timeout, TimeUnit timeUnit);
+
+ long getTimeoutMillis();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorageImpl.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorageImpl.java
new file mode 100644
index 0000000..529f0da
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorageImpl.java
@@ -0,0 +1,24 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TimeoutStorageImpl implements TimeoutStorage {
+
+ private final AtomicLong timeoutMillis = new AtomicLong();
+
+ @Override
+ public void setTimeout(long millis) {
+ timeoutMillis.set(millis);
+ }
+
+ @Override
+ public void setTimeout(int timeout, TimeUnit timeUnit) {
+ setTimeout(timeUnit.toMillis(timeout));
+ }
+
+ @Override
+ public long getTimeoutMillis() {
+ return timeoutMillis.get();
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteAttachment.java
new file mode 100644
index 0000000..3497279
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteAttachment.java
@@ -0,0 +1,12 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.BlockingQueue;
+
+public interface WriteAttachment {
+
+ /**
+ * @return queue for offer/peek write tasks
+ */
+ BlockingQueue<WriteTask> getWriteTasks();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteListener.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteListener.java
new file mode 100644
index 0000000..4743e53
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteListener.java
@@ -0,0 +1,18 @@
+package com.turn.ttorrent.network;
+
+public interface WriteListener {
+
+ /**
+ * invoked if write is failed by any reason
+ *
+ * @param message error description
+ * @param e exception if exist. Otherwise null
+ */
+ void onWriteFailed(String message, Throwable e);
+
+ /**
+ * invoked if write done correctly
+ */
+ void onWriteDone();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteTask.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteTask.java
new file mode 100644
index 0000000..838752b
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteTask.java
@@ -0,0 +1,38 @@
+package com.turn.ttorrent.network;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+public class WriteTask {
+
+ private final ByteChannel socketChannel;
+ private final ByteBuffer byteBuffer;
+ private final WriteListener listener;
+
+ public WriteTask(ByteChannel socketChannel, ByteBuffer byteBuffer, WriteListener listener) {
+ this.socketChannel = socketChannel;
+ this.byteBuffer = byteBuffer;
+ this.listener = listener;
+ }
+
+ public ByteChannel getSocketChannel() {
+ return socketChannel;
+ }
+
+ public ByteBuffer getByteBuffer() {
+ return byteBuffer;
+ }
+
+ public WriteListener getListener() {
+ return listener;
+ }
+
+ @Override
+ public String toString() {
+ return "WriteTask{" +
+ "socketChannel=" + socketChannel +
+ ", byteBuffer=" + byteBuffer +
+ ", listener=" + listener +
+ '}';
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/AcceptableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/AcceptableKeyProcessor.java
new file mode 100644
index 0000000..e3b07c2
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/AcceptableKeyProcessor.java
@@ -0,0 +1,77 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.*;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AcceptableKeyProcessor implements KeyProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(AcceptableKeyProcessor.class);
+
+ private final Selector mySelector;
+ private final String myServerSocketLocalAddress;
+ private final TimeService myTimeService;
+ private final NewConnectionAllower myNewConnectionAllower;
+ private final TimeoutStorage myTimeoutStorage;
+ private final AtomicInteger mySendBufferSize;
+ private final AtomicInteger myReceiveBufferSize;
+
+ public AcceptableKeyProcessor(Selector selector,
+ String serverSocketLocalAddress,
+ TimeService timeService,
+ NewConnectionAllower newConnectionAllower,
+ TimeoutStorage timeoutStorage,
+ AtomicInteger sendBufferSize,
+ AtomicInteger receiveBufferSize) {
+ this.mySelector = selector;
+ this.myServerSocketLocalAddress = serverSocketLocalAddress;
+ this.myTimeService = timeService;
+ this.myNewConnectionAllower = newConnectionAllower;
+ this.myTimeoutStorage = timeoutStorage;
+ this.mySendBufferSize = sendBufferSize;
+ this.myReceiveBufferSize = receiveBufferSize;
+ }
+
+ @Override
+ public void process(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+ if (!(channel instanceof ServerSocketChannel)) {
+ logger.error("incorrect instance of server channel. Can not accept connections");
+ key.cancel();
+ return;
+ }
+ Object attachment = key.attachment();
+ if (!(attachment instanceof AcceptAttachment)) {
+ logger.error("incorrect instance of server channel key attachment");
+ key.cancel();
+ return;
+ }
+ ChannelListenerFactory channelListenerFactory = ((AcceptAttachment) attachment).getChannelListenerFactory();
+
+ SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
+ logger.trace("server {} get new connection from {}", new Object[]{myServerSocketLocalAddress, socketChannel.socket()});
+
+ if (!myNewConnectionAllower.isNewConnectionAllowed()) {
+ logger.info("new connection is not allowed. New connection is closed");
+ socketChannel.close();
+ return;
+ }
+
+ ConnectionListener stateConnectionListener = channelListenerFactory.newChannelListener();
+ stateConnectionListener.onConnectionEstablished(socketChannel);
+ socketChannel.configureBlocking(false);
+ KeyProcessorUtil.setBuffersSizeIfNecessary(socketChannel, mySendBufferSize.get(), myReceiveBufferSize.get());
+ ReadWriteAttachment keyAttachment = new ReadWriteAttachment(stateConnectionListener, myTimeService.now(), myTimeoutStorage.getTimeoutMillis());
+ socketChannel.register(mySelector, SelectionKey.OP_READ, keyAttachment);
+ }
+
+ @Override
+ public boolean accept(SelectionKey key) {
+ return key.isValid() && key.isAcceptable();
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessor.java
new file mode 100644
index 0000000..34e7f78
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessor.java
@@ -0,0 +1,58 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.TimeoutAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class CleanupKeyProcessor implements CleanupProcessor {
+
+ private final static Logger logger = TorrentLoggerFactory.getLogger(CleanupKeyProcessor.class);
+
+ private final TimeService myTimeService;
+
+ public CleanupKeyProcessor(TimeService timeService) {
+ this.myTimeService = timeService;
+ }
+
+ @Override
+ public void processCleanup(SelectionKey key) {
+ TimeoutAttachment attachment = KeyProcessorUtil.getAttachmentAsTimeoutOrNull(key);
+ if (attachment == null) {
+ key.cancel();
+ return;
+ }
+ if (attachment.isTimeoutElapsed(myTimeService.now())) {
+
+ SocketChannel channel = KeyProcessorUtil.getCastedChannelOrNull(key);
+ if (channel == null) {
+ key.cancel();
+ return;
+ }
+
+ logger.debug("channel {} was inactive in specified timeout. Close channel...", channel);
+ try {
+ channel.close();
+ key.cancel();
+ attachment.onTimeoutElapsed(channel);
+ } catch (IOException e) {
+ LoggerUtils.errorAndDebugDetails(logger, "unable close channel {}", channel, e);
+ }
+ }
+ }
+
+ @Override
+ public void processSelected(SelectionKey key) {
+ TimeoutAttachment attachment = KeyProcessorUtil.getAttachmentAsTimeoutOrNull(key);
+ if (attachment == null) {
+ key.cancel();
+ return;
+ }
+ attachment.communicatedNow(myTimeService.now());
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupProcessor.java
new file mode 100644
index 0000000..c3d50c7
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupProcessor.java
@@ -0,0 +1,21 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import java.nio.channels.SelectionKey;
+
+public interface CleanupProcessor {
+
+ /**
+ * invoked when the cleanup procedure is running. Processor can cancel key and/or close channel if necessary
+ *
+ * @param key specified key
+ */
+ void processCleanup(SelectionKey key);
+
+ /**
+ * invoked when get any activity for channel associated with this key
+ *
+ * @param key specified key
+ */
+ void processSelected(SelectionKey key);
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ConnectableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ConnectableKeyProcessor.java
new file mode 100644
index 0000000..6b6474a
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ConnectableKeyProcessor.java
@@ -0,0 +1,88 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ConnectTask;
+import com.turn.ttorrent.network.ConnectionListener;
+import com.turn.ttorrent.network.ReadWriteAttachment;
+import com.turn.ttorrent.network.TimeoutStorage;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ConnectableKeyProcessor implements KeyProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(ConnectableKeyProcessor.class);
+
+ private final Selector mySelector;
+ private final TimeService myTimeService;
+ private final TimeoutStorage myTimeoutStorage;
+ private final AtomicInteger mySendBufferSize;
+ private final AtomicInteger myReceiveBufferSize;
+
+ public ConnectableKeyProcessor(Selector selector,
+ TimeService timeService,
+ TimeoutStorage timeoutStorage,
+ AtomicInteger sendBufferSize,
+ AtomicInteger receiveBufferSize) {
+ this.mySelector = selector;
+ this.myTimeService = timeService;
+ this.myTimeoutStorage = timeoutStorage;
+ this.mySendBufferSize = sendBufferSize;
+ this.myReceiveBufferSize = receiveBufferSize;
+ }
+
+ @Override
+ public void process(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+ if (!(channel instanceof SocketChannel)) {
+ logger.warn("incorrect instance of channel. The key is cancelled");
+ key.cancel();
+ return;
+ }
+ SocketChannel socketChannel = (SocketChannel) channel;
+ Object attachment = key.attachment();
+ if (!(attachment instanceof ConnectTask)) {
+ logger.warn("incorrect instance of attachment for channel {}. The key for the channel is cancelled", socketChannel);
+ key.cancel();
+ return;
+ }
+ final ConnectTask connectTask = (ConnectTask) attachment;
+ final ConnectionListener connectionListener = connectTask.getConnectionListener();
+ final boolean isConnectFinished;
+ try {
+ isConnectFinished = socketChannel.finishConnect();
+ } catch (NoRouteToHostException e) {
+ logger.info("Could not connect to {}:{}, received NoRouteToHostException", connectTask.getHost(), connectTask.getPort());
+ connectionListener.onError(socketChannel, e);
+ return;
+ } catch (ConnectException e) {
+ logger.info("Could not connect to {}:{}, received ConnectException", connectTask.getHost(), connectTask.getPort());
+ connectionListener.onError(socketChannel, e);
+ return;
+ }
+ if (!isConnectFinished) {
+ logger.info("Could not connect to {}:{}", connectTask.getHost(), connectTask.getPort());
+ connectionListener.onError(socketChannel, null);
+ return;
+ }
+ socketChannel.configureBlocking(false);
+ KeyProcessorUtil.setBuffersSizeIfNecessary(socketChannel, mySendBufferSize.get(), myReceiveBufferSize.get());
+ ReadWriteAttachment keyAttachment = new ReadWriteAttachment(connectionListener, myTimeService.now(), myTimeoutStorage.getTimeoutMillis());
+ socketChannel.register(mySelector, SelectionKey.OP_READ, keyAttachment);
+ logger.debug("setup new TCP connection with {}", socketChannel);
+ connectionListener.onConnectionEstablished(socketChannel);
+ }
+
+ @Override
+ public boolean accept(SelectionKey key) {
+ return key.isValid() && key.isConnectable();
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/InvalidKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/InvalidKeyProcessor.java
new file mode 100644
index 0000000..259f48f
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/InvalidKeyProcessor.java
@@ -0,0 +1,44 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ReadAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class InvalidKeyProcessor implements KeyProcessor {
+
+ private final static Logger logger = TorrentLoggerFactory.getLogger(InvalidKeyProcessor.class);
+
+ @Override
+ public void process(SelectionKey key) throws IOException {
+ final Object attachment = key.attachment();
+ final SelectableChannel channel = key.channel();
+ if (attachment == null) {
+ key.cancel();
+ return;
+ }
+ if (!(attachment instanceof ReadAttachment)) {
+ key.cancel();
+ return;
+ }
+ if (!(channel instanceof SocketChannel)) {
+ key.cancel();
+ return;
+ }
+ final SocketChannel socketChannel = (SocketChannel) channel;
+ final ReadAttachment readAttachment = (ReadAttachment) attachment;
+
+ logger.trace("drop invalid key {}", channel);
+ readAttachment.getConnectionListener().onError(socketChannel, new CancelledKeyException());
+ }
+
+ @Override
+ public boolean accept(SelectionKey key) {
+ return !key.isValid();
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessor.java
new file mode 100644
index 0000000..2a8f51e
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessor.java
@@ -0,0 +1,22 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+public interface KeyProcessor {
+
+ /**
+ * processes the passed key
+ *
+ * @param key key for processing
+ * @throws IOException if an I/O error occurs
+ */
+ void process(SelectionKey key) throws IOException;
+
+ /**
+ * @param key specified key for check acceptance
+ * @return true if and only if processor can process this key.
+ */
+ boolean accept(SelectionKey key);
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessorUtil.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessorUtil.java
new file mode 100644
index 0000000..c097942
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessorUtil.java
@@ -0,0 +1,44 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.TimeoutAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class KeyProcessorUtil {
+
+ private final static Logger logger = TorrentLoggerFactory.getLogger(KeyProcessorUtil.class);
+
+ public static TimeoutAttachment getAttachmentAsTimeoutOrNull(SelectionKey key) {
+ Object attachment = key.attachment();
+ if (attachment instanceof TimeoutAttachment) {
+ return (TimeoutAttachment) attachment;
+ }
+ logger.error("unable to cast attachment {} to timeout attachment type", attachment);
+ return null;
+ }
+
+ public static SocketChannel getCastedChannelOrNull(SelectionKey key) {
+ SelectableChannel channel = key.channel();
+ if (channel instanceof SocketChannel) {
+ return (SocketChannel) channel;
+ }
+ logger.error("unable to cast channel {} to specified type");
+ return null;
+ }
+
+ public static void setBuffersSizeIfNecessary(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize) throws IOException {
+ final Socket socket = socketChannel.socket();
+ if (sendBufferSize > 0) {
+ socket.setSendBufferSize(sendBufferSize);
+ }
+ if (receiveBufferSize > 0) {
+ socket.setReceiveBufferSize(receiveBufferSize);
+ }
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ReadableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ReadableKeyProcessor.java
new file mode 100644
index 0000000..d209919
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ReadableKeyProcessor.java
@@ -0,0 +1,49 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ConnectionListener;
+import com.turn.ttorrent.network.ReadAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class ReadableKeyProcessor implements KeyProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(ReadableKeyProcessor.class);
+
+ private final String myServerSocketLocalAddress;
+
+ public ReadableKeyProcessor(String serverSocketLocalAddress) {
+ this.myServerSocketLocalAddress = serverSocketLocalAddress;
+ }
+
+ @Override
+ public void process(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+ if (!(channel instanceof SocketChannel)) {
+ logger.warn("incorrect instance of channel. The key is cancelled");
+ key.cancel();
+ return;
+ }
+
+ SocketChannel socketChannel = (SocketChannel) channel;
+ logger.trace("server {} get new data from {}", myServerSocketLocalAddress, socketChannel);
+
+ Object attachment = key.attachment();
+ if (!(attachment instanceof ReadAttachment)) {
+ logger.warn("incorrect instance of attachment for channel {}", new Object[]{socketChannel.socket()});
+ socketChannel.close();
+ return;
+ }
+ ConnectionListener connectionListener = ((ReadAttachment) attachment).getConnectionListener();
+ connectionListener.onNewDataAvailable(socketChannel);
+ }
+
+ @Override
+ public boolean accept(SelectionKey key) {
+ return key.isValid() && key.isReadable();
+ }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessor.java
new file mode 100644
index 0000000..dea5be2
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessor.java
@@ -0,0 +1,69 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ConnectionClosedException;
+import com.turn.ttorrent.network.WriteAttachment;
+import com.turn.ttorrent.network.WriteTask;
+import org.slf4j.Logger;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class WritableKeyProcessor implements KeyProcessor {
+
+ private static final Logger logger = TorrentLoggerFactory.getLogger(WritableKeyProcessor.class);
+
+ @Override
+ public void process(SelectionKey key) throws IOException {
+ SelectableChannel channel = key.channel();
+ if (!(channel instanceof SocketChannel)) {
+ logger.warn("incorrect instance of channel. The key is cancelled");
+ key.cancel();
+ return;
+ }
+
+ SocketChannel socketChannel = (SocketChannel) channel;
+
+ Object attachment = key.attachment();
+ if (!(attachment instanceof WriteAttachment)) {
+ logger.error("incorrect instance of attachment for channel {}", channel);
+ key.cancel();
+ return;
+ }
+
+ WriteAttachment keyAttachment = (WriteAttachment) attachment;
+
+ if (keyAttachment.getWriteTasks().isEmpty()) {
+ key.interestOps(SelectionKey.OP_READ);
+ return;
+ }
+
+ WriteTask processedTask = keyAttachment.getWriteTasks().peek();
+
+ try {
+ int writeCount = socketChannel.write(processedTask.getByteBuffer());
+ if (writeCount < 0) {
+ processedTask.getListener().onWriteFailed("Reached end of stream while writing", null);
+ throw new EOFException("Reached end of stream while writing");
+ }
+
+ if (!processedTask.getByteBuffer().hasRemaining()) {
+ processedTask.getListener().onWriteDone();
+ keyAttachment.getWriteTasks().remove();
+ }
+
+ } catch (IOException e) {
+ processedTask.getListener().onWriteFailed("I/O error occurs on write to channel " + socketChannel, new ConnectionClosedException(e));
+ keyAttachment.getWriteTasks().clear();
+ key.cancel();
+ }
+ }
+
+ @Override
+ public boolean accept(SelectionKey key) {
+ return key.isValid() && key.isWritable();
+ }
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionManagerTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionManagerTest.java
new file mode 100644
index 0000000..e35ffe8
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionManagerTest.java
@@ -0,0 +1,189 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.MockTimeService;
+import org.apache.log4j.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+public class ConnectionManagerTest {
+
+ private ConnectionManager myConnectionManager;
+ private ExecutorService myExecutorService;
+ private ConnectionListener connectionListener;
+ private ConnectionManagerContext myContext;
+
+ public ConnectionManagerTest() {
+ if (Logger.getRootLogger().getAllAppenders().hasMoreElements())
+ return;
+ BasicConfigurator.configure(new ConsoleAppender(new PatternLayout("[%d{MMdd HH:mm:ss,SSS} %t] %6p - %20.20c - %m %n")));
+ Logger.getRootLogger().setLevel(Level.ALL);
+ }
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ Logger.getRootLogger().setLevel(Level.INFO);
+ myContext = mock(ConnectionManagerContext.class);
+ myExecutorService = Executors.newSingleThreadExecutor();
+ when(myContext.getExecutor()).thenReturn(myExecutorService);
+ final SelectorFactory selectorFactory = mock(SelectorFactory.class);
+ when(selectorFactory.newSelector()).thenReturn(Selector.open());
+ NewConnectionAllower newConnectionAllower = mock(NewConnectionAllower.class);
+ when(newConnectionAllower.isNewConnectionAllowed()).thenReturn(true);
+ myConnectionManager = new ConnectionManager(
+ myContext,
+ new MockTimeService(),
+ newConnectionAllower,
+ newConnectionAllower,
+ selectorFactory,
+ new AtomicInteger(),
+ new AtomicInteger());
+ }
+
+ @Test(expectedExceptions = IllegalStateException.class)
+ public void testThatDoubleInitThrowException() {
+ try {
+ myConnectionManager.initAndRunWorker(new FirstAvailableChannel(6881, 6889));
+ } catch (IOException e) {
+ fail("unable to init and run worker", e);
+ }
+ try {
+ myConnectionManager.initAndRunWorker(new FirstAvailableChannel(6881, 6889));
+ } catch (IOException e) {
+ fail("unable to init and run worker", e);
+ }
+ }
+
+ @Test
+ public void canAcceptAndReadData() throws IOException, InterruptedException {
+ final AtomicInteger acceptCount = new AtomicInteger();
+ final AtomicInteger readCount = new AtomicInteger();
+ final AtomicInteger connectCount = new AtomicInteger();
+ final AtomicInteger lastReadBytesCount = new AtomicInteger();
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(10);
+
+ final Semaphore semaphore = new Semaphore(0);
+
+ this.connectionListener = new ConnectionListener() {
+ @Override
+ public void onNewDataAvailable(SocketChannel socketChannel) throws IOException {
+ readCount.incrementAndGet();
+ lastReadBytesCount.set(socketChannel.read(byteBuffer));
+ if (lastReadBytesCount.get() == -1) {
+ socketChannel.close();
+ }
+ semaphore.release();
+ }
+
+ @Override
+ public void onConnectionEstablished(SocketChannel socketChannel) throws IOException {
+ acceptCount.incrementAndGet();
+ semaphore.release();
+ }
+
+ @Override
+ public void onError(SocketChannel socketChannel, Throwable ex) {
+
+ }
+ };
+
+ when(myContext.newChannelListener()).thenReturn(connectionListener);
+
+ myConnectionManager.initAndRunWorker(new FirstAvailableChannel(6881, 6889));
+
+ assertEquals(acceptCount.get(), 0);
+ assertEquals(readCount.get(), 0);
+ int serverPort = myConnectionManager.getBindPort();
+
+ Socket socket = new Socket("127.0.0.1", serverPort);
+
+ tryAcquireOrFail(semaphore);//wait until connection is accepted
+
+ assertTrue(socket.isConnected());
+ assertEquals(acceptCount.get(), 1);
+ assertEquals(readCount.get(), 0);
+
+ Socket socketSecond = new Socket("127.0.0.1", serverPort);
+
+ tryAcquireOrFail(semaphore);//wait until connection is accepted
+
+ assertTrue(socketSecond.isConnected());
+ assertEquals(acceptCount.get(), 2);
+ assertEquals(readCount.get(), 0);
+ socketSecond.close();
+ tryAcquireOrFail(semaphore);//wait read that connection is closed
+ assertEquals(readCount.get(), 1);
+ assertEquals(acceptCount.get(), 2);
+ assertEquals(lastReadBytesCount.get(), -1);
+ byteBuffer.rewind();
+ assertEquals(byteBuffer.get(), 0);
+ byteBuffer.rewind();
+ String writeStr = "abc";
+ OutputStream outputStream = socket.getOutputStream();
+ outputStream.write(writeStr.getBytes());
+ tryAcquireOrFail(semaphore);//wait until read bytes
+ assertEquals(readCount.get(), 2);
+ assertEquals(lastReadBytesCount.get(), 3);
+ byte[] expected = new byte[byteBuffer.capacity()];
+ System.arraycopy(writeStr.getBytes(), 0, expected, 0, writeStr.length());
+ assertEquals(byteBuffer.array(), expected);
+ outputStream.close();
+ socket.close();
+ tryAcquireOrFail(semaphore);//wait read that connection is closed
+ assertEquals(readCount.get(), 3);
+
+ int otherPeerPort = 7575;
+ ServerSocket ss = new ServerSocket(otherPeerPort);
+ assertEquals(connectCount.get(), 0);
+ myConnectionManager.offerConnect(new ConnectTask("127.0.0.1", otherPeerPort, new ConnectionListener() {
+ @Override
+ public void onNewDataAvailable(SocketChannel socketChannel) throws IOException {
+
+ }
+
+ @Override
+ public void onConnectionEstablished(SocketChannel socketChannel) throws IOException {
+ connectCount.incrementAndGet();
+ semaphore.release();
+ }
+
+ @Override
+ public void onError(SocketChannel socketChannel, Throwable ex) {
+
+ }
+ }, 0, 100), 1, TimeUnit.SECONDS);
+ ss.accept();
+ tryAcquireOrFail(semaphore);
+ assertEquals(connectCount.get(), 1);
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ this.myConnectionManager.close();
+ myExecutorService.shutdown();
+ assertTrue(myExecutorService.awaitTermination(10, TimeUnit.SECONDS));
+ }
+
+ private void tryAcquireOrFail(Semaphore semaphore) throws InterruptedException {
+ if (!semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) {
+ fail("don't get signal from connection receiver that connection selected");
+ }
+ }
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionWorkerTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionWorkerTest.java
new file mode 100644
index 0000000..237d163
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionWorkerTest.java
@@ -0,0 +1,48 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.MockTimeService;
+import com.turn.ttorrent.network.keyProcessors.CleanupProcessor;
+import com.turn.ttorrent.network.keyProcessors.KeyProcessor;
+import org.testng.annotations.Test;
+
+import java.nio.channels.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.mockito.Mockito.*;
+
+@Test
+public class ConnectionWorkerTest {
+
+ public void testCleanupIsCalled() throws Exception {
+
+ final SelectionKey mockKey = mock(SelectionKey.class);
+ final SelectableChannel channel = SocketChannel.open();
+ final KeyProcessor acceptProcessor = mock(KeyProcessor.class);
+ final KeyProcessor notAcceptProcessor = mock(KeyProcessor.class);
+
+ Selector mockSelector = mock(Selector.class);
+ when(mockSelector.select(anyLong())).thenReturn(1).thenThrow(new ClosedSelectorException());
+ when(mockSelector.selectedKeys()).thenReturn(new HashSet<SelectionKey>(Collections.singleton(mockKey)));
+ when(mockKey.isValid()).thenReturn(true);
+ when(mockKey.channel()).thenReturn(channel);
+ when(acceptProcessor.accept(mockKey)).thenReturn(true);
+ when(notAcceptProcessor.accept(mockKey)).thenReturn(false);
+ ConnectionWorker connectionWorker = new ConnectionWorker(
+ mockSelector,
+ Arrays.asList(acceptProcessor, notAcceptProcessor),
+ 10,
+ 0,
+ new MockTimeService(),
+ mock(CleanupProcessor.class),
+ mock(NewConnectionAllower.class));
+ connectionWorker.run();
+ verify(mockSelector).selectedKeys();
+ verify(acceptProcessor).accept(mockKey);
+ verify(acceptProcessor).process(mockKey);
+ verify(notAcceptProcessor).accept(mockKey);
+ verifyNoMoreInteractions(notAcceptProcessor);
+ }
+}
+
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/StateChannelListenerTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/StateChannelListenerTest.java
new file mode 100644
index 0000000..5c38fac
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/StateChannelListenerTest.java
@@ -0,0 +1,4 @@
+package com.turn.ttorrent.network;
+
+public class StateChannelListenerTest {
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/WorkingReceiverTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/WorkingReceiverTest.java
new file mode 100644
index 0000000..434d17a
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/WorkingReceiverTest.java
@@ -0,0 +1,4 @@
+package com.turn.ttorrent.network;
+
+public class WorkingReceiverTest {
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessorTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessorTest.java
new file mode 100644
index 0000000..3797d30
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessorTest.java
@@ -0,0 +1,86 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.MockTimeService;
+import com.turn.ttorrent.network.ConnectionListener;
+import com.turn.ttorrent.network.TimeoutAttachment;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import static org.mockito.Mockito.*;
+
+@Test
+public class CleanupKeyProcessorTest {
+
+ private final int CLOSE_TIMEOUT = 100;
+
+ private MockTimeService myTimeService;
+ private TimeoutAttachment myTimeoutAttachment;
+ private SelectionKey myKey;
+ private SelectableChannel myChannel;
+ private ConnectionListener myConnectionListener;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ myTimeService = new MockTimeService();
+ myConnectionListener = mock(ConnectionListener.class);
+ myTimeoutAttachment = mock(TimeoutAttachment.class);
+ myKey = mock(SelectionKey.class);
+ myChannel = SocketChannel.open();
+ when(myKey.channel()).thenReturn(myChannel);
+ when(myKey.interestOps()).thenReturn(SelectionKey.OP_READ);
+ myKey.attach(myTimeoutAttachment);
+ }
+
+ public void testSelected() {
+
+ long oldTime = 10;
+ myTimeService.setTime(oldTime);
+
+ CleanupProcessor cleanupProcessor = new CleanupKeyProcessor(myTimeService);
+ cleanupProcessor.processSelected(myKey);
+
+ verify(myTimeoutAttachment).communicatedNow(eq(oldTime));
+
+ long newTime = 100;
+ myTimeService.setTime(newTime);
+
+ cleanupProcessor.processSelected(myKey);
+
+ verify(myTimeoutAttachment).communicatedNow(eq(newTime));
+ }
+
+ public void testCleanupWillCloseWithTimeout() throws Exception {
+
+ when(myTimeoutAttachment.isTimeoutElapsed(anyLong())).thenReturn(true);
+
+ CleanupProcessor cleanupProcessor = new CleanupKeyProcessor(myTimeService);
+ cleanupProcessor.processCleanup(myKey);
+
+ verify(myKey).cancel();
+ verify(myKey).channel();
+ verify(myTimeoutAttachment).onTimeoutElapsed(any(SocketChannel.class));
+ verifyNoMoreInteractions(myKey);
+ }
+
+ public void testCleanupWithoutClose() {
+ when(myTimeoutAttachment.isTimeoutElapsed(anyLong())).thenReturn(false);
+
+ myTimeService.setTime(200);
+
+ CleanupProcessor cleanupProcessor = new CleanupKeyProcessor(myTimeService);
+ cleanupProcessor.processCleanup(myKey);
+
+ verify(myTimeoutAttachment).isTimeoutElapsed(myTimeService.now());
+ verify(myKey, never()).cancel();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ myChannel.close();
+ }
+}
\ No newline at end of file
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessorTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessorTest.java
new file mode 100644
index 0000000..b784952
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessorTest.java
@@ -0,0 +1,102 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.network.WriteAttachment;
+import com.turn.ttorrent.network.WriteListener;
+import com.turn.ttorrent.network.WriteTask;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.BlockingQueue;
+
+import static org.mockito.Mockito.*;
+
+@Test
+public class WritableKeyProcessorTest {
+
+ private SelectionKey myKey;
+ private SocketChannel myChannel;
+ private WritableKeyProcessor myWritableKeyProcessor;
+ private WriteAttachment myWriteAttachment;
+ private BlockingQueue<WriteTask> myQueue;
+
+
+ @SuppressWarnings("unchecked")
+ @BeforeMethod
+ public void setUp() throws Exception {
+ myKey = mock(SelectionKey.class);
+ myChannel = mock(SocketChannel.class);
+ myWritableKeyProcessor = new WritableKeyProcessor();
+ when(myKey.channel()).thenReturn(myChannel);
+ when(myKey.interestOps()).thenReturn(SelectionKey.OP_WRITE);
+ myWriteAttachment = mock(WriteAttachment.class);
+ myQueue = mock(BlockingQueue.class);
+ }
+
+ public void testThatOnWriteDoneInvoked() throws Exception {
+ final ByteBuffer data = ByteBuffer.allocate(10);
+
+ //imitate writing byte buffer
+ when(myChannel.write(eq(data))).then(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ data.position(data.capacity());
+ return data.capacity();
+ }
+ });
+
+ WriteListener listener = mock(WriteListener.class);
+
+ when(myQueue.peek()).thenReturn(new WriteTask(myChannel, data, listener));
+ when(myWriteAttachment.getWriteTasks()).thenReturn(myQueue);
+
+ myKey.attach(myWriteAttachment);
+
+ myWritableKeyProcessor.process(myKey);
+
+ verify(listener).onWriteDone();
+ }
+
+ public void testThatOnWriteFailedInvokedIfChannelThrowException() throws Exception {
+ when(myChannel.write(any(ByteBuffer.class))).thenThrow(new IOException());
+
+ WriteListener listener = mock(WriteListener.class);
+
+ when(myQueue.peek()).thenReturn(new WriteTask(myChannel, ByteBuffer.allocate(1), listener));
+ when(myWriteAttachment.getWriteTasks()).thenReturn(myQueue);
+ myKey.attach(myWriteAttachment);
+
+ myWritableKeyProcessor.process(myKey);
+
+ verify(listener).onWriteFailed(anyString(), any(Throwable.class));
+ }
+
+ public void checkThatWriteTaskDoesntRemovedIfBufferIsNotWrittenInOneStep() throws Exception {
+ final ByteBuffer data = ByteBuffer.allocate(10);
+
+ //imitate writing only one byte of byte buffer
+ when(myChannel.write(eq(data))).then(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+ data.position(data.capacity() - 1);
+ return data.position();
+ }
+ });
+
+ WriteListener listener = mock(WriteListener.class);
+
+ when(myQueue.peek()).thenReturn(new WriteTask(myChannel, data, listener));
+ when(myWriteAttachment.getWriteTasks()).thenReturn(myQueue);
+
+ myKey.attach(myWriteAttachment);
+
+ myWritableKeyProcessor.process(myKey);
+
+ verify(listener, never()).onWriteDone();
+ }
+}