Java Locks and Concurrency
Locks
Implicit Locks
Or synchronized methods and synchronized blocks.
Each object in java has a intrinsic lock associated with it.
Synchronized methods use the lock of method owner object; Synchronized blocks, if you don’t pass in any object in, same as sychronized methods, otherwise, it use the object you passed in as the lock.
Synchronized methods and blocks are reentrant.
Drawbacks
- fairness lock
- read / write lock
Lock
Here indicates its implementation ReentrantLock
. Use lock()
to aquire the lock and unlock()
to release it. And make sure applying the try-finally pattern with it, in case of exceptions.
ReadWriteLock
Or its implementation ReentrantReadWriteLock
, which defined a pair of locks for read and write access. It allows multiply read threads holding the read lock when no write lock been hold. It would improve the throughput for circonstances that reads are more frequent than writes.
StampedLock
Also provide read/write locks, but is NOT reentrant.
The locking methods of a StampedLock return a stamp represented (a long value), which can be used to release a lock or to check if the lock is still valid.
readLock()
: exclusive blockingwriteLock()
: non-exclusive blockingtryOptimisticRead()
: returns a non-zero stamp only if the lock is not currently held in write mode, and you have to usevalidate(stamp)
(might be repeatablly) to test if any write lock has break ittryConvertToWriteLock()
: try to convert a read lock to write lock
LockSupport
At most one permit, but permit can prepared before acquiring.
park()
to block the thread waiting for a permitunpark(thread)
to add a permit (not accumulate) to the threadthread.interrupt() can also unpark the thread, but won’t throw InterrupptedException
- park will return if the caller’s thread was interrupted
- unpark can be invoked before park
- parking methods may return spuriously, so it is important to call park() in a loop that can repark the thread if it should not have resumed
- the blocker object is for diagnose purpose
Concurrency
CountDownLatch
A latch that multiply threads waiting for, will be notified only once when the counter reaches zero.
CyclicBarrier
Repeatable CountDownLatch
.
Semaphore
Only n processes can access a certain resource at a given time.
acquire()
will block until permits are availablerelease()
to release a permitacquireInterruptibly()
acquire a resource, reattempting if it is interruptedtryAcquire()
can limit how long we will wait for a permit
Some tricks:
- Release doesn’t have to be called by the same thread as acquire, or the permit is accumulate
- Increase the number of permits at runtime (
release()
will always increase the number of permits)
Exchanger
GC-less exchange data between only two threads.
Atomic
Atomic classes utlize CAS (compare-and-swap), which is directly supported by CPUs, so it’s much faster than synchronized and locks.
AtomicBoolean
AtomicInteger
AtomicLong
AtomicReference
LongAdder
: preferable if more write threads than read threads, but take more memoryLongAccumulator
: generalized version of LongAdder, useLongBinaryOperator
as operations
Blocking Queues
ArrayBlockingQueue
- fixed size
- array-based
- best performance
- single-lock double condition algorithm
LinkedBlockingQueue
/ LinkedBlockingDeque
- linked-list based
- 2 locks 2 conditions
SynchronousQueue
- zero capacity
- the thread inserting data will block until there is a thread to remove that data or vice-versa
- does not permit null elements
DelayQueue
- blocks the elements internally until a certain delay has expired
- elements must implement
java.util.concurrent.Delayed
LinkedTransferQueue
- waits for consumer to consume the element (message passing need to be guaranteed)
PriorityBlockingQueue
- concurrent version of
PriorityQueue
Thread Pools
newCachedThreadPool()
: reuses threads when possible, creates new ones as needed with no configured limitnewFixedThreadPool(int nThreads)
: uses only up to the number of threads specifiednewScheduledThreadPool(int corePoolSize)
: schedules threads with delayed execution, returnsScheduledExecutorService
with:schedule(Runnable command, long delay, TimeUnit unit)
scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
newSingleThreadExecutor()
andnewSingleThreadScheduledExecutor()
: a single thread/task is executing at a time.
Future
CompletableFuture
Some methods
method | parameter | description |
---|---|---|
supplyAsync | Supplier<U> -> U | apply a function to the result |
thenApply | T -> U | apply a function to the result |
thenAccept | T -> void | like thenApplay but with no return value |
handle | (T, Throwable) -> U | process the error result |
whenComplete | (T, Throwable) -> void | like handle but with not return value |
thenCompose | T -> CompletableFuture | invoke the function on the result and esecute returned future |
thenRun | Runnable | execute the runnable |
ForkJoin
The fork/join framework is to parallel processing tasks by trying to use all available processor cores.
Tasks are stored in a deque.
It can speed up processing of large tasks, but should follow these guidelines:
- Use as few thread pools as possible
- Use the default common thread pool
- Use a reasonable threshold
- Avoid any blocking in your ForkJoingTasks
ForkJoinPool
The ForkJoinPool which implemented ExecutorService
, is the heart of the framework.
Work Stealing Algorithm
Free threads try to “steal” work from deques of busy threads from tail.
By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue
ForkJoinTask
The base type for tasks executed inside ForkJoinPool.
Your task class should extend either RecursiveAction
for void tasks, or RecursiveTask<V>
for tasks that return a value.
AQS
Or AbstractQueuedSynchronizer
.
It provides a framework for implementing blocking locks and related synchronizers like semaphores, by using a CLH derived queue, where each node represent a thread, consists of the following properties:
properties | description |
---|---|
prev:Node | previous node |
next:Node | next node |
thread:Thread | the thread that enqueued this node |
nextWaiter:Node | Link to next node waiting on condition, or the special value SHARED |
waitStatus:int | see bellow |
waitStatus
- 0: new node or to be updated, or releasing
- CANCELLED = 1: been cancelled due to timeout or interrupt, will be removed from the queue
- SIGNAL = -1: the successor of this node is (or will soon be) blocked (via park), so the current node must unpark its successor when it releases or cancels
- CONDITION = -2: on a condition queue, will not be used as a sync queue node until transferred (waitState -> 0)
- PROPAGATE = -3: a releaseShared should be propagated to other nodes. in shared mode, the head node could be in this state
Classes
To implement your own synchronizer, you would focus on the following methods:
- isHeldExclusively()
- tryAcquire(int)
- tryRelease(int)
- tryAcquireShared(int)
- tryReleaseShared(int)
Exclusive Acquire
Release
Deadlocks Prevention
- acquire multiple locks in a consistent order
- don’t execute foreign code while holding a lock
- use interruptible locks
Reference
- https://www.javacodegeeks.com/2011/09/java-concurrency-tutorial.html
- http://winterbe.com/posts/2015/04/30/java8-concurrency-tutorial-synchronized-locks-examples/
- http://www.javarticles.com/2012/10/abstractqueuedsynchronizer-aqs.html
- https://www.ibm.com/developerworks/java/library/j-jtp05236/index.html
- http://gee.cs.oswego.edu/dl/papers/fj.pdf