Introduction
In this tutorial I am going to show how to create custom thread pool in Java.
Multi-threaded and multi-process programmings are great ways to optimize CPU usage and get things done quickly.
What is thread pool?
Thread pool is a collection of already created worker threads ready to perform certain tasks. Instead of creating and discarding thread once the task is done, thread pool reuses the thread in the form of worker thread. The worker thread differently exists from Runnable or Callable tasks it executes and often used to execute multiple tasks.
Why do you need Thread Pool?
Creation of Threads in Java is a costly IO operation because thread objects use a significant amount of memory.
In a large scale applications, allocating and deallocating many thread objects create a memory management overhead, because creation of a thread object takes a time. So as soon as client requests come, the actual jobs do not start and clients will see a slight delay. And a time comes when application receives more requests than it can handle immediately and as a result application will stop responding to the requests.
Therefore it is not advisable to create & destroy thread(s) every often. It is recommended to use pool of threads as per the needs.
Custom Thread Pool
Here custom thread pool will be implemented using BlockingQueue that is used for storing tasks.
The responsibility of the BlockingQueue is to hold Runnables, and to have a way to poll them and check if the BlockingQueue is empty or not in order to help the threads in the pool utilize their resources better.
package com.roytuts.java.custom.threadpool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
public class CustomThreadPool {
// holds tasks
private BlockingQueue<Runnable> runnableQueue;
// holds the pool of worker threads
private List<WorkerThread> threads;
// check if shutdown is initiated
private AtomicBoolean isThreadPoolShutDownInitiated;
public CustomThreadPool(final int noOfThreads) {
this.runnableQueue = new LinkedBlockingQueue<>();
this.threads = new ArrayList<>(noOfThreads);
this.isThreadPoolShutDownInitiated = new AtomicBoolean(false);
// create worker threads
for (int i = 1; i <= noOfThreads; i++) {
WorkerThread thread = new WorkerThread(runnableQueue, this);
thread.setName("Worker Thread - " + i);
thread.start();
threads.add(thread);
}
}
public void execute(Runnable r) throws InterruptedException {
if (!isThreadPoolShutDownInitiated.get()) {
runnableQueue.put(r);
} else {
throw new InterruptedException("Thread Pool shutdown is initiated, unable to execute task");
}
}
public void shutdown() {
isThreadPoolShutDownInitiated = new AtomicBoolean(true);
}
private class WorkerThread extends Thread {
// holds tasks
private BlockingQueue<Runnable> taskQueue;
// check if shutdown is initiated
private CustomThreadPool threadPool;
public WorkerThread(BlockingQueue<Runnable> taskQueue, CustomThreadPool threadPool) {
this.taskQueue = taskQueue;
this.threadPool = threadPool;
}
@Override
public void run() {
try {
// continue until all tasks finished processing
while (!threadPool.isThreadPoolShutDownInitiated.get() || !taskQueue.isEmpty()) {
Runnable r;
// Poll a runnable from the queue and execute it
while ((r = taskQueue.poll()) != null) {
r.run();
}
Thread.sleep(1);
}
} catch (RuntimeException | InterruptedException e) {
throw new CustomThreadPoolException(e);
}
}
}
private class CustomThreadPoolException extends RuntimeException {
private static final long serialVersionUID = 1L;
public CustomThreadPoolException(Throwable t) {
super(t);
}
}
}
Here the above CustomThreadPool class has inner private class that is basically polling the BlockingQueue and executes the Runnables.
isThreadPoolShutDownInitiated()
is declared as AtomicBoolean
that provides a boolean
value, which can be read and written atomically.
Testing Custom Thread Pool
Create below class to test the custom ThreadPool. Here custom ThreadPool is created with two threads and runnables are submitted to this ThreadPool.
package com.roytuts.java.custom.threadpool;
public class CustomThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
Runnable r = () -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " is executing task.");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
CustomThreadPool threadPool = new CustomThreadPool(2);
threadPool.execute(r);
threadPool.execute(r);
threadPool.shutdown();
// threadPool.execute(r);
}
}
Output
Executing the above class you will see the following output:
Worker Thread - 1 is executing task.
Worker Thread - 2 is executing task.