package kafka import ( "context" "encoding/json" "fmt" "strconv" "time" "route256/loms/internal/domain/entity" "github.com/IBM/sarama" "github.com/rs/zerolog/log" pb "route256/pkg/api/loms/v1" ) type statusProducer struct { topic string producer sarama.AsyncProducer } func NewStatusProducer(topic string, producer sarama.AsyncProducer) (*statusProducer, error) { p := &statusProducer{topic: topic, producer: producer} go p.runCallbacks() return p, nil } func mapOrderStatus(pbStatus string) (string, error) { switch pbStatus { case pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String(): return "awaiting payment", nil case pb.OrderStatus_ORDER_STATUS_CANCELLED.String(): return "cancelled", nil case pb.OrderStatus_ORDER_STATUS_FAILED.String(): return "failed", nil case pb.OrderStatus_ORDER_STATUS_NEW.String(): return "new", nil case pb.OrderStatus_ORDER_STATUS_PAYED.String(): return "payed", nil default: return "", fmt.Errorf("unexpected OrderStatus: %v", pbStatus) } } func (p *statusProducer) runCallbacks() { for { select { case err, ok := <-p.producer.Errors(): if !ok { return } log.Error().Err(err).Msgf("kafka publish error") case _, ok := <-p.producer.Successes(): if !ok { return } // TODO: add msg metrics (latency/partition/offset) } } } func (p *statusProducer) Send(ctx context.Context, id entity.ID, status string) error { log.Debug().Msgf("sending event for id: %d; status: %s", id, status) newStatus, err := mapOrderStatus(status) if err != nil { return err } evt := OrderEvent{ OrderID: int64(id), Status: newStatus, Moment: time.Now().UTC().Format(time.RFC3339Nano), } value, err := json.Marshal(evt) if err != nil { return fmt.Errorf("marshal event: %w", err) } return p.SendRaw(ctx, id, value) } func (p *statusProducer) SendRaw(ctx context.Context, id entity.ID, value []byte) error { if len(value) == 0 { return fmt.Errorf("empty message value") } msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(strconv.FormatInt(int64(id), 10)), Value: sarama.ByteEncoder(value), Timestamp: time.Now(), } select { case p.producer.Input() <- msg: return nil case <-ctx.Done(): return ctx.Err() } } func (p *statusProducer) Close() error { return p.producer.Close() }