mirror of
https://github.com/3ybactuk/marketplace-go-service-project.git
synced 2025-10-30 22:13:44 +03:00
118 lines
2.5 KiB
Go
118 lines
2.5 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"route256/loms/internal/domain/entity"
|
|
"route256/loms/internal/infra/tracing"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/rs/zerolog/log"
|
|
|
|
pb "route256/pkg/api/loms/v1"
|
|
)
|
|
|
|
type statusProducer struct {
|
|
topic string
|
|
producer sarama.AsyncProducer
|
|
}
|
|
|
|
func NewStatusProducer(topic string, producer sarama.AsyncProducer) (*statusProducer, error) {
|
|
p := &statusProducer{topic: topic, producer: producer}
|
|
go p.runCallbacks()
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func mapOrderStatus(pbStatus string) (string, error) {
|
|
switch pbStatus {
|
|
case pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String():
|
|
return "awaiting payment", nil
|
|
case pb.OrderStatus_ORDER_STATUS_CANCELLED.String():
|
|
return "cancelled", nil
|
|
case pb.OrderStatus_ORDER_STATUS_FAILED.String():
|
|
return "failed", nil
|
|
case pb.OrderStatus_ORDER_STATUS_NEW.String():
|
|
return "new", nil
|
|
case pb.OrderStatus_ORDER_STATUS_PAYED.String():
|
|
return "payed", nil
|
|
default:
|
|
return "", fmt.Errorf("unexpected OrderStatus: %v", pbStatus)
|
|
}
|
|
}
|
|
|
|
func (p *statusProducer) runCallbacks() {
|
|
for {
|
|
select {
|
|
case err, ok := <-p.producer.Errors():
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
log.Error().Err(err).Msgf("kafka publish error")
|
|
case _, ok := <-p.producer.Successes():
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// TODO: add msg metrics (latency/partition/offset)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *statusProducer) Send(ctx context.Context, id entity.ID, status string) error {
|
|
ctx, span := tracing.Tracer().Start(ctx, "Producer.Send")
|
|
defer span.End()
|
|
|
|
log.Debug().Msgf("sending event for id: %d; status: %s", id, status)
|
|
|
|
newStatus, err := mapOrderStatus(status)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
evt := OrderEvent{
|
|
OrderID: int64(id),
|
|
Status: newStatus,
|
|
Moment: time.Now().UTC().Format(time.RFC3339Nano),
|
|
}
|
|
|
|
value, err := json.Marshal(evt)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal event: %w", err)
|
|
}
|
|
|
|
return p.SendRaw(ctx, id, value)
|
|
}
|
|
|
|
func (p *statusProducer) SendRaw(ctx context.Context, id entity.ID, value []byte) error {
|
|
ctx, span := tracing.Tracer().Start(ctx, "Producer.SendRaw")
|
|
defer span.End()
|
|
|
|
if len(value) == 0 {
|
|
return fmt.Errorf("empty message value")
|
|
}
|
|
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: p.topic,
|
|
Key: sarama.StringEncoder(strconv.FormatInt(int64(id), 10)),
|
|
Value: sarama.ByteEncoder(value),
|
|
Timestamp: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case p.producer.Input() <- msg:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (p *statusProducer) Close() error {
|
|
return p.producer.Close()
|
|
}
|