Java ExecutorService for multi-threading -- coding question and tutorial
Q. Can you code in Java for the following scenario?
Write a multi-threaded SumEngine, which takes SumRequest with 2 operands (or input numbers to add) as shown below:
package com.mycompany.metrics; import java.util.UUID; public class SumRequest { private String id = UUID.randomUUID().toString(); private int operand1; private int operand2; protected int getOperand1() { return operand1; } protected void setOperand1(int operand1) { this.operand1 = operand1; } protected int getOperand2() { return operand2; } protected void setOperand2(int operand2) { this.operand2 = operand2; } protected String getId() { return id; } @Override public String toString() { return "SumRequest [id=" + id + ", operand1=" + operand1 + ", operand2=" + operand2 + "]"; } }
and returns a SumResponse with a result.
package com.mycompany.metrics; public class SumResponse { private String requestId; private int result; protected String getRequestId() { return requestId; } protected void setRequestId(String requestId) { this.requestId = requestId; } protected int getResult() { return result; } protected void setResult(int result) { this.result = result; } @Override public String toString() { return "SumResponse [requestId=" + requestId + ", result=" + result + "]"; } }A. Processing a request and returning a response is a very common programming task. Here is a basic sample code to get started.This interface can take any type of object as request and response.
package com.mycompany.metrics; /** * R -- Generic request type, S -- Generic response type */ public interface SumProcessor<R,S> { abstract S sum(R request); }
Step 1: Define the interface that performs the sum operation. Take note that generics is used .
package com.mycompany.metrics; /** * R -- Generic request type, S -- Generic response type */ public interface SumProcessor<R,S> { abstract S sum(R request); }
Step 2: Define the implementation for the above interface. Takes SumRequest and returns SumResponse.
package com.mycompany.metrics; public class SumProcessorImpl<R,S> implements SumProcessor<SumRequest, SumResponse> { @Override public SumResponse sum(SumRequest request) { System.out.println(Thread.currentThread().getName() + " processing request .... " + request); SumResponse resp= new SumResponse(); resp.setRequestId(request.getId()); resp.setResult(request.getOperand1() + request.getOperand2()); return resp; } }
Step 3: Write the multi-threaded SumEngine. The entry point is the public method execute(SumRequest... request ) that takes 1 or more SumRequest as input via varargs. ExecutorService is the thread pool and closure of Callable interface is the executable task that can be submitted to the pool to be executed by the available thread.
package com.mycompany.metrics; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; public class SumEngine { private final AtomicInteger requestsCount = new AtomicInteger(); ExecutorService executionService = null; //executes requests to sum public void execute(SumRequest... request) { executionService = Executors.newFixedThreadPool(5); //create a thread pool List<Callable<SumResponse>> tasks = createExecuteTasks(request); List<Future<SumResponse>> results = execute(tasks); for (Future<SumResponse> result : results) { try { System.out.println(Thread.currentThread().getName() + ": Response = " + result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //initiates an orderly shutdown of thread pool executionService.shutdown(); } //create tasks private List<Callable<SumResponse>> createExecuteTasks(SumRequest[] requests) { List<Callable<SumResponse>> tasks = new LinkedList<Callable<SumResponse>>(); executingRequests(requests.length); for (SumRequest req : requests) { Callable<SumResponse> task = createTask(req); tasks.add(task); } return tasks; } //increment the requests counter private void executingRequests(int count) { requestsCount.addAndGet(count); } //creates callable (i.e executable or runnable tasks) private Callable<SumResponse> createTask(final SumRequest request) { // anonymous implementation of Callable. // Pre Java 8's way of creating closures Callable<SumResponse> task = new Callable<SumResponse>() { @Override public SumResponse call() throws Exception { System.out.println(Thread.currentThread().getName() + ": Request = " + request); SumProcessor<SumRequest, SumResponse> processor = new SumProcessorImpl<>(); SumResponse result = processor.sum(request); return result; } }; return task; } //executes the tasks private <T> List<Future<T>> execute(List<Callable<T>> tasks) { List<Future<T>> result = null; try { //invokes the sum(sumRequest) method by executing the closure call() inside createTask result = executionService.invokeAll(tasks); } catch (InterruptedException e) { e.printStackTrace(); } return result; } public int getRequestsCount(){ return requestsCount.get(); } }
Step 4: Write the SumEngineTest to run the engine with the main method. Loops through numbers 1 to 5 and adds each consecutive numbers like 1+2=3, 2+3=5, 3+4=7, 4+5=9, and 5+6 = 11.
package com.mycompany.metrics; import java.util.ArrayList; import java.util.List; public class SumEngineTest { public static void main(String[] args) throws Exception { SumEngine se = new SumEngine(); List<SumRequest> list = new ArrayList<>(); // sums 1+2, 2+3, 3+4, etc for (int i = 1; i <= 5; i++) { SumRequest req = new SumRequest(); req.setOperand1(i); req.setOperand2(i + 1); list.add(req); } SumRequest[] req = new SumRequest[list.size()]; se.execute((SumRequest[]) list.toArray(req)); } }
The output is:
pool-1-thread-2: Request = SumRequest [id=bca23e97-3a6f-4e42-aff4-5ed5f7de2783, operand1=2, operand2=3] pool-1-thread-4: Request = SumRequest [id=36d95b35-09f0-4e93-99e4-715ea7cb33c9, operand1=4, operand2=5] pool-1-thread-3: Request = SumRequest [id=31ccd137-349a-4b7a-93b1-e51f62c11ba9, operand1=3, operand2=4] pool-1-thread-1: Request = SumRequest [id=4bfa782a-c695-4de6-9593-cbfd357c3535, operand1=1, operand2=2] pool-1-thread-5: Request = SumRequest [id=c653f469-6a6f-45b6-99f2-ed58620fd144, operand1=5, operand2=6] pool-1-thread-4 processing request .... SumRequest [id=36d95b35-09f0-4e93-99e4-715ea7cb33c9, operand1=4, operand2=5] pool-1-thread-2 processing request .... SumRequest [id=bca23e97-3a6f-4e42-aff4-5ed5f7de2783, operand1=2, operand2=3] pool-1-thread-1 processing request .... SumRequest [id=4bfa782a-c695-4de6-9593-cbfd357c3535, operand1=1, operand2=2] pool-1-thread-3 processing request .... SumRequest [id=31ccd137-349a-4b7a-93b1-e51f62c11ba9, operand1=3, operand2=4] pool-1-thread-5 processing request .... SumRequest [id=c653f469-6a6f-45b6-99f2-ed58620fd144, operand1=5, operand2=6] main: Response = SumResponse [requestId=4bfa782a-c695-4de6-9593-cbfd357c3535, result=3] main: Response = SumResponse [requestId=bca23e97-3a6f-4e42-aff4-5ed5f7de2783, result=5] main: Response = SumResponse [requestId=31ccd137-349a-4b7a-93b1-e51f62c11ba9, result=7] main: Response = SumResponse [requestId=36d95b35-09f0-4e93-99e4-715ea7cb33c9, result=9] main: Response = SumResponse [requestId=c653f469-6a6f-45b6-99f2-ed58620fd144, result=11]
Labels: Coding, Multi-threading
0 Comments:
Post a Comment
Subscribe to Post Comments [Atom]
<< Home