فتح مغلق

Kafka subscribe into multiple topics #3412


User avatar
0
tahseelit1 خلقت

May I ask if the current version of abp.io framework support to subscribe multiple topic. if not please advice the other possible work around and please let me know you are are planning to add it in your next release.

Thanks

Check the docs before asking a question: https://docs.abp.io/en/commercial/latest/ Check the samples, to see the basic tasks: https://docs.abp.io/en/commercial/latest/samples/index The exact solution to your question may have been answered before, please use the search on the homepage.

If you're creating a bug/problem report, please include followings:

  • ABP Framework version: vX.X.X
  • UI type: Angular / MVC / Blazor
  • DB provider: EF Core / MongoDB
  • Tiered (MVC) or Identity Server Separated (Angular): yes / no
  • Exception message and stack trace:
  • Steps to reproduce the issue:"

3 إجابة (إجابات)
  • User Avatar
    0
    liangshiwei خلقت
    فريق الدعم Fullstack Developer

    Hi,

    Can you explain it in detail? thanks.

  • User Avatar
    0
    tahseelit1 خلقت

    i want to implement KafkaDistributedEventBus one microservice subscribe into multiple topics

  • User Avatar
    0
    liangshiwei خلقت
    فريق الدعم Fullstack Developer

    Hi,

    I'm sorry to say for the current design, it's not possible, Kafka is different from RabbitMQ, it does not have Exchange and routeing keys.

    But Kafka consumers can subscribe to multiple topics, you can try this:

    [ExposeServices(typeof(KafkaMessageConsumer), typeof(IKafkaMessageConsumer))]
    public class MyKafkaMessageConsumer : KafkaMessageConsumer
    {
        private static readonly PropertyInfo ConsumerProperty;
    
        static MyKafkaMessageConsumer()
        {
            var type = typeof(KafkaMessageConsumer);
            ConsumerProperty = type.GetProperty("Consumer", BindingFlags.Instance | BindingFlags.NonPublic);
        }
    
        public MyKafkaMessageConsumer(IConsumerPool consumerPool, IExceptionNotifier exceptionNotifier,
            IOptions<AbpKafkaOptions> options, IProducerPool producerPool, AbpAsyncTimer timer) : base(consumerPool,
            exceptionNotifier, options, producerPool, timer)
        {
        }
    
        protected override void Consume()
        {
            ConsumerProperty.SetValue(this, ConsumerPool.Get(GroupId, ConnectionName));
    
            Task.Factory.StartNew(async () =>
            {
                Consumer.Subscribe(new []{ "MyTopicName", "MyTopicName2",.....});
    
                while (true)
                {
                    try
                    {
                        var consumeResult = Consumer.Consume();
    
                        if (consumeResult.IsPartitionEOF)
                        {
                            continue;
                        }
    
                        await HandleIncomingMessage(consumeResult);
                    }
                    catch (ConsumeException ex)
                    {
                        Logger.LogException(ex, LogLevel.Warning);
                        await ExceptionNotifier.NotifyAsync(ex, logLevel: LogLevel.Warning);
                    }
                }
            }, TaskCreationOptions.LongRunning);
        }
    }
    
Made with ❤️ on ABP v8.2.0-preview Updated on مارس 25, 2024, 15:11