广告
返回顶部
首页 > 资讯 > 精选 >Java8通过CompletableFuture怎么实现异步回调
  • 200
分享到

Java8通过CompletableFuture怎么实现异步回调

2023-06-30 11:06:38 200人浏览 八月长安
摘要

本篇内容介绍了“Java8通过CompletableFuture怎么实现异步回调”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1 什么是Co

本篇内容介绍了“Java8通过CompletableFuture怎么实现异步回调”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

1 什么是CompletableFuture?

CompletableFuture是Java 8 中新增的一个类,它是对Future接口的扩展。从下方的类继承关系图中我们看到其不仅实现了Future接口,还有CompletionStage接口,当Future需要显示地完成时,可以使用CompletionStage接口去支持完成时触发的函数和操作,当2个以上线程同时尝试完成、异常完成、取消一个CompletableFuture时,只有一个能成功。

CompletableFuture主要作用就是简化我们异步编程的复杂性,支持函数式编程,可以通过回调的方式处理计算结果。

Java8通过CompletableFuture怎么实现异步回调

2 为什么会有CompletableFuture ?

在java5中,jdk为我们提供了Callable和Future,使我们可以很容易的完成异步任务结果的获取,但是通过Future的get获取异步任务结果会导致主线程的阻塞,这样在某些场景下是非常消耗CPU资源的,进而Java8为我们提供了CompletableFuture,使我们无需阻塞等待,而是通过回调的方式去处理结果,并且还支持流式处理、组合异步任务等操作。

如果不熟悉CallableFuture的,可以看小编之前更新的这篇文章Java从源码看异步任务计算FutureTask

3 CompletableFuture 简单使用

下面我们就CompletableFuture 的使用进行简单分类:

创建任务:

  • supplyAsync/runAsync

异步回调:

  • thenApply/thenAccept/thenRun

  • thenApplyAsync/thenAcceptAsync/thenRunAsync

  • exceptionally

  • handle/whenComplete

组合处理:

  • thenCombine / thenAcceptBoth / runAfterBoth

  • applyToEither / acceptEither / runAfterEither

  • thenCompose

  • allOf / anyOf

具体内容请参照以下案例:

    public static void main(String[] args) throws Exception {        // 1.带返回值的异步任务(不指定线程池,默认ForkJoinPool.commonPool(),单核ThreadPerTaskExecutor)        CompletableFuture<Integer> cf1 = CompletableFuture.supplyAsync(() -> {            return 1 + 1;        });        System.out.println("cf1 result: " + cf1.get());        // 2.无返回值的异步任务(不指定线程池,默认ForkJoinPool.commonPool(),单核ThreadPerTaskExecutor)        CompletableFuture cf2 = CompletableFuture.runAsync(() -> {            int a = 1 + 1;        });        System.out.println("cf2 result: " + cf2.get());        // 3.指定线程池的带返回值的异步任务,runAsync同理        CompletableFuture<Integer> cf3 = CompletableFuture.supplyAsync(() -> {            return 1 + 1;        }, Executors.newCachedThreadPool());        System.out.println("cf3 result: " + cf3.get());        // 4.回调,任务执行完成后执行的动作        CompletableFuture<Integer> cf4 = cf1.thenApply((result) -> {            System.out.println("cf4回调拿到cf1的结果 result : " + result);            return result + 1;        });        System.out.println("cf4 result: " + cf4.get());        // 5.异步回调(将回调任务提交到线程池),任务执行完成后执行的动作后异步执行        CompletableFuture<Integer> cf5 = cf1.thenApplyAsync((result) -> {            System.out.println("cf5回调拿到cf1的结果 result : " + result);            return result + 1;        });        System.out.println("cf5 result: " + cf5.get());        // 6.回调(同thenApply但无返回结果),任务执行完成后执行的动作        CompletableFuture cf6 = cf1.thenAccept((result) -> {            System.out.println("cf6回调拿到cf1的结果 result : " + result);        });        System.out.println("cf6 result: " + cf6.get());        // 7.回调(同thenAccept但无入参),任务执行完成后执行的动作        CompletableFuture cf7 = cf1.thenRun(() -> {        });        System.out.println("cf7 result: " + cf7.get());        // 8.异常回调,任务执行出现异常后执行的动作        CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {            throw new RuntimeException("出现异常");        });        CompletableFuture<Integer> cf8 = cf.exceptionally((result) -> {            return -1;        });        System.out.println("cf8 result: " + cf8.get());        // 9.当某个任务执行完成后执行的回调方法,会将执行结果或者执行期间抛出的异常传递给回调方法        //   如果是正常执行则异常为null,回调方法对应的CompletableFuture的result和该任务一致;        //   如果该任务正常执行,则get方法返回执行结果,如果是执行异常,则get方法抛出异常。        CompletableFuture<Integer> cf9 = cf1.handle((a, b) -> {            if (b != null) {                b.printStackTrace();            }            return a;        });        System.out.println("cf9 result: " + cf9.get());        // 10 与handle类似,无返回值        try {            CompletableFuture<Integer> cf10 = cf.whenComplete((a, b) -> {                if (b != null) {                    b.printStackTrace();                }            });            System.out.println("cf10 result: " + cf10.get());        } catch (Exception e) {            System.out.println("cf10 出现异常!!!");        }        // 11 组合处理(两个都完成,然后执行)有入参,有返回值        CompletableFuture<Integer> cf11 = cf1.thenCombine(cf3, (r1, r2) -> {            return r1 + r2;        });        System.out.println("cf11 result: " + cf11.get());        // 12 组合处理(两个都完成,然后执行)有入参,无返回值        CompletableFuture cf12 = cf1.thenAcceptBoth(cf3, (r1, r2) -> {        });        System.out.println("cf12 result: " + cf12.get());        // 13 组合处理(两个都完成,然后执行)无入参,无返回值        CompletableFuture cf13 = cf1.runAfterBoth(cf3, () -> {        });        System.out.println("cf13 result: " + cf13.get());        // 14 组合处理(有一个完成,然后执行)有入参,有返回值        CompletableFuture<Integer> cf14 = cf1.applyToEither(cf3, (r) -> {            return r;        });        System.out.println("cf14 result: " + cf14.get());        // 15 组合处理(有一个完成,然后执行)有入参,无返回值        CompletableFuture cf15 = cf1.acceptEither(cf3, (r) -> {        });        System.out.println("cf15 result: " + cf15.get());        // 16 组合处理(有一个完成,然后执行)无入参,无返回值        CompletableFuture cf16 = cf1.runAfterEither(cf3, () -> {        });        System.out.println("cf16 result: " + cf16.get());        // 17 方法执行后返回一个新的CompletableFuture        CompletableFuture<Integer> cf17 = cf1.thenCompose((r) -> {            return CompletableFuture.supplyAsync(() -> {                return 1 + 1;            });        });        System.out.println("cf17 result: " + cf17.get());        // 18 多个任务都执行成功才会继续执行        CompletableFuture.allOf(cf1,cf2,cf3).whenComplete((r, t) -> {            System.out.println(r);        });        // 18 多个任务任意一个执行成功就会继续执行        CompletableFuture.anyOf(cf1,cf2,cf3).whenComplete((r, t) -> {            System.out.println(r);        });    }

4 CompletableFuture 源码分析

首先我们可以从注释中看到,它对CompletionStageFuture接口扩展的一些描述,这些也是它的一些重点。

除了直接操作状态和结果的相关方法外,CompletableFuture还实现了CompletionStage接口的如下策略:

  • (1)为非异步方法的依赖完成提供的操作,可以由完成当前CompletableFuture的线程执行,也可以由完成方法的任何其他调用方执行。

  • (2)所有没有显式Executor参数的异步方法都使用ForkJoinPool.commonPool()执行(除非它不支持至少两个并行级别,在这种情况下,将创建一个新线程来运行每个任务)。为了简化监视、调试和跟踪,所有生成的异步任务都是CompletableFuture的实例,异步完成任务。

不了解ForkJoinPool的可以阅读小编之前更新的这篇文章一文带你了解Java中的ForkJoin。

  • (3)所有CompletionStage方法都是独立于其他公共方法实现的,因此一个方法的行为不会受到子类中其他方法重写的影响。

CompletableFuture实现了Future接口的如下策略:

  • 因为(与FutureTask不同)这个类对导致它完成的计算没有直接控制权,所以取消被视为另一种形式的异常完成,所以cancel操作被视为是另一种异常完成形式(new CancellationException()具有相同的效果。)。方法isCompletedExceptionally()可以用来确定一个CompletableFuture是否以任何异常的方式完成。

  • 如果异常完成时出现CompletionException,方法get()和get(long,TimeUnit)会抛出一个ExecutionException,其原因与相应CompletionException中的原因相同。为了简化在大多数上下文中的使用,该类还定义了join()和getNow()方法,在这些情况下直接抛出CompletionException。

4.1 创建异步任务

我们先看一下CompletableFuture是如何创建异步任务的,我们可以看到起创建异步任务的核心实现是两个入参,一个入参是Executor,另一个入参是Supplier(函数式编程接口)。其中也提供了一个入参的重载,一个入参的重载方法会获取默认的Executor,当系统是单核的会使用ThreadPerTaskExecutor,多核时使用ForkJoinPool.commonPool()

注意:这里默认ForkJoinPool.commonPool()线程池,如果所有异步任务都使用该线程池话,出现问题不容易定位,如果长时间占用该线程池可能影响其他业务的正常操作,stream的并行流也是使用的该线程池。

其中还封装了静态内部类AsyncSupply,该类代表这个异步任务,实现了Runnable,重写了run方法。

    private static final Executor asyncPool = useCommonPool ?        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();    private static final boolean useCommonPool =        (ForkJoinPool.getCommonPoolParallelism() > 1);public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {        return asyncSupplyStage(asyncPool, supplier);    }    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,                                                     Supplier<U> f) {        if (f == null) throw new NullPointerException();        CompletableFuture<U> d = new CompletableFuture<U>();        e.execute(new AsyncSupply<U>(d, f));        return d;    }    static final class AsyncSupply<T> extends ForkJoinTask<Void>            implements Runnable, AsynchronousCompletionTask {        CompletableFuture<T> dep; Supplier<T> fn;        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {            this.dep = dep; this.fn = fn;        }        public final Void getRawResult() { return null; }        public final void setRawResult(Void v) {}        public final boolean exec() { run(); return true; }        public void run() {            CompletableFuture<T> d; Supplier<T> f;            if ((d = dep) != null && (f = fn) != null) {                dep = null; fn = null;                if (d.result == null) {                    try {                        d.completeValue(f.get());                    } catch (Throwable ex) {                        d.completeThrowable(ex);                    }                }                d.postComplete();            }        }    }

Supplier类是一个函数式的接口,@FunctionalInterface注解就是函数式编程的标记。

package java.util.function;@FunctionalInterfacepublic interface Supplier<T> {    T get();}

4.2 异步任务回调

异步任务回调,我们以thenApply/thenApplyAsync为例来看一下其实现原理,方法名含有Async的会传入asyncPool。uniApplyStage方法通过判断e是否有值,来区分是从哪个方法进来的。thenApply不会传入 Executor,它优先让当前线程来执行后续 stage 的任务。

  • 当发现前一个 stage 已经执行完毕时,直接让当前线程来执行后续 stage 的 task。

  • 当发现前一个 stage 还没执行完毕时,则把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中。执行前一个 stage 的线程,执行完毕后,接着执行后续 stage 的 task。

thenApplyAsync会传入一个 Executor,它总是让 Executor 线程池里面的线程来执行后续 stage 的任务。

  • 把当前 stage 包装成一个 UniApply 对象,放到前一个 stage 的栈中,直接让 Executor 来执行。

    public <U> CompletableFuture<U> thenApply(        Function<? super T,? extends U> fn) {        return uniApplyStage(null, fn);    }    public <U> CompletableFuture<U> thenApplyAsync(        Function<? super T,? extends U> fn) {        return uniApplyStage(asyncPool, fn);    }    private <V> CompletableFuture<V> uniApplyStage(        Executor e, Function<? super T,? extends V> f) {        if (f == null) throw new NullPointerException();        CompletableFuture<V> d =  new CompletableFuture<V>();        // Async直接进入,不是Async执行uniApply尝试获取结果        if (e != null || !d.uniApply(this, f, null)) {            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);            push(c);            c.tryFire(SYNC);        }        return d;    }     final <S> boolean uniApply(CompletableFuture<S> a,                               Function<? super S,? extends T> f,                               UniApply<S,T> c) {        Object r; Throwable x;        // 判断当前CompletableFuture是否已完成,如果没完成则返回false;如果完成了则执行下面的逻辑。        if (a == null || (r = a.result) == null || f == null)            return false;        tryComplete: if (result == null) {            // 判断任务结果是否是AltResult类型            if (r instanceof AltResult) {                if ((x = ((AltResult)r).ex) != null) {                    completeThrowable(x, r);                    break tryComplete;                }                r = null;            }            try {                // 判断当前任务是否可以执行                if (c != null && !c.claim())                    return false;                // 获取任务结果                @SuppressWarnings("unchecked") S s = (S) r;                // 执行                completeValue(f.apply(s));            } catch (Throwable ex) {                completeThrowable(ex);            }        }        return true;    }    static final class UniApply<T,V> extends UniCompletion<T,V> {        Function<? super T,? extends V> fn;        UniApply(Executor executor, CompletableFuture<V> dep,                 CompletableFuture<T> src,                 Function<? super T,? extends V> fn) {            super(executor, dep, src); this.fn = fn;        }        final CompletableFuture<V> tryFire(int mode) {            CompletableFuture<V> d; CompletableFuture<T> a;            if ((d = dep) == null ||                !d.uniApply(a = src, fn, mode > 0 ? null : this))                return null;            dep = null; src = null; fn = null;            return d.postFire(a, mode);        }    }    final void push(UniCompletion<?,?> c) {        if (c != null) {            while (result == null && !tryPushStack(c))                lazySetNext(c, null); // clear on failure        }    }    final boolean completeValue(T t) {        return UNSAFE.compareAndSwapObject(this, RESULT, null,                                           (t == null) ? NIL : t);    }

4.3 异步任务组合

我们再thenCombine方法为例看一下CompletableFuture是如何处理组合任务的,我们可以看到thenCombine的源码与thenApply的源码基本上是一直的,只不过组合的时候不仅仅是判断一个,需要集合具体场景,判断多个CompletableFuture

    public <U,V> CompletableFuture<V> thenCombine(        CompletionStage<? extends U> other,        BiFunction<? super T,? super U,? extends V> fn) {        return biApplyStage(null, other, fn);    }    private <U,V> CompletableFuture<V> biApplyStage(        Executor e, CompletionStage<U> o,        BiFunction<? super T,? super U,? extends V> f) {        CompletableFuture<U> b;        if (f == null || (b = o.toCompletableFuture()) == null)            throw new NullPointerException();        CompletableFuture<V> d = new CompletableFuture<V>();        if (e != null || !d.biApply(this, b, f, null)) {            BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);            bipush(b, c);            c.tryFire(SYNC);        }        return d;    }    final <R,S> boolean biApply(CompletableFuture<R> a,                                CompletableFuture<S> b,                                BiFunction<? super R,? super S,? extends T> f,                                BiApply<R,S,T> c) {        Object r, s; Throwable x;        // 此处不止要判断a还得判断b        if (a == null || (r = a.result) == null ||            b == null || (s = b.result) == null || f == null)            return false;        tryComplete: if (result == null) {            if (r instanceof AltResult) {                if ((x = ((AltResult)r).ex) != null) {                    completeThrowable(x, r);                    break tryComplete;                }                r = null;            }            // 这里不止判断a的结果r还要判断b的结果s            if (s instanceof AltResult) {                if ((x = ((AltResult)s).ex) != null) {                    completeThrowable(x, s);                    break tryComplete;                }                s = null;            }            // 最后将rr, ss传入            try {                if (c != null && !c.claim())                    return false;                @SuppressWarnings("unchecked") R rr = (R) r;                @SuppressWarnings("unchecked") S ss = (S) s;                completeValue(f.apply(rr, ss));            } catch (Throwable ex) {                completeThrowable(ex);            }        }        return true;    }    static final class BiApply<T,U,V> extends BiCompletion<T,U,V> {        BiFunction<? super T,? super U,? extends V> fn;        BiApply(Executor executor, CompletableFuture<V> dep,                CompletableFuture<T> src, CompletableFuture<U> snd,                BiFunction<? super T,? super U,? extends V> fn) {            super(executor, dep, src, snd); this.fn = fn;        }        // tryFire方法也同样的多可个b        final CompletableFuture<V> tryFire(int mode) {            CompletableFuture<V> d;            CompletableFuture<T> a;            CompletableFuture<U> b;            if ((d = dep) == null ||                !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this))                return null;            dep = null; src = null; snd = null; fn = null;            return d.postFire(a, b, mode);        }    }

“Java8通过CompletableFuture怎么实现异步回调”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注编程网网站,小编将为大家输出更多高质量的实用文章!

--结束END--

本文标题: Java8通过CompletableFuture怎么实现异步回调

本文链接: https://www.lsjlt.com/news/328316.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • Java8通过CompletableFuture怎么实现异步回调
    本篇内容介绍了“Java8通过CompletableFuture怎么实现异步回调”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!1 什么是Co...
    99+
    2023-06-30
  • Java8通过CompletableFuture实现异步回调
    目录1 什么是CompletableFuture?2 为什么会有CompletableFuture ?3 CompletableFuture 简单使用4 CompletableFut...
    99+
    2022-11-13
  • Java8 CompletableFuture异步多线程怎么实现
    这篇文章主要介绍了Java8 CompletableFuture异步多线程怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Java8 CompletableFuture异步多线程怎么实...
    99+
    2023-07-05
  • 怎么实现JavaScript异步回调
    这篇文章主要讲解了“怎么实现JavaScript异步回调”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么实现JavaScript异步回调”吧!什么是异步回调?异步回调是指在执行一个任务时,...
    99+
    2023-07-06
  • ASP.NET异步回调怎么实现
    本篇内容主要讲解“ASP.NET异步回调怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“ASP.NET异步回调怎么实现”吧!ASP.NET异步回调实例:首先,在Render事件中添加好一个...
    99+
    2023-06-17
  • golang异步回调怎么实现
    在Go语言中,可以使用goroutine和channel来实现异步回调的功能。 首先,我们可以创建一个goroutine来执行异步的...
    99+
    2023-10-25
    golang
  • ASP.NET中怎么实现异步回调
    ASP.NET中怎么实现异步回调,相信很多没有经验的人对此束手无策,为此本文总结了问题出现的原因和解决方法,通过这篇文章希望你能解决这个问题。ASP.NET异步回调的重要性:在实际开发中,ASP.NET 客户端异步回调技术才是王道。下面我们...
    99+
    2023-06-17
  • golang异步回调数据怎么实现
    在Go语言中,可以使用协程(goroutine)和通道(channel)来实现异步回调数据的功能。 首先,定义一个函数,该函数用于执...
    99+
    2023-10-27
    golang
  • 详解springboot通过Async注解实现异步任务及回调的方法
    目录前言什么是异步调用?1. 环境准备2. 同步调用3. 异步调用4. 异步回调前言 什么是异步调用? 异步调用是相对于同步调用而言的,同步调用是指程序按预定顺序一步步执行,每一步必...
    99+
    2022-11-13
  • golang定时任务中异步回调怎么实现
    在golang中,可以使用goroutine和channel来实现异步回调。 下面是一个示例代码: package main im...
    99+
    2023-10-22
    golang
  • java异步调用怎么实现
    在Java中,可以通过多线程、回调接口、Future和CompletableFuture等方式来实现异步调用。1. 多线程:创建一个...
    99+
    2023-09-16
    java
  • java怎么实现异步调用
    在Java中,可以使用多种方法实现异步调用,下面列举了其中的两种常用方式: 使用线程池:可以使用Java中的`ExecutorSe...
    99+
    2023-10-25
    java
  • php异步通知怎么实现
    在PHP中实现异步通知可以通过以下两种方式: 使用curl发送异步请求:在接收到请求后,使用curl函数发送一个异步请求到指定的处...
    99+
    2023-10-27
    php
  • ajax中怎么通过异步加载实现局部刷新
    这篇文章给大家介绍ajax中怎么通过异步加载实现局部刷新,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。[html] view plain copy <sc...
    99+
    2022-10-19
  • C#中怎么实现异步调用
    C#中怎么实现异步调用,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。首先,C#异步调用出来的新线程,必须回收,不回收是浪费资源的可耻行为,.NET也是不允许的,所以你别想钻...
    99+
    2023-06-17
  • VB.NET中怎么实现异步调用
    这篇文章给大家介绍VB.NET中怎么实现异步调用,内容非常详细,感兴趣的小伙伴们可以参考借鉴,希望对大家能有所帮助。VB.NET异步调用代码示例:private delegate Sub (()sub ...
    99+
    2023-06-17
  • WCF中怎么实现异步调用
    这篇文章将为大家详细讲解有关WCF中怎么实现异步调用,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。随着时代的发展,异步调用在编程中是不可缺少的,这里就关于WCF异步调用简单的和大家分享一下吧...
    99+
    2023-06-17
  • C#中怎么实现同步调用和异步调用
    今天就跟大家聊聊有关C#中怎么实现同步调用和异步调用,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。C#委托的Invoke方法用来进行同步调用。同步调用也可以叫阻塞调用,它将阻塞当前线...
    99+
    2023-06-17
  • Vue中的同步调用和异步调用怎么实现
    这篇“Vue中的同步调用和异步调用怎么实现”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希望大家阅读完这篇文章能有所收获,下面我们一起来看看这篇“Vue中的同步调用和异步调用怎么实现...
    99+
    2023-06-28
  • Vue Axios的异步通信怎么实现
    这篇文章主要介绍了Vue Axios的异步通信怎么实现的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Vue Axios的异步通信怎么实现文章都会有所收获,下面我们一起来看看吧。1.什么是AxiosAxios是一...
    99+
    2023-06-29
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作