Wednesday, 26 April 2017

Thread Pools

Java [Thread Pool] [Executor]: Manages pool of worker threads + contains a [queue] that keeps tasks waiting to get executed.

1) We add [Runnable] tasks to queue. From there, executor picks one task and assign to worker thread which is free.
2) Java.util.concurrent.Executor(interface) --> Java.util.concurrent.Executors(implementation).
   We use this implementation to create thread pool in java.


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SimpleThreadPool
{
public static void main(String args[])
{
//Worker threads are five
// Executor(main interface) --> ExecutorService (impl) --> ThreadPoolExecutor;
ExecutorService executor = Executors.newFixedThreadPool(5);
//But submitting 10. so 5 tasks will end up in queue untill worker threads are free
//Task assignment will be taken care by pool.
for(int i=0;i<10;i++)
{
//Submitting runnable tasks
Runnable runnable = new WorkerThread(""+i);
// There is a submit method to get future object. async mode. can be used when u required to read results.
executor.execute(runnable);
}
//java app  will never end untill non-deamon threads are terminated.

executor.shutdown();
//Blocking main thread untill pool terminates.
while(!executor.isTerminated())
{

}
System.out.println("Finished all threads");
}
}


public class WorkerThread implements Runnable
{
  String command;

  WorkerThread(String command)
  {
    this.command = command;
  }

  @Override
  public void run()
  {
  System.out.println(Thread.currentThread().getName()+" Start. command = "+command);
  processCommand();
  System.out.println(Thread.currentThread().getName()+" End.");
  }
 
  private void processCommand()
  {
    try
    {
    Thread.sleep(1000);
    }
    catch(InterruptedException e)
    {
    e.printStackTrace();
    }
  }

  @Override
  public String toString()
  {
    return this.command;
  }
}

Output:
*************************************
pool-1-thread-2 Start. command = 1
pool-1-thread-5 Start. command = 4
pool-1-thread-3 Start. command = 2
pool-1-thread-4 Start. command = 3
pool-1-thread-1 Start. command = 0
pool-1-thread-1 End.
pool-1-thread-3 End.
pool-1-thread-5 End.
pool-1-thread-2 End.
pool-1-thread-4 End.
pool-1-thread-2 Start. command = 8
pool-1-thread-3 Start. command = 7
pool-1-thread-5 Start. command = 6
pool-1-thread-1 Start. command = 5
pool-1-thread-4 Start. command = 9
pool-1-thread-4 End.
pool-1-thread-2 End.
pool-1-thread-3 End.
pool-1-thread-1 End.
pool-1-thread-5 End.
Finished all threads

***************************************************************************************************************


1) ThreadPoolExecutor is more powerful than executors.
2) With threadpoolexecutor, we can provide min no.of threads to be alive,max no.of threads in pool at any given time, and RejectedExecutionHandler to handle jobs which can not fit into queue.


import java.util.concurrent.ThreadPoolExecutor;

public class MyMonitorThread implements Runnable
{
ThreadPoolExecutor executor;
private boolean run = true;
private int seconds;

    MyMonitorThread(ThreadPoolExecutor executor,int delay)
    {
      this.executor = executor;
      this.seconds = delay;
    }

    public void shutdown()
    {
    run = false;
    }

@Override
public void run()
{
       while(run)
       {
        System.out.println(String.format("[monitor] [%d/%d] Active: %d, Completed: %d,Task: %d,isShutdown: %s, isTerminated: %s",
        executor.getPoolSize(),executor.getCorePoolSize(),executor.getActiveCount(),executor.getCompletedTaskCount(),
        executor.getTaskCount(),executor.isShutdown(),executor.isTerminated()));

        try
        {
        Thread.sleep(seconds*1000);
        }
        catch(InterruptedException e)
        {
            e.printStackTrace();
        }
       }
}
}

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler
{
@Override
public void rejectedExecution(Runnable r,ThreadPoolExecutor executor)
{
       System.out.println(r.toString()+" is rejected.");
}
}

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class WorkerPool
{
public static void main(String args[]) throws InterruptedException
{
RejectedExecutionHandlerImpl rejectedExecutionHandlerImpl = new RejectedExecutionHandlerImpl();
ThreadFactory threadFactory = Executors.defaultThreadFactory();
//intial pool size 2,max pool size 4,keepAliveTime, for thread,worker queue size 2
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2,4,10,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2),
threadFactory,rejectedExecutionHandlerImpl);

MyMonitorThread myMonitorThread = new MyMonitorThread(executorPool,3);
Thread thread = new Thread(myMonitorThread);
thread.start();

for(int i=0;i<10;i++)
{
           executorPool.execute(new WorkerThread("cmd"+i));
}

Thread.sleep(30000);
executorPool.shutdown();
Thread.sleep(5000);
myMonitorThread.shutdown();
}
}

ScheduledThreadPoolExecutor --> to schedule tasks periodically.
CachedThreadPoolExecutor --> To spawn unlimited threads if free ones are not available. not recommended.
FixedTreadPoolExecutor --> Limited threads.
***************************************************************************************************************

1) ThreadPoolExecutor(which is subclass of Executor and ExecutorService) class has 4 constructors,but due to their complexity, the java concurrency API provides the 'Executors' class to construct the executor.

2) Shoutdown waits unitl running + pending tasks completion. Won't allow new tasks.
3) AwaitTermination can be used to block calling method untill termination happens or timeout happens.

http://www.journaldev.com/1069/threadpoolexecutor-java-thread-pool-example-executorservice
http://howtodoinjava.com/core-java/multi-threading/java-thread-pool-executor-example/
https://dzone.com/articles/scalable-java-thread-pool-executor

No comments:

Post a Comment