tracker

Change-Id: I8f8ac81f9c4d7c7650cd64d2dade701dc6c11dce
diff --git a/ttorrent-master/network/pom.xml b/ttorrent-master/network/pom.xml
new file mode 100644
index 0000000..72c994b
--- /dev/null
+++ b/ttorrent-master/network/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>com.turn</groupId>
+        <artifactId>ttorrent</artifactId>
+        <version>1.3.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <name>ttorrent/network</name>
+    <url>http://turn.github.com/ttorrent/</url>
+    <artifactId>ttorrent-network</artifactId>
+    <version>1.0</version>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>com.turn</groupId>
+            <artifactId>ttorrent-common</artifactId>
+            <version>1.3.0-SNAPSHOT</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.turn</groupId>
+            <artifactId>ttorrent-test-api</artifactId>
+            <version>1.0</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachment.java
new file mode 100644
index 0000000..acba652
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachment.java
@@ -0,0 +1,10 @@
+package com.turn.ttorrent.network;
+
+public interface AcceptAttachment {
+
+  /**
+   * @return channel listener factory for create listeners for new connections
+   */
+  ChannelListenerFactory getChannelListenerFactory();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachmentImpl.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachmentImpl.java
new file mode 100644
index 0000000..ce0b051
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/AcceptAttachmentImpl.java
@@ -0,0 +1,32 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public class AcceptAttachmentImpl implements AcceptAttachment, TimeoutAttachment {
+
+  private final ChannelListenerFactory myChannelListenerFactory;
+
+  public AcceptAttachmentImpl(ChannelListenerFactory channelListenerFactory) {
+    this.myChannelListenerFactory = channelListenerFactory;
+  }
+
+  @Override
+  public ChannelListenerFactory getChannelListenerFactory() {
+    return myChannelListenerFactory;
+  }
+
+  @Override
+  public boolean isTimeoutElapsed(long currentTimeMillis) {
+    return false;//accept attachment doesn't closed by timeout
+  }
+
+  @Override
+  public void communicatedNow(long currentTimeMillis) {
+  }
+
+  @Override
+  public void onTimeoutElapsed(SocketChannel channel) throws IOException {
+
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ChannelListenerFactory.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ChannelListenerFactory.java
new file mode 100644
index 0000000..25e111c
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ChannelListenerFactory.java
@@ -0,0 +1,7 @@
+package com.turn.ttorrent.network;
+
+public interface ChannelListenerFactory {
+
+  ConnectionListener newChannelListener();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectTask.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectTask.java
new file mode 100644
index 0000000..5906873
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectTask.java
@@ -0,0 +1,59 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.SocketChannel;
+
+public class ConnectTask implements TimeoutAttachment, ReadAttachment {
+
+  private long lastCommunicationTime;
+  private final int myTimeoutMillis;
+  private final String myHost;
+  private final int myPort;
+  private final ConnectionListener myConnectionListener;
+
+  public ConnectTask(String host, int port, ConnectionListener connectionListener, long lastCommunicationTime, int timeoutMillis) {
+    this.myHost = host;
+    this.myPort = port;
+    this.myConnectionListener = connectionListener;
+    this.myTimeoutMillis = timeoutMillis;
+    this.lastCommunicationTime = lastCommunicationTime;
+  }
+
+  public String getHost() {
+    return myHost;
+  }
+
+  public int getPort() {
+    return myPort;
+  }
+
+  @Override
+  public ConnectionListener getConnectionListener() {
+    return myConnectionListener;
+  }
+
+  @Override
+  public String toString() {
+    return "ConnectTask{" +
+            "myHost='" + myHost + '\'' +
+            ", myPort=" + myPort +
+            '}';
+  }
+
+  @Override
+  public boolean isTimeoutElapsed(long currentTimeMillis) {
+    long minTimeForKeepAlive = currentTimeMillis - myTimeoutMillis;
+    return minTimeForKeepAlive > lastCommunicationTime;
+  }
+
+  @Override
+  public void communicatedNow(long currentTimeMillis) {
+    lastCommunicationTime = currentTimeMillis;
+  }
+
+  @Override
+  public void onTimeoutElapsed(SocketChannel channel) throws IOException {
+    myConnectionListener.onError(channel, new SocketTimeoutException());
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionClosedException.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionClosedException.java
new file mode 100644
index 0000000..f6dca57
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionClosedException.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+
+public class ConnectionClosedException extends IOException {
+
+  public ConnectionClosedException() {
+  }
+
+  public ConnectionClosedException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionListener.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionListener.java
new file mode 100644
index 0000000..09f5888
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionListener.java
@@ -0,0 +1,32 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public interface ConnectionListener {
+
+  /**
+   * invoked when specified socket channel contains any data
+   *
+   * @param socketChannel specified socket channel with data
+   * @throws IOException if an I/O error occurs
+   */
+  void onNewDataAvailable(SocketChannel socketChannel) throws IOException;
+
+  /**
+   * invoked when get new connection
+   *
+   * @param socketChannel specified socket channel
+   * @throws IOException if an I/O error occurs
+   */
+  void onConnectionEstablished(SocketChannel socketChannel) throws IOException;
+
+  /**
+   * invoked when an error occurs
+   *
+   * @param socketChannel specified channel, associated with this channel
+   * @param ex            specified exception
+   * @throws IOException if an I/O error occurs
+   */
+  void onError(SocketChannel socketChannel, Throwable ex) throws IOException;
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManager.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManager.java
new file mode 100644
index 0000000..76700a8
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManager.java
@@ -0,0 +1,175 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.keyProcessors.*;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.Channel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.util.Arrays;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.turn.ttorrent.Constants.DEFAULT_CLEANUP_RUN_TIMEOUT_MILLIS;
+import static com.turn.ttorrent.Constants.DEFAULT_SELECTOR_SELECT_TIMEOUT_MILLIS;
+
+public class ConnectionManager {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(ConnectionManager.class);
+
+  private final Selector selector;
+  private final TimeService myTimeService;
+  private volatile ConnectionWorker myConnectionWorker;
+  private int myBindPort;
+  private final ConnectionManagerContext myContext;
+  private volatile ServerSocketChannel myServerSocketChannel;
+  private volatile Future<?> myWorkerFuture;
+  private final NewConnectionAllower myIncomingConnectionAllower;
+  private final NewConnectionAllower myOutgoingConnectionAllower;
+  private final TimeoutStorage socketTimeoutStorage = new TimeoutStorageImpl();
+  private final AtomicBoolean alreadyInit = new AtomicBoolean(false);
+  private final AtomicInteger mySendBufferSize;
+  private final AtomicInteger myReceiveBufferSize;
+
+  public ConnectionManager(ConnectionManagerContext context,
+                           TimeService timeService,
+                           NewConnectionAllower newIncomingConnectionAllower,
+                           NewConnectionAllower newOutgoingConnectionAllower,
+                           SelectorFactory selectorFactory,
+                           AtomicInteger mySendBufferSize,
+                           AtomicInteger myReceiveBufferSize) throws IOException {
+    this.mySendBufferSize = mySendBufferSize;
+    this.myReceiveBufferSize = myReceiveBufferSize;
+    this.selector = selectorFactory.newSelector();
+    this.myTimeService = timeService;
+    myContext = context;
+    this.myIncomingConnectionAllower = newIncomingConnectionAllower;
+    this.myOutgoingConnectionAllower = newOutgoingConnectionAllower;
+  }
+
+  public void initAndRunWorker(ServerChannelRegister serverChannelRegister) throws IOException {
+
+    boolean wasInit = alreadyInit.getAndSet(true);
+
+    if (wasInit) {
+      throw new IllegalStateException("connection manager was already initialized");
+    }
+
+    myServerSocketChannel = serverChannelRegister.channelFor(selector);
+    myServerSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new AcceptAttachmentImpl(myContext));
+    myBindPort = myServerSocketChannel.socket().getLocalPort();
+    String serverName = myServerSocketChannel.socket().toString();
+    myConnectionWorker = new ConnectionWorker(selector, Arrays.asList(
+            new InvalidKeyProcessor(),
+            new AcceptableKeyProcessor(selector, serverName, myTimeService, myIncomingConnectionAllower, socketTimeoutStorage,
+                    mySendBufferSize, myReceiveBufferSize),
+            new ConnectableKeyProcessor(selector, myTimeService, socketTimeoutStorage,
+                    mySendBufferSize, myReceiveBufferSize),
+            new ReadableKeyProcessor(serverName),
+            new WritableKeyProcessor()), DEFAULT_SELECTOR_SELECT_TIMEOUT_MILLIS, DEFAULT_CLEANUP_RUN_TIMEOUT_MILLIS,
+            myTimeService,
+            new CleanupKeyProcessor(myTimeService),
+            myOutgoingConnectionAllower);
+    myWorkerFuture = myContext.getExecutor().submit(myConnectionWorker);
+  }
+
+  public void setSelectorSelectTimeout(int timeout) {
+    ConnectionWorker workerLocal = myConnectionWorker;
+    checkThatWorkerIsInit(workerLocal);
+    workerLocal.setSelectorSelectTimeout(timeout);
+  }
+
+  private void checkThatWorkerIsInit(ConnectionWorker worker) {
+    if (worker == null) throw new IllegalStateException("Connection manager is not initialized!");
+  }
+
+  public boolean offerConnect(ConnectTask connectTask, int timeout, TimeUnit timeUnit) {
+    if (myConnectionWorker == null) {
+      return false;
+    }
+    return myConnectionWorker.offerConnect(connectTask, timeout, timeUnit);
+  }
+
+  public boolean offerWrite(WriteTask writeTask, int timeout, TimeUnit timeUnit) {
+    if (myConnectionWorker == null) {
+      return false;
+    }
+    return myConnectionWorker.offerWrite(writeTask, timeout, timeUnit);
+  }
+
+
+  public int getBindPort() {
+    return myBindPort;
+  }
+
+  public void close(int timeout, TimeUnit timeUnit) {
+    logger.debug("try close connection manager...");
+    boolean successfullyClosed = true;
+    if (myConnectionWorker != null) {
+      myWorkerFuture.cancel(true);
+      try {
+        boolean shutdownCorrectly = myConnectionWorker.stop(timeout, timeUnit);
+        if (!shutdownCorrectly) {
+          successfullyClosed = false;
+          logger.warn("unable to terminate worker in {} {}", timeout, timeUnit);
+        }
+      } catch (InterruptedException e) {
+        successfullyClosed = false;
+        LoggerUtils.warnAndDebugDetails(logger, "unable to await termination worker, thread was interrupted", e);
+      }
+    }
+    try {
+      this.myServerSocketChannel.close();
+    } catch (Throwable e) {
+      LoggerUtils.errorAndDebugDetails(logger, "unable to close server socket channel", e);
+      successfullyClosed = false;
+    }
+    for (SelectionKey key : this.selector.keys()) {
+      try {
+        if (key.isValid()) {
+          key.channel().close();
+        }
+      } catch (Throwable e) {
+        logger.error("unable to close socket channel {}", key.channel());
+        successfullyClosed = false;
+        logger.debug("", e);
+      }
+    }
+    try {
+      this.selector.close();
+    } catch (Throwable e) {
+      LoggerUtils.errorAndDebugDetails(logger, "unable to close selector channel", e);
+      successfullyClosed = false;
+    }
+    if (successfullyClosed) {
+      logger.debug("connection manager is successfully closed");
+    } else {
+      logger.error("connection manager wasn't closed successfully");
+    }
+  }
+
+  public void close() {
+    close(1, TimeUnit.MINUTES);
+  }
+
+  public void setCleanupTimeout(long timeoutMillis) {
+    ConnectionWorker workerLocal = myConnectionWorker;
+    checkThatWorkerIsInit(workerLocal);
+    workerLocal.setCleanupTimeout(timeoutMillis);
+  }
+
+  public void setSocketConnectionTimeout(long timeoutMillis) {
+    socketTimeoutStorage.setTimeout(timeoutMillis);
+  }
+
+  public void closeChannel(Channel channel) throws IOException {
+    channel.close();
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManagerContext.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManagerContext.java
new file mode 100644
index 0000000..c95cbb5
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionManagerContext.java
@@ -0,0 +1,9 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.ExecutorService;
+
+public interface ConnectionManagerContext extends ChannelListenerFactory {
+
+  ExecutorService getExecutor();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionWorker.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionWorker.java
new file mode 100644
index 0000000..2d83275
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ConnectionWorker.java
@@ -0,0 +1,259 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.keyProcessors.CleanupProcessor;
+import com.turn.ttorrent.network.keyProcessors.KeyProcessor;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+public class ConnectionWorker implements Runnable {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(ConnectionWorker.class);
+  private static final String SELECTOR_THREAD_NAME = "Torrent channels manager thread";
+  private volatile boolean stop = false;
+  private final Selector selector;
+  private final BlockingQueue<ConnectTask> myConnectQueue;
+  private final BlockingQueue<WriteTask> myWriteQueue;
+  private final Semaphore mySemaphore;
+  private final List<KeyProcessor> myKeyProcessors;
+  private final TimeService myTimeService;
+  private long lastCleanupTime;
+  private volatile int mySelectorTimeoutMillis;
+  private volatile long myCleanupTimeoutMillis;
+  private final CleanupProcessor myCleanupProcessor;
+  private final NewConnectionAllower myNewConnectionAllower;
+
+  ConnectionWorker(Selector selector,
+                   List<KeyProcessor> keyProcessors,
+                   int selectorTimeoutMillis,
+                   int cleanupTimeoutMillis,
+                   TimeService timeService,
+                   CleanupProcessor cleanupProcessor,
+                   NewConnectionAllower myNewConnectionAllower) {
+    this.selector = selector;
+    this.myTimeService = timeService;
+    this.lastCleanupTime = timeService.now();
+    this.mySelectorTimeoutMillis = selectorTimeoutMillis;
+    this.myCleanupTimeoutMillis = cleanupTimeoutMillis;
+    this.myCleanupProcessor = cleanupProcessor;
+    this.myNewConnectionAllower = myNewConnectionAllower;
+    this.mySemaphore = new Semaphore(1);
+    this.myConnectQueue = new LinkedBlockingQueue<ConnectTask>(100);
+    this.myKeyProcessors = keyProcessors;
+    this.myWriteQueue = new LinkedBlockingQueue<WriteTask>(5000);
+  }
+
+  @Override
+  public void run() {
+
+    try {
+      mySemaphore.acquire();
+    } catch (InterruptedException e) {
+      return;
+    }
+
+    final String oldName = Thread.currentThread().getName();
+
+    try {
+
+      Thread.currentThread().setName(SELECTOR_THREAD_NAME);
+
+      while (!stop && (!Thread.currentThread().isInterrupted())) {
+        try {
+          logger.trace("try select keys from selector");
+          int selected;
+          try {
+            selected = selector.select(mySelectorTimeoutMillis);
+          } catch (ClosedSelectorException e) {
+            break;
+          }
+          connectToPeersFromQueue();
+          processWriteTasks();
+          logger.trace("select keys from selector. Keys count is " + selected);
+          if (selected != 0) {
+            processSelectedKeys();
+          }
+          if (needRunCleanup()) {
+            cleanup();
+          }
+        } catch (Throwable e) {
+          LoggerUtils.warnAndDebugDetails(logger, "unable to select channel keys. Error message {}", e.getMessage(), e);
+        }
+      }
+    } catch (Throwable e) {
+      LoggerUtils.errorAndDebugDetails(logger, "exception on cycle iteration", e);
+    } finally {
+      Thread.currentThread().setName(oldName);
+      mySemaphore.release();
+    }
+  }
+
+  private void cleanup() {
+    lastCleanupTime = myTimeService.now();
+    for (SelectionKey key : selector.keys()) {
+      if (!key.isValid()) continue;
+      myCleanupProcessor.processCleanup(key);
+    }
+  }
+
+  private boolean needRunCleanup() {
+    return (myTimeService.now() - lastCleanupTime) > myCleanupTimeoutMillis;
+  }
+
+  private void processWriteTasks() {
+
+    final Iterator<WriteTask> iterator = myWriteQueue.iterator();
+    while (iterator.hasNext()) {
+      WriteTask writeTask = iterator.next();
+      if (stop || Thread.currentThread().isInterrupted()) {
+        return;
+      }
+      logger.trace("try register channel for write. Write task is {}", writeTask);
+      SocketChannel socketChannel = (SocketChannel) writeTask.getSocketChannel();
+      if (!socketChannel.isOpen()) {
+        iterator.remove();
+        writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Channel is not open"), new ConnectionClosedException());
+        continue;
+      }
+      SelectionKey key = socketChannel.keyFor(selector);
+      if (key == null) {
+        logger.warn("unable to find key for channel {}", socketChannel);
+        iterator.remove();
+        writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Can not find key for the channel"), new ConnectionClosedException());
+        continue;
+      }
+      Object attachment = key.attachment();
+      if (!(attachment instanceof WriteAttachment)) {
+        logger.error("incorrect attachment {} for channel {}", attachment, socketChannel);
+        iterator.remove();
+        writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Incorrect attachment instance for the key"), new ConnectionClosedException());
+        continue;
+      }
+      WriteAttachment keyAttachment = (WriteAttachment) attachment;
+      if (keyAttachment.getWriteTasks().offer(writeTask)) {
+        iterator.remove();
+        try {
+          key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        } catch (CancelledKeyException e) {
+          writeTask.getListener().onWriteFailed(getDefaultWriteErrorMessageWithSuffix(socketChannel, "Key is cancelled"), new ConnectionClosedException(e));
+        }
+      }
+    }
+  }
+
+  private String getDefaultWriteErrorMessageWithSuffix(SocketChannel socketChannel, String suffix) {
+    return "unable write data to channel " + socketChannel + ". " + suffix;
+  }
+
+  private void connectToPeersFromQueue() {
+    ConnectTask connectTask;
+    while ((connectTask = myConnectQueue.poll()) != null) {
+      if (stop || Thread.currentThread().isInterrupted()) {
+        return;
+      }
+      logger.debug("try connect to peer. Connect task is {}", connectTask);
+      try {
+        SocketChannel socketChannel = SocketChannel.open();
+        socketChannel.configureBlocking(false);
+        socketChannel.register(selector, SelectionKey.OP_CONNECT, connectTask);
+        socketChannel.connect(new InetSocketAddress(connectTask.getHost(), connectTask.getPort()));
+      } catch (IOException e) {
+        LoggerUtils.warnAndDebugDetails(logger, "unable connect. Connect task is {}", connectTask, e);
+      }
+    }
+  }
+
+  public boolean stop(int timeout, TimeUnit timeUnit) throws InterruptedException {
+    stop = true;
+    if (timeout <= 0) {
+      return true;
+    }
+    return mySemaphore.tryAcquire(timeout, timeUnit);
+  }
+
+  private void processSelectedKeys() {
+    Set<SelectionKey> selectionKeys = selector.selectedKeys();
+    for (SelectionKey key : selectionKeys) {
+      if (stop || Thread.currentThread().isInterrupted()) {
+        return;
+      }
+      try {
+        processSelectedKey(key);
+      } catch (Exception e) {
+        logger.warn("error {} in processing key. Close channel {}", e.getMessage(), key.channel());
+        logger.debug("", e);
+        try {
+          key.channel().close();
+        } catch (IOException ioe) {
+          LoggerUtils.errorAndDebugDetails(logger, "unable close bad channel", ioe);
+        }
+      }
+    }
+    selectionKeys.clear();
+  }
+
+  private void processSelectedKey(SelectionKey key) throws IOException {
+    logger.trace("try process key for channel {}", key.channel());
+    myCleanupProcessor.processSelected(key);
+    if (!key.channel().isOpen()) {
+      key.cancel();
+      return;
+    }
+    for (KeyProcessor keyProcessor : myKeyProcessors) {
+      if (keyProcessor.accept(key)) {
+        keyProcessor.process(key);
+      }
+    }
+  }
+
+  public boolean offerConnect(ConnectTask connectTask, int timeout, TimeUnit timeUnit) {
+    if (!myNewConnectionAllower.isNewConnectionAllowed()) {
+      logger.info("can not add connect task {} to queue. New connection is not allowed", connectTask);
+      return false;
+    }
+    return addTaskToQueue(connectTask, timeout, timeUnit, myConnectQueue);
+  }
+
+  public boolean offerWrite(WriteTask writeTask, int timeout, TimeUnit timeUnit) {
+    boolean done = addTaskToQueue(writeTask, timeout, timeUnit, myWriteQueue);
+    if (!done) {
+      writeTask.getListener().onWriteFailed("unable add task " + writeTask + " to the queue. Maybe queue is overload", null);
+    }
+    return done;
+  }
+
+  private <T> boolean addTaskToQueue(T task, int timeout, TimeUnit timeUnit, BlockingQueue<T> queue) {
+    try {
+      if (queue.offer(task, timeout, timeUnit)) {
+        logger.trace("added task {}. Wake up selector", task);
+        selector.wakeup();
+        return true;
+      }
+    } catch (InterruptedException e) {
+      logger.debug("Task {} interrupted before was added to queue", task);
+    }
+    logger.debug("Task {} was not added", task);
+    return false;
+  }
+
+  void setCleanupTimeout(long timeoutMillis) {
+    this.myCleanupTimeoutMillis = timeoutMillis;
+  }
+
+  void setSelectorSelectTimeout(int timeout) {
+    mySelectorTimeoutMillis = timeout;
+  }
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/FirstAvailableChannel.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/FirstAvailableChannel.java
new file mode 100644
index 0000000..b31a7ef
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/FirstAvailableChannel.java
@@ -0,0 +1,49 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+
+public class FirstAvailableChannel implements ServerChannelRegister {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(FirstAvailableChannel.class);
+
+  private final int firstTryPort;
+  private final int lastTryPort;
+
+  public FirstAvailableChannel(int firstTryPort, int lastTryPort) {
+    this.firstTryPort = firstTryPort;
+    this.lastTryPort = lastTryPort;
+  }
+
+  @NotNull
+  @Override
+  public ServerSocketChannel channelFor(Selector selector) throws IOException {
+    ServerSocketChannel myServerSocketChannel = selector.provider().openServerSocketChannel();
+    myServerSocketChannel.configureBlocking(false);
+    int bindPort = -1;
+    for (int port = firstTryPort; port <= lastTryPort; port++) {
+      try {
+        InetSocketAddress tryAddress = new InetSocketAddress(port);
+        myServerSocketChannel.socket().bind(tryAddress);
+        bindPort = tryAddress.getPort();
+        break;
+      } catch (IOException e) {
+        //try next port
+        logger.debug("Could not bind to port {}, trying next port...", port);
+      }
+    }
+    if (bindPort == -1) {
+      logger.error(String.format(
+              "No available ports in range [%d, %d] for the BitTorrent client!", firstTryPort, lastTryPort
+      ));
+      throw new IOException("No available port for the BitTorrent client!");
+    }
+    return myServerSocketChannel;
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/NewConnectionAllower.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/NewConnectionAllower.java
new file mode 100644
index 0000000..c1f834f
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/NewConnectionAllower.java
@@ -0,0 +1,10 @@
+package com.turn.ttorrent.network;
+
+public interface NewConnectionAllower {
+
+  /**
+   * @return true if we can accept new connection or can connect to other peer
+   */
+  boolean isNewConnectionAllowed();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadAttachment.java
new file mode 100644
index 0000000..2072e30
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadAttachment.java
@@ -0,0 +1,9 @@
+package com.turn.ttorrent.network;
+
+public interface ReadAttachment {
+
+  /**
+   * @return connection listener, associated with key with current attachment
+   */
+  ConnectionListener getConnectionListener();
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadWriteAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadWriteAttachment.java
new file mode 100644
index 0000000..6bb06e4
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ReadWriteAttachment.java
@@ -0,0 +1,50 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class ReadWriteAttachment implements ReadAttachment, WriteAttachment, TimeoutAttachment {
+
+  private final static int WRITE_TASK_QUEUE_SIZE = 150;
+
+  private long lastCommunicationTime;
+  private final ConnectionListener connectionListener;
+  private final long myTimeoutMillis;
+  private final BlockingQueue<WriteTask> writeTasks;
+
+  public ReadWriteAttachment(ConnectionListener connectionListener, long lastCommunicationTime, long timeoutMillis) {
+    this.connectionListener = connectionListener;
+    this.writeTasks = new LinkedBlockingQueue<WriteTask>(WRITE_TASK_QUEUE_SIZE);
+    this.lastCommunicationTime = lastCommunicationTime;
+    this.myTimeoutMillis = timeoutMillis;
+  }
+
+  @Override
+  public ConnectionListener getConnectionListener() {
+    return connectionListener;
+  }
+
+  @Override
+  public BlockingQueue<WriteTask> getWriteTasks() {
+    return writeTasks;
+  }
+
+  @Override
+  public boolean isTimeoutElapsed(long currentTimeMillis) {
+    long minTimeForKeepAlive = currentTimeMillis - myTimeoutMillis;
+    return minTimeForKeepAlive > lastCommunicationTime;
+  }
+
+  @Override
+  public void communicatedNow(long currentTimeMillis) {
+    lastCommunicationTime = currentTimeMillis;
+  }
+
+  @Override
+  public void onTimeoutElapsed(SocketChannel channel) throws IOException {
+    connectionListener.onError(channel, new SocketTimeoutException());
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/SelectorFactory.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/SelectorFactory.java
new file mode 100644
index 0000000..b3b99b2
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/SelectorFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2000-2018 JetBrains s.r.o.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+
+public interface SelectorFactory {
+
+  /**
+   * @return new {@link Selector} instance
+   * @throws IOException if any io error occurs
+   */
+  Selector newSelector() throws IOException;
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ServerChannelRegister.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ServerChannelRegister.java
new file mode 100644
index 0000000..8f69c5d
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/ServerChannelRegister.java
@@ -0,0 +1,20 @@
+package com.turn.ttorrent.network;
+
+import org.jetbrains.annotations.NotNull;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+
+public interface ServerChannelRegister {
+
+  /**
+   * Create new channel and bind to specified selector
+   *
+   * @param selector specified selector
+   * @return new created server channel
+   */
+  @NotNull
+  ServerSocketChannel channelFor(Selector selector) throws IOException;
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutAttachment.java
new file mode 100644
index 0000000..6478666
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutAttachment.java
@@ -0,0 +1,29 @@
+package com.turn.ttorrent.network;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+
+public interface TimeoutAttachment {
+
+  /**
+   * @param currentTimeMillis current time for timeout calculation
+   * @return true, if and only if timeout was elapsed
+   */
+  boolean isTimeoutElapsed(long currentTimeMillis);
+
+  /**
+   * set last communication time to current time
+   *
+   * @param currentTimeMillis current time in milliseconds
+   */
+  void communicatedNow(long currentTimeMillis);
+
+  /**
+   * must be invoked if timeout was elapsed
+   *
+   * @param channel specified channel for key associated with this attachment
+   * @throws IOException if an I/O error occurs
+   */
+  void onTimeoutElapsed(SocketChannel channel) throws IOException;
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorage.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorage.java
new file mode 100644
index 0000000..ff14d92
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorage.java
@@ -0,0 +1,13 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TimeoutStorage {
+
+  void setTimeout(long millis);
+
+  void setTimeout(int timeout, TimeUnit timeUnit);
+
+  long getTimeoutMillis();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorageImpl.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorageImpl.java
new file mode 100644
index 0000000..529f0da
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/TimeoutStorageImpl.java
@@ -0,0 +1,24 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class TimeoutStorageImpl implements TimeoutStorage {
+
+  private final AtomicLong timeoutMillis = new AtomicLong();
+
+  @Override
+  public void setTimeout(long millis) {
+    timeoutMillis.set(millis);
+  }
+
+  @Override
+  public void setTimeout(int timeout, TimeUnit timeUnit) {
+    setTimeout(timeUnit.toMillis(timeout));
+  }
+
+  @Override
+  public long getTimeoutMillis() {
+    return timeoutMillis.get();
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteAttachment.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteAttachment.java
new file mode 100644
index 0000000..3497279
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteAttachment.java
@@ -0,0 +1,12 @@
+package com.turn.ttorrent.network;
+
+import java.util.concurrent.BlockingQueue;
+
+public interface WriteAttachment {
+
+  /**
+   * @return queue for offer/peek write tasks
+   */
+  BlockingQueue<WriteTask> getWriteTasks();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteListener.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteListener.java
new file mode 100644
index 0000000..4743e53
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteListener.java
@@ -0,0 +1,18 @@
+package com.turn.ttorrent.network;
+
+public interface WriteListener {
+
+  /**
+   * invoked if write is failed by any reason
+   *
+   * @param message error description
+   * @param e       exception if exist. Otherwise null
+   */
+  void onWriteFailed(String message, Throwable e);
+
+  /**
+   * invoked if write done correctly
+   */
+  void onWriteDone();
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteTask.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteTask.java
new file mode 100644
index 0000000..838752b
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/WriteTask.java
@@ -0,0 +1,38 @@
+package com.turn.ttorrent.network;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+public class WriteTask {
+
+  private final ByteChannel socketChannel;
+  private final ByteBuffer byteBuffer;
+  private final WriteListener listener;
+
+  public WriteTask(ByteChannel socketChannel, ByteBuffer byteBuffer, WriteListener listener) {
+    this.socketChannel = socketChannel;
+    this.byteBuffer = byteBuffer;
+    this.listener = listener;
+  }
+
+  public ByteChannel getSocketChannel() {
+    return socketChannel;
+  }
+
+  public ByteBuffer getByteBuffer() {
+    return byteBuffer;
+  }
+
+  public WriteListener getListener() {
+    return listener;
+  }
+
+  @Override
+  public String toString() {
+    return "WriteTask{" +
+            "socketChannel=" + socketChannel +
+            ", byteBuffer=" + byteBuffer +
+            ", listener=" + listener +
+            '}';
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/AcceptableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/AcceptableKeyProcessor.java
new file mode 100644
index 0000000..e3b07c2
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/AcceptableKeyProcessor.java
@@ -0,0 +1,77 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.*;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class AcceptableKeyProcessor implements KeyProcessor {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(AcceptableKeyProcessor.class);
+
+  private final Selector mySelector;
+  private final String myServerSocketLocalAddress;
+  private final TimeService myTimeService;
+  private final NewConnectionAllower myNewConnectionAllower;
+  private final TimeoutStorage myTimeoutStorage;
+  private final AtomicInteger mySendBufferSize;
+  private final AtomicInteger myReceiveBufferSize;
+
+  public AcceptableKeyProcessor(Selector selector,
+                                String serverSocketLocalAddress,
+                                TimeService timeService,
+                                NewConnectionAllower newConnectionAllower,
+                                TimeoutStorage timeoutStorage,
+                                AtomicInteger sendBufferSize,
+                                AtomicInteger receiveBufferSize) {
+    this.mySelector = selector;
+    this.myServerSocketLocalAddress = serverSocketLocalAddress;
+    this.myTimeService = timeService;
+    this.myNewConnectionAllower = newConnectionAllower;
+    this.myTimeoutStorage = timeoutStorage;
+    this.mySendBufferSize = sendBufferSize;
+    this.myReceiveBufferSize = receiveBufferSize;
+  }
+
+  @Override
+  public void process(SelectionKey key) throws IOException {
+    SelectableChannel channel = key.channel();
+    if (!(channel instanceof ServerSocketChannel)) {
+      logger.error("incorrect instance of server channel. Can not accept connections");
+      key.cancel();
+      return;
+    }
+    Object attachment = key.attachment();
+    if (!(attachment instanceof AcceptAttachment)) {
+      logger.error("incorrect instance of server channel key attachment");
+      key.cancel();
+      return;
+    }
+    ChannelListenerFactory channelListenerFactory = ((AcceptAttachment) attachment).getChannelListenerFactory();
+
+    SocketChannel socketChannel = ((ServerSocketChannel) key.channel()).accept();
+    logger.trace("server {} get new connection from {}", new Object[]{myServerSocketLocalAddress, socketChannel.socket()});
+
+    if (!myNewConnectionAllower.isNewConnectionAllowed()) {
+      logger.info("new connection is not allowed. New connection is closed");
+      socketChannel.close();
+      return;
+    }
+
+    ConnectionListener stateConnectionListener = channelListenerFactory.newChannelListener();
+    stateConnectionListener.onConnectionEstablished(socketChannel);
+    socketChannel.configureBlocking(false);
+    KeyProcessorUtil.setBuffersSizeIfNecessary(socketChannel, mySendBufferSize.get(), myReceiveBufferSize.get());
+    ReadWriteAttachment keyAttachment = new ReadWriteAttachment(stateConnectionListener, myTimeService.now(), myTimeoutStorage.getTimeoutMillis());
+    socketChannel.register(mySelector, SelectionKey.OP_READ, keyAttachment);
+  }
+
+  @Override
+  public boolean accept(SelectionKey key) {
+    return key.isValid() && key.isAcceptable();
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessor.java
new file mode 100644
index 0000000..34e7f78
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessor.java
@@ -0,0 +1,58 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.LoggerUtils;
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.TimeoutAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class CleanupKeyProcessor implements CleanupProcessor {
+
+  private final static Logger logger = TorrentLoggerFactory.getLogger(CleanupKeyProcessor.class);
+
+  private final TimeService myTimeService;
+
+  public CleanupKeyProcessor(TimeService timeService) {
+    this.myTimeService = timeService;
+  }
+
+  @Override
+  public void processCleanup(SelectionKey key) {
+    TimeoutAttachment attachment = KeyProcessorUtil.getAttachmentAsTimeoutOrNull(key);
+    if (attachment == null) {
+      key.cancel();
+      return;
+    }
+    if (attachment.isTimeoutElapsed(myTimeService.now())) {
+
+      SocketChannel channel = KeyProcessorUtil.getCastedChannelOrNull(key);
+      if (channel == null) {
+        key.cancel();
+        return;
+      }
+
+      logger.debug("channel {} was inactive in specified timeout. Close channel...", channel);
+      try {
+        channel.close();
+        key.cancel();
+        attachment.onTimeoutElapsed(channel);
+      } catch (IOException e) {
+        LoggerUtils.errorAndDebugDetails(logger, "unable close channel {}", channel, e);
+      }
+    }
+  }
+
+  @Override
+  public void processSelected(SelectionKey key) {
+    TimeoutAttachment attachment = KeyProcessorUtil.getAttachmentAsTimeoutOrNull(key);
+    if (attachment == null) {
+      key.cancel();
+      return;
+    }
+    attachment.communicatedNow(myTimeService.now());
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupProcessor.java
new file mode 100644
index 0000000..c3d50c7
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/CleanupProcessor.java
@@ -0,0 +1,21 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import java.nio.channels.SelectionKey;
+
+public interface CleanupProcessor {
+
+  /**
+   * invoked when the cleanup procedure is running. Processor can cancel key and/or close channel if necessary
+   *
+   * @param key specified key
+   */
+  void processCleanup(SelectionKey key);
+
+  /**
+   * invoked when get any activity for channel associated with this key
+   *
+   * @param key specified key
+   */
+  void processSelected(SelectionKey key);
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ConnectableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ConnectableKeyProcessor.java
new file mode 100644
index 0000000..6b6474a
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ConnectableKeyProcessor.java
@@ -0,0 +1,88 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TimeService;
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ConnectTask;
+import com.turn.ttorrent.network.ConnectionListener;
+import com.turn.ttorrent.network.ReadWriteAttachment;
+import com.turn.ttorrent.network.TimeoutStorage;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.NoRouteToHostException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class ConnectableKeyProcessor implements KeyProcessor {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(ConnectableKeyProcessor.class);
+
+  private final Selector mySelector;
+  private final TimeService myTimeService;
+  private final TimeoutStorage myTimeoutStorage;
+  private final AtomicInteger mySendBufferSize;
+  private final AtomicInteger myReceiveBufferSize;
+
+  public ConnectableKeyProcessor(Selector selector,
+                                 TimeService timeService,
+                                 TimeoutStorage timeoutStorage,
+                                 AtomicInteger sendBufferSize,
+                                 AtomicInteger receiveBufferSize) {
+    this.mySelector = selector;
+    this.myTimeService = timeService;
+    this.myTimeoutStorage = timeoutStorage;
+    this.mySendBufferSize = sendBufferSize;
+    this.myReceiveBufferSize = receiveBufferSize;
+  }
+
+  @Override
+  public void process(SelectionKey key) throws IOException {
+    SelectableChannel channel = key.channel();
+    if (!(channel instanceof SocketChannel)) {
+      logger.warn("incorrect instance of channel. The key is cancelled");
+      key.cancel();
+      return;
+    }
+    SocketChannel socketChannel = (SocketChannel) channel;
+    Object attachment = key.attachment();
+    if (!(attachment instanceof ConnectTask)) {
+      logger.warn("incorrect instance of attachment for channel {}. The key for the channel is cancelled", socketChannel);
+      key.cancel();
+      return;
+    }
+    final ConnectTask connectTask = (ConnectTask) attachment;
+    final ConnectionListener connectionListener = connectTask.getConnectionListener();
+    final boolean isConnectFinished;
+    try {
+      isConnectFinished = socketChannel.finishConnect();
+    } catch (NoRouteToHostException e) {
+      logger.info("Could not connect to {}:{}, received NoRouteToHostException", connectTask.getHost(), connectTask.getPort());
+      connectionListener.onError(socketChannel, e);
+      return;
+    } catch (ConnectException e) {
+      logger.info("Could not connect to {}:{}, received ConnectException", connectTask.getHost(), connectTask.getPort());
+      connectionListener.onError(socketChannel, e);
+      return;
+    }
+    if (!isConnectFinished) {
+      logger.info("Could not connect to {}:{}", connectTask.getHost(), connectTask.getPort());
+      connectionListener.onError(socketChannel, null);
+      return;
+    }
+    socketChannel.configureBlocking(false);
+    KeyProcessorUtil.setBuffersSizeIfNecessary(socketChannel, mySendBufferSize.get(), myReceiveBufferSize.get());
+    ReadWriteAttachment keyAttachment = new ReadWriteAttachment(connectionListener, myTimeService.now(), myTimeoutStorage.getTimeoutMillis());
+    socketChannel.register(mySelector, SelectionKey.OP_READ, keyAttachment);
+    logger.debug("setup new TCP connection with {}", socketChannel);
+    connectionListener.onConnectionEstablished(socketChannel);
+  }
+
+  @Override
+  public boolean accept(SelectionKey key) {
+    return key.isValid() && key.isConnectable();
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/InvalidKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/InvalidKeyProcessor.java
new file mode 100644
index 0000000..259f48f
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/InvalidKeyProcessor.java
@@ -0,0 +1,44 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ReadAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class InvalidKeyProcessor implements KeyProcessor {
+
+  private final static Logger logger = TorrentLoggerFactory.getLogger(InvalidKeyProcessor.class);
+
+  @Override
+  public void process(SelectionKey key) throws IOException {
+    final Object attachment = key.attachment();
+    final SelectableChannel channel = key.channel();
+    if (attachment == null) {
+      key.cancel();
+      return;
+    }
+    if (!(attachment instanceof ReadAttachment)) {
+      key.cancel();
+      return;
+    }
+    if (!(channel instanceof SocketChannel)) {
+      key.cancel();
+      return;
+    }
+    final SocketChannel socketChannel = (SocketChannel) channel;
+    final ReadAttachment readAttachment = (ReadAttachment) attachment;
+
+    logger.trace("drop invalid key {}", channel);
+    readAttachment.getConnectionListener().onError(socketChannel, new CancelledKeyException());
+  }
+
+  @Override
+  public boolean accept(SelectionKey key) {
+    return !key.isValid();
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessor.java
new file mode 100644
index 0000000..2a8f51e
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessor.java
@@ -0,0 +1,22 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+
+public interface KeyProcessor {
+
+  /**
+   * processes the passed key
+   *
+   * @param key key for processing
+   * @throws IOException if an I/O error occurs
+   */
+  void process(SelectionKey key) throws IOException;
+
+  /**
+   * @param key specified key for check acceptance
+   * @return true if and only if processor can process this key.
+   */
+  boolean accept(SelectionKey key);
+
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessorUtil.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessorUtil.java
new file mode 100644
index 0000000..c097942
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/KeyProcessorUtil.java
@@ -0,0 +1,44 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.TimeoutAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class KeyProcessorUtil {
+
+  private final static Logger logger = TorrentLoggerFactory.getLogger(KeyProcessorUtil.class);
+
+  public static TimeoutAttachment getAttachmentAsTimeoutOrNull(SelectionKey key) {
+    Object attachment = key.attachment();
+    if (attachment instanceof TimeoutAttachment) {
+      return (TimeoutAttachment) attachment;
+    }
+    logger.error("unable to cast attachment {} to timeout attachment type", attachment);
+    return null;
+  }
+
+  public static SocketChannel getCastedChannelOrNull(SelectionKey key) {
+    SelectableChannel channel = key.channel();
+    if (channel instanceof SocketChannel) {
+      return (SocketChannel) channel;
+    }
+    logger.error("unable to cast channel {} to specified type");
+    return null;
+  }
+
+  public static void setBuffersSizeIfNecessary(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize) throws IOException {
+    final Socket socket = socketChannel.socket();
+    if (sendBufferSize > 0) {
+      socket.setSendBufferSize(sendBufferSize);
+    }
+    if (receiveBufferSize > 0) {
+      socket.setReceiveBufferSize(receiveBufferSize);
+    }
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ReadableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ReadableKeyProcessor.java
new file mode 100644
index 0000000..d209919
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/ReadableKeyProcessor.java
@@ -0,0 +1,49 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ConnectionListener;
+import com.turn.ttorrent.network.ReadAttachment;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class ReadableKeyProcessor implements KeyProcessor {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(ReadableKeyProcessor.class);
+
+  private final String myServerSocketLocalAddress;
+
+  public ReadableKeyProcessor(String serverSocketLocalAddress) {
+    this.myServerSocketLocalAddress = serverSocketLocalAddress;
+  }
+
+  @Override
+  public void process(SelectionKey key) throws IOException {
+    SelectableChannel channel = key.channel();
+    if (!(channel instanceof SocketChannel)) {
+      logger.warn("incorrect instance of channel. The key is cancelled");
+      key.cancel();
+      return;
+    }
+
+    SocketChannel socketChannel = (SocketChannel) channel;
+    logger.trace("server {} get new data from {}", myServerSocketLocalAddress, socketChannel);
+
+    Object attachment = key.attachment();
+    if (!(attachment instanceof ReadAttachment)) {
+      logger.warn("incorrect instance of attachment for channel {}", new Object[]{socketChannel.socket()});
+      socketChannel.close();
+      return;
+    }
+    ConnectionListener connectionListener = ((ReadAttachment) attachment).getConnectionListener();
+    connectionListener.onNewDataAvailable(socketChannel);
+  }
+
+  @Override
+  public boolean accept(SelectionKey key) {
+    return key.isValid() && key.isReadable();
+  }
+}
diff --git a/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessor.java b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessor.java
new file mode 100644
index 0000000..dea5be2
--- /dev/null
+++ b/ttorrent-master/network/src/main/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessor.java
@@ -0,0 +1,69 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.common.TorrentLoggerFactory;
+import com.turn.ttorrent.network.ConnectionClosedException;
+import com.turn.ttorrent.network.WriteAttachment;
+import com.turn.ttorrent.network.WriteTask;
+import org.slf4j.Logger;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+public class WritableKeyProcessor implements KeyProcessor {
+
+  private static final Logger logger = TorrentLoggerFactory.getLogger(WritableKeyProcessor.class);
+
+  @Override
+  public void process(SelectionKey key) throws IOException {
+    SelectableChannel channel = key.channel();
+    if (!(channel instanceof SocketChannel)) {
+      logger.warn("incorrect instance of channel. The key is cancelled");
+      key.cancel();
+      return;
+    }
+
+    SocketChannel socketChannel = (SocketChannel) channel;
+
+    Object attachment = key.attachment();
+    if (!(attachment instanceof WriteAttachment)) {
+      logger.error("incorrect instance of attachment for channel {}", channel);
+      key.cancel();
+      return;
+    }
+
+    WriteAttachment keyAttachment = (WriteAttachment) attachment;
+
+    if (keyAttachment.getWriteTasks().isEmpty()) {
+      key.interestOps(SelectionKey.OP_READ);
+      return;
+    }
+
+    WriteTask processedTask = keyAttachment.getWriteTasks().peek();
+
+    try {
+      int writeCount = socketChannel.write(processedTask.getByteBuffer());
+      if (writeCount < 0) {
+        processedTask.getListener().onWriteFailed("Reached end of stream while writing", null);
+        throw new EOFException("Reached end of stream while writing");
+      }
+
+      if (!processedTask.getByteBuffer().hasRemaining()) {
+        processedTask.getListener().onWriteDone();
+        keyAttachment.getWriteTasks().remove();
+      }
+
+    } catch (IOException e) {
+      processedTask.getListener().onWriteFailed("I/O error occurs on write to channel " + socketChannel, new ConnectionClosedException(e));
+      keyAttachment.getWriteTasks().clear();
+      key.cancel();
+    }
+  }
+
+  @Override
+  public boolean accept(SelectionKey key) {
+    return key.isValid() && key.isWritable();
+  }
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionManagerTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionManagerTest.java
new file mode 100644
index 0000000..e35ffe8
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionManagerTest.java
@@ -0,0 +1,189 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.MockTimeService;
+import org.apache.log4j.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+public class ConnectionManagerTest {
+
+  private ConnectionManager myConnectionManager;
+  private ExecutorService myExecutorService;
+  private ConnectionListener connectionListener;
+  private ConnectionManagerContext myContext;
+
+  public ConnectionManagerTest() {
+    if (Logger.getRootLogger().getAllAppenders().hasMoreElements())
+      return;
+    BasicConfigurator.configure(new ConsoleAppender(new PatternLayout("[%d{MMdd HH:mm:ss,SSS} %t] %6p - %20.20c - %m %n")));
+    Logger.getRootLogger().setLevel(Level.ALL);
+  }
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    Logger.getRootLogger().setLevel(Level.INFO);
+    myContext = mock(ConnectionManagerContext.class);
+    myExecutorService = Executors.newSingleThreadExecutor();
+    when(myContext.getExecutor()).thenReturn(myExecutorService);
+    final SelectorFactory selectorFactory = mock(SelectorFactory.class);
+    when(selectorFactory.newSelector()).thenReturn(Selector.open());
+    NewConnectionAllower newConnectionAllower = mock(NewConnectionAllower.class);
+    when(newConnectionAllower.isNewConnectionAllowed()).thenReturn(true);
+    myConnectionManager = new ConnectionManager(
+            myContext,
+            new MockTimeService(),
+            newConnectionAllower,
+            newConnectionAllower,
+            selectorFactory,
+            new AtomicInteger(),
+            new AtomicInteger());
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testThatDoubleInitThrowException() {
+    try {
+      myConnectionManager.initAndRunWorker(new FirstAvailableChannel(6881, 6889));
+    } catch (IOException e) {
+      fail("unable to init and run worker", e);
+    }
+    try {
+      myConnectionManager.initAndRunWorker(new FirstAvailableChannel(6881, 6889));
+    } catch (IOException e) {
+      fail("unable to init and run worker", e);
+    }
+  }
+
+  @Test
+  public void canAcceptAndReadData() throws IOException, InterruptedException {
+    final AtomicInteger acceptCount = new AtomicInteger();
+    final AtomicInteger readCount = new AtomicInteger();
+    final AtomicInteger connectCount = new AtomicInteger();
+    final AtomicInteger lastReadBytesCount = new AtomicInteger();
+    final ByteBuffer byteBuffer = ByteBuffer.allocate(10);
+
+    final Semaphore semaphore = new Semaphore(0);
+
+    this.connectionListener = new ConnectionListener() {
+      @Override
+      public void onNewDataAvailable(SocketChannel socketChannel) throws IOException {
+        readCount.incrementAndGet();
+        lastReadBytesCount.set(socketChannel.read(byteBuffer));
+        if (lastReadBytesCount.get() == -1) {
+          socketChannel.close();
+        }
+        semaphore.release();
+      }
+
+      @Override
+      public void onConnectionEstablished(SocketChannel socketChannel) throws IOException {
+        acceptCount.incrementAndGet();
+        semaphore.release();
+      }
+
+      @Override
+      public void onError(SocketChannel socketChannel, Throwable ex) {
+
+      }
+    };
+
+    when(myContext.newChannelListener()).thenReturn(connectionListener);
+
+    myConnectionManager.initAndRunWorker(new FirstAvailableChannel(6881, 6889));
+
+    assertEquals(acceptCount.get(), 0);
+    assertEquals(readCount.get(), 0);
+    int serverPort = myConnectionManager.getBindPort();
+
+    Socket socket = new Socket("127.0.0.1", serverPort);
+
+    tryAcquireOrFail(semaphore);//wait until connection is accepted
+
+    assertTrue(socket.isConnected());
+    assertEquals(acceptCount.get(), 1);
+    assertEquals(readCount.get(), 0);
+
+    Socket socketSecond = new Socket("127.0.0.1", serverPort);
+
+    tryAcquireOrFail(semaphore);//wait until connection is accepted
+
+    assertTrue(socketSecond.isConnected());
+    assertEquals(acceptCount.get(), 2);
+    assertEquals(readCount.get(), 0);
+    socketSecond.close();
+    tryAcquireOrFail(semaphore);//wait read that connection is closed
+    assertEquals(readCount.get(), 1);
+    assertEquals(acceptCount.get(), 2);
+    assertEquals(lastReadBytesCount.get(), -1);
+    byteBuffer.rewind();
+    assertEquals(byteBuffer.get(), 0);
+    byteBuffer.rewind();
+    String writeStr = "abc";
+    OutputStream outputStream = socket.getOutputStream();
+    outputStream.write(writeStr.getBytes());
+    tryAcquireOrFail(semaphore);//wait until read bytes
+    assertEquals(readCount.get(), 2);
+    assertEquals(lastReadBytesCount.get(), 3);
+    byte[] expected = new byte[byteBuffer.capacity()];
+    System.arraycopy(writeStr.getBytes(), 0, expected, 0, writeStr.length());
+    assertEquals(byteBuffer.array(), expected);
+    outputStream.close();
+    socket.close();
+    tryAcquireOrFail(semaphore);//wait read that connection is closed
+    assertEquals(readCount.get(), 3);
+
+    int otherPeerPort = 7575;
+    ServerSocket ss = new ServerSocket(otherPeerPort);
+    assertEquals(connectCount.get(), 0);
+    myConnectionManager.offerConnect(new ConnectTask("127.0.0.1", otherPeerPort, new ConnectionListener() {
+      @Override
+      public void onNewDataAvailable(SocketChannel socketChannel) throws IOException {
+
+      }
+
+      @Override
+      public void onConnectionEstablished(SocketChannel socketChannel) throws IOException {
+        connectCount.incrementAndGet();
+        semaphore.release();
+      }
+
+      @Override
+      public void onError(SocketChannel socketChannel, Throwable ex) {
+
+      }
+    }, 0, 100), 1, TimeUnit.SECONDS);
+    ss.accept();
+    tryAcquireOrFail(semaphore);
+    assertEquals(connectCount.get(), 1);
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    this.myConnectionManager.close();
+    myExecutorService.shutdown();
+    assertTrue(myExecutorService.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  private void tryAcquireOrFail(Semaphore semaphore) throws InterruptedException {
+    if (!semaphore.tryAcquire(500, TimeUnit.MILLISECONDS)) {
+      fail("don't get signal from connection receiver that connection selected");
+    }
+  }
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionWorkerTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionWorkerTest.java
new file mode 100644
index 0000000..237d163
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/ConnectionWorkerTest.java
@@ -0,0 +1,48 @@
+package com.turn.ttorrent.network;
+
+import com.turn.ttorrent.MockTimeService;
+import com.turn.ttorrent.network.keyProcessors.CleanupProcessor;
+import com.turn.ttorrent.network.keyProcessors.KeyProcessor;
+import org.testng.annotations.Test;
+
+import java.nio.channels.*;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+
+import static org.mockito.Mockito.*;
+
+@Test
+public class ConnectionWorkerTest {
+
+  public void testCleanupIsCalled() throws Exception {
+
+    final SelectionKey mockKey = mock(SelectionKey.class);
+    final SelectableChannel channel = SocketChannel.open();
+    final KeyProcessor acceptProcessor = mock(KeyProcessor.class);
+    final KeyProcessor notAcceptProcessor = mock(KeyProcessor.class);
+
+    Selector mockSelector = mock(Selector.class);
+    when(mockSelector.select(anyLong())).thenReturn(1).thenThrow(new ClosedSelectorException());
+    when(mockSelector.selectedKeys()).thenReturn(new HashSet<SelectionKey>(Collections.singleton(mockKey)));
+    when(mockKey.isValid()).thenReturn(true);
+    when(mockKey.channel()).thenReturn(channel);
+    when(acceptProcessor.accept(mockKey)).thenReturn(true);
+    when(notAcceptProcessor.accept(mockKey)).thenReturn(false);
+    ConnectionWorker connectionWorker = new ConnectionWorker(
+            mockSelector,
+            Arrays.asList(acceptProcessor, notAcceptProcessor),
+            10,
+            0,
+            new MockTimeService(),
+            mock(CleanupProcessor.class),
+            mock(NewConnectionAllower.class));
+    connectionWorker.run();
+    verify(mockSelector).selectedKeys();
+    verify(acceptProcessor).accept(mockKey);
+    verify(acceptProcessor).process(mockKey);
+    verify(notAcceptProcessor).accept(mockKey);
+    verifyNoMoreInteractions(notAcceptProcessor);
+  }
+}
+
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/StateChannelListenerTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/StateChannelListenerTest.java
new file mode 100644
index 0000000..5c38fac
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/StateChannelListenerTest.java
@@ -0,0 +1,4 @@
+package com.turn.ttorrent.network;
+
+public class StateChannelListenerTest {
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/WorkingReceiverTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/WorkingReceiverTest.java
new file mode 100644
index 0000000..434d17a
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/WorkingReceiverTest.java
@@ -0,0 +1,4 @@
+package com.turn.ttorrent.network;
+
+public class WorkingReceiverTest {
+}
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessorTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessorTest.java
new file mode 100644
index 0000000..3797d30
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/CleanupKeyProcessorTest.java
@@ -0,0 +1,86 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.MockTimeService;
+import com.turn.ttorrent.network.ConnectionListener;
+import com.turn.ttorrent.network.TimeoutAttachment;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+
+import static org.mockito.Mockito.*;
+
+@Test
+public class CleanupKeyProcessorTest {
+
+  private final int CLOSE_TIMEOUT = 100;
+
+  private MockTimeService myTimeService;
+  private TimeoutAttachment myTimeoutAttachment;
+  private SelectionKey myKey;
+  private SelectableChannel myChannel;
+  private ConnectionListener myConnectionListener;
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    myTimeService = new MockTimeService();
+    myConnectionListener = mock(ConnectionListener.class);
+    myTimeoutAttachment = mock(TimeoutAttachment.class);
+    myKey = mock(SelectionKey.class);
+    myChannel = SocketChannel.open();
+    when(myKey.channel()).thenReturn(myChannel);
+    when(myKey.interestOps()).thenReturn(SelectionKey.OP_READ);
+    myKey.attach(myTimeoutAttachment);
+  }
+
+  public void testSelected() {
+
+    long oldTime = 10;
+    myTimeService.setTime(oldTime);
+
+    CleanupProcessor cleanupProcessor = new CleanupKeyProcessor(myTimeService);
+    cleanupProcessor.processSelected(myKey);
+
+    verify(myTimeoutAttachment).communicatedNow(eq(oldTime));
+
+    long newTime = 100;
+    myTimeService.setTime(newTime);
+
+    cleanupProcessor.processSelected(myKey);
+
+    verify(myTimeoutAttachment).communicatedNow(eq(newTime));
+  }
+
+  public void testCleanupWillCloseWithTimeout() throws Exception {
+
+    when(myTimeoutAttachment.isTimeoutElapsed(anyLong())).thenReturn(true);
+
+    CleanupProcessor cleanupProcessor = new CleanupKeyProcessor(myTimeService);
+    cleanupProcessor.processCleanup(myKey);
+
+    verify(myKey).cancel();
+    verify(myKey).channel();
+    verify(myTimeoutAttachment).onTimeoutElapsed(any(SocketChannel.class));
+    verifyNoMoreInteractions(myKey);
+  }
+
+  public void testCleanupWithoutClose() {
+    when(myTimeoutAttachment.isTimeoutElapsed(anyLong())).thenReturn(false);
+
+    myTimeService.setTime(200);
+
+    CleanupProcessor cleanupProcessor = new CleanupKeyProcessor(myTimeService);
+    cleanupProcessor.processCleanup(myKey);
+
+    verify(myTimeoutAttachment).isTimeoutElapsed(myTimeService.now());
+    verify(myKey, never()).cancel();
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    myChannel.close();
+  }
+}
\ No newline at end of file
diff --git a/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessorTest.java b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessorTest.java
new file mode 100644
index 0000000..b784952
--- /dev/null
+++ b/ttorrent-master/network/src/test/java/com/turn/ttorrent/network/keyProcessors/WritableKeyProcessorTest.java
@@ -0,0 +1,102 @@
+package com.turn.ttorrent.network.keyProcessors;
+
+import com.turn.ttorrent.network.WriteAttachment;
+import com.turn.ttorrent.network.WriteListener;
+import com.turn.ttorrent.network.WriteTask;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.BlockingQueue;
+
+import static org.mockito.Mockito.*;
+
+@Test
+public class WritableKeyProcessorTest {
+
+  private SelectionKey myKey;
+  private SocketChannel myChannel;
+  private WritableKeyProcessor myWritableKeyProcessor;
+  private WriteAttachment myWriteAttachment;
+  private BlockingQueue<WriteTask> myQueue;
+
+
+  @SuppressWarnings("unchecked")
+  @BeforeMethod
+  public void setUp() throws Exception {
+    myKey = mock(SelectionKey.class);
+    myChannel = mock(SocketChannel.class);
+    myWritableKeyProcessor = new WritableKeyProcessor();
+    when(myKey.channel()).thenReturn(myChannel);
+    when(myKey.interestOps()).thenReturn(SelectionKey.OP_WRITE);
+    myWriteAttachment = mock(WriteAttachment.class);
+    myQueue = mock(BlockingQueue.class);
+  }
+
+  public void testThatOnWriteDoneInvoked() throws Exception {
+    final ByteBuffer data = ByteBuffer.allocate(10);
+
+    //imitate writing byte buffer
+    when(myChannel.write(eq(data))).then(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+        data.position(data.capacity());
+        return data.capacity();
+      }
+    });
+
+    WriteListener listener = mock(WriteListener.class);
+
+    when(myQueue.peek()).thenReturn(new WriteTask(myChannel, data, listener));
+    when(myWriteAttachment.getWriteTasks()).thenReturn(myQueue);
+
+    myKey.attach(myWriteAttachment);
+
+    myWritableKeyProcessor.process(myKey);
+
+    verify(listener).onWriteDone();
+  }
+
+  public void testThatOnWriteFailedInvokedIfChannelThrowException() throws Exception {
+    when(myChannel.write(any(ByteBuffer.class))).thenThrow(new IOException());
+
+    WriteListener listener = mock(WriteListener.class);
+
+    when(myQueue.peek()).thenReturn(new WriteTask(myChannel, ByteBuffer.allocate(1), listener));
+    when(myWriteAttachment.getWriteTasks()).thenReturn(myQueue);
+    myKey.attach(myWriteAttachment);
+
+    myWritableKeyProcessor.process(myKey);
+
+    verify(listener).onWriteFailed(anyString(), any(Throwable.class));
+  }
+
+  public void checkThatWriteTaskDoesntRemovedIfBufferIsNotWrittenInOneStep() throws Exception {
+    final ByteBuffer data = ByteBuffer.allocate(10);
+
+    //imitate writing only one byte of byte buffer
+    when(myChannel.write(eq(data))).then(new Answer<Integer>() {
+      @Override
+      public Integer answer(InvocationOnMock invocationOnMock) throws Throwable {
+        data.position(data.capacity() - 1);
+        return data.position();
+      }
+    });
+
+    WriteListener listener = mock(WriteListener.class);
+
+    when(myQueue.peek()).thenReturn(new WriteTask(myChannel, data, listener));
+    when(myWriteAttachment.getWriteTasks()).thenReturn(myQueue);
+
+    myKey.attach(myWriteAttachment);
+
+    myWritableKeyProcessor.process(myKey);
+
+    verify(listener, never()).onWriteDone();
+  }
+}