iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >java中Pulsar InterruptedException 异常
  • 329
分享到

java中Pulsar InterruptedException 异常

java Pulsar InterruptedException异常java 异常 2023-02-23 11:02:55 329人浏览 独家记忆

Python 官方文档:入门教程 => 点击学习

摘要

目录背景前置排查Pulsar 源码排查定位问题总结背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lan

背景

今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。

和业务沟通后得知是在一个 grpc 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。

前置排查

拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常;

这样可以初步排除是 Pulsar 服务端的问题。

接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。

Pulsar 源码排查

既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。

首先第一步要得知具体使用的是 Pulsar-client 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 SpringBoot starter 所以第一步还得排查这个 starter 是否有影响。

通过查看源码基本排除了 starter 的嫌疑,里面只是简单的封装了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) 
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) 
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下来便只能是分析堆栈了,因为 Pulsar-client 的部分实现源码是没有直接打包到依赖中的,反编译的话许多代码行数对不上,所以需要将官方的源码拉到本地,切换到对于的分支进行查看。

这一步稍微有点麻烦,首先是代码库还挺大的,加上之前如果没有准备好 Pulsar 的开发环境的话估计会劝退一部分人;

但其实大部分问题都是网络造成的,只要配置一些 Maven 镜像多试几次总会编译成功。

我这里直接将分支切换到 branch-2.8

从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91

看起来是内部异步发送消息的时候抛了异常。

接着往下看到这里:

java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

看起来是这里没错,但是代码行数明显不对;因为 2.8 这个分支也是修复过几个版本,所以中间有修改导致代码行数与最新代码对不上也正常。

semaphore.get().acquire();

不过初步来看应该是这行代码抛出的线程终端异常,这里看起来只有他最有可能了。

为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。

我们点开java.util.concurrent.Semaphore#acquire()的源码,

    
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }    

通过源码会发现 acquire() 函数确实会响应中断,一旦检测到当前线程被中断后便会抛出 InterruptedException 异常。

定位问题

所以问题的原因基本确定了,就是在 Pulsar 的发送消息线程被中断了导致的,但为啥会被中断还需要继续排查。

我们知道线程中断是需要调用 Thread.currentThread().interrupt(); API的,首先猜测是否 Pulsar 客户端内部有个线程中断了这个发送线程。

于是我在 pulsar-client 这个模块中搜索了相关代码:

排除掉和 producer 不相关的地方,其余所有中断线程的代码都是在有了该异常之后继续传递而已;所以初步来看 pulsar-client 内部没有主动中断的操作。

既然 Pulsar 自己没有做,那就只可能是业务做的了?

于是我在业务代码中搜索了一下:

果然在业务代码中搜到了唯一一处中断的地方,而且通过调用关系得知这段代码是在消息发送前执行的,并且和 Pulsar 发送函数处于同一线程。

大概的伪代码如下:

        List.of(1, 2, 3).stream().map(e -> {
                    return CompletableFuture.supplyAsync(() -> {
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                        return e;
                    });
                }
        ).collect(Collectors.toList()).forEach(f -> {
            try {
                Integer integer = f.get();
                log.info("====" + integer);
                if (integer==3){
                    TimeUnit.SECONDS.sleep(10);
                    Thread.currentThread().interrupt();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
	   MessageId send = producer.newMessage().value(msg.getBytes()).send();

执行这段代码可以完全复现同样的堆栈。

幸好中断这里还打得有日志:

通过日志搜索发现异常的时间和这个中断的日志时间点完全重合,这样也就知道根本原因了。

因为业务线程和消息发送线程是同一个,在某些情况下会执行 Thread.currentThread().interrupt();,其实单纯执行这行函数并不会发生什么,只要没有去响应这个中断,也就是 Semaphore 源码中的判断了线程中断的标记:

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }

但恰好这里业务中断后自己并没有去判断这个标记,导致 Pulsar 内部去判断了,最终抛出了这个异常。

总结

所以归根结底还是这里的代码不合理导致的,首先是自己中断了线程但也没使用,从而导致有被其他基础库使用的可能,所以会造成了一些不可预知的后果。

再一个是不建议在业务代码中使用 Thread.currentThread().interrupt(); 这类代码,第一眼根本不知道是要干啥,也不易维护。

其实本质上线程中断也是线程间通信的一种手段,有这类需求完全可以换为内置的 BlockQueue 这类函数来实现。

以上就是java中Pulsar InterruptedException 异常的详细内容,更多关于 Pulsar InterruptedException异常的资料请关注编程网其它相关文章!

--结束END--

本文标题: java中Pulsar InterruptedException 异常

本文链接: https://www.lsjlt.com/news/197319.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • java中Pulsar InterruptedException 异常
    目录背景前置排查Pulsar 源码排查定位问题总结背景 今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lan...
    99+
    2023-02-23
    java Pulsar InterruptedException异常 java 异常
  • java中Pulsar InterruptedException异常怎么解决
    本篇内容主要讲解“java中Pulsar InterruptedException异常怎么解决”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“java中Pulsar Inter...
    99+
    2023-07-05
  • java高并发InterruptedException异常引发思考
    目录前言程序案例问题分析问题解决总结前言 InterruptedException异常可能没你想的那么简单! 当我们在调用Java对象的wait()方法或者线程的sleep()方法时...
    99+
    2024-04-02
  • JAVA中的异常
    目录 Throwable Error Exception 编译时异常 运行时异常 异常的处理 try-catch捕获并处理 finally throw throws 自定义异常类 在Java中,将程序执行过程中发生的不正常行为称为异常。...
    99+
    2023-09-14
    java 开发语言
  • Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据
    本篇文章为大家展示了Java中怎么利用pulsar-flink-connector读取pulsar catalog元数据,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。简介通过 pulsar-flin...
    99+
    2023-06-20
  • Java:详解Java中的异常
    目录Java异常常见异常throw和throws的区别final、finally、finalize的区别总结Java异常 Java中的异常:又称例外,是一个在程序执行期间发生的事件,...
    99+
    2024-04-02
  • 【Java】异常
    看似不起波澜的日复一日 会突然在某一天让人看到坚持的意义 目录 1.认识异常 1.1 异常的概念  1.2 常见的异常  2.异常的体系结构 3.异常的分类  3.1 编译时异常  3.2 运行时异常  4.异常的处理  4.1 事前防...
    99+
    2023-09-11
    java jvm 开发语言
  • Java中异常是什么
    这篇文章将为大家详细讲解有关Java中异常是什么,小编觉得挺实用的,因此分享给大家做个参考,希望大家阅读完这篇文章后可以有所收获。基础概念  (1)异常:Java程序在运行时期发生的不正常情况。     Java就按照面向对象的思想对不正常...
    99+
    2023-06-20
  • Java中有哪些常见的异常
    这篇文章主要介绍“Java中有哪些常见的异常”,在日常操作中,相信很多人在Java中有哪些常见的异常问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Java中有哪些常见的异常”的疑惑有所帮助!接下来,请跟着小编...
    99+
    2023-06-03
  • IllegalStateException(java异常)
    IllegalStateException是Java标准库中的一个异常类,表示在不合适或无效的情况下执行了某个方法或操作。 以下是一些可能会导致IllegalStateException异常的常见情况...
    99+
    2023-08-31
    java 开发语言 jvm
  • Java之异常
    作者简介: zoro-1,目前大二,正在学习Java,数据结构等 作者主页: zoro-1的主页 欢迎大家点赞 👍 收藏 ⭐ 加关注哦!💖💖 ...
    99+
    2023-09-21
    java 笔记 程序人生
  • Java中如何处理异常
    这篇文章主要为大家展示了“Java中如何处理异常”,内容简而易懂,条理清晰,希望能够帮助大家解决疑惑,下面让小编带领大家一起研究并学习一下“Java中如何处理异常”这篇文章吧。 在Finally中清理资源或者使用Try-With-Resou...
    99+
    2023-06-02
  • 解决Java中的IOException异常
    IOException是Java中的一个受检查异常(Checked Exception)。它是java.io包中定义的异常类之一,用于表示输入输出操作期间可能发生的错误或异常情况。 IOException继承自Exception类,并...
    99+
    2023-09-04
    java 开发语言
  • 老生常谈java数组中的常见异常
    数组的定义 1:单个变量能存储信息 2:用来存储具有相同数据类型的数据集合,可以使用共同的名字来引用数组中存储的数据。 特点 数组可以存储任何类型的数据,包括原始数据类型和引用数据类...
    99+
    2024-04-02
  • 驾驭异常:Java 异常处理中的大师级技巧
    1. 区分受检和非受检异常 受检异常:编译时必须处理或声明为抛出,通常表示编程错误。 非受检异常:运行时抛出,无法通过编译器检查,通常表示运行时错误。 2. 使用明确的异常类型 定义特定的异常类,而不是依赖于通用的 Exceptio...
    99+
    2024-04-02
  • Java中抛出异常和捕获异常有什么区别
    这篇文章主要介绍Java中抛出异常和捕获异常有什么区别,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!抛出异常:创建异常对象,封装异常信息然后通过throw将异常对象传递给调用者。不对异常进行处理只对异常进行抛出是非常...
    99+
    2023-06-15
  • 【Java】认识异常
    文章目录 一、异常的概念和体系结构1.异常的概念2.异常的体系结构3.异常的分类 二、异常的处理1.防御式异常2.异常的抛出3.异常的捕捉 三、异常的处理流程四、自定义异常类 一、异常的概念和体系结构 1.异常的概念 ...
    99+
    2023-12-22
    java 开发语言
  • Java中有哪些异常体系
    本篇文章为大家展示了Java中有哪些异常体系,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。Java的优点是什么1. 简单,只需理解基本的概念,就可以编写适合于各种情况的应用程序;2. 面向对象;3....
    99+
    2023-06-14
  • java中有哪些异常类型
    java中有哪些异常类型?很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。Java可以用来干什么Java主要应用于:1. web开发;2. Android开发;3....
    99+
    2023-06-14
  • Java中trycatch处理异常示例
     描述说明: public class TryCatchStu {   实例代码: public static void main(String[] args) { Sy...
    99+
    2024-04-02
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作