mirror of
				https://github.com/3ybactuk/marketplace-go-service-project.git
				synced 2025-10-30 05:53:45 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			118 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			118 lines
		
	
	
		
			2.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package kafka
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"strconv"
 | |
| 	"time"
 | |
| 
 | |
| 	"route256/loms/internal/domain/entity"
 | |
| 	"route256/loms/internal/infra/tracing"
 | |
| 
 | |
| 	"github.com/IBM/sarama"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| 
 | |
| 	pb "route256/pkg/api/loms/v1"
 | |
| )
 | |
| 
 | |
| type statusProducer struct {
 | |
| 	topic    string
 | |
| 	producer sarama.AsyncProducer
 | |
| }
 | |
| 
 | |
| func NewStatusProducer(topic string, producer sarama.AsyncProducer) (*statusProducer, error) {
 | |
| 	p := &statusProducer{topic: topic, producer: producer}
 | |
| 	go p.runCallbacks()
 | |
| 
 | |
| 	return p, nil
 | |
| }
 | |
| 
 | |
| func mapOrderStatus(pbStatus string) (string, error) {
 | |
| 	switch pbStatus {
 | |
| 	case pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String():
 | |
| 		return "awaiting payment", nil
 | |
| 	case pb.OrderStatus_ORDER_STATUS_CANCELLED.String():
 | |
| 		return "cancelled", nil
 | |
| 	case pb.OrderStatus_ORDER_STATUS_FAILED.String():
 | |
| 		return "failed", nil
 | |
| 	case pb.OrderStatus_ORDER_STATUS_NEW.String():
 | |
| 		return "new", nil
 | |
| 	case pb.OrderStatus_ORDER_STATUS_PAYED.String():
 | |
| 		return "payed", nil
 | |
| 	default:
 | |
| 		return "", fmt.Errorf("unexpected OrderStatus: %v", pbStatus)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *statusProducer) runCallbacks() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case err, ok := <-p.producer.Errors():
 | |
| 			if !ok {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			log.Error().Err(err).Msgf("kafka publish error")
 | |
| 		case _, ok := <-p.producer.Successes():
 | |
| 			if !ok {
 | |
| 				return
 | |
| 			}
 | |
| 
 | |
| 			// TODO: add msg metrics (latency/partition/offset)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *statusProducer) Send(ctx context.Context, id entity.ID, status string) error {
 | |
| 	ctx, span := tracing.Tracer().Start(ctx, "Producer.Send")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	log.Debug().Msgf("sending event for id: %d; status: %s", id, status)
 | |
| 
 | |
| 	newStatus, err := mapOrderStatus(status)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	evt := OrderEvent{
 | |
| 		OrderID: int64(id),
 | |
| 		Status:  newStatus,
 | |
| 		Moment:  time.Now().UTC().Format(time.RFC3339Nano),
 | |
| 	}
 | |
| 
 | |
| 	value, err := json.Marshal(evt)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("marshal event: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return p.SendRaw(ctx, id, value)
 | |
| }
 | |
| 
 | |
| func (p *statusProducer) SendRaw(ctx context.Context, id entity.ID, value []byte) error {
 | |
| 	ctx, span := tracing.Tracer().Start(ctx, "Producer.SendRaw")
 | |
| 	defer span.End()
 | |
| 
 | |
| 	if len(value) == 0 {
 | |
| 		return fmt.Errorf("empty message value")
 | |
| 	}
 | |
| 
 | |
| 	msg := &sarama.ProducerMessage{
 | |
| 		Topic:     p.topic,
 | |
| 		Key:       sarama.StringEncoder(strconv.FormatInt(int64(id), 10)),
 | |
| 		Value:     sarama.ByteEncoder(value),
 | |
| 		Timestamp: time.Now(),
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case p.producer.Input() <- msg:
 | |
| 		return nil
 | |
| 	case <-ctx.Done():
 | |
| 		return ctx.Err()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *statusProducer) Close() error {
 | |
| 	return p.producer.Close()
 | |
| }
 | 
