first floor : See the blocking queue for the first time BlockingQueue:workQueue、 Task loading method Worker.addWorker()
The first level of thinking :
- corePoolSize BlockingQueue How to directly participate in scheduling
- It’s happening Worker What is it?
- Why didn’t it show up maximumPoolSize
- workerCountOf What is the definition of the number of active threads obtained
public void execute(Runnable command) {
// Thread pool metrics : Gets the thread pool state + Number of active threads ( Use binary bits to identify )
int c = ctl.get();
// Number of active threads < corePoolSize Call the task loading method Worker.addWorker(Runnable r, boolean core)
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 【 notes 1】 Parameter is true, Mark the added task as the core task
// Load successfully 【 Go straight back to 】
return;
// Refresh the thread pool indicator if the load fails
c = ctl.get();
}
// Number of active threads > corePoolSize, Add task to blocking queue BlockingQueue success
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// After the task is added successfully, monitor the thread pool status again
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// Number of active threads > corePoolSize And add blocking queues Failure , The reason may be that the blocking queue is full
else if (!addWorker(command, false)) // Add non core tasks
reject(command);
}
The second floor :Worker.addWorker() Really created threads
【 answer 】 The first level of thinking :
- corePoolSize BlockingQueue How to directly participate in scheduling
When Number of active threads < corePoolSize, Add a new one 【core = true】 The task will be directly addWorker( Mission ), No participation BlockingQueue The logic of
When Number of active threads > corePoolSize
—> Added to the BlockingQueue success : No special treatment
—> Added to the BlockingQueue Failure : Add a new one 【core = false】 The task will be directly addWorker( Mission ) - It’s happening Worker What is it?
ThreadPoolExecutor.Worker
ThreadPoolExecutor The inner class of
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
Worker
, It’s encapsulated Runnerable A thread safe class of - Why didn’t it show up maximumPoolSize
maximumPoolSize The logic of participation lies in addWorker(Runnable r, boolean core) The second parameter core in , See analysis below
There are statements in the second layer of code :wc >= (core ? corePoolSize : maximumPoolSize))
- workerCountOf What is the definition of the number of active threads obtained : Thread executing task
Number of core threads It’s not equal to Number of active threads , Because core threads can also block waiting tasks .
Blocked threads do not belong to the number of active threads
The principle of blocking will appear in the fourth layer .
The goal of the second level :
- addWorker(Runnerable r, boolean core) What did you do specifically , What does it have to do with threads
- workes What is the container
// Reduce the source code to the following logic
private boolean addWorker(Runnable firstTask, boolean core) {
retry: // The loop does not go down until it meets the adding conditions
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
for (;;) {
int wc = workerCountOf(c);
// maximumPoolSize There is , The outer layer comes in core == true, Indicates that the maximum number of active threads is determined by corePoolSize decision
// This Boolean maintains two exceptions : Whether to add core thread is abnormal , Whether to add to the maximum number of threads is abnormal
// Pass in core == false , It means that the upper limit of the active thread is determined by maximumPoolSize decision
/* 【 important 】 When the outer layer Number of active threads > corePoolSize
Added to the BlockingQueue Failure : Newly added 【core = false】 The task will be directly addWorker( Mission )
In other words, a new thread will be created in the above case “ Jump the queue ” perform
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
// New thread logic :
try {
w = new Worker(firstTask); // Added Runnable Encapsulated into Worker
// 【 a key 】 See the next code area Construction method Worker(Runnable firstTask)
final Thread t = w.thread;
workers.add(w); // Distinguish workers and workerQueue The difference between , worker It's a HashSet For the time being, we can only see that it is used for statistical indicators
int s = workers.size();
if (s > largestPoolSize) {
largestPoolSize = s; // Refresh workers Number , Real time synchronous monitoring indicators
workerAdded = true;
}
} ...
if (workerAdded) {
t.start(); // Start the package Runnable, So every time addWorker(Runnable) It will open a thread
workerStarted = true;
}
return workerStarted;
}
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // Worker To build it will create a new thread
}
.
The third level :t.start() The concrete execution logic of
【 answer 】 The second level of thinking :
- addWorker(Runnerable r, boolean core) What did you do specifically , What does it have to do with threads
addWorker No matter what core What is the value of , It’s all for the new one Runnable Start a thread ,
core The value of is only used to maintain two exceptions
wc >= (core ? corePoolSize : maximumPoolSize)
The upper layer adds the core thread , Then judge whether the number of active threads is greater than that of core threads
The upper layer adds non core threads , Then determine whether the number of active threads is greater than the maximum number of threads , Any exception is return false - workes What is the container
private final HashSet<Worker> workers = new HashSet<Worker>();
It has nothing to do with thread scheduling , For the time being, it appears in the online process pool index statistics
The goal of the third level :
- t.start() What is the internal implementation of
- getTask Have you contacted BlockingQueue : workeQueue
// Worker.run()
public void run() {
runWorker(this);
}
// ThreadPoolExecutor.runnWorker(Worker w), Leave the core logic :
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {
// 【 a key 】 Keep trying to get tasks , Get it and execute it
while (task != null || (task = getTask()) != null) {
try {
task.run(); // Give priority to the incoming Runnable, If getTask() Get to the task and execute
} finally {
w.completedTasks++;
}
}
completedAbruptly = false;
} finally {
// In case of exception or no task, the incoming Worker -> workers.remove(w)
processWorkerExit(w, completedAbruptly);
}
}
.
The fourth level :getTask() The concrete realization of
【 answer 】 The third level of thinking :
- t.start() The concrete realization of , because final Thread t = w.thread;
t.start() -> Worker.run() -> ThreadPoolExecutor.runWorker(this)
runWorker
The concrete logic is :- Add tasks -> Trigger new thread -> This thread takes precedence over the added tasks firstTask
- processed
firstTask
, Start usinggetTask()
Get other tasks ( It’s not like I’ll destroy it after I finish my task ) - Each task uses the above logic , So the more threads you add , The higher the number of tasks that can be processed concurrently
- getTask() Have you contacted BlockingQueue : workeQueue
yes , adopt BlockingQueue Decouple the task from the action of executing the task , Let all threads be able to preempt workeQueue The task in
The goal of the fourth level :
- getTask How to schedule blocking queues workeQueue The task in
// Keep only the core logic
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
Focus on BlockingQueue class
Medium .poll(long timeout, TimeUnit unit)
and take()
Method
/**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
*/
E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
Conclusion :
take()
Is to obtain BlockingQueue The elements in , If the queue is empty , Just waiting for therefore ,take It’s the core of blocking
poll(long timeout, TimeUnit unit)
It’s getting elements in unit time , If not, return NULL
How to schedule :
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
-
If 【 Number of active threads < corePoolSize】, All core threads are blocked to get tasks , Go back to the third layer and look at the code
The key to keeping the core thread alive :getTask() -> workQueue.take() Blocking here while (task != null || (task = getTask()) != null) { ... }
After the core thread created by thread pool has completed its own task , Through the bottom layer workQueue.take() Implementation does not destroy threads
-
If 【 Number of active threads > corePoolSize】, Start getting tasks without blocking , If not, the thread will be destroyed after execution
The thread pool will only keep… At most corePoolSize The number of core threads is not actively destroyed
It also explains dang 【maximumPoolSize > Number of active threads > corePoolSize】, To create a thread directly to perform tasks, you don’t need to worry about the thread not being destroyed
It can guarantee the timely completion of tasks , It doesn’t occupy the blocking queue , An art of concurrent programming .
If 【maximumPoolSize = Number of active threads , The blocking queue is not full 】 Add to blocking queue ,
【maximumPoolSize = Number of active threads , The blocking queue is full 】 First floorreject(command)
; It will work , This can be customized , The default is to throw exceptions -
Worth mentioning
timed
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
executor.allowsCoreThreadTimeOut(); Thread pool provides method configuration , The core thread can be destroyed after performing tasks
summary
-
Division of duties :
Worker(Runnable r,boolean core)
Mission , Encapsulate tasks as thread safe classesWorker.addWorker()
Create threadrunWorker()
Task execution , The core :while (task != null || (task =getTask()
) != null){}
–》runWorker()
Using a blocking loop to ensure that the core thread is not destroyed , And can let a created thread perform multiple tasks
–》task.run()
Every time I get a task , Perform alone run() Method , Save the cost of creating another threadBlockingQueue
hold Mission 、 Task execution decoupling , Make the created thread No longer a single mission .
–》poll()
More than the number of core threads If you can’t get the task, you can’t wait , Return torunWorker()
End of cycle , Let threads die
–》take()
Block acquisition task , If the blocking queue is empty, it will wait , Keep the core thread alive . -
Logic twist :
- 【 Active threads <= corePoolSize】 : A task Create a thread
Worker.addWorker(r, true)
, Do not destroy after the completion of the task , Used to compete for execution of other tasks - 【maximumPoolSize > Active threads > corePoolSize】 :
2.1workQueue
The queue is under , Join the blocking queueworkQueue
, Let the core thread compete for execution rights
2.2workQueue
The queue is full ,Worker.addWorker(r, false)
Create thread And implement r, And eliminated A core thread ( Keep the number of core threads the same ) - 【maximumPoolSize = Active threads 】 :
3.1workQueue
The queue is under , Join the blocking queueworkQueue
, Let the core thread compete for execution rights
3.2workQueue
The queue is full ,reject()
Use rejection strategy , The default is to throw exceptions
- 【 Active threads <= corePoolSize】 : A task Create a thread
-
Possible misunderstandings :
-
How can thread pools distinguish between core threads and non core threads
The answer is : Indistinguishes
The code is justaddWorker(r, true / false)
And the core / Non core relation , But there are two kinds of exception handling monitoring
Pass intrue
, The current number of thread pools > corePoolSize Just throw exceptions , Because the upper code logic is to increase the number of threads to <= corePoolSize
Pass infalse
, The current number of thread pools > maximumPoolSize Just throw exceptions , Because the upper code logic is to increase the number of threads to <= maximumPoolSize
The logic to really maintain the number of core threads is :boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
Once the number of active threads > corePoolSize. will Random You have to choose a thread to execute
poll(keepAliveTime, TimeUnit.NANOSECONDS)
Whether or not a task is taken , Will let the selected redundant threads die naturally , To maintain the number of core threads .
therefore , The core thread is not marked by the thread pool , Thread pools only maintain corePoolSize Number of threads competing for execution
-