Java Locks and Concurrency

(updated: )
  1. 1. Locks
    1. 1.1. Implicit Locks
    2. 1.2. Lock
    3. 1.3. ReadWriteLock
    4. 1.4. StampedLock
    5. 1.5. LockSupport
  2. 2. Concurrency
    1. 2.1. CountDownLatch
    2. 2.2. CyclicBarrier
    3. 2.3. Semaphore
    4. 2.4. Exchanger
    5. 2.5. Atomic
    6. 2.6. Blocking Queues
      1. 2.6.1. ArrayBlockingQueue
      2. 2.6.2. LinkedBlockingQueue / LinkedBlockingDeque
      3. 2.6.3. SynchronousQueue
      4. 2.6.4. DelayQueue
      5. 2.6.5. LinkedTransferQueue
      6. 2.6.6. PriorityBlockingQueue
    7. 2.7. Thread Pools
    8. 2.8. Future
      1. 2.8.1. CompletableFuture
    9. 2.9. ForkJoin
      1. 2.9.1. ForkJoinPool
      2. 2.9.2. Work Stealing Algorithm
      3. 2.9.3. ForkJoinTask
  3. 3. AQS
    1. 3.1. Classes
    2. 3.2. Exclusive Acquire
    3. 3.3. Release
  4. 4. Deadlocks Prevention
  5. 5. Reference

Locks

Implicit Locks

Or synchronized methods and synchronized blocks.

Each object in java has a intrinsic lock associated with it.

Synchronized methods use the lock of method owner object; Synchronized blocks, if you don’t pass in any object in, same as sychronized methods, otherwise, it use the object you passed in as the lock.

Synchronized methods and blocks are reentrant.

Drawbacks

  • fairness lock
  • read / write lock

Lock

Lock

Here indicates its implementation ReentrantLock. Use lock() to aquire the lock and unlock() to release it. And make sure applying the try-finally pattern with it, in case of exceptions.

ReadWriteLock

Or its implementation ReentrantReadWriteLock, which defined a pair of locks for read and write access. It allows multiply read threads holding the read lock when no write lock been hold. It would improve the throughput for circonstances that reads are more frequent than writes.

StampedLock

Also provide read/write locks, but is NOT reentrant.

The locking methods of a StampedLock return a stamp represented (a long value), which can be used to release a lock or to check if the lock is still valid.

  • readLock(): exclusive blocking
  • writeLock(): non-exclusive blocking
  • tryOptimisticRead(): returns a non-zero stamp only if the lock is not currently held in write mode, and you have to use validate(stamp) (might be repeatablly) to test if any write lock has break it
  • tryConvertToWriteLock(): try to convert a read lock to write lock

LockSupport

At most one permit, but permit can prepared before acquiring.

  • park() to block the thread waiting for a permit
  • unpark(thread) to add a permit (not accumulate) to the thread

  • thread.interrupt() can also unpark the thread, but won’t throw InterrupptedException

  • park will return if the caller’s thread was interrupted
  • unpark can be invoked before park
  • parking methods may return spuriously, so it is important to call park() in a loop that can repark the thread if it should not have resumed
  • the blocker object is for diagnose purpose

Concurrency

Concurrent Collections

CountDownLatch

A latch that multiply threads waiting for, will be notified only once when the counter reaches zero.

CyclicBarrier

Repeatable CountDownLatch.

Semaphore

Only n processes can access a certain resource at a given time.

  • acquire() will block until permits are available
  • release() to release a permit
  • acquireInterruptibly() acquire a resource, reattempting if it is interrupted
  • tryAcquire() can limit how long we will wait for a permit

Some tricks:

  • Release doesn’t have to be called by the same thread as acquire, or the permit is accumulate
  • Increase the number of permits at runtime (release() will always increase the number of permits)

Exchanger

GC-less exchange data between only two threads.

Atomic

Atomic classes utlize CAS (compare-and-swap), which is directly supported by CPUs, so it’s much faster than synchronized and locks.

  • AtomicBoolean
  • AtomicInteger
  • AtomicLong
  • AtomicReference
  • LongAdder: preferable if more write threads than read threads, but take more memory
  • LongAccumulator: generalized version of LongAdder, use LongBinaryOperator as operations

Blocking Queues

ArrayBlockingQueue

  • fixed size
  • array-based
  • best performance
  • single-lock double condition algorithm

LinkedBlockingQueue / LinkedBlockingDeque

  • linked-list based
  • 2 locks 2 conditions

SynchronousQueue

  • zero capacity
  • the thread inserting data will block until there is a thread to remove that data or vice-versa
  • does not permit null elements

DelayQueue

  • blocks the elements internally until a certain delay has expired
  • elements must implement java.util.concurrent.Delayed

LinkedTransferQueue

  • waits for consumer to consume the element (message passing need to be guaranteed)

PriorityBlockingQueue

  • concurrent version of PriorityQueue

Thread Pools

Executors

  • newCachedThreadPool(): reuses threads when possible, creates new ones as needed with no configured limit
  • newFixedThreadPool(int nThreads): uses only up to the number of threads specified
  • newScheduledThreadPool(int corePoolSize): schedules threads with delayed execution, returns ScheduledExecutorService with:
    • schedule(Runnable command, long delay, TimeUnit unit)
    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
  • newSingleThreadExecutor() and newSingleThreadScheduledExecutor(): a single thread/task is executing at a time.

Future

Future

CompletableFuture

Some methods

method parameter description
supplyAsync Supplier<U> -> U apply a function to the result
thenApply T -> U apply a function to the result
thenAccept T -> void like thenApplay but with no return value
handle (T, Throwable) -> U process the error result
whenComplete (T, Throwable) -> void like handle but with not return value
thenCompose T -> CompletableFuture invoke the function on the result and esecute returned future
thenRun Runnable execute the runnable

ForkJoin

The fork/join framework is to parallel processing tasks by trying to use all available processor cores.
Tasks are stored in a deque.

It can speed up processing of large tasks, but should follow these guidelines:

  • Use as few thread pools as possible
  • Use the default common thread pool
  • Use a reasonable threshold
  • Avoid any blocking in your ForkJoingTasks

ForkJoinPool

The ForkJoinPool which implemented ExecutorService, is the heart of the framework.

Work Stealing Algorithm

Free threads try to “steal” work from deques of busy threads from tail.

By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue

ForkJoinTask

The base type for tasks executed inside ForkJoinPool.
Your task class should extend either RecursiveAction for void tasks, or RecursiveTask<V> for tasks that return a value.

AQS

Or AbstractQueuedSynchronizer.
It provides a framework for implementing blocking locks and related synchronizers like semaphores, by using a CLH derived queue, where each node represent a thread, consists of the following properties:

properties description
prev:Node previous node
next:Node next node
thread:Thread the thread that enqueued this node
nextWaiter:Node Link to next node waiting on condition, or the special value SHARED
waitStatus:int see bellow

waitStatus

  • 0: new node or to be updated, or releasing
  • CANCELLED = 1: been cancelled due to timeout or interrupt, will be removed from the queue
  • SIGNAL = -1: the successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels
  • CONDITION = -2: on a condition queue, will not be used as a sync queue node until transferred (waitState -> 0)
  • PROPAGATE = -3: a releaseShared should be propagated to other nodes. in shared mode, the head node could be in this state

Classes

Classes

To implement your own synchronizer, you would focus on the following methods:

  • isHeldExclusively()
  • tryAcquire(int)
  • tryRelease(int)
  • tryAcquireShared(int)
  • tryReleaseShared(int)

Exclusive Acquire

Exclusive Acquire

Release

Release

Deadlocks Prevention

  • acquire multiple locks in a consistent order
  • don’t execute foreign code while holding a lock
  • use interruptible locks

Reference