• 周六. 10 月 5th, 2024

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

Java Concurrent Programming: the use and principle analysis of ThreadPoolExecutor, the core of java

King Wang

1 月 3, 2022

List of articles

    • Lead out thread pool
    • Executor frame
    • ThreadPoolExecutor Detailed explanation
      • Constructors
      • Important variables
      • Thread pools execute processes
      • Task queue workQueue
      • Task rejection policy
      • Thread pool closure
    • ThreadPoolExecutor Create a thread pool instance
    • Reference resources :

Lead out thread pool

Threads are the foundation of concurrent programming , In the previous post , Our instances are basically thread-based development as instances , And both are used to create a thread . This is the easy way , But there is a problem , That’s the number of threads .

So let’s say I have a complicated system , Many threads are required , If threads are created this way , Then it will consume system resources greatly . The first is that the creation and destruction of the thread itself takes time , If each small task created a thread , And that would significantly reduce the efficiency of the system . The second is that threads themselves also take up memory space , A large number of threads will preempt memory resources , Memory is likely to overflow if not handled properly , This is clearly not what we want to see .

So what’s the solution ? A good idea is to reuse threads , Because not all threads run together at the same time , Some threads may be idle at some point , If the idle threads can be used effectively , Then you can make full use of the thread running , This eliminates the need to create so many threads . We can put a certain number of threads in a container , When threads are needed , Take free threads out of the container to use , There is no rush to close the thread after it has finished working , Instead, go back to the thread pool and wait to be used . Such containers are commonly referred to as thread pools . Thread pools are an effective way to manage threads , A simple picture of the thread pool management process can be illustrated :
 Insert picture description here

Executor frame

Java There is also a framework for managing threads , That’s it Executor frame .Executor The frame is JDK1.5 It was introduced later , be located java.util.cocurrent It’s a bag , This framework can be used to control the start of threads 、 Execute and close , This simplifies the operation of concurrent programming , This is its core member class diagram :
 Insert picture description here
Executor: Top level interface , Defines a basic method execute, Accept one Runnable Parameters , Instead of the usual method of creating or starting threads .

ExecutorService: Inherited from Executor Interface , Provides methods for dealing with multithreading .

ScheduledExecutorService: Timing scheduling interface , Inherited from ExecutorService.

AbstractExecutorService: Execute the framework’s abstract classes .

ThreadPoolExecutor: One of the most core classes in the thread pool , Provides a basic method for thread pool operations .

Executors: Thread pool factory class , Can be used to create a series of thread pools with specific functionality .

ThreadPoolExecutor Detailed explanation

above Executor A basic member of a framework , One of the most core members is undoubtedly ThreadPoolExecutor, Want to know Java The operation mechanism of thread pool in , You have to know this class first , And the best way to understand is to look at the source code .

Constructors

open ThreadPoolExecutor Source code , Four constructors are provided in the discovery class

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

It can be seen that ,ThreadPoolExecutor There are still a lot of arguments in the constructor , And the core is the fourth constructor , Where the underlying initialization is done .

Let’s explain what constructor arguments mean :

  • corePoolSize: The basic size of the thread pool . When a task is submitted to the thread pool , The thread pool creates a thread to perform a task , Repeat this operation , Until the number of threads in the pool is reached corePoolSize No new threads are created after that , Instead, put the task in the cache queue .

  • maximumPoolSize: The maximum number of threads allowed to be created by the thread pool .

  • workQueue: Blocking queues , Used to store tasks waiting to be executed , And only calls can be stored execute Method commits a task . There are three types of queues that are commonly used ,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue.

  • keepAliveTime: Maximum idle time of a thread in a thread pool , This is typically caused by the number of threads being greater than the number of tasks .

  • unit:keepAliveTime Time unit of ,TimeUnit Is an enumerated type , be located java.util.concurrent It’s a bag .

  • threadFactory: Thread factory , Used to create threads .

  • handler: Refusal strategy , The processing strategy used when there are too many tasks to process .

Important variables

We’re done with the constructor , Let’s see ThreadPoolExecutor Several important member variables in a class :

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl: A field that controls the running state of a thread . meanwhile , According to the following methods runStateOf,workerCountOf,ctlOf It can be seen that , This field also contains two parts of information : The running state of the thread pool (runState) And the number of valid threads in the thread pool (workerCount), And it USES the Integar type , high 3 Bit save runState, low 29 Bit save workerCount.

COUNT_BITS: The value is 29 The constant , In the field CAPACITY Quoted calculation .

CAPACITY: Represents the number of valid threads (workerCount) Upper limit , The size is (1<<29) – 1.

below 5 These variables represent the running state of the thread , Namely :

  • RUNNING : Accept new assignments , And can handle tasks in blocking queues ;
  • SHUTDOWN: Not accepting new tasks , But it will execute the tasks in the queue .
  • STOP: No new assignments , Nor do the tasks in the queue , Simultaneously interrupts the thread that is working on the task .
  • TIDYING: If all missions have been terminated ,workerCount ( Number of valid threads ) by 0, When the thread pool enters this state, it is called terminated() Methods into the TERMINATED state .
  • TERMINATED:terminated( ) Method execution completed .

A state transition diagram might look something like this ( Image from https://www.cnblogs.com/liuzhihu/p/8177371.html):
 Insert picture description here

The constructor and the basic parameters are understood , Next up is a look at the important methods in the class .

Thread pools execute processes

execute Method

ThreadPoolExecutor The core scheduling method of the class is execute(), This method is called to submit a task to the thread pool , It is left to the thread pool to execute . and ThreadPoolExecutor The working logic can also be sorted out step by step through this method . This is the source of the method :

public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// obtain ctl Value , I said before. , This value records runState and workerCount
int c = ctl.get();
/*
* call workerCountOf Gets the number of threads currently active ;
* The number of active threads is less than corePoolSize, Create a new thread and place it in the thread pool ;
* addWorker(): Adds a task to the thread .
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// If the above add thread operation fails , Recapture ctl value
c = ctl.get();
}
// If the current thread pool is running , And add the task to the work queue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
/*
* If the current thread is not running , Remove the task from the queue
* call reject( Internal calls handler) Refuse to accept the task
*/
if (! isRunning(recheck) && remove(command))
reject(command);
// Gets the number of valid threads in the thread pool , If 0, execute addWorker Create a new thread
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
/*
* So if I go here , There are two situations :
* 1. Thread pools are no longer RUNNING state ;
* 2. The thread pool is RUNNING state , but workerCount >= corePoolSize also workQueue Is full .
* At this time , Call again addWorker Method , But the second parameter is passed in as false, Sets the upper limit of the limited number of threads in the thread pool to maximumPoolSize;
* Reject the task if it fails
*/
else if (!addWorker(command, false))
reject(command);
}

Briefly summarize the logic of the code , Something like that :

1、 Determines if the number of threads currently running is less than corePoolSize, If yes, call addWorker Create threads to execute tasks .

2、 dissatisfaction 1 Conditions , Just put the task in the work queue workQueue in .

3、 If the mission succeeds workQueue, Determines whether the thread pool is running , If not, move the task off the work queue first , And call reject Method , Reject the task using a reject policy . Thread if not running , call addWorker Create a new thread .

4、 If you put in workQueue Failure ( The queue is full ), Call addWorker Create threads to execute tasks , If the creation thread fails at this point (addWorker The second parameter passed in is false, Indicates that the number of current threads is not less than maximumPoolSize), Will call reject( Internal calls handler) Refuse to accept the task .

The entire execution process is represented in a picture as follows :
 Insert picture description here
That’s all execute The general logic of the method , So let’s see addWorker Method implementation of .

addWorker Method

Source code is as follows :

private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/** The thread pool state is not SHUTDOWN when
* Determines if the queue or task is empty , If yes, go back to false
*/.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
/* You can see here core Parameter determines the size of the comparison object for the number of active threads
* core by true To express with corePoolSize Size comparison
* core by false To express with maximumPoolSize Size comparison
* The number of currently active threads is greater than the comparison object false
*/
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// Try to add workerCount, If it works , I jump out of the first one for loop
if (compareAndIncrementWorkerCount(c))
break retry;
// If you add workerCount Failure , Then we can get back ctl Value
c = ctl.get(); // Re-read ctl
// If the current running state is not equal to rs, Indicates that the state has been changed , Return to the first for Loop continuation
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// Create a worker object w
w = new Worker(firstTask);
// Instantiation w The thread of t
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// workers It's a HashSet, Save the task worker object
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// Start thread
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

As you can see from the code ,addWorker The main job of the method is to create a new thread in the thread pool and execute it , among firstTask Parameter specifies the first task that the new thread needs to perform ,core The comparison object whose parameter depends on the number of active threads is corePoolSize still maximumPoolSize. The state of the thread pool and queue is first determined based on the parameters passed in , I’m just going to create a new one Worker object , And instantiate the object’s thread , Finally, start the thread .

Worker class

according to addWorker The logic in the source code , We can find out , Each thread in the thread pool is actually corresponding Worker Object is being maintained , So we have to be right Worker To find out the truth , First look at the source of the class :

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

from Worker The constructor of the class shows that , When instantiating a Worker Object time ,Worker The object will pass it in Runnable Parameters firstTask Assign a value to its own property of the same name , And use the thread factory which is the current one ThreadFactory To create a new thread .

meanwhile , because Worker Realized Runnable Interface , So when Worker When a thread in a class starts , The call is actually run() Method .run Method is called runWorker Method , So let’s look at the implementation of that :

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// Get the first task
Runnable task = w.firstTask;
w.firstTask = null;
// Allow the interrupt
w.unlock(); // allow interrupts
// Flag to exit the loop because of an exception ,processWorkerExit The method makes a judgment on this parameter
boolean completedAbruptly = true;
try {
// Judge task Is it null, Yes, through getTask() Get the task from the queue
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
/* Here is the main logic of the judgment :
* If the thread pool is stopping , Then make sure that the current thread is interrupted ;
* If not , Make sure it's not an interrupt state
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// Used to record what needs to be done before a task is executed , Belong to ThreadPoolExecutor Methods in class , // It's empty. , You need a subclass to implement that
beforeExecute(wt, task);
Throwable thrown = null;
try {
// Perform tasks
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

To sum up runWorker The running logic of the method :

1、 adopt while The cycle goes on and on getTask() Method to get a task from a queue ;

2、 If the thread pool is in a stop state , Ensure that the current thread is in an interrupted state , Otherwise ensure that the current thread is not interrupted ;

3、 call task Of run() Methods to perform tasks , Need to be set as after execution null;

4、 Cycle call getTask() I can’t get the mission , Out of the loop , perform processWorkerExit() Method .

Over runWorker() Operation flow , Let’s see getTask() How did it happen .

getTask Method

getTask() The getqueue () method fetches a task from a queue , The following is the source of the method :

private Runnable getTask() {
// Record whether a task from the queue last timed out
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// take workerCount reduce 1
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
/* timed Variables are used to determine whether a thread's operation requires a timeout
* allowCoreThreadTimeOut Whatever it is , The default is false
* wc > corePoolSize, The current thread is if greater than the number of core threads corePoolSize
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/* according to timed Variable judgment , If true, call workQueue Of poll Method fetch task ,
* If in keepAliveTime No task was obtained in time , Then return to null;
* timed by false Words , Just call workQueue Of take Method block queue ,
* Until a task is available in the queue .
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//r by null, explain time by true, It's overtime , hold timedOut It's also set to true
timedOut = true;
} catch (InterruptedException retry) {
// Something goes wrong , hold timedOut It's also set to false, Re run the cycle
timedOut = false;
}
}
}

getTask The code looks simple , But there is a universe inside , Let’s focus on two if The logic of judgment :

1、 When entering getTask After the method , Determine the current thread pool state first , If thread pool state rs >= SHUTDOWN, Make the following judgments :

1)rs Is the state greater than STOP;2) Whether the queue is empty ;

Satisfy one of the above conditions , will workerCount reduce 1 And back to null, That means there are no more tasks in the queue . Because the state of the thread pool is zero SHUTDOWN When above , New tasks are no longer allowed to be added to the queue , So if one of the above two conditions holds, that means that all the tasks in the queue are taken .

2、 Into the second if Judge , The logic here is a little convoluted , But it’s important , Is to control the number of valid threads in the thread pool , Let’s parse the code :

wc > maximumPoolSize: Determines whether the current number of threads is greater than maximumPoolSize, This rarely happens , Unless it is maximumPoolSize Is set at the same time as the program executes , For example, call ThreadPoolExecutor Medium setMaximumPoolSize Method .

timed && timedOut: If true, Indicates that the current operation requires a timeout judgment , And the last time the task from the queue was timed out .

wc > 1 || workQueue.isEmpty(): If the worker thread is larger than 1, Or the blocking queue is empty .

compareAndDecrementWorkerCount: Compare and pool threads in workerCount reduce 1

In the above , We analyze execute The logic of the method , If the number of threads in the current thread pool exceeds corePoolSize And less than maximumPoolSize, also workQueue When it’s full , You can still add worker threads .

But the call getTask() In the process of getting the task , If the timeout does not get the task , That is to say timedOut by true The situation of , explain workQueue It’s already empty , This means that the current thread pool does not need so many threads to execute tasks , You can put more than corePoolSize Number of threads destroyed , That is, constantly getting tasks taken out , Keep the number of threads constant corePoolSize that will do , until getTask Method returns null.

And when getTask Method returns null after ,runWorker The method is executed because the task cannot be retrieved processWorkerExit() Method .

processWorkerExit Method

processWorkerExit The function of method is mainly right worker Object removal , The following is the source of the method :

private void processWorkerExit(Worker w, boolean completedAbruptly) {
// Is the exception exit , The execution procedure will workerCount Quantity reduction 1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
// from workers Remove from the set of worker object , This means that a worker thread is removed from the thread pool
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate();
int c = ctl.get();
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

thus , from executor The entire run of the method is complete , Summarize the process :

perform executor –> newly build Worker object , And instantiate the thread –> call runWorker Method , adopt getTask() Access to task , And implement run Method –> getTask() Method to continuously fetch tasks from the queue , And will workerCount Quantity reduction 1, Until we return to null –> call processWorkerExit eliminate worker object .

Use a flowchart to represent the following ( Image from https://www.cnblogs.com/liuzhihu/p/8177371.html):
 Insert picture description here

Task queue workQueue

We’ve mentioned this many times before workQueue, This is a task queue , Used to store tasks waiting to be performed , It is BlockingQueue Object of type , And in the ThreadPoolExecutor In the source code annotation of , Three commonly used ones are introduced in detail Queue type , Namely :

  • SynchronousQueue: Queue for direct submission . This queue has no capacity , When the task is received , It will be submitted directly to the thread for processing , And not keep it . If there are no idle threads , Just create a new thread to handle the task ! If the maximum number of threads is reached , The rejection strategy is executed . therefore , When using this type of queue , In general, they will maximumPoolSize It is generally designated as Integer.MAX_VALUE, Avoid being rejected easily .

  • ArrayBlockingQueue: Bounded task queues . A parameter needs to be given to limit the length of the queue , When the task is received , If not corePoolSize Value , New thread ( Core thread ) Perform tasks , If you reach , Then put the task into the wait queue . If the queue is full , Is less than the total number of threads maximumPoolSize Under the premise of the new thread to execute the task , If more than maximumPoolSize, A reject policy is executed .

  • LinkedBlockingQueue: Unbounded task queues . The queue has no limit on the number of tasks , So missions can stay in the team , Know to deplete system resources . When receiving a task , If the current number of threads is less than corePoolSize, Creates a new thread to handle the task ; If the current number of threads is equal to corePoolSize, Enter the queue and wait .

Task rejection policy

When the task queue of the thread pool is full and the number of threads has reached maximumPoolSize when , A rejection strategy is usually used for new tasks , There are usually four strategies :

  1. AbortPolicy: Throw an exception directly , This is the default strategy ;
  2. CallerRunsPolicy: Use the thread of the caller to execute the task ;
  3. DiscardOldestPolicy: Discard the top task in the blocking queue , And carry out the current task ;
  4. DiscardPolicy: Discard tasks directly ;

Thread pool closure

ThreadPoolExecutor Two methods are provided , Used for thread pool closure , Namely shutdown() and shutdownNow():

public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

The code logic is not parsed , To summarize the characteristics of the two methods :

  • shutdown(): The thread pool is not immediately terminated , Instead, it will not terminate until all the tasks in the task cache queue have been executed , But never take on a new assignment
  • shutdownNow(): Terminate the thread pool immediately , And try to interrupt the task being performed , And clear the task cache queue , Returns a task that has not yet been performed

ThreadPoolExecutor Create a thread pool instance

ThreadPoolExecutor That’s all for the operating mechanism , So let’s show you how it works ThreadPoolExecutor Create a thread pool instance , The specific code is as follows :

public static void main(String[] args) {
ExecutorService service = new ThreadPoolExecutor(5, 10, 300, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
// use lambda The expression writes the logic in the body of the method
Runnable run = () -> {
try {
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + " Being implemented ");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
for (int i = 0; i < 10; i++) {
service.execute(run);
}
// Make sure you close it here
service.shutdown();
}

In the above code , We created ThreadPoolExecutor The number of core threads in the thread pool is 5 individual , therefore , When the thread pool is invoked to perform a task , The maximum number of threads running at the same time 5 individual , perform main Method , The output is as follows :

pool-1-thread-3 Being implemented
pool-1-thread-1 Being implemented
pool-1-thread-4 Being implemented
pool-1-thread-5 Being implemented
pool-1-thread-3 Being implemented
pool-1-thread-2 Being implemented
pool-1-thread-1 Being implemented
pool-1-thread-4 Being implemented
pool-1-thread-5 Being implemented

See it out , Thread pools are really only 5 Three threads are working , That is, the real implementation of thread reuse , Explain our ThreadPoolExecutor The instance is valid .

Reference resources :

https://www.cnblogs.com/liuzhihu/p/8177371.html

https://www.cnblogs.com/dolphin0520/p/3932921.html

《 actual combat Java: High concurrency programming 》

发表回复