iis服务器助手广告广告
返回顶部
首页 > 资讯 > 精选 >如何使用spring boot整合kafka和延迟启动消费者
  • 363
分享到

如何使用spring boot整合kafka和延迟启动消费者

2023-06-20 20:06:39 363人浏览 安东尼
摘要

这篇文章给大家分享的是有关如何使用Spring Boot整合kafka和延迟启动消费者的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。spring boot 整合kafka,延迟启动消费者spring boot整合

这篇文章给大家分享的是有关如何使用Spring Boot整合kafka和延迟启动消费者的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。

spring boot 整合kafka,延迟启动消费者

spring boot整合kafka的时候一般使用@KafkaListener来设置消费者,但是这种方式在spring启动的时候就会立即开启消费者。如果有需要根据配置信息延迟开启指定的消费者就不能使用这种方式。

为了方便使用,我自定义了一个注解:

import org.springframework.kafka.annotation.TopicPartition; import java.lang.annotation.ElementType;import java.lang.annotation.Retention;import java.lang.annotation.RetentionPolicy;import java.lang.annotation.Target; @Target({ ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)public @interface DelayKafkaConsumer {     String id() default "";     String[] topics() default {};     String errorHandler() default "";     String groupId() default "";     TopicPartition[] topicPartitions() default {};     String beanRef() default "__listener";}

配合注解使用的factory:

import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.aop.framework.Advised;import org.springframework.aop.support.AopUtils;import org.springframework.beans.BeansException;import org.springframework.beans.factory.BeanFactory;import org.springframework.beans.factory.BeanFactoryAware;import org.springframework.beans.factory.ListableBeanFactory;import org.springframework.beans.factory.ObjectFactory;import org.springframework.beans.factory.config.*;import org.springframework.context.expression.StandardBeanExpressionResolver;import org.springframework.core.MethodIntrospector;import org.springframework.core.annotation.AnnotationUtils;import org.springframework.core.convert.converter.Converter;import org.springframework.core.convert.converter.GenericConverter;import org.springframework.fORMat.Formatter;import org.springframework.format.FormatterReGIStry;import org.springframework.format.support.DefaultFormattinGConversionService;import org.springframework.kafka.annotation.KafkaListenerConfigurer;import org.springframework.kafka.annotation.PartitionOffset;import org.springframework.kafka.annotation.TopicPartition;import org.springframework.kafka.config.KafkaListenerEndpoint;import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;import org.springframework.kafka.config.MethodKafkaListenerEndpoint;import org.springframework.kafka.listener.KafkaListenerErrorHandler;import org.springframework.kafka.support.KafkaNull;import org.springframework.kafka.support.TopicPartitionInitialOffset;import org.springframework.messaging.converter.GenericMessageConverter;import org.springframework.messaging.handler.annotation.support.*;import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;import org.springframework.stereotype.Service;import org.springframework.util.Assert;import org.springframework.util.ReflectionUtils;import org.springframework.util.StringUtils; import java.lang.reflect.Method;import java.util.*;import java.util.concurrent.atomic.AtomicInteger; @Servicepublic class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware {     private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class);     private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar;     private final AtomicInteger counter = new AtomicInteger();    private BeanFactory beanFactory;     private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();     private BeanExpressionContext expressionContext;     private final ListenerScope listenerScope = new ListenerScope();     private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =            new KafkaHandlerMethodFactoryAdapter();     @Override    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {        this.kafkaListenerEndpointRegistrar = registrar;        addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService);    }     public void startConsumer(KafkaListenerEndpoint endpoint){        kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);    }     public void startConsumer(Object target){        logger.info("start consumer {} ...",target.getClass());        Class<?> targetClass = AopUtils.getTargetClass(target);        Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,                new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() {                     @Override                    public Set<DelayKafkaConsumer> inspect(Method method) {                        Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);                        return (!listenerMethods.isEmpty() ? listenerMethods : null);                    }                 });        if (annotatedMethods.size()==0)            throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer");        for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) {            Method method = entry.geTKEy();            logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass());            for (DelayKafkaConsumer listener : entry.getValue()) {                if(listener.topics().length==0) {                    logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass());                    continue;                }                processKafkaListener(listener,method,target);                logger.info("register method {} success , target object : {}",method.getName(),target.getClass());            }        }        logger.info("{} consumer start complete .",target.getClass());    }     protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {        Method methodToUse = checkProxy(method, bean);        MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();        endpoint.setMethod(methodToUse);        endpoint.setBeanFactory(this.beanFactory);        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");        if (StringUtils.hasText(errorHandlerBeanName)) {            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));        }        processListener(endpoint, kafkaListener, bean, methodToUse);    }     protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean,                                   Object adminTarget) {        String beanRef = kafkaListener.beanRef();        if (StringUtils.hasText(beanRef)) {            this.listenerScope.addListener(beanRef, bean);        }        endpoint.setBean(bean);        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);        endpoint.setId(getEndpointId(kafkaListener));        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));        endpoint.setTopics(resolveTopics(kafkaListener));        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));        kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);        if (StringUtils.hasText(beanRef)) {            this.listenerScope.removeListener(beanRef);        }    }     private String getEndpointId(DelayKafkaConsumer kafkaListener) {        if (StringUtils.hasText(kafkaListener.id())) {            return resolve(kafkaListener.id());        }        else {            return "Custom-Consumer" + this.counter.getAndIncrement();        }    }     private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {        String groupId = null;        if (StringUtils.hasText(kafkaListener.groupId())) {            groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");        }        if (groupId == null && StringUtils.hasText(kafkaListener.id())) {            groupId = id;        }        return groupId;    }     private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {        String[] topics = kafkaListener.topics();        List<String> result = new ArrayList<>();        if (topics.length > 0) {            for (int i = 0; i < topics.length; i++) {                Object topic = resolveExpression(topics[i]);                resolveAsString(topic, result);            }        }        return result.toArray(new String[result.size()]);    }     private void resolveAsString(Object resolvedValue, List<String> result) {        if (resolvedValue instanceof String[]) {            for (Object object : (String[]) resolvedValue) {                resolveAsString(object, result);            }        }        else if (resolvedValue instanceof String) {            result.add((String) resolvedValue);        }        else if (resolvedValue instanceof Iterable) {            for (Object object : (Iterable<Object>) resolvedValue) {                resolveAsString(object, result);            }        }        else {            throw new IllegalArgumentException(String.format(                    "@DelayKafkaConsumer can't resolve '%s' as a String", resolvedValue));        }    }     private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {        TopicPartition[] topicPartitions = kafkaListener.topicPartitions();        List<TopicPartitionInitialOffset> result = new ArrayList<>();        if (topicPartitions.length > 0) {            for (TopicPartition topicPartition : topicPartitions) {                result.addAll(resolveTopicPartitionsList(topicPartition));            }        }        return result.toArray(new TopicPartitionInitialOffset[result.size()]);    }     private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {        Object topic = resolveExpression(topicPartition.topic());        Assert.state(topic instanceof String,                "topic in @TopicPartition must resolve to a String, not " + topic.getClass());        Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");        String[] partitions = topicPartition.partitions();        PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();        Assert.state(partitions.length > 0 || partitionOffsets.length > 0,                "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");        List<TopicPartitionInitialOffset> result = new ArrayList<>();        for (int i = 0; i < partitions.length; i++) {            resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);        }         for (PartitionOffset partitionOffset : partitionOffsets) {            Object partitionValue = resolveExpression(partitionOffset.partition());            Integer partition;            if (partitionValue instanceof String) {                Assert.state(StringUtils.hasText((String) partitionValue),                        "partition in @PartitionOffset for topic '" + topic + "' cannot be empty");                partition = Integer.valueOf((String) partitionValue);            }            else if (partitionValue instanceof Integer) {                partition = (Integer) partitionValue;            }            else {                throw new IllegalArgumentException(String.format(                        "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",                        topic, partitionOffset.partition(), partitionValue.getClass()));            }             Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());            Long initialOffset;            if (initialOffsetValue instanceof String) {                Assert.state(StringUtils.hasText((String) initialOffsetValue),                        "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");                initialOffset = Long.valueOf((String) initialOffsetValue);            }            else if (initialOffsetValue instanceof Long) {                initialOffset = (Long) initialOffsetValue;            }            else {                throw new IllegalArgumentException(String.format(                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",                        topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));            }             Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());            Boolean relativeToCurrent;            if (relativeToCurrentValue instanceof String) {                relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);            }            else if (relativeToCurrentValue instanceof Boolean) {                relativeToCurrent = (Boolean) relativeToCurrentValue;            }            else {                throw new IllegalArgumentException(String.format(                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",                        topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));            }             TopicPartitionInitialOffset topicPartitionOffset =                    new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent);            if (!result.contains(topicPartitionOffset)) {                result.add(topicPartitionOffset);            }            else {                throw new IllegalArgumentException(                        String.format("@TopicPartition can't have the same partition configuration twice: [%s]",                                topicPartitionOffset));            }        }        return result;    }     private void resolvePartitionAsInteger(String topic, Object resolvedValue,                                           List<TopicPartitionInitialOffset> result) {        if (resolvedValue instanceof String[]) {            for (Object object : (String[]) resolvedValue) {                resolvePartitionAsInteger(topic, object, result);            }        }        else if (resolvedValue instanceof String) {            Assert.state(StringUtils.hasText((String) resolvedValue),                    "partition in @TopicPartition for topic '" + topic + "' cannot be empty");            result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));        }        else if (resolvedValue instanceof Integer[]) {            for (Integer partition : (Integer[]) resolvedValue) {                result.add(new TopicPartitionInitialOffset(topic, partition));            }        }        else if (resolvedValue instanceof Integer) {            result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));        }        else if (resolvedValue instanceof Iterable) {            for (Object object : (Iterable<Object>) resolvedValue) {                resolvePartitionAsInteger(topic, object, result);            }        }        else {            throw new IllegalArgumentException(String.format(                    "@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));        }    }     private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {        Set<DelayKafkaConsumer> listeners = new HashSet<>();        DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class);        if (ann != null) {            listeners.add(ann);        }        return listeners;    }      private Method checkProxy(Method methodArg, Object bean) {        Method method = methodArg;        if (AopUtils.isjdkDynamicProxy(bean)) {            try {                method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());                Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();                for (Class<?> iface : proxiedInterfaces) {                    try {                        method = iface.getMethod(method.getName(), method.getParameterTypes());                        break;                    }                    catch (NoSuchMethodException noMethod) {                    }                }            }            catch (SecurityException ex) {                ReflectionUtils.handleReflectionException(ex);            }            catch (NoSuchMethodException ex) {                throw new IllegalStateException(String.format(                        "target method '%s' found on bean target class '%s', " +                                "but not found in any interface(s) for bean JDK proxy. Either " +                                "pull the method up to an interface or switch to subclass (CGLIB) " +                                "proxies by setting proxy-target-class/proxyTargetClass " +                                "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);            }        }        return method;    }     @Override    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {        this.beanFactory = beanFactory;        if (beanFactory instanceof ConfigurableListableBeanFactory) {            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,                    this.listenerScope);        }    }     private String resolveExpressionAsString(String value, String attribute) {        Object resolved = resolveExpression(value);        if (resolved instanceof String) {            return (String) resolved;        }        else {            throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "                    + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");        }    }     private Object resolveExpression(String value) {        String resolvedValue = resolve(value);        return this.resolver.evaluate(resolvedValue, this.expressionContext);    }         private String resolve(String value) {        if (this.beanFactory instanceof ConfigurableBeanFactory) {            return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);        }        return value;    }     private void addFormatters(FormatterRegistry registry) {        for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {            registry.addConverter(converter);        }        for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {            registry.addConverter(converter);        }        for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) {            registry.addFormatter(formatter);        }    }     private <T> Collection<T> getBeansOfType(Class<T> type) {        if (this.beanFactory instanceof ListableBeanFactory) {            return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values();        }else {            return Collections.emptySet();        }    }     private static class ListenerScope implements Scope {         private final Map<String, Object> listeners = new HashMap<>();         ListenerScope() {            super();        }         public void addListener(String key, Object bean) {            this.listeners.put(key, bean);        }         public void removeListener(String key) {            this.listeners.remove(key);        }         @Override        public Object get(String name, ObjectFactory<?> objectFactory) {            return this.listeners.get(name);        }         @Override        public Object remove(String name) {            return null;        }         @Override        public void registerDestructionCallback(String name, Runnable callback) {        }         @Override        public Object resolveContextualObject(String key) {            return this.listeners.get(key);        }         @Override        public String getConversationId() {            return null;        }     }     private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {         private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();         private MessageHandlerMethodFactory messageHandlerMethodFactory;         public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {            this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;        }         @Override        public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {            return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);        }         private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {            if (this.messageHandlerMethodFactory == null) {                this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();            }            return this.messageHandlerMethodFactory;        }         private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {            DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();            defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory);             ConfigurableBeanFactory cbf =                    (MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ?                            (ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null);              defaultFactory.setConversionService(this.defaultFormattingConversionService);             List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();             // Annotation-based argument resolution            argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));            argumentResolvers.add(new HeadersMethodArgumentResolver());             // Type-based argument resolution            final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);            argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));            argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {                 @Override                protected boolean isEmptyPayload(Object payload) {                    return payload == null || payload instanceof KafkaNull;                }             });            defaultFactory.setArgumentResolvers(argumentResolvers);             defaultFactory.afterPropertiesSet();            return defaultFactory;        }     }}

通过startConsumer来启动一个消费者(多次调用会启动多个消费者)。target必须至少包含一个有@DelayKafkaConsumer注解的方法。这里类似@KafkaListener。我去掉了一部分功能,保留了比较常用的部分。

这里提供了一个通过注解的方式在spring boot项目中动态控制consumer的方法。还有其他的方法来达到这种效果,不过我觉得这种方法比较方便。

java项目集成SpringBoot使用kafka消费者,启动失败报错 Failed to construct kafka consumer

之前博客里面提到本公司为物联网项目。项目中使用MQtt+kafka进行与设备端的通讯,之前的协议格式为JSON格式,现在改成字节数组byte[]格式进行通信。

集成springboot后,具体的demo网上很多,接下来有时间会出一份kafka的demo。

报错信息如下:

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer

原因分析:

之前json格式通信时候,构建kafka消费工厂的时候,其中ConcurrentMessageListenerContainer的key为String类型,而value现在为byte[]类型,所以构建消费者工厂的时候需要指定正确的value类型。

代码如下:

 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() {     ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();     factory.setConsumerFactory(consumerByteFactory());     factory.setConcurrency(concurrency);     factory.getContainerProperties().setPollTimeout(1500);     return factory;    }

整体kafka生产者+kafka消费者的demo会在接下来的博客中陆续整理。

感谢各位的阅读!关于“如何使用spring boot整合kafka和延迟启动消费者”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!

--结束END--

本文标题: 如何使用spring boot整合kafka和延迟启动消费者

本文链接: https://www.lsjlt.com/news/299975.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作