• 周五. 12月 2nd, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

[Java Concurrent Programming Practice 14] building custom Synchronizers

[db:作者]

1月 6, 2022

JDK Contains many classes that exist state dependencies , for example FutureTask、Semaphore and BlockingQueue, Some of their operations have preconditions , For example, it is not empty 、 The task has been completed, etc .

The easiest way to create a state dependent class is in JDK Provides a state dependent class based on . for example ValueLactch, If these are not satisfied , have access to Java Language or the underlying mechanism provided by the class library , Include

  • Built in condition queue
  • condition
  • AQS

This chapter introduces these .

1 State dependency management State Dependence

The next section describes how to use conditional queues to solve blocked thread runs .
Next, we will introduce polling and hibernation ( reluctantly ) The solution of the .

Standard template :

void blockingAction() throws InterruptedException {

acquire lock on object state
while (precondition does not hold) {

release lock
wait until precondition might hold
optionally fail if interrupted or timeout expires
reacquire lock
}
perform action
}

Take a look at several ways to block bounded queues . The premise of dependence :

  • Cannot get element from empty cache
  • Cannot put element into full cache

Do not meet the conditions , State dependent operations can :

  • Throw an exception
  • Returns an error status ( code )
  • Block until you get to the right state

Here’s the base class , Thread safety , But not blocked .

@ThreadSafe
public abstract class BaseBoundedBuffer <V> {

@GuardedBy("this") private final V[] buf;
@GuardedBy("this") private int tail;
@GuardedBy("this") private int head;
@GuardedBy("this") private int count;
protected BaseBoundedBuffer(int capacity) {

this.buf = (V[]) new Object[capacity];
}
protected synchronized final void doPut(V v) {

buf[tail] = v;
if (++tail == buf.length)
tail = 0;
++count;
}
protected synchronized final V doTake() {

V v = buf[head];
buf[head] = null;
if (++head == buf.length)
head = 0;
--count;
return v;
}
public synchronized final boolean isFull() {

return count == buf.length;
}
public synchronized final boolean isEmpty() {

return count == 0;
}
}

“ Check before running ” The logical solution for :
The caller has to deal with the failure of the precondition himself . Of course, you can also return error messages .

Of course, the caller may not Sleep, Instead, just try again , This is it. Busy waiting or spin waiting (busy waiting or spin waiting), If it’s not changed for a long time , So it’s going to take a lot of CPU Time !!! So the caller sleeps himself ,sleep Give up CPU. But it’s an embarrassing time :

  • sleep If the precondition is satisfied after a long time, isn’t it just waiting for nothing, so the response is low
  • sleep Short waste CPU Clock cycle

In addition, you can try yield, But it’s not reliable .

@ThreadSafe
public class GrumpyBoundedBuffer <V> extends BaseBoundedBuffer<V> {

public GrumpyBoundedBuffer() {

this(100);
}
public GrumpyBoundedBuffer(int size) {

super(size);
}
public synchronized void put(V v) throws BufferFullException {

if (isFull())
throw new BufferFullException();
doPut(v);
}
public synchronized V take() throws BufferEmptyException {

if (isEmpty())
throw new BufferEmptyException();
return doTake();
}
}
class ExampleUsage {

private GrumpyBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
void useBuffer() throws InterruptedException {

while (true) {

try {

String item = buffer.take();
// use item
break;
} catch (BufferEmptyException e) {

Thread.sleep(SLEEP_GRANULARITY);
}
}
}
}

Optimization makes the client more comfortable :

@ThreadSafe
public class SleepyBoundedBuffer <V> extends BaseBoundedBuffer<V> {

int SLEEP_GRANULARITY = 60;
public SleepyBoundedBuffer() {

this(100);
}
public SleepyBoundedBuffer(int size) {

super(size);
}
public void put(V v) throws InterruptedException {

while (true) {

synchronized (this) {

if (!isFull()) {

doPut(v);
return;
}
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
public V take() throws InterruptedException {

while (true) {

synchronized (this) {

if (!isEmpty())
return doTake();
}
Thread.sleep(SLEEP_GRANULARITY);
}
}
}

The test failed in this way , So release the lock , Let others do it , Sleep on your own , And then test , Repeat the process over and over again , Of course, it can be solved , But there are trade-offs ,CPU The choice between usage and responsiveness .

So we think that if this polling and sleeping dummy The way is not , It’s that there’s some kind of scheme for suspending threads , And this method ensures that when a condition is true when , Wake up the thread now , That would greatly simplify the implementation , This is the implementation of conditional queues .

Condition Queues My name comes from :
it gives a group of threads called the wait set a way to wait for a specific condition to become true. Unlike
typical queues in which the elements are data items, the elements of a
condition queue are the threads waiting for the condition.

Every Java An object can be a lock , Each object can also be used as a condition queue , also Object Of wait、notify and notifyAll It’s the internal condition queue API. Object’s built-in lock (intrinsic lock ) Associated with built-in condition queues , To be called X Any method of the conditional queue in , Must hold the object X The lock on the .

Object.wait Automatic release lock , And ask for os Suspends the current thread , Other threads can get the lock and modify the state of the object . When a suspended thread wakes up , It will reacquire the lock before returning .

@ThreadSafe
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {

// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {

this(100);
}
public BoundedBuffer(int size) {

super(size);
}
// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {

while (isFull())
wait();
doPut(v);
notifyAll();
}
// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {

while (isEmpty())
wait();
V v = doTake();
notifyAll();
return v;
}
// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
public synchronized void alternatePut(V v) throws InterruptedException {

while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}
}

Be careful , If a function fails “ Polling and sleeping ” To achieve , Then the conditional queue can’t be realized .

2 Use condition queue

2.1 Conditional predicate (The Condition Predicate)

A conditional predicate is a prerequisite for an operation to become a state dependent operation :

  • take The conditional predicate of the method is ” Cache is not empty “,take Method must first test conditional predicates before executing
  • put The conditional predicate of the method is ” Cache dissatisfaction “

There is an important ternary relationship in conditional waiting :

  • Lock
  • wait Method
  • Conditional predicate

Conditional predicates contain multiple state variables , State variables are protected by a lock , Therefore, you must hold the lock before testing the conditional predicate . The lock object and the condition queue object must be the same object .
wait Release the lock , Thread pending blocking , Wait until the timeout , Then it is interrupted by another thread or called up . After wake up ,wait The lock needs to be reacquired before returning , When a thread from wait Method , It doesn’t have any special priority when re requesting locks , Compete with others .

2.2 Wake up early

The other threads are in the middle , Acquire the lock , And the traversal is modified , At this point, the thread needs to re check the conditional predicate to acquire the lock .

wait block ----------race to get lock ------------------------------------------get lock -----
^
wait block --------> race to get lock ------get lock------> perform action ---> release lock
^
notifyAll

Of course, sometimes , For example, you don’t know why someone else called it notify or notifyAll, Maybe the conditional predicate is not satisfied at all , But the thread still gets the lock , then test Conditional predicate , Release the lock , All the other threads come here , That’s what happened “ Lied about the military ” ah .

Based on the above two situations , Conditional predicates must be retested .

When using condition waits (Object.wait or Condition.await):

  • Always have a condition predicate——some test of object state that must hold before proceeding;
  • Always test the condition predicate before calling wait, and again after returning from wait;
  • Always call wait in a loop;
  • Ensure that the state variables making up the condition predicate are guarded by the lock associated with the condition queue;
  • Hold the lock associated with the the condition queue when calling wait, notify, or notifyAll
  • Do not release the lock after checking the condition predicate but before acting on it.

The template is :

void stateDependentMethod() throws InterruptedException {

// condition predicate must be guarded by lock
synchronized(lock) {

while (!conditionPredicate()) // We must make conditional predicates in the loop 
lock.wait(); // Ensure and synchronized It's an object 
// object is now in desired state // Don't release the lock 
}
}

2.3 Lost signal

Guarantee notify It must be wait after

2.4 notice

call notify and notifyAll You also have to hold the lock associated with the conditional queue object . call notify,JVM Thread Scheduler Select one of the threads waiting on the condition queue to wake up , and notifyAll Will wake up all threads . So once notify So we need to release the lock as soon as possible , Otherwise, others are competing for the lock , It’s all going on blocked The state of , Instead of the thread hanging waiting state , Competition is not a good thing , But this is a contradiction between performance factor and safety factor , Specific problems should be analyzed .

The following methods can come in to reduce competition , But implementation is a bit hard to write , So we have to compromise :

public synchronized void alternatePut(V v) throws InterruptedException {

while (isFull())
wait();
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty)
notifyAll();
}

Use notify It’s easy to lose the signal , So most of the time you use notifyAll, such as take notify, But I informed the other take, No notification put, So that’s signal loss , It’s a kind of “ Hijacked ” The signal .

Therefore, only the following two conditions are satisfied , Ability to use notify, instead of notifyAll:

  • All wait threads are of the same type
  • Single in, single out

2.5 Example : Valves A Gate Class

And the 5 Chapter one TestHarness Use in CountDownLatch similar , Totally usable wait/notifyAll Make a valve .

@ThreadSafe
public class ThreadGate {

// CONDITION-PREDICATE: opened-since(n) (isOpen || generation>n)
@GuardedBy("this") private boolean isOpen;
@GuardedBy("this") private int generation;
public synchronized void close() {

isOpen = false;
}
public synchronized void open() {

++generation;
isOpen = true;
notifyAll();
}
// BLOCKS-UNTIL: opened-since(generation on entry)
public synchronized void await() throws InterruptedException {

int arrivalGeneration = generation;
while (!isOpen && arrivalGeneration == generation)
wait();
}
}

3 Explicit Condition Objects

Lock It’s an alternative to a built-in lock , and Condition It’s also a kind of generalized Built in condition queue .
Condition Of API:

public interface Condition {

void await() throws InterruptedException;
boolean await(long time, TimeUnit unit)throws InterruptedException;
long awaitNanos(long nanosTimeout) throws InterruptedException;
void awaitUninterruptibly();
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

There are some flaws in built-in conditional queues , Every built-in lock can only have One The associated condition queue . So in BoundedBuffer In this class , Multiple threads may be waiting for different conditional predicates on the same conditional queue , therefore notifyAll Frequent notification is not the same type of requirement . If you want to write a concurrent object with multiple conditional predicates , Or want more control than conditional queue visibility , have access to Lock and Condition, Instead of built-in locks and conditional queues , It’s more flexible .

One Condition And a lock relation , Imagine a conditional queue associated with a built-in lock . stay Lock On the call newCondition You can create countless conditional predicates , these condition It’s interruptible 、 There’s a time limit , Fair or unfair queue operations .

The equivalents of wait, notify, and notifyAll for Condition objects are await, signal, and signalAll.

The following is an example of the reformed BoundedBuffer:

@ThreadSafe
public class ConditionBoundedBuffer <T> {

protected final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// CONDITION PREDICATE: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();
private static final int BUFFER_SIZE = 100;
@GuardedBy("lock") private final T[] items = (T[]) new Object[BUFFER_SIZE];
@GuardedBy("lock") private int tail, head, count;
// BLOCKS-UNTIL: notFull
public void put(T x) throws InterruptedException {

lock.lock();
try {

while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {

lock.unlock();
}
}
// BLOCKS-UNTIL: notEmpty
public T take() throws InterruptedException {

lock.lock();
try {

while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {

lock.unlock();
}
}
}

Pay attention to the use of signal instead of signalll, It can greatly reduce the number of context switches and lock requests in each cache operation .

Use condition Like built-in locks and conditional queues , We have to defend in lock Inside .

4 Synchronizer analyse

It looks like ReentrantLock and Semaphore The function is very similar , Only a certain number of threads are allowed to pass through each time , When you get to the valve :

  • Can pass lock perhaps acquire
  • wait for , Blocked up
  • Cancel tryLock,tryAcquire
  • Interruptible , Time limited
  • Fair wait and unfair wait

The following procedure is to use Lock Make one Mutex That is, holding a license Semaphore.

@ThreadSafe
public class SemaphoreOnLock {

private final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: permitsAvailable (permits > 0)
private final Condition permitsAvailable = lock.newCondition();
@GuardedBy("lock") private int permits;
SemaphoreOnLock(int initialPermits) {

lock.lock();
try {

permits = initialPermits;
} finally {

lock.unlock();
}
}
// BLOCKS-UNTIL: permitsAvailable
public void acquire() throws InterruptedException {

lock.lock();
try {

while (permits <= 0)
permitsAvailable.await();
--permits;
} finally {

lock.unlock();
}
}
public void release() {

lock.lock();
try {

++permits;
permitsAvailable.signal();
} finally {

lock.unlock();
}
}
}

Actually a lot of J.U.C The following classes are all based on AbstractQueuedSynchronizer (AQS) Built , for example CountDownLatch, ReentrantReadWriteLock, SynchronousQueue and FutureTask(java7 Not after that ).AQS A lot of detail problems in the design of synchronizer are solved , For example, waiting thread adopts FIFO Operation sequence of queue operation .AQS It can not only reduce the workload of synchronizer , And you don’t have to deal with competition , be based on AQS A build can only block at one moment , This reduces the cost of context switching , Increase throughput . In the design AQS when , Full consideration of scalability .

5 AbstractQueuedSynchronizer (AQS)

be based on AQS In the synchronizer class built , The most advanced operations include various forms of get operations and release operations . The get operation is a state dependent operation , And it’s usually blocked .

If a class wants to be state dependent , It has to have some states ,AQS Responsible for managing these States , adopt getState,setState, compareAndSetState etc. protected Type method . This is the template pattern in design patterns .

Use AQS The template is as follows :

Get the lock : First, judge whether the current state allows to acquire the lock , If so, get the lock , Otherwise, the operation will be blocked or the acquisition will fail , That is to say, if it is an exclusive lock, it may block , If it’s a shared lock, it may fail . In addition, if the thread is blocked , Then the thread needs to enter the blocking queue . Modify the state when the state bit allows the lock to be acquired , And if it’s in the queue, it’s removed from the queue .

Release the lock : The process is to modify the status bits , If a thread is blocked by the status bit, it wakes up one or more threads in the queue .

boolean acquire() throws InterruptedException {

while (state does not permit acquire) {

if (blocking acquisition requested) {

enqueue current thread if not already queued
block current thread
}
else
return failure
}
possibly update synchronization state
dequeue thread if it was queued
return success
}
void release() {

update synchronization state
if (new state may permit a blocked thread to acquire)
unblock one or more queued threads
}

To support the above two operations, you must have the following conditions :

  • The state bit of the atomic operation synchronizer
  • Blocking and waking threads
  • An orderly queue

1 Atomic operations of state bits

Here we use a 32 Bit integer to describe the status bit , The theoretical knowledge of atomic operation in the previous chapter is useful , It’s still used here CAS Operation to solve this problem . In fact, there’s another 64 Bit version synchronizer (AbstractQueuedLongSynchronizer), Let’s not talk about .

2 Blocking and waking threads

The standard JAVA API It can’t be suspended inside ( Blocking ) One thread , And then wake it up at some point in the future .JDK 1.0 Of API There are Thread.suspend and Thread.resume, And it’s been going on . But these are outdated API, And it’s not recommended .

HotSpot stay Linux By calling pthread_mutex_lock Function to give the thread to the system kernel for blocking .

stay JDK 5.0 Use it later JNI stay LockSupport Class .

LockSupport.park() LockSupport.park(Object) LockSupport.parkNanos(Object, long) LockSupport.parkNanos(long) LockSupport.parkUntil(Object, long) LockSupport.parkUntil(long) LockSupport.unpark(Thread)

above API in park() It is called in the current thread , Cause thread to block , Parameterized Object It’s a pending object , In this way, when monitoring, we can know what resources the thread is blocked for . because park() Return immediately , So it is usually necessary to detect competing resources in the loop to decide whether to block next time .park() There are three reasons to return :

  • Some other thread call takes the current thread as the target call unpark;
  • Some other thread interrupt Current thread ;
  • The call illogically ( That is, without any reason ) return .

In fact, the third one determines the need for cyclic detection , It’s similar to what’s usually written while(checkCondition()){Thread.sleep(time);} Similar functions .

3 An orderly queue

stay AQS Used in CHL List to solve the problem of ordered queues .

AQS Adopted CHL The model uses the following algorithm to complete FIFO In and out of the queue . The operation of the queue is through Lock-Free(CAS) operation .

Self realized CLH SpinLock as follows :

class ClhSpinLock {

private final ThreadLocal<Node> prev;
private final ThreadLocal<Node> node;
private final AtomicReference<Node> tail = new AtomicReference<Node>(new Node());
public ClhSpinLock() {

this.node = new ThreadLocal<Node>() {

protected Node initialValue() {

return new Node();
}
};
this.prev = new ThreadLocal<Node>() {

protected Node initialValue() {

return null;
}
};
}
public void lock() {

final Node node = this.node.get();
node.locked = true;
// One CAS Operation can add the node corresponding to the current thread to the queue ,
// At the same time, we get the reference of the previous node , Then wait for the previous release lock 
Node pred = this.tail.getAndSet(node);
this.prev.set(pred);
while (pred.locked) {
// Into spin 
}
}
public void unlock() {

final Node node = this.node.get();
node.locked = false;
this.node.set(this.prev.get());
}
private static class Node {

private volatile boolean locked;
}
}

For entering the queue (*enqueue):* use CAS operation , Whether the tail nodes are consistent each time , And then insert it into the tail node .

do {
pred = tail;
}while ( !compareAndSet(pred,tail,node) );

For out of line (dequeue): Because each node also caches a state , Decide whether to get out of the queue , So when the condition is not satisfied, we need to spin and wait , Once the condition is satisfied, the head node is set as the next node .

AQS There are three core fields :

private volatile int state;

private transient volatile Node head;

private transient volatile Node tail;

among state Describes how many threads have acquired locks , For mutexes state<=1.head/tail add CAS The operation constitutes a CHL Of FIFO queue . Here is Node Properties of a node .

Exclusive operation of API It’s all without shared, And shared ones include semaphore and countdownlatch It’s all with shared Literally API.

Some useful references :

**java.util.concurrent.locks.AbstractQueuedSynchronizer – **AQS

http://gee.cs.oswego.edu/dl/papers/aqs.pdf The paper

http://www.blogjava.net/xylz/archive/2010/07/08/325587.html A more comprehensive interpretation of another person

http://suo.iteye.com/blog/1329460

http://www.infoq.com/cn/articles/jdk1.8-abstractqueuedsynchronizer

http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html

http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-clh-and-spin-lock.html

http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-locksupport-and-thread-interrupt.html

Exclusive use TRyAcquire, TRyRelease, and isHeldExclusively, Use what you share tryAcquireShared and TRyReleaseShared. with try All prefix methods are template methods ,AQS It is used to judge whether it can continue , For example, if tryAcquireShared Returns a negative value , That means the lock acquisition failed , Those who fail need to enter CLH queue , And suspend the thread .

For example , A simple latching :

@ThreadSafe
public class OneShotLatch {

private final Sync sync = new Sync();
public void signal() {

sync.releaseShared(0);
}
public void await() throws InterruptedException {

sync.acquireSharedInterruptibly(0);
}
private class Sync extends AbstractQueuedSynchronizer {

protected int tryAcquireShared(int ignored) {

// Succeed if latch is open (state == 1), else fail
return (getState() == 1) ? 1 : -1;
}
protected boolean tryReleaseShared(int ignored) {

setState(1); // Latch is now open
return true; // Other threads may now be able to acquire
}
}
}

Here’s one of my own Mutex.

/**
* Lock free The mutex of , Simple implementation , Do not reenter the lock
*/
public class Mutex implements Lock {

private static final int FREE = 0;
private static final int BUSY = 1;
private static class LockSync extends AbstractQueuedSynchronizer {

private static final long serialVersionUID = 4689388770786922019L;
protected boolean isHeldExclusively() {

return getState() == BUSY;
}
public boolean tryAcquire(int acquires) {

return compareAndSetState(FREE, BUSY);
}
protected boolean tryRelease(int releases) {

if (getState() == FREE) {

throw new IllegalMonitorStateException();
}
setState(FREE);
return true;
}
Condition newCondition() {

return new ConditionObject();
}
}
private final LockSync sync = new LockSync();
public void lock() {

sync.acquire(0);
}
public boolean tryLock() {

return sync.tryAcquire(0);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {

return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {

sync.release(0);
}
public Condition newCondition() {

return sync.newCondition();
}
public boolean isLocked() {

return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {

return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(0);
}
}

6 AQS Implementation class

ReentrantLock

protected boolean tryAcquire(int ignored) {

final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {

if (compareAndSetState(0, 1)) {

owner = current;
return true;
}
} else if (current == owner) {

setState(c+1);
return true;
}
return false;
}

Semaphore and CountDownLatch

protected int tryAcquireShared(int acquires) {

while (true) {

int available = getState();
int remaining = available - acquires;
if (remaining < 0
|| compareAndSetState(available, remaining))
return remaining;
}
}
protected boolean tryReleaseShared(int releases) {

while (true) {

int p = getState();
if (compareAndSetState(p, p + releases))
return true;
}
}

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注