• 周五. 12月 2nd, 2022

5G编程聚合网

5G时代下一个聚合的编程学习网

热门标签

Basic: asynchronous programming won’t? I’ll teach you! CompletableFuture(JDK1.8)

[db:作者]

1月 6, 2022

Preface

Previously, when you needed to execute a task asynchronously , General is to use Thread Or thread pool Executor To create . If you need to return a value , Call Executor.submit obtain Future. But multiple threads have dependency combinations , What can we do ? Synchronization components can be used CountDownLatch、CyclicBarrier etc. ; There’s a simple way , Just use CompeletableFuture

  • Thread task creation
  • Serial execution of thread tasks
  • Parallel execution of thread tasks
  • Handle task results and exceptions
  • Simple combination of multitasking
  • Cancel execution of thread task
  • The acquisition of task results and the judgment of whether the task is completed or not

Official account , Communicate together , Search on wechat : Sneak forward

github Address , thank star

1 Creating asynchronous thread tasks

according to supplier establish CompletableFuture Mission

// Use built-in threads ForkJoinPool.commonPool(), according to supplier Build execution tasks
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// Specify a custom thread , according to supplier Build execution tasks
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

according to runnable establish CompletableFuture Mission

// Use built-in threads ForkJoinPool.commonPool(), according to runnable Build execution tasks
public static CompletableFuture<Void> runAsync(Runnable runnable)
// Specify a custom thread , according to runnable Build execution tasks
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
  • Examples of use

    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> rFuture = CompletableFuture
    .runAsync(() -> System.out.println("hello siting"), executor);
    //supplyAsync Use
    CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
    System.out.print("hello ");
    return "siting";
    }, executor);
    // Block waiting ,runAsync Of future No return value , Output null
    System.out.println(rFuture.join());
    // Block waiting
    String name = future.join();
    System.out.println(name);
    executor.shutdown(); // Thread pool needs to be closed
    -------- Output results --------
    hello siting
    null
    hello siting

    Constant value as CompletableFuture return

    // Sometimes you need to build a constant CompletableFuture
    public static <U> CompletableFuture<U> completedFuture(U value)

2 Thread serial execution

Run when the task is finished action, Don’t care about the results of the last mission , No return value

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
//action Execute with the specified thread pool
public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)
  • Examples of use

    CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> "hello siting", executor)
    .thenRunAsync(() -> System.out.println("OK"), executor);
    executor.shutdown();
    -------- Output results --------
    OK

Run when the task is finished action, Depending on the results of the previous task , No return value

public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
//action Execute with the specified thread pool
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
  • Examples of use

    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> "hello siting", executor)
    .thenAcceptAsync(System.out::println, executor);
    executor.shutdown();
    -------- Output results --------
    hello siting

Run when the task is finished fn, Depending on the results of the previous task , There is a return value

public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
//fn Execute with the specified thread pool
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
  • Examples of use

    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "hello world", executor)
    .thenApplyAsync(data -> {
    System.out.println(data); return "OK";
    }, executor);
    System.out.println(future.join());
    executor.shutdown();
    -------- Output results --------
    hello world
    OK

    thenCompose – Run when the task is finished fn, Depending on the results of the previous task , There is a return value

  • similar thenApply( The difference is that thenCompose The return value of CompletionStage,thenApply It’s going back to U), Provide this method for and other CompletableFuture Tasks are better used in combination

    public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,
    Executor executor) 
  • Examples of use

    // The first asynchronous task , Constant task
    CompletableFuture<String> f = CompletableFuture.completedFuture("OK");
    // The second asynchronous task
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "hello world", executor)
    .thenComposeAsync(data -> {
    System.out.println(data); return f; // Use the first task as the return
    }, executor);
    System.out.println(future.join());
    executor.shutdown();
    -------- Output results --------
    hello world
    OK

3 Thread parallel execution

Two CompletableFuture Parallel execution finished , And then execute action, Independent of the results of the last two tasks , No return value

public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)
  • Examples of use

    // The first asynchronous task , Constant task
    CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> future = CompletableFuture
    // The second asynchronous task
    .supplyAsync(() -> "hello siting", executor)
    // () -> System.out.println("OK") It's the third mission
    .runAfterBothAsync(first, () -> System.out.println("OK"), executor);
    executor.shutdown();
    -------- Output results --------
    OK

Two CompletableFuture Parallel execution finished , And then execute action, Depending on the results of two tasks , No return value

// The caller task and other Execute after parallel completion action,action Rely on the results of consuming two tasks , No return value
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
// Two tasks are completed asynchronously ,fn Rely on the results of consuming two tasks , No return value , Use the default thread pool
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
// Two tasks are completed asynchronously ,fn( Execute with the specified thread pool ) Rely on the results of consuming two tasks , No return value
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor) 
  • Examples of use

    // The first asynchronous task , Constant task
    CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> future = CompletableFuture
    // The second asynchronous task
    .supplyAsync(() -> "hello siting", executor)
    // (w, s) -> System.out.println(s) It's the third mission
    .thenAcceptBothAsync(first, (s, w) -> System.out.println(s), executor);
    executor.shutdown();
    -------- Output results --------
    hello siting

Two CompletableFuture Parallel execution finished , And then execute fn, Depending on the results of two tasks , There is a return value

// The caller task and other After parallel completion , perform fn,fn Rely on the results of consuming two tasks , There is a return value
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
// Two tasks are completed asynchronously ,fn Rely on the results of consuming two tasks , There is a return value , Use the default thread pool
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
// Two tasks are completed asynchronously ,fn( Execute with the specified thread pool ) Rely on the results of consuming two tasks , There is a return value
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor) 
  • Examples of use

    // The first asynchronous task , Constant task
    CompletableFuture<String> first = CompletableFuture.completedFuture("hello world");
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<String> future = CompletableFuture
    // The second asynchronous task
    .supplyAsync(() -> "hello siting", executor)
    // (w, s) -> System.out.println(s) It's the third mission
    .thenCombineAsync(first, (s, w) -> {
    System.out.println(s);
    return "OK";
    }, executor);
    System.out.println(future.join());
    executor.shutdown();
    -------- Output results --------
    hello siting
    OK

4 Thread parallel execution , Whoever completes the task first triggers the next task ( The fastest of both )

The last mission or other Task to complete , function action, Don’t rely on the results of the previous task , No return value

public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action)
//action Execute with the specified thread pool
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action, Executor executor)
  • Examples of use

    // The first asynchronous task , Sleep 1 second , Make sure the latest execution is late
    CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
    try{ Thread.sleep(1000); }catch (Exception e){}
    System.out.println("hello world");
    return "hello world";
    });
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> future = CompletableFuture
    // The second asynchronous task
    .supplyAsync(() ->{
    System.out.println("hello siting");
    return "hello siting";
    } , executor)
    //() -> System.out.println("OK") It's the third mission
    .runAfterEitherAsync(first, () -> System.out.println("OK") , executor);
    executor.shutdown();
    -------- Output results --------
    hello siting
    OK

The last mission or other Task to complete , function action, Depending on the result of completing the task first , No return value

public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other,
Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,
Consumer<? super T> action, Executor executor)
//action Execute with the specified thread pool
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other,
Consumer<? super T> action, Executor executor) 
  • Examples of use

    // The first asynchronous task , Sleep 1 second , Make sure the latest execution is late
    CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
    try{ Thread.sleep(1000); }catch (Exception e){}
    return "hello world";
    });
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<Void> future = CompletableFuture
    // The second asynchronous task
    .supplyAsync(() -> "hello siting", executor)
    // data -> System.out.println(data) It's the third mission
    .acceptEitherAsync(first, data -> System.out.println(data) , executor);
    executor.shutdown();
    -------- Output results --------
    hello siting 

The last mission or other Task to complete , function fn, Depending on the result of completing the task first , There is a return value

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other,
Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,
Function<? super T, U> fn)
//fn Execute with the specified thread pool
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other,
Function<? super T, U> fn, Executor executor) 
  • Examples of use

    // The first asynchronous task , Sleep 1 second , Make sure the latest execution is late
    CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
    try{ Thread.sleep(1000); }catch (Exception e){}
    return "hello world";
    });
    ExecutorService executor = Executors.newSingleThreadExecutor();
    CompletableFuture<String> future = CompletableFuture
    // The second asynchronous task
    .supplyAsync(() -> "hello siting", executor)
    // data -> System.out.println(data) It's the third mission
    .applyToEitherAsync(first, data -> {
    System.out.println(data);
    return "OK";
    } , executor);
    System.out.println(future);
    executor.shutdown();
    -------- Output results --------
    hello siting
    OK

5 Handle task results or exceptions

exceptionally- Handling exceptions

public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
  • If there is an exception in the previous processing link , It triggers exceptionally The call of is equivalent to try…catch
  • Examples of use

    CompletableFuture<Integer> first = CompletableFuture
    .supplyAsync(() -> {
    if (true) {
    throw new RuntimeException("main error!");
    }
    return "hello world";
    })
    .thenApply(data -> 1)
    .exceptionally(e -> {
    e.printStackTrace(); // Exception capture processing , The first two processes can capture
    return 0;
    });

handle- When the task completes abnormally fn, The return value is fn Return

  • comparison exceptionally for , You can handle the exception of the previous link, and you can also handle its normal return value

    public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
    public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
    public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,
    Executor executor) 
  • Examples of use

    CompletableFuture<Integer> first = CompletableFuture
    .supplyAsync(() -> {
    if (true) { throw new RuntimeException("main error!"); }
    return "hello world";
    })
    .thenApply(data -> 1)
    .handleAsync((data,e) -> {
    e.printStackTrace(); // Exception capture processing
    return data;
    });
    System.out.println(first.join());
    -------- Output results --------
    java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!
    ... 5 more
    null

whenComplete- When the task completes abnormally action, There is a return value

  • whenComplete And handle The difference is that , It does not participate in the processing of returned results , Think of it as a monitor
  • Even if the exception is handled , stay CompletableFuture Outer layer , The exception will reappear again
  • Use whenCompleteAsync when , To return the result, we need to consider the problem of multithreading , After all, two threads will operate on a result at the same time

    public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
    public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,
    Executor executor) 
  • Examples of use

    CompletableFuture<AtomicBoolean> first = CompletableFuture
    .supplyAsync(() -> {
    if (true) { throw new RuntimeException("main error!"); }
    return "hello world";
    })
    .thenApply(data -> new AtomicBoolean(false))
    .whenCompleteAsync((data,e) -> {
    // Exception capture processing , But the anomaly will reappear in the outer layer
    System.out.println(e.getMessage());
    });
    first.join();
    -------- Output results --------
    java.lang.RuntimeException: main error!
    Exception in thread "main" java.util.concurrent.CompletionException: java.lang.RuntimeException: main error!
    ... 5 more

6 A simple combination of multiple tasks

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)


  • Examples of use

     CompletableFuture<Void> future = CompletableFuture
    .allOf(CompletableFuture.completedFuture("A"),
    CompletableFuture.completedFuture("B"));
    // All tasks need to be done
    future.join();
    CompletableFuture<Object> future2 = CompletableFuture
    .anyOf(CompletableFuture.completedFuture("C"),
    CompletableFuture.completedFuture("D"));
    // Just finish one of the tasks
    future2.join();

8 Cancel execution of thread task

// mayInterruptIfRunning No influence ; If the task is not completed , Then return an exception
public boolean cancel(boolean mayInterruptIfRunning)
// Whether the task is cancelled
public boolean isCancelled()
  • Examples of use

    CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (Exception e) { }
    return "hello world";
    })
    .thenApply(data -> 1);
    System.out.println(" Before the mission is cancelled :" + future.isCancelled());
    // If the task is not completed , Then return an exception , Need to use exceptionally,handle Deal with the results
    future.cancel(true);
    System.out.println(" After the mission is cancelled :" + future.isCancelled());
    future = future.exceptionally(e -> {
    e.printStackTrace();
    return 0;
    });
    System.out.println(future.join());
    -------- Output results --------
    Before the mission is cancelled :false
    After the mission is cancelled :true
    java.util.concurrent.CancellationException
    at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
    at Test.main(Test.java:25)
    0

9 Task acquisition and completion judgment

// Whether the task has been completed
public boolean isDone()
// Block waiting Get the return value
public T join()
// Block waiting Get the return value , The difference is that get Need to return checked exception
public T get()
// Wait for some time to block , And get the return value
public T get(long timeout, TimeUnit unit)
// If it is not completed, it will return to the specified value
public T getNow(T valueIfAbsent)
// Hang in the air , Use value As a result of task execution , End of the task . need future.get obtain
public boolean complete(T value)
// Hang in the air , It's an exception call , Return exception result , End of the task
public boolean completeExceptionally(Throwable ex)
// Judge whether the task ended abnormally
public boolean isCompletedExceptionally()
// Force the return value to value, Whether or not the previous task has been completed ; similar complete
public void obtrudeValue(T value)
// Force an exception to be thrown , Exception return , Whether or not the previous task has been completed ; similar completeExceptionally
public void obtrudeException(Throwable ex) 
  • Examples of use

    CompletableFuture<Integer> future = CompletableFuture
    .supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (Exception e) { }
    return "hello world";
    })
    .thenApply(data -> 1);
    System.out.println(" Before the task is finished :" + future.isDone());
    future.complete(10);
    System.out.println(" When the task is completed :" + future.join());
    -------- Output results --------
    Before the task is finished :false
    When the task is completed :10

Welcome refers to a mistake in the text

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注