How to create Custom Thread Pool in Java

Introduction

In this tutorial I am going to show how to create custom thread pool in Java.

Multi-threaded and multi-process programmings are great ways to optimize CPU usage and get things done quickly.

What is thread pool?

Thread pool is a collection of already created worker threads ready to perform certain tasks. Instead of creating and discarding thread once the task is done, thread pool reuses the thread in the form of worker thread. The worker thread differently exists from Runnable or Callable tasks it executes and often used to execute multiple tasks.

Why do you need Thread Pool?

Creation of Threads in Java is a costly IO operation because thread objects use a significant amount of memory.

In a large scale applications, allocating and deallocating many thread objects create a memory management overhead, because creation of a thread object takes a time. So as soon as client requests come, the actual jobs do not start and clients will see a slight delay. And a time comes when application receives more requests than it can handle immediately and as a result application will stop responding to the requests.

Therefore it is not advisable to create & destroy thread(s) every often. It is recommended to use pool of threads as per the needs.

Custom Thread Pool

Here custom thread pool will be implemented using BlockingQueue that is used for storing tasks.

The responsibility of the BlockingQueue is to hold Runnables, and to have a way to poll them and check if the BlockingQueue is empty or not in order to help the threads in the pool utilize their resources better.

package com.roytuts.java.custom.threadpool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

public class CustomThreadPool {

	// holds tasks
	private BlockingQueue<Runnable> runnableQueue;

	// holds the pool of worker threads
	private List<WorkerThread> threads;

	// check if shutdown is initiated
	private AtomicBoolean isThreadPoolShutDownInitiated;

	public CustomThreadPool(final int noOfThreads) {
		this.runnableQueue = new LinkedBlockingQueue<>();
		this.threads = new ArrayList<>(noOfThreads);
		this.isThreadPoolShutDownInitiated = new AtomicBoolean(false);
		// create worker threads
		for (int i = 1; i <= noOfThreads; i++) {
			WorkerThread thread = new WorkerThread(runnableQueue, this);
			thread.setName("Worker Thread - " + i);
			thread.start();
			threads.add(thread);
		}
	}

	public void execute(Runnable r) throws InterruptedException {
		if (!isThreadPoolShutDownInitiated.get()) {
			runnableQueue.put(r);
		} else {
			throw new InterruptedException("Thread Pool shutdown is initiated, unable to execute task");
		}
	}

	public void shutdown() {
		isThreadPoolShutDownInitiated = new AtomicBoolean(true);
	}

	private class WorkerThread extends Thread {
		// holds tasks
		private BlockingQueue<Runnable> taskQueue;

		// check if shutdown is initiated
		private CustomThreadPool threadPool;

		public WorkerThread(BlockingQueue<Runnable> taskQueue, CustomThreadPool threadPool) {
			this.taskQueue = taskQueue;
			this.threadPool = threadPool;
		}

		@Override
		public void run() {
			try {
				// continue until all tasks finished processing
				while (!threadPool.isThreadPoolShutDownInitiated.get() || !taskQueue.isEmpty()) {
					Runnable r;
					// Poll a runnable from the queue and execute it
					while ((r = taskQueue.poll()) != null) {
						r.run();
					}
					Thread.sleep(1);
				}
			} catch (RuntimeException | InterruptedException e) {
				throw new CustomThreadPoolException(e);
			}
		}
	}

	private class CustomThreadPoolException extends RuntimeException {
		private static final long serialVersionUID = 1L;

		public CustomThreadPoolException(Throwable t) {
			super(t);
		}
	}

}

Here the above CustomThreadPool class has inner private class that is basically polling the BlockingQueue and executes the Runnables.

isThreadPoolShutDownInitiated() is declared as AtomicBoolean that provides a boolean value, which can be read and written atomically.

Testing Custom Thread Pool

Create below class to test the custom ThreadPool. Here custom ThreadPool is created with two threads and runnables are submitted to this ThreadPool.

package com.roytuts.java.custom.threadpool;

public class CustomThreadPoolTest {

	public static void main(String[] args) throws InterruptedException {
		Runnable r = () -> {
			try {
				Thread.sleep(1000);
				System.out.println(Thread.currentThread().getName() + " is executing task.");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		};

		CustomThreadPool threadPool = new CustomThreadPool(2);

		threadPool.execute(r);
		threadPool.execute(r);
		threadPool.shutdown();

		// threadPool.execute(r);
	}

}

Output

Executing the above class you will see the following output:

Worker Thread - 1 is executing task.
Worker Thread - 2 is executing task.

Source Code

Download

Leave a Reply

Your email address will not be published. Required fields are marked *