Skip to main content

Consuming events from Kafka

Be aware that Kafka guarantees at least once delivery. Make sure your handling logic is idempotent.

Creating a simple handler#

Implement the ISingletonMessageHandler interface. This will automatically register your handler and create a consumer.

// Created from the DI container, you can inject dependenciespublic class EventExportHandler : ISingletonMessageHandler<EventExportMessage>{    public ValueTask HandleMessage(MessageContext messageContext, EventExportMessage message)    {        // Do your magic here    }}

Creating a dynamically scaled handler#

This handler will scale out based on e.g. configuration.

Implement the IPerConfigMessageHandler interface. This will automatically register your handler and create a consumer.

// Created from the DI container, you can inject dependenciespublic class EventExportHandler : IPerConfigMessageHandler<EventExportMessage, EventExportConfiguration>{    // This will be set on init and whenever the configuration changes    public EventExportConfiguration Config { get; set; }
    public ValueTask HandleMessage(MessageContext messageContext, EventExportMessage message)    {        // Do your magic here    }}

Then implement the IMessageHandlerConfigProvider that will provide the configs. Without this class, no consumers will be created for the IPerConfigMessageHandler classes.

// Created from the DI container, you can inject dependenciespublic class EventExportConfigProvider : IMessageHandlerConfigProvider<EventExportHandler, EventExportConfiguration>{  public Task<EventExportConfiguration[]> GetConfigsAsync()   {      // supply the list of configs. A consumer is created per config  }  public string GetUniqueConfigId(EventExportConfiguration config)  {      // return a unique id for the config, please use something like: config.ID  }  public bool DoesConfigChangeRequireRestart(EventExportConfiguration oldConfig, EventExportConfiguration newConfig)  {      // return true whenever the handler should be recreated (due to caching for example), return true as little as possible      // NOTE: GetUniqueConfigId(oldConfig) == GetUniqueConfigId(newConfig)  }}