Google

May 16, 2014

Java 7 fork and join tutorial with a diagram and an example

Q. How does java.util.concurrent.RecursiveTask work?
A. It uses a single ForkJoin pool to execute tasks in parallel for both computing sum of numbers recursively and chunking (or breaking) them into chunks of say 3 or less as per the following example.

Here is an example, where a an array of given numbers are summed in parallel by multiple threads. The batch size is 3. So, each thread processes 3 or less numbers asynchronously (i.e. fork) and the join to compute the final results.


Example. numbers = {1,2,3,4,5,6,7,8,9,10}, sum = 55; process them using the fork and join feature introduced in Java 7.





Here is the code

package com.fork.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.RecursiveTask;

class SumTask extends RecursiveTask<Integer> {

 static final int CHUNK_SIZE = 3; // execution batch size;

 Integer[] numbers;
 int begin;
 int end;

 SumTask(Integer[] numbers, int begin, int end) {
  this.numbers = numbers;
  this.begin = begin;
  this.end = end;
 }

 @Override
 protected Integer compute() {
        //sums the given number
  if (end - begin <= CHUNK_SIZE) {
   int sum = 0;
   List<Integer> processedNumbers = new ArrayList<>();
   for(int i=begin; i < end; ++i) {
    processedNumbers.add(numbers[i]);//just to track
    sum += numbers[i];
   }

   //tracking thread, numbers processed, and sum
   System.out.println(Thread.currentThread().getName() + " proceesing " +
      Arrays.asList(processedNumbers) +  ", sum = " + sum);
   return sum;
  }
  
  //create chunks, fork and join
  else {
   int mid = begin + (end - begin) / 2; //mid point to partition
   SumTask left  = new SumTask(numbers, begin, mid);  //left partition
   SumTask right = new SumTask(numbers, mid, end);    //right partition
   left.fork();            //asynchronously execute on a separate thread
   int leftAns = right.compute();                  //recurse and compute
   int rightAns = left.join();  //returns the asynchronously executed result
   System.out.println("leftAns=" + leftAns + " + " + "rightAns=" + rightAns);
   return leftAns + rightAns;                         
  }

 }
}


Here is the test class with the main method

package com.fork.join;

import java.util.concurrent.ForkJoinPool;

public class SumTaskTest {

 public static void main(String[] args) {
  int numberOfCpuCores = Runtime.getRuntime().availableProcessors();
  ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfCpuCores);

  Integer[] numbers = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
  int sum = forkJoinPool.invoke(new SumTask(numbers, 0, numbers.length));
  
  System.out.println(sum);
 }

}


The output


ForkJoinPool-1-worker-1 proceesing [[6, 7]], sum = 13
ForkJoinPool-1-worker-3 proceesing [[8, 9, 10]], sum = 27
leftAns=27 + rightAns=13
ForkJoinPool-1-worker-2 proceesing [[3, 4, 5]], sum = 12
ForkJoinPool-1-worker-0 proceesing [[1, 2]], sum = 3
leftAns=12 + rightAns=3
leftAns=40 + rightAns=15
55


Q. Where to use fork/join as opposed to using the ExecutorService framework?

The Fork/Join Framework in Java 7 is designed for work that can be broken down into smaller tasks and the results of those tasks combined to produce the final result. Multicore processors are now widespread across server, desktop, and laptop hardware. They are also making their way into smaller devices, such as smartphones and tablets. Fork/Join offers serious gains for solving problems that involve recursion. Because recursion is fundamental to parallel programming on multicore platforms. Java recursion tutorial example with a diagram and example.

The fork/join tasks should operate as “pure” in-memory algorithms in which no I/O operations come into play. Also, communication between tasks through shared state should be avoided as much as possible, because that implies that locking might have to be performed. Ideally, tasks communicate only when one task forks another or when one task joins another.

ExecutorService continues to be a fine solution for many concurrent programming tasks, and in programming scenarios in which recursion is vital to processing power, it makes sense to use Fork/join. This fork and join feature is used in Java 8 parallel stream processing with lambda expressions.

Labels: ,

Apr 12, 2014

Top 8 new features in Java 7 with examples -- part 2

Extends Top 8 new features in Java 7 with examples -- part 1.


#5 Multi-catch to avoid code duplication

public class Java7Feature4 {

 public static void main(String[] args) {

  //pre Java 7
  try {
   someMethod();
  } catch (CustomException1 ex1) {
   ex1.printStackTrace();
  } catch (CustomException2 ex2) {
   ex2.printStackTrace();
  }
  
  
  //Java 7 -- 5 lines as opposed to 7 lines.
  //no code duplication
  try {
   someMethod();
  } catch (CustomException1|CustomException2 ex) {
   ex.printStackTrace();
  }
 }

 public static void someMethod() throws CustomException1, CustomException2 {

 }

 public static class CustomException1 extends Exception {
  private static final long serialVersionUID = 1L;
 }

 public static class CustomException2 extends Exception {
  private static final long serialVersionUID = 1L;
 }

}

Note that the pipe ‘|’ character is used as the delimiter.

#6 Improved type inference for generic instance creation

This is only a small change that makes generics declaration a little less verbose. As shown below, you can just use empty diamond "<>" in Java 7 on the RHS.

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Java7Feature4 {

 public static void main(String[] args) {
        //Pre Java 7
  getEmployeesWithManagersOld("a102");
  //Java 7
  getEmployeesWithManagersNew("a102");
 }

 
 public static Map<String, List<Employee>>  getEmployeesWithManagersOld(String empCode){
   if(empCode == null){
      return Collections.emptyMap();
   }
   
   //gives type safety warning. You need to add <String, List<Employee>> again on the RHS
   Map<String, List<Employee>> mapEmployees = new HashMap();
   
   
   return mapEmployees;
 }

 
 //Java 7
 public static Map<String, List<Employee>>  getEmployeesWithManagersNew(String empCode){
   if(empCode == null){
      return Collections.emptyMap();
   }
   
   //no duplication of generic inference
   Map<String, List<Employee>> mapEmployees = new HashMap<>();
   //do something with mapEmployees
   
   return mapEmployees;
 }

  
 static class Employee {}
}


#7: More new I/O APIs for the Java platform (NIO - 2.0)

Those who worked with Java IO may still remember the headaches that framework caused. It was never easy to work seamlessly across operating systems or multi-file systems. The NIO 2.0 has come forward with many enhancements. It’s also introduced new classes to ease the life of a developer when working with multiple file systems with classes and interfaces such as Path, Paths, FileSystem, FileSystems and others.

Another very handy feature is the WatchService for file change notifications. It can monitor a directory for changes as demonstrated below.

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;

public class Java7Feature7 {

 public static void main(String[] args) throws IOException, InterruptedException {
  
  // Java 7
  Path path = Paths.get("c:\\Temp\\simple.txt");
  System.out.println(path.getFileName());
  System.out.println(path.getRoot());
  System.out.println(path.getParent());

  // Java 7 file change watch service
  WatchService watchService = FileSystems.getDefault().newWatchService();
  
  //register Temp folder with the watch service for addition of new file, modification of a file name, and deletion of a file
  path.getParent().register(watchService, StandardWatchEventKinds.ENTRY_CREATE,
    StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);

  //wait for incoming events
  while (true) {
   final WatchKey key = watchService.take();
   for (WatchEvent<?> watchEvent : key.pollEvents()) {
    final Kind<?> kind = watchEvent.kind();
    // Overflow event
    if (StandardWatchEventKinds.OVERFLOW == kind) {
     continue; // loop
    } else if (StandardWatchEventKinds.ENTRY_CREATE == kind || StandardWatchEventKinds.ENTRY_MODIFY == kind
      || StandardWatchEventKinds.ENTRY_DELETE == kind) {
     @SuppressWarnings("unchecked")
     final WatchEvent<Path> watchEventPath = (WatchEvent<Path>) watchEvent;
     final Path entry = watchEventPath.context();

     System.out.println(kind + "-->" + entry);

    }

   }

   if (!key.reset()) {
    break;
   }
  }

  // deleting a file is as easy as.
  Files.deleteIfExists(path); // Java 7 feature as well.
 }

}


The output will be something like:

simple.txt
c:\
c:\Temp
ENTRY_CREATE-->New Text Document.txt
ENTRY_DELETE-->New Text Document.txt
ENTRY_CREATE-->File1.txt
ENTRY_MODIFY-->File1.txt


#8: Fork and Join

Java 7 has incorporated the feature that would distribute the work across multiple cores and then join them to return the result set as a Fork and Join framework. he effective use of parallel cores in a Java program has always been a challenge. It’s a divide-and-conquer algorithm where Fork-Join breaks the task at hand into mini-tasks until the mini-task is simple enough that it can be solved without further breakups. One important concept to note in this framework is that ideally no worker thread is idle. They implement a work-stealing algorithm in that idle workers “steal” the work from those workers who are busy.

The example below demonstrates this with a simple task of summing up 10 numbers. If the count of numbers to be added are greater than 5, it is forked into chunks of 5 to be processed by separate thread, and the forked sum are then joined to give the overall total of 10 numbers from 1 to 10, which is 55.  The total of numbers 1 to 5 is 15, and 6 to 10 is 40.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class Java7Feature8 {

 static int[] numbers = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

 public static void main(String[] args) throws IOException, InterruptedException {
     int numberOfCpuCores = Runtime.getRuntime().availableProcessors();
     ForkJoinPool forkJoinPool = new ForkJoinPool(numberOfCpuCores);
     int sum = forkJoinPool.invoke(new ChunkingTask(numbers));
     System.out.println(sum);
 }

 //inner class
 static class SumCalculatorTask extends RecursiveTask<Integer> {
  int[] numbers;

  SumCalculatorTask(int[] numbers) {
   this.numbers = numbers;
  }

  @Override
  protected Integer compute() {
   int sum = 0;
   for (int i : numbers) {
    sum += i;
   }
   
   System.out.println(Thread.currentThread().getName() + " sum = " + sum);

   return sum;
  }
 }

 
 //inner class
 
 /**
  * 
  *chunking size is 5
  */
 static class ChunkingTask extends RecursiveTask<Integer> {

  private static final int CHUNK_SIZE = 5;
  int[] numbers;

  ChunkingTask(int[] numbers) {
   this.numbers = numbers;
  }

  @Override
  protected Integer compute() {
   int sum = 0;
   List<RecursiveTask<Integer>> forks = new ArrayList<>();
   
   //if the numbers size is > CHUNK_SIZE fork them
   if (numbers.length > CHUNK_SIZE) {
    ChunkingTask chunk1 = new ChunkingTask(Arrays.copyOfRange(numbers, 0, numbers.length / 2));
    ChunkingTask chunk2 = new ChunkingTask(Arrays.copyOfRange(numbers, numbers.length / 2, numbers.length));
    forks.add(chunk1);
    forks.add(chunk2);
    chunk1.fork();
    chunk2.fork();
   //size is less than or equal to CHUNK_SIZE start summing them 
   } else {
    SumCalculatorTask sumCalculatorTask = new SumCalculatorTask(numbers);
    forks.add(sumCalculatorTask);
    sumCalculatorTask.fork();
   }

   // Combine the result from all the tasks
   //join 
   for (RecursiveTask<Integer> task : forks) {
    sum += task.join();
   }

   return sum;
  }

 }

}


Output is:

ForkJoinPool-1-worker-2 sum = 15
ForkJoinPool-1-worker-2 sum = 40
55

Java 8's Arrays.parallelSort( ... ) make use of this fork and join feature to sort an array in parallel.

Labels:

Apr 11, 2014

Top 8 new features in Java 7 with examples

There are several small new features and enhancements in Java 7. The major features and enhancements coming in Java 8. Let's look at the Java 7 new features.

#1: string in switch statement:

public class Java7Feature1 {

 private static String color = "BLUE";

 private enum Color {
  RED, GREEN
 };

 public static void main(String[] args) {

  // Pre Java 5
  if (color.equals("RED")) {
   System.out.println("Color is Red");
  } else if (color.equals("GREEN")) {
   System.out.println("Color is Green");
  } else {
   System.out.println("Color not found");
  }

  // Java 5 enum. try/catch is required for colours other than RED and GREEN 
  try {
   switch (Color.valueOf(color)) {
   case RED:
    System.out.println("Color is Red");
    break;
   case GREEN:
    System.out.println("Color is Green");
   }
  } catch (IllegalArgumentException e) {
   System.out.println("Color not found");
  }

  // Java 7 String in switch statement for simplicity & better readability
  //JDK 7 switch performs better than if-else
  //using types with enums is only useful when it serves a meaningful purpose
  //the value for color could come from database, and string in switch is handy for this
  switch (color) {
  case "RED":
   System.out.println("Color is Red");
   break;
  case "GREEN":
   System.out.println("Color is Green");
   break;
  default:
   System.out.println("Color not found");
  }
 }

}



Output is:

Color not found
Color not found
Color not found


#2 Binary integral literals

public class Java7Feature2 {

 public static void main(String[] args) {
  // Pre Java 7
  int n = Integer.parseInt("10000000", 2);
  System.out.println(n);

  n = 1 << 7;
  System.out.println(n);

  // Java 7
  n = 0b10000000; // 128 = 2^7
  System.out.println(n);
 }
}


Output:

128
128
128


#3: Underscores for better readability in numeric literals

public class Java7Feature3 {

 public static void main(String[] args) {
         //pre Java 7
   int million = 1000000;
   System.out.println(million);
   
   //Java 7. More readable
   million = 1_000_000;
   System.out.println(million);
      
   //consecutive underscores are allowed
   int ten_million = 10__000_000;
   System.out.println(ten_million);
      
   //underscores can be used in other numeric types
   double million_dollars_5_cents = 1_000_000.0_5d;
   System.out.println(million_dollars_5_cents);
   
   //illegal to have underscores 
   //1. start or end a literal with an underscore _10.00, 10.00_
   //2. have underscores before or after a decimal point 10_.00, 10._00
 }
}

Output:

1000000
1000000
10000000
1000000.05



#4: AutoCloseable interface. 

Java 5 introduced the Closeable interface and Java 7 has introduced the AutoCloseable interface to avoid the unsightly try/catch/finally(within finally try/catch) blocks to close a resource. It also prevents potential resource leaks due to not properly closing a resource. The java.io.InputStream and java.io.OutputStream now implements the AutoCloseable interface.



try-with-resources is one of the most useful additions in Java 7.

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

public class Java7Feature4 {

 public static void main(String[] args) {
  // pre Java 7
  BufferedReader br = null;

  try {
   File f = new File("c://temp/simple.txt");
   InputStream is = new FileInputStream(f);
   InputStreamReader isr = new InputStreamReader(is);
   br = new BufferedReader(isr);

   String read;

   while ((read = br.readLine()) != null) {
    System.out.println(read);
   }
  } catch (IOException ioe) {
   ioe.printStackTrace();
  } finally {
   try {
    if (br != null)
     br.close();
   } catch (IOException ex) {
    ex.printStackTrace();
   }
  }

  
  // Java 7 -- more concise 11 lines as opposed to 20 lines
  try (InputStream is = new FileInputStream(new File("c://temp/simple.txt"));
    InputStreamReader isr = new InputStreamReader(is);
    BufferedReader br2 = new BufferedReader(isr);) {

   String read;

   while ((read = br2.readLine()) != null) {
    System.out.println(read);
   }

  }
  catch (IOException ioe) {
   ioe.printStackTrace();
  }

 }

}

The output is:

Big
brown fox
jumped over the fence
Big
brown fox
jumped over the fence


try can now have multiple statements in the parenthesis and each statement should create an object which implements the new java.lang.AutoCloseable interface. The AutoCloseable interface consists of just one method. void close () throws Exception {}. Each AutoClosable resource created in the try statement will be automatically closed! If an exception is thrown in the try block and another Exception is thrown while closing the resource, the first Exception is the one eventually thrown to the caller.

Think of the close( ) method as implicitly being called as the last line in the try block.


Features 5 to 8 are covered in Java 8 new features in Java 7 with examples -- part 2

Labels: