Sunday 26 July 2015

Creating a ThreadPool in java

Couple of ThreadPools introduced in Java 6.0 onwards. Based on the requirement we can pick one of these ThreadPool and use it in our Application.
To understand the existing ThreadPool, we need to know what does it mean by ThreadPool.







ThreadPool maintains 2 Queues, one is WorkerQueue and another is WorkQueue.
Workers Queue keep track of Worker who are going to work on the submitted work and number of work can be configured by Application developer. There is a separate discussion all together to decide number of worker thread. WorkQueue is use to store the Submitted tasks.

Thread can be Created onDemand or As soon as it initialized. If there is no task in the WorkQueue then all the thread will wait for task to submit. As soon as task gets submitted to WorkQueue, one of the Thread from worker queue will pick the task and execute it.



How we can implement our own ThreadPool or How can we achieve this functionality.

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class MyThreadPool {

 private BlockingQueue<Runnable> taskQueue = null;
 private Set<MyThread> threads = new HashSet<MyThread>();
 private boolean isStopped = false;

 public MyThreadPool(int noOfThreads) {
  taskQueue = new LinkedBlockingQueue<Runnable>();

  for (int i = 0; i < noOfThreads; i++) {
   threads.add(new MyThread(taskQueue));
  }
  for (MyThread thread : threads) {
   thread.start();
  }
 }

 public synchronized void execute(Runnable task) throws Exception {
  if (this.isStopped)
   throw new IllegalStateException("ThreadPool is stopped");
  this.taskQueue.add(task);
 }

 public synchronized void stop() {
  this.isStopped = true;
  for (MyThread thread : threads) {
   thread.doStop();
  }
 }

}






/*
* My Thread class
*/

import java.util.concurrent.BlockingQueue;

public class MyThread extends Thread {

 private BlockingQueue<Runnable> taskQueue = null;
 private boolean isStopped = false;

 public MyThread(BlockingQueue<Runnable> queue) {
  taskQueue = queue;
 }

 public void run() {
  while (!isStopped()) {
   try {
    Runnable runnable = taskQueue.take();
    runnable.run();
   } catch (Exception e) {
    //log the exception
   }
  }
 }

 public synchronized void doStop() {
  isStopped = true;
  this.interrupt(); 
  // stop the thread and it will come out from run
       // method.
 }

 public synchronized boolean isStopped() {
  return isStopped;
 }
}







Test Class

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class ThreadPoolTest {
    public static void main(String[] args) {
        TestCallable testCallable = new TestCallable();
        FutureTask<String> futureTask = new FutureTask<String>(testCallable);
        FutureTask<String> futureTask1 = new FutureTask<String>(testCallable);
        FutureTask<String> futureTask2 = new FutureTask<String>(testCallable);
        FutureTask<String> futureTask3 = new FutureTask<String>(testCallable);
        MyThreadPool myThreadPool = new MyThreadPool(5);
        try {
            myThreadPool.execute(futureTask);
            myThreadPool.execute(futureTask1);
            myThreadPool.execute(futureTask2);
            myThreadPool.execute(futureTask3);
            System.out.println(futureTask.get());
            System.out.println(futureTask1.get());
            System.out.println(futureTask2.get());
            System.out.println(futureTask3.get());
            myThreadPool.stop();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static class TestCallable implements Callable<String> {
        @Override
        public String call() throws Exception {
            return "Done";
        }

    }
}


No comments:

Post a Comment