Nonblocking algorithms

Posted on

Java 5.0 provides supports for additional atomic operations. This allows to develop algorithm which are non-blocking algorithm, e.g. which do not require synchronization, but are based on low-level atomic hardware primitives such as compare-and-swap (CAS). A compare-and-swap operation check if the variable has a certain value and if it has this value it will perform this operation.
Non-blocking algorithm are usually much faster then blocking algorithms as the synchronization of threads appears on a much finer level (hardware).
For example this created a non-blocking counter which always increases. This example is contained in the project “de.vogella.concurrency.nonblocking.counter”.

   
package de.vogella.concurrency.nonblocking.counter;

import java.util.concurrent.atomic.AtomicInteger;

public class Counter {
private AtomicInteger value = new AtomicInteger();
public int getValue(){
return value.get();
}
public int increment(){
return value.incrementAndGet();
}

// Alternative implementation as increment but just make the
// implementation explicit
public int incrementLongVersion(){
int oldValue = value.get();
while (!value.compareAndSet(oldValue, oldValue+1)){
oldValue = value.get();
}
return oldValue+1;
}

}

And a test.

   
package de.vogella.concurrency.nonblocking.counter;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Test {
private static final int NTHREDS = 10;

public static void main(String[] args) {
final Counter counter = new Counter();
List<Future<Integer>> list = new ArrayList<Future<Integer>>();

ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Callable<Integer> worker = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int number = counter.increment();
System.out.println(number);
return number ;
}
};
Future<Integer> submit= executor.submit(worker);
list.add(submit);

}


// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
}
Set<Integer> set = new HashSet<Integer>();
for (Future<Integer> future : list) {
try {
set.add(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
if (list.size()!=set.size()){
throw new RuntimeException("Double-entries!!!");
}

}


}

The interesting part is how incrementAndGet() is implemented. It uses a CAS operation.

    
public final int incrementAndGet() {
for (;;) {
int current = get();
int next = current + 1;
if (compareAndSet(current, next))
return next;
}
}

The JDK itself makes more and more use of non-blocking algorithms to increase performance for every developer. Developing correct non-blocking algorithm is not a trivial task. For more information on non-blocking algorithm, e.g. examples for a non-blocking Stack and non-block LinkedList, please see http://www.ibm.com/developerworks/java/library/j-jtp04186/index.html

Advertisements

Futures and Callables

Posted on

The executor framework presented in the last chapter works with Runnables. Runnable do not return result.
In case you expect your threads to return a computed result you can use java.util.concurrent.Callable. Callables allow to return values after competition.
Callable uses generic to define the type of object which is returned.
If you submit a callable to an executor the framework returns a java.util.concurrent.Future. This futures can be used to check the status of a callable and to retrieve the result from the callable.
On the executor you can use the method submit to submit a Callable and to get a future. To retrieve the result of the future use the get() method.

   
package de.vogella.concurrency.callables;

import java.util.concurrent.Callable;

public class MyCallable implements Callable<Long> {
@Override
public Long call() throws Exception {
long sum = 0;
for (long i = 0; i <= 100; i++) {
sum += i;
}
return sum;
}

}

   
package de.vogella.concurrency.callables;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableFutures {
private static final int NTHREDS = 10;

public static void main(String[] args) {

ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
List<Future<Long>> list = new ArrayList<Future<Long>>();
for (int i = 0; i < 20000; i++) {
Callable<Long> worker = new MyCallable();
Future<Long> submit = executor.submit(worker);
list.add(submit);
}
long sum = 0;
System.out.println(list.size());
// Now retrieve the result
for (Future<Long> future : list) {
try {
sum += future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println(sum);
executor.shutdown();
}
}

Threads pools with the Executor Framework

Posted on

Thread pools manage a pool of worker threads. The thread pools contains a work queue which holds tasks waiting to get executed.
A thread pool can be described as a collection of runnables (work queue) and a connections of running threads. These threads are constantly running and are checking the work query for new work. If there is new work to be done they execute this Runnable. The Thread class itself provides a method, e.g. execute(Runnable r) to add runnables to the work queue.
The Executor framework provides example implementation of the java.util.concurrent.Executor interface, e.g. Executors.newFixedThreadPool(int n) which will create n worker threads. The ExecutorService adds lifecycle methods to the Executor, which allows to shutdown the Executor and to wait for termination.

Tip

If you want to use one thread pool with one thread which executes several runnables you can use Executors.newSingleThreadExecutor();

Create again the Runnable.

   
package de.vogella.concurrency.threadpools;


/**
* MyRunnable will count the sum of the number from 1 to the parameter
* countUntil and then write the result to the console.
* <p>
* MyRunnable is the task which will be performed
*
* @author Lars Vogel
*
*/

public class MyRunnable implements Runnable {
private final long countUntil;

MyRunnable(long countUntil) {
this.countUntil = countUntil;
}

@Override
public void run() {
long sum = 0;
for (long i = 1; i < countUntil; i++) {
sum += i;
}
System.out.println(sum);
}
}

Now you run your runnables with the executor framework.

   
package de.vogella.concurrency.threadpools;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
private static final int NTHREDS = 10;

public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
for (int i = 0; i < 500; i++) {
Runnable worker = new MyRunnable(10000000L + i);
executor.execute(worker);
}
// This will make the executor accept no new threads
// and finish all existing threads in the queue
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {

}
System.out.println("Finished all threads");
}
}

In case the threads should return some value (result-bearing threads) then you could use java.util.concurrent.Callable.

Threads in Java

Posted on

The base means for concurrency are “java.lang.Threads”. Each thread will execute an object of type “java.lang.Runnable”. Runnable is an interface with defines only the one method “run()”. This method is called by “Thread” and contains the work which should be done. Therefore the “Runnable” is the task to perform. The Thread is the worker who is doing this task.
The following demonstrates a task (Runnable) which counts the sum of a given range of numbers. Create the Java project “de.vogella.concurrency.threads” for the example coding of this section.

   
package de.vogella.concurrency.threads;


/**
* MyRunnable will count the sum of the number from 1 to the parameter
* countUntil and then write the result to the console.
* <p>
* MyRunnable is the task which will be performed
*
* @author Lars Vogel
*
*/

public class MyRunnable implements Runnable {
private final long countUntil;

MyRunnable(long countUntil) {
this.countUntil = countUntil;
}

@Override
public void run() {
long sum = 0;
for (long i = 1; i < countUntil; i++) {
sum += i;
}
System.out.println(sum);
}
}

To perform a task (Runnables) you need to define and start a Thread. The following coding will create Threads, assigns a Runnable to each Thread, schedule the Threads to run and wait until all Threads are finished.

   
package de.vogella.concurrency.threads;

import java.util.ArrayList;
import java.util.List;

public class Main {

public static void main(String[] args) {
// We will store the threads so that we can check if they are done
List<Thread> threads = new ArrayList<Thread>();
// We will create 500 threads
for (int i = 0; i < 500; i++) {
Runnable task = new MyRunnable(10000000L + i);
Thread worker = new Thread(task);
// We can set the name of the thread
worker.setName(String.valueOf(i));
// Start the thread, never call method run() direct
worker.start();
// Remember the thread for later usage
threads.add(worker);
}
int running = 0;
do {
running = 0;
for (Thread thread : threads) {
if (thread.isAlive()) {
running++;
}
}
System.out.println("We have " + running + " running threads. ");
} while (running > 0);

}
}

Using Threads directly has the following disadvantages.

  • Creating a new thread causes some performance overhead
  • Too many threads can lead to reduced performance, as the CPU needs to switch between these threads.
  • You cannot easily control the number of threads, therefore you may run into out of memory errors due to too many threads.

The “java.util.concurrent” package offers improved support for concurrency compared to threads. The java.util.concurrent package helps solving several of the issues with threads.

Immutability and Defensive Copies

Posted on

1. Immutability

The simplest way to avoid problems with concurrency is to share only immutable data between threads. Immutable data is data which can not changed.
To make a class immutable make

  • all its fields final
  • the class declared as final
  • the this reference is not allowed to escape during construction
  • Any fields which refer to mutable data objects are
    • private
    • have no setter method
    • they are never directly returned of otherwise exposed to a caller
    • if they are changed internally in the class this change is not visible and has no effect outside of the class

An immutable class may have some mutable data which is uses to manages its state but from the outside this class nor any attribute of this classes can get changed.
For all mutable fields, e.g. Arrays, that are passed from the outside to the class during the construction phase, the class needs to make a defensive-copy of the elements to make sure that no other object from the outside still can change the data

2. Defensive Copies

You must protected your classes from calling code. Assume that calling code will do its best to change your data in a way you didn’t expect it. While this is especially true in case of immutable data it is also true for non-immutable data which you still not expect that this data is changed outside your class.
To protect your class against that you should copy data you receive and only return copies of data to calling code.
The following example creates a copy of a list (ArrayList) and returns only the copy of the list. This way the client of this class cannot remove elements from the list.

    
package de.vogella.performance.defensivecopy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class MyDataStructure {
List<String> list = new ArrayList<String>();

public void add(String s) {
list.add(s);
}


/**
* Makes a defensive copy of the List and return it
* This way cannot modify the list itself
*
* @return List<String>
*/

public List<String> getList() {
return Collections.unmodifiableList(list);
}
}

The Java memory model

Posted on

1. Overview

The Java memory model describes the communication between the memory of the threads and the main memory, e.g. then are changes of threads visible to others and then does a thread re-fresh its own memory from the main memory.
It also describes which operations are atomic and the ordering of operations.

2. Atomic operation

An atomic operation is an operation which is performed as a single unit of work without the possibility of interference from other operations.
In Java the language specification guarantees that that reading or writing a variable is atomic (unless the variable is of type long or double). Long and double are only atomic if they declared as volatile.
The operation i++ it not atomic in Java for the standard variables (int, long, etc). It first reads the value which is currently stored in i (atomic operations) and then it adds one to it (atomic operation). But between the read and the write the value of i might have changed. Java
Since Java 1.5 the java language provides atomic variables, e.g. AtomicInteger or AtomicLong which provide methods like getAndDecrement(), getAndIncrement() and getAndSet() which are atomic.

3. The “synchronized” keyword

For variables which are protected by synchronized it it guaranteed by the Java memory model that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock.

4. The “volatile” keyword

If a variable is declared as volatile then is guaranteed that any thread which reads the field will see the most recently written value.

Concurrency in Java

Posted on

1. Processes and Threads

A Java program runs in its own process. Java supports within one program the usage of threads.
Java supports threads as part of the Java language. Java 1.5 also provides improved support for concurrency with the in the package java.util.concurrent.
Java also provides locks to protect certain parts of the coding to be executed by several threads at the same time. The simplest way of locking a certain method or Java class is to use the keyword “synchronized” in a method or class declaration.

2. Synchronized

The synchronized keyword in Java ensures:

  • that only a single thread can execute a block of code at the same time
  • ensures that each thread entering a synchronized block of code sees the effects of all previous modifications that were guarded by the same lock

Synchronization is necessary for mutual exclusive access of code blocks and for reliable communication between threads.

synchronized can be used directly in the definition of a method. This would ensure that only one thread can access this method. Or you can used synchronized blocks which a named block (either a string or a object). Then all code which is protected by the same lock can only be executed by one thread.
For example the following datastructure will ensure that only one thread can access the inner block of add() and next()

    
package de.vogella.pagerank.crawler;

import java.util.ArrayList;
import java.util.List;


/**
* Data structure for a web crawler. Keeps track of the visited sites and keeps
* a list of sites which needs still to be crawled.
*
* @author Lars Vogel
*
*/

public class CrawledSites {
private List<String> crawledSites = new ArrayList<String>();
private List<String> linkedSites = new ArrayList<String>();

public void add(String site) {
synchronized (this) {
if (!crawledSites.contains(site)) {
linkedSites.add(site);
}
}
}


/**
* Get next site to crawl. Can return null (if nothing to crawl)
*/

public String next() {
if (linkedSites.size() == 0) {
return null;
}
synchronized (this) {
// Need to check again if size has changed
if (linkedSites.size() > 0) {
String s = linkedSites.get(0);
linkedSites.remove(0);
crawledSites.add(s);
return s;
}
return null;
}
}

}

3. Volatile

If a variable is declared as volatile then is guaranteed that any thread which reads the field will see the most recently written value. The keyword volatile will not perform any mutual exclusive lock on the variable.
As of Java 5 write access to a volatile variable will also update non-volatile variables which were modified by the same thread. This can also be used to update values within a reference variable, e.g. for a volatile variable person. In this case you must use a temporary variable person and use the setter to initialize the variable and then assign the temporary variable to the final variable. This will then make the address changes of this variable and the values visible to other threads.