mirror of
				https://github.com/3ybactuk/marketplace-go-service-project.git
				synced 2025-10-31 06:23:44 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			49 lines
		
	
	
		
			1.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			49 lines
		
	
	
		
			1.0 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package kafka
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 
 | |
| 	"github.com/IBM/sarama"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| type StatusConsumer struct {
 | |
| 	group sarama.ConsumerGroup
 | |
| 	topic string
 | |
| }
 | |
| 
 | |
| func NewStatusConsumer(topic string, consumerGroup sarama.ConsumerGroup) (*StatusConsumer, error) {
 | |
| 	return &StatusConsumer{
 | |
| 		group: consumerGroup,
 | |
| 		topic: topic,
 | |
| 	}, nil
 | |
| }
 | |
| 
 | |
| func (c *StatusConsumer) FetchEvents(ctx context.Context) error {
 | |
| 	h := &statusHandler{}
 | |
| 
 | |
| 	return c.group.Consume(ctx, []string{c.topic}, h)
 | |
| }
 | |
| 
 | |
| type statusHandler struct{}
 | |
| 
 | |
| func (h *statusHandler) Setup(sess sarama.ConsumerGroupSession) error {
 | |
| 	log.Info().Msgf("[notifier] assigned %v", sess.Claims())
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (h *statusHandler) Cleanup(_ sarama.ConsumerGroupSession) error {
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (h *statusHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
 | |
| 	for msg := range claim.Messages() {
 | |
| 		log.Info().Msgf("[order=%s] p=%d off=%d %s\n", string(msg.Key), msg.Partition, msg.Offset, string(msg.Value))
 | |
| 
 | |
| 		sess.MarkMessage(msg, "")
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | 
