Consuming events from Kafka
Be aware that Kafka guarantees at least once delivery. Make sure your handling logic is idempotent.
#
Creating a simple handlerImplement the ISingletonMessageHandler
interface. This will automatically register your handler and create a consumer.
// Created from the DI container, you can inject dependenciespublic class EventExportHandler : ISingletonMessageHandler<EventExportMessage>{ public ValueTask HandleMessage(MessageContext messageContext, EventExportMessage message) { // Do your magic here }}
#
Creating a dynamically scaled handlerThis handler will scale out based on e.g. configuration.
Implement the IPerConfigMessageHandler
interface. This will automatically register your handler and create a consumer.
// Created from the DI container, you can inject dependenciespublic class EventExportHandler : IPerConfigMessageHandler<EventExportMessage, EventExportConfiguration>{ // This will be set on init and whenever the configuration changes public EventExportConfiguration Config { get; set; }
public ValueTask HandleMessage(MessageContext messageContext, EventExportMessage message) { // Do your magic here }}
Then implement the IMessageHandlerConfigProvider
that will provide the configs. Without this class, no consumers will be created for the IPerConfigMessageHandler
classes.
// Created from the DI container, you can inject dependenciespublic class EventExportConfigProvider : IMessageHandlerConfigProvider<EventExportHandler, EventExportConfiguration>{ public Task<EventExportConfiguration[]> GetConfigsAsync() { // supply the list of configs. A consumer is created per config } public string GetUniqueConfigId(EventExportConfiguration config) { // return a unique id for the config, please use something like: config.ID } public bool DoesConfigChangeRequireRestart(EventExportConfiguration oldConfig, EventExportConfiguration newConfig) { // return true whenever the handler should be recreated (due to caching for example), return true as little as possible // NOTE: GetUniqueConfigId(oldConfig) == GetUniqueConfigId(newConfig) }}