Consuming events from Kafka
Be aware that Kafka guarantees at least once delivery. Make sure your handling logic is idempotent.
Creating a simple handler#
Implement the ISingletonMessageHandler interface. This will automatically register your handler and create a consumer.
// Created from the DI container, you can inject dependenciespublic class EventExportHandler : ISingletonMessageHandler<EventExportMessage>{ public ValueTask HandleMessage(MessageContext messageContext, EventExportMessage message) { // Do your magic here }}Creating a dynamically scaled handler#
This handler will scale out based on e.g. configuration.
Implement the IPerConfigMessageHandler interface. This will automatically register your handler and create a consumer.
// Created from the DI container, you can inject dependenciespublic class EventExportHandler : IPerConfigMessageHandler<EventExportMessage, EventExportConfiguration>{ // This will be set on init and whenever the configuration changes public EventExportConfiguration Config { get; set; }
public ValueTask HandleMessage(MessageContext messageContext, EventExportMessage message) { // Do your magic here }}Then implement the IMessageHandlerConfigProvider that will provide the configs. Without this class, no consumers will be created for the IPerConfigMessageHandler classes.
// Created from the DI container, you can inject dependenciespublic class EventExportConfigProvider : IMessageHandlerConfigProvider<EventExportHandler, EventExportConfiguration>{ public Task<EventExportConfiguration[]> GetConfigsAsync() { // supply the list of configs. A consumer is created per config } public string GetUniqueConfigId(EventExportConfiguration config) { // return a unique id for the config, please use something like: config.ID } public bool DoesConfigChangeRequireRestart(EventExportConfiguration oldConfig, EventExportConfiguration newConfig) { // return true whenever the handler should be recreated (due to caching for example), return true as little as possible // NOTE: GetUniqueConfigId(oldConfig) == GetUniqueConfigId(newConfig) }}