package kafka import ( "context" "github.com/IBM/sarama" "github.com/rs/zerolog/log" ) type StatusConsumer struct { group sarama.ConsumerGroup topic string } func NewStatusConsumer(topic string, consumerGroup sarama.ConsumerGroup) (*StatusConsumer, error) { return &StatusConsumer{ group: consumerGroup, topic: topic, }, nil } func (c *StatusConsumer) FetchEvents(ctx context.Context) error { h := &statusHandler{} return c.group.Consume(ctx, []string{c.topic}, h) } type statusHandler struct{} func (h *statusHandler) Setup(sess sarama.ConsumerGroupSession) error { log.Info().Msgf("[notifier] assigned %v", sess.Claims()) return nil } func (h *statusHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (h *statusHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { log.Info().Msgf("[order=%s] p=%d off=%d %s\n", string(msg.Key), msg.Partition, msg.Offset, string(msg.Value)) sess.MarkMessage(msg, "") } return nil }