• 周六. 10 月 5th, 2024

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

【java_ Deep foundation] ThreadPoolExecutor.execute () source code analysis

King Wang

1 月 3, 2022

first floor : See the blocking queue for the first time BlockingQueue:workQueue、 Task loading method Worker.addWorker()

The first level of thinking :

  1. corePoolSize BlockingQueue How to directly participate in scheduling
  2. It’s happening Worker What is it?
  3. Why didn’t it show up maximumPoolSize
  4. workerCountOf What is the definition of the number of active threads obtained
 public void execute(Runnable command) {

// Thread pool metrics : Gets the thread pool state + Number of active threads ( Use binary bits to identify )
int c = ctl.get();
// Number of active threads < corePoolSize Call the task loading method Worker.addWorker(Runnable r, boolean core)
if (workerCountOf(c) < corePoolSize) {

if (addWorker(command, true)) // 【 notes 1】 Parameter is true, Mark the added task as the core task 
// Load successfully 【 Go straight back to 】
return;
// Refresh the thread pool indicator if the load fails 
c = ctl.get();
}
// Number of active threads > corePoolSize, Add task to blocking queue BlockingQueue success 
if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();
// After the task is added successfully, monitor the thread pool status again 
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// Number of active threads > corePoolSize And add blocking queues Failure , The reason may be that the blocking queue is full 
else if (!addWorker(command, false)) // Add non core tasks 
reject(command);
}

The second floor :Worker.addWorker() Really created threads

【 answer 】 The first level of thinking :

  1. corePoolSize BlockingQueue How to directly participate in scheduling
    When Number of active threads < corePoolSize, Add a new one 【core = true】 The task will be directly addWorker( Mission ), No participation BlockingQueue The logic of
    When Number of active threads > corePoolSize
    —> Added to the BlockingQueue success : No special treatment
    —> Added to the BlockingQueue Failure : Add a new one 【core = false】 The task will be directly addWorker( Mission )
  2. It’s happening Worker What is it?
    ThreadPoolExecutor.Worker ThreadPoolExecutor The inner class of
    private final class Worker extends AbstractQueuedSynchronizer implements Runnable
    Worker , It’s encapsulated Runnerable A thread safe class of
  3. Why didn’t it show up maximumPoolSize
    maximumPoolSize The logic of participation lies in addWorker(Runnable r, boolean core) The second parameter core in , See analysis below
    There are statements in the second layer of code :wc >= (core ? corePoolSize : maximumPoolSize))
  4. workerCountOf What is the definition of the number of active threads obtained : Thread executing task
    Number of core threads It’s not equal to Number of active threads , Because core threads can also block waiting tasks .
    Blocked threads do not belong to the number of active threads
    The principle of blocking will appear in the fourth layer .

The goal of the second level :

  1. addWorker(Runnerable r, boolean core) What did you do specifically , What does it have to do with threads
  2. workes What is the container
 // Reduce the source code to the following logic 
private boolean addWorker(Runnable firstTask, boolean core) {

retry: // The loop does not go down until it meets the adding conditions 
for (;;) {

int c = ctl.get();
int rs = runStateOf(c);
for (;;) {

int wc = workerCountOf(c);
// maximumPoolSize There is , The outer layer comes in core == true, Indicates that the maximum number of active threads is determined by corePoolSize decision 
// This Boolean maintains two exceptions : Whether to add core thread is abnormal , Whether to add to the maximum number of threads is abnormal 
// Pass in core == false , It means that the upper limit of the active thread is determined by maximumPoolSize decision 
/* 【 important 】 When the outer layer Number of active threads > corePoolSize
Added to the BlockingQueue Failure : Newly added 【core = false】 The task will be directly addWorker( Mission )
In other words, a new thread will be created in the above case “ Jump the queue ” perform
*/
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
// New thread logic : 
try {

w = new Worker(firstTask); // Added Runnable Encapsulated into Worker
// 【 a key 】 See the next code area Construction method Worker(Runnable firstTask)
final Thread t = w.thread;
workers.add(w); // Distinguish workers and workerQueue The difference between , worker It's a HashSet For the time being, we can only see that it is used for statistical indicators 
int s = workers.size();
if (s > largestPoolSize) {

largestPoolSize = s; // Refresh workers Number , Real time synchronous monitoring indicators 
workerAdded = true;
}
} ...
if (workerAdded) {

t.start(); // Start the package Runnable, So every time addWorker(Runnable) It will open a thread 
workerStarted = true;
}
return workerStarted;
}
Worker(Runnable firstTask) {

setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // Worker To build it will create a new thread 
}

.


The third level :t.start() The concrete execution logic of

【 answer 】 The second level of thinking :

  1. addWorker(Runnerable r, boolean core) What did you do specifically , What does it have to do with threads
    addWorker No matter what core What is the value of , It’s all for the new one Runnable Start a thread ,
    core The value of is only used to maintain two exceptions
    wc >= (core ? corePoolSize : maximumPoolSize)
    The upper layer adds the core thread , Then judge whether the number of active threads is greater than that of core threads
    The upper layer adds non core threads , Then determine whether the number of active threads is greater than the maximum number of threads , Any exception is return false
  2. workes What is the container
    private final HashSet<Worker> workers = new HashSet<Worker>();
    It has nothing to do with thread scheduling , For the time being, it appears in the online process pool index statistics
     Insert picture description here

The goal of the third level :

  1. t.start() What is the internal implementation of
  2. getTask Have you contacted BlockingQueue : workeQueue
 // Worker.run()
public void run() {

runWorker(this);
}
// ThreadPoolExecutor.runnWorker(Worker w), Leave the core logic :
final void runWorker(Worker w) {

Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;
try {

// 【 a key 】 Keep trying to get tasks , Get it and execute it 
while (task != null || (task = getTask()) != null) {

try {

task.run(); // Give priority to the incoming Runnable, If getTask() Get to the task and execute 
} finally {

w.completedTasks++;
}
}
completedAbruptly = false;
} finally {

// In case of exception or no task, the incoming Worker -> workers.remove(w)
processWorkerExit(w, completedAbruptly);
}
}

.


The fourth level :getTask() The concrete realization of

【 answer 】 The third level of thinking :

  1. t.start() The concrete realization of , because final Thread t = w.thread;
    t.start() -> Worker.run() -> ThreadPoolExecutor.runWorker(this)
    runWorker The concrete logic is :
    1. Add tasks -> Trigger new thread -> This thread takes precedence over the added tasks firstTask
    2. processed firstTask, Start using getTask() Get other tasks ( It’s not like I’ll destroy it after I finish my task
    3. Each task uses the above logic , So the more threads you add , The higher the number of tasks that can be processed concurrently
  2. getTask() Have you contacted BlockingQueue : workeQueue
    yes , adopt BlockingQueue Decouple the task from the action of executing the task , Let all threads be able to preempt workeQueue The task in

The goal of the fourth level :

  1. getTask How to schedule blocking queues workeQueue The task in
 // Keep only the core logic 
private Runnable getTask() {

boolean timedOut = false; // Did the last poll() time out?
for (;;) {

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize
try {

Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {

timedOut = false;
}
}
}

Focus on BlockingQueue class Medium .poll(long timeout, TimeUnit unit) and take() Method

 /**
* Retrieves and removes the head of this queue, waiting if necessary
* until an element becomes available.
*
*/
E take() throws InterruptedException;
/**
* Retrieves and removes the head of this queue, waiting up to the
* specified wait time if necessary for an element to become available.
*
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

Conclusion :
take() Is to obtain BlockingQueue The elements in , If the queue is empty , Just waiting for
therefore ,take It’s the core of blocking
poll(long timeout, TimeUnit unit) It’s getting elements in unit time , If not, return NULL

How to schedule :

 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
  • If 【 Number of active threads < corePoolSize】, All core threads are blocked to get tasks , Go back to the third layer and look at the code

     The key to keeping the core thread alive :getTask() -> workQueue.take() Blocking here
    while (task != null || (task = getTask()) != null) {
     ... }
    

    After the core thread created by thread pool has completed its own task , Through the bottom layer workQueue.take() Implementation does not destroy threads

  • If 【 Number of active threads > corePoolSize】, Start getting tasks without blocking , If not, the thread will be destroyed after execution
    The thread pool will only keep… At most corePoolSize The number of core threads is not actively destroyed
    It also explains dang 【maximumPoolSize > Number of active threads > corePoolSize】, To create a thread directly to perform tasks, you don’t need to worry about the thread not being destroyed
    It can guarantee the timely completion of tasks , It doesn’t occupy the blocking queue , An art of concurrent programming .
    If 【maximumPoolSize = Number of active threads , The blocking queue is not full 】 Add to blocking queue ,
    maximumPoolSize = Number of active threads , The blocking queue is full 】 First floor reject(command); It will work , This can be customized , The default is to throw exceptions

  • Worth mentioning timed
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    executor.allowsCoreThreadTimeOut(); Thread pool provides method configuration , The core thread can be destroyed after performing tasks


summary

  • Division of duties :

    Worker(Runnable r,boolean core) Mission , Encapsulate tasks as thread safe classes

    Worker.addWorker() Create thread

    runWorker() Task execution , The core :while (task != null || (task = getTask()) != null){}
    –》 runWorker() Using a blocking loop to ensure that the core thread is not destroyed , And can let a created thread perform multiple tasks
    –》task.run() Every time I get a task , Perform alone run() Method , Save the cost of creating another thread

    BlockingQueue hold Mission Task execution decoupling , Make the created thread No longer a single mission .
    –》poll() More than the number of core threads If you can’t get the task, you can’t wait , Return to runWorker() End of cycle , Let threads die
    –》take() Block acquisition task , If the blocking queue is empty, it will wait , Keep the core thread alive .

  • Logic twist :
    1. 【 Active threads <= corePoolSize】 : A task Create a thread Worker.addWorker(r, true), Do not destroy after the completion of the task , Used to compete for execution of other tasks
    2. 【maximumPoolSize > Active threads > corePoolSize】 :
      2.1 workQueue The queue is under , Join the blocking queue workQueue, Let the core thread compete for execution rights
      2.2 workQueue The queue is full ,Worker.addWorker(r, false) Create thread And implement r, And eliminated A core thread ( Keep the number of core threads the same )
    3. 【maximumPoolSize = Active threads 】 :
      3.1 workQueue The queue is under , Join the blocking queue workQueue, Let the core thread compete for execution rights
      3.2 workQueue The queue is full ,reject() Use rejection strategy , The default is to throw exceptions
  • Possible misunderstandings :
    1. How can thread pools distinguish between core threads and non core threads
      The answer is : Indistinguishes
      The code is just addWorker(r, true / false) And the core / Non core relation , But there are two kinds of exception handling monitoring
      Pass in true, The current number of thread pools > corePoolSize Just throw exceptions , Because the upper code logic is to increase the number of threads to <= corePoolSize
      Pass in false, The current number of thread pools > maximumPoolSize Just throw exceptions , Because the upper code logic is to increase the number of threads to <= maximumPoolSize
      The logic to really maintain the number of core threads is

      boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      Runnable r = timed ?
      workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      

      Once the number of active threads > corePoolSize. will Random You have to choose a thread to execute poll(keepAliveTime, TimeUnit.NANOSECONDS)
      Whether or not a task is taken , Will let the selected redundant threads die naturally , To maintain the number of core threads .
      therefore , The core thread is not marked by the thread pool , Thread pools only maintain corePoolSize Number of threads competing for execution

发表回复