mirror of
https://github.com/3ybactuk/marketplace-go-service-project.git
synced 2025-10-30 14:03:45 +03:00
[hw-6] add notifier service, kafka
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog"
|
||||
@@ -22,6 +23,7 @@ import (
|
||||
"route256/loms/internal/domain/service"
|
||||
"route256/loms/internal/infra/config"
|
||||
mw "route256/loms/internal/infra/grpc/middleware"
|
||||
"route256/loms/internal/infra/messaging/kafka"
|
||||
"route256/loms/internal/infra/postgres"
|
||||
|
||||
pb "route256/pkg/api/loms/v1"
|
||||
@@ -61,11 +63,20 @@ func NewApp(configPath string) (*App, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
producer, err := setupSaramaAsyncConn([]string{c.Kafka.Brokers})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stockRepo := stocksRepository.NewStockRepository(masterPool, replicaPool)
|
||||
orderRepo := ordersRepository.NewOrderRepository(masterPool)
|
||||
txManager := postgres.NewTxManager(masterPool, replicaPool)
|
||||
kafkaProducer, err := kafka.NewStatusProducer(c.Kafka.OrderTopic, producer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
service := service.NewLomsService(orderRepo, stockRepo, txManager)
|
||||
service := service.NewLomsService(orderRepo, stockRepo, txManager, kafkaProducer)
|
||||
controller := server.NewServer(service)
|
||||
|
||||
app := &App{
|
||||
@@ -79,7 +90,6 @@ func NewApp(configPath string) (*App, error) {
|
||||
func (app *App) Shutdown(ctx context.Context) (err error) {
|
||||
if app.httpServer != nil {
|
||||
err = app.httpServer.Shutdown(ctx)
|
||||
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("failed http gateway server shutdown")
|
||||
}
|
||||
@@ -197,3 +207,19 @@ func getPostgresPools(c *config.Config) (masterPool, replicaPool *pgxpool.Pool,
|
||||
|
||||
return pools[0], pools[1], nil
|
||||
}
|
||||
|
||||
func setupSaramaAsyncConn(brokers []string) (sarama.AsyncProducer, error) {
|
||||
cfg := sarama.NewConfig()
|
||||
cfg.Producer.RequiredAcks = sarama.WaitForAll
|
||||
cfg.Producer.Idempotent = true
|
||||
cfg.Producer.Return.Successes = true
|
||||
cfg.Producer.Retry.Max = 5
|
||||
cfg.Net.MaxOpenRequests = 1
|
||||
|
||||
producer, err := sarama.NewAsyncProducer(brokers, cfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create async producer: %w", err)
|
||||
}
|
||||
|
||||
return producer, nil
|
||||
}
|
||||
|
||||
11
loms/internal/domain/repository/outbox/outbox_event.go
Normal file
11
loms/internal/domain/repository/outbox/outbox_event.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package outbox
|
||||
|
||||
import "route256/loms/internal/domain/entity"
|
||||
|
||||
type Event struct {
|
||||
ID int64
|
||||
OrderID entity.ID
|
||||
Topic string
|
||||
Key string
|
||||
Payload []byte
|
||||
}
|
||||
32
loms/internal/domain/repository/outbox/sqlc/db.go
Normal file
32
loms/internal/domain/repository/outbox/sqlc/db.go
Normal file
@@ -0,0 +1,32 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
|
||||
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
|
||||
QueryRow(context.Context, string, ...interface{}) pgx.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
}
|
||||
}
|
||||
5
loms/internal/domain/repository/outbox/sqlc/models.go
Normal file
5
loms/internal/domain/repository/outbox/sqlc/models.go
Normal file
@@ -0,0 +1,5 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
|
||||
package sqlc
|
||||
18
loms/internal/domain/repository/outbox/sqlc/querier.go
Normal file
18
loms/internal/domain/repository/outbox/sqlc/querier.go
Normal file
@@ -0,0 +1,18 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
OutboxInsert(ctx context.Context, arg *OutboxInsertParams) error
|
||||
OutboxMarkError(ctx context.Context, dollar_1 []int64) (int64, error)
|
||||
OutboxMarkSent(ctx context.Context, dollar_1 []int64) (int64, error)
|
||||
OutboxSelectForPublish(ctx context.Context, limit int32) ([]*OutboxSelectForPublishRow, error)
|
||||
}
|
||||
|
||||
var _ Querier = (*Queries)(nil)
|
||||
22
loms/internal/domain/repository/outbox/sqlc/query.sql
Normal file
22
loms/internal/domain/repository/outbox/sqlc/query.sql
Normal file
@@ -0,0 +1,22 @@
|
||||
-- name: OutboxInsert :exec
|
||||
INSERT INTO outbox (order_id, topic, "key", payload)
|
||||
VALUES ($1, $2, $3, $4::jsonb);
|
||||
|
||||
-- name: OutboxSelectForPublish :many
|
||||
SELECT id, order_id, topic, "key", payload
|
||||
FROM outbox
|
||||
WHERE status = 'new'
|
||||
ORDER BY created_at
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED;
|
||||
|
||||
-- name: OutboxMarkSent :execrows
|
||||
UPDATE outbox
|
||||
SET status = 'sent',
|
||||
sent_at = now()
|
||||
WHERE id = ANY($1::bigint[]);
|
||||
|
||||
-- name: OutboxMarkError :execrows
|
||||
UPDATE outbox
|
||||
SET status = 'error'
|
||||
WHERE id = ANY($1::bigint[]);
|
||||
104
loms/internal/domain/repository/outbox/sqlc/query.sql.go
Normal file
104
loms/internal/domain/repository/outbox/sqlc/query.sql.go
Normal file
@@ -0,0 +1,104 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
// source: query.sql
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
const outboxInsert = `-- name: OutboxInsert :exec
|
||||
INSERT INTO outbox (order_id, topic, "key", payload)
|
||||
VALUES ($1, $2, $3, $4::jsonb)
|
||||
`
|
||||
|
||||
type OutboxInsertParams struct {
|
||||
OrderID int64
|
||||
Topic string
|
||||
Key *string
|
||||
Column4 []byte
|
||||
}
|
||||
|
||||
func (q *Queries) OutboxInsert(ctx context.Context, arg *OutboxInsertParams) error {
|
||||
_, err := q.db.Exec(ctx, outboxInsert,
|
||||
arg.OrderID,
|
||||
arg.Topic,
|
||||
arg.Key,
|
||||
arg.Column4,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
const outboxMarkError = `-- name: OutboxMarkError :execrows
|
||||
UPDATE outbox
|
||||
SET status = 'error'
|
||||
WHERE id = ANY($1::bigint[])
|
||||
`
|
||||
|
||||
func (q *Queries) OutboxMarkError(ctx context.Context, dollar_1 []int64) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, outboxMarkError, dollar_1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const outboxMarkSent = `-- name: OutboxMarkSent :execrows
|
||||
UPDATE outbox
|
||||
SET status = 'sent',
|
||||
sent_at = now()
|
||||
WHERE id = ANY($1::bigint[])
|
||||
`
|
||||
|
||||
func (q *Queries) OutboxMarkSent(ctx context.Context, dollar_1 []int64) (int64, error) {
|
||||
result, err := q.db.Exec(ctx, outboxMarkSent, dollar_1)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected(), nil
|
||||
}
|
||||
|
||||
const outboxSelectForPublish = `-- name: OutboxSelectForPublish :many
|
||||
SELECT id, order_id, topic, "key", payload
|
||||
FROM outbox
|
||||
WHERE status = 'new'
|
||||
ORDER BY created_at
|
||||
LIMIT $1
|
||||
FOR UPDATE SKIP LOCKED
|
||||
`
|
||||
|
||||
type OutboxSelectForPublishRow struct {
|
||||
ID int64
|
||||
OrderID int64
|
||||
Topic string
|
||||
Key *string
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
func (q *Queries) OutboxSelectForPublish(ctx context.Context, limit int32) ([]*OutboxSelectForPublishRow, error) {
|
||||
rows, err := q.db.Query(ctx, outboxSelectForPublish, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []*OutboxSelectForPublishRow
|
||||
for rows.Next() {
|
||||
var i OutboxSelectForPublishRow
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.OrderID,
|
||||
&i.Topic,
|
||||
&i.Key,
|
||||
&i.Payload,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, &i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
85
loms/internal/domain/repository/outbox/sqlc/repository.go
Normal file
85
loms/internal/domain/repository/outbox/sqlc/repository.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
|
||||
"route256/loms/internal/domain/entity"
|
||||
"route256/loms/internal/domain/repository/outbox"
|
||||
"route256/loms/internal/infra/postgres"
|
||||
)
|
||||
|
||||
type outboxRepo struct {
|
||||
db *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewOutboxRepository(masterPool *pgxpool.Pool) *outboxRepo {
|
||||
return &outboxRepo{
|
||||
db: masterPool,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *outboxRepo) GetQuerier(ctx context.Context) *Queries {
|
||||
tx, ok := postgres.TxFromCtx(ctx)
|
||||
if ok {
|
||||
return New(tx)
|
||||
}
|
||||
|
||||
return New(r.db)
|
||||
}
|
||||
|
||||
func (r *outboxRepo) AddEvent(ctx context.Context, evt outbox.Event) error {
|
||||
querier := r.GetQuerier(ctx)
|
||||
|
||||
return querier.OutboxInsert(ctx, &OutboxInsertParams{
|
||||
OrderID: int64(evt.OrderID),
|
||||
Topic: evt.Topic,
|
||||
Key: &evt.Key,
|
||||
Column4: evt.Payload,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *outboxRepo) WithNewEvents(ctx context.Context, limit int32, handler func(context.Context, outbox.Event) error) error {
|
||||
querier := r.GetQuerier(ctx)
|
||||
|
||||
rows, err := querier.OutboxSelectForPublish(ctx, int32(limit))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(rows) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var sentIDs, errIDs []int64
|
||||
|
||||
for _, row := range rows {
|
||||
ev := outbox.Event{
|
||||
ID: row.ID,
|
||||
OrderID: entity.ID(row.OrderID),
|
||||
Topic: row.Topic,
|
||||
Key: *row.Key,
|
||||
Payload: row.Payload,
|
||||
}
|
||||
|
||||
if err := handler(ctx, ev); err != nil {
|
||||
errIDs = append(errIDs, row.ID)
|
||||
} else {
|
||||
sentIDs = append(sentIDs, row.ID)
|
||||
}
|
||||
}
|
||||
|
||||
if len(sentIDs) > 0 {
|
||||
if _, err := querier.OutboxMarkSent(ctx, sentIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(errIDs) > 0 {
|
||||
if _, err := querier.OutboxMarkError(ctx, errIDs); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -35,17 +35,23 @@ type txManager interface {
|
||||
ReadWithRepeatableRead(ctx context.Context, fn func(ctx context.Context) error) (err error)
|
||||
}
|
||||
|
||||
type LomsService struct {
|
||||
orders OrderRepository
|
||||
stocks StockRepository
|
||||
txManager txManager
|
||||
type StatusProducer interface {
|
||||
Send(ctx context.Context, id entity.ID, status string) error
|
||||
}
|
||||
|
||||
func NewLomsService(orderRepo OrderRepository, stockRepo StockRepository, txManager txManager) *LomsService {
|
||||
type LomsService struct {
|
||||
orders OrderRepository
|
||||
stocks StockRepository
|
||||
txManager txManager
|
||||
statusProducer StatusProducer
|
||||
}
|
||||
|
||||
func NewLomsService(orderRepo OrderRepository, stockRepo StockRepository, txManager txManager, statusProducer StatusProducer) *LomsService {
|
||||
return &LomsService{
|
||||
orders: orderRepo,
|
||||
stocks: stockRepo,
|
||||
txManager: txManager,
|
||||
orders: orderRepo,
|
||||
stocks: stockRepo,
|
||||
txManager: txManager,
|
||||
statusProducer: statusProducer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,17 +63,24 @@ func (s *LomsService) rollbackStocks(ctx context.Context, stocks []*entity.Stock
|
||||
}
|
||||
}
|
||||
|
||||
func (s *LomsService) OrderCreate(ctx context.Context, orderReq *pb.OrderCreateRequest) (entity.ID, error) {
|
||||
if orderReq == nil || orderReq.UserId <= 0 || len(orderReq.Items) == 0 {
|
||||
return 0, model.ErrInvalidInput
|
||||
// Wraps writing status to DB and status topic.
|
||||
// Should use this function for status updates.
|
||||
// Guarantees that status upd event will be sent only if DB write is successful.
|
||||
func (s *LomsService) setStatus(ctx context.Context, id entity.ID, status string) error {
|
||||
log.Trace().Msgf("running status update for %d with status %s", id, status)
|
||||
|
||||
if err := s.orders.OrderSetStatus(ctx, id, status); err != nil {
|
||||
return fmt.Errorf("orders.OrderSetStatus: %w", err)
|
||||
}
|
||||
|
||||
for _, item := range orderReq.Items {
|
||||
if item.Sku <= 0 || item.Count == 0 {
|
||||
return 0, model.ErrInvalidInput
|
||||
}
|
||||
if err := s.statusProducer.Send(ctx, id, status); err != nil {
|
||||
log.Error().Err(err).Msg("statusProducer.Send")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LomsService) createInitial(ctx context.Context, orderReq *pb.OrderCreateRequest) (*entity.Order, error) {
|
||||
order := &entity.Order{
|
||||
OrderID: 0,
|
||||
Status: pb.OrderStatus_ORDER_STATUS_NEW.String(),
|
||||
@@ -86,44 +99,82 @@ func (s *LomsService) OrderCreate(ctx context.Context, orderReq *pb.OrderCreateR
|
||||
return int(a.ID - b.ID)
|
||||
})
|
||||
|
||||
var (
|
||||
orderID entity.ID
|
||||
resErr error
|
||||
)
|
||||
|
||||
err := s.txManager.WriteWithTransaction(ctx, func(txCtx context.Context) error {
|
||||
id, err := s.orders.OrderCreate(txCtx, order)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
order.OrderID = id
|
||||
orderID = id
|
||||
|
||||
order.OrderID = id
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func (s *LomsService) OrderCreate(ctx context.Context, orderReq *pb.OrderCreateRequest) (entity.ID, error) {
|
||||
if orderReq == nil || orderReq.UserId <= 0 || len(orderReq.Items) == 0 {
|
||||
return 0, model.ErrInvalidInput
|
||||
}
|
||||
|
||||
for _, item := range orderReq.Items {
|
||||
if item.Sku <= 0 || item.Count == 0 {
|
||||
return 0, model.ErrInvalidInput
|
||||
}
|
||||
}
|
||||
|
||||
order, err := s.createInitial(ctx, orderReq)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if statErr := s.setStatus(ctx, order.OrderID, order.Status); statErr != nil {
|
||||
return 0, statErr
|
||||
}
|
||||
|
||||
var resErr error
|
||||
|
||||
err = s.txManager.WriteWithTransaction(ctx, func(txCtx context.Context) error {
|
||||
committed := make([]*entity.Stock, 0, len(order.Items))
|
||||
for _, it := range order.Items {
|
||||
st := &entity.Stock{Item: it, Reserved: it.Count}
|
||||
if err := s.stocks.StockReserve(txCtx, st); err != nil {
|
||||
if resErr = s.stocks.StockReserve(txCtx, st); resErr != nil {
|
||||
s.rollbackStocks(txCtx, committed)
|
||||
|
||||
_ = s.orders.OrderSetStatus(txCtx, id,
|
||||
pb.OrderStatus_ORDER_STATUS_FAILED.String())
|
||||
resErr = fmt.Errorf("stocks.StockReserve: %w", resErr)
|
||||
|
||||
resErr = fmt.Errorf("stocks.StockReserve: %w", err)
|
||||
return nil
|
||||
}
|
||||
committed = append(committed, st)
|
||||
}
|
||||
|
||||
return s.orders.OrderSetStatus(txCtx, id,
|
||||
pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String())
|
||||
return nil
|
||||
})
|
||||
|
||||
finalStatus := pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String()
|
||||
|
||||
defer func() {
|
||||
if statErr := s.setStatus(ctx, order.OrderID, finalStatus); statErr != nil {
|
||||
log.Error().Err(statErr).Msgf("failed to setStatus to %s", finalStatus)
|
||||
}
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
finalStatus = pb.OrderStatus_ORDER_STATUS_FAILED.String()
|
||||
|
||||
return 0, err
|
||||
}
|
||||
if resErr != nil {
|
||||
finalStatus = pb.OrderStatus_ORDER_STATUS_FAILED.String()
|
||||
|
||||
return 0, resErr
|
||||
}
|
||||
return orderID, nil
|
||||
|
||||
return order.OrderID, nil
|
||||
}
|
||||
|
||||
func (s *LomsService) OrderInfo(ctx context.Context, orderID entity.ID) (*entity.Order, error) {
|
||||
@@ -157,7 +208,7 @@ func (s *LomsService) OrderPay(ctx context.Context, orderID entity.ID) error {
|
||||
log.Error().Err(err).Msg("failed to free stock reservation")
|
||||
}
|
||||
}
|
||||
return s.orders.OrderSetStatus(txCtx, orderID,
|
||||
return s.setStatus(txCtx, orderID,
|
||||
pb.OrderStatus_ORDER_STATUS_PAYED.String())
|
||||
default:
|
||||
return model.ErrOrderInvalidStatus
|
||||
@@ -192,7 +243,7 @@ func (s *LomsService) OrderCancel(ctx context.Context, orderID entity.ID) error
|
||||
return err
|
||||
}
|
||||
}
|
||||
return s.orders.OrderSetStatus(txCtx, orderID,
|
||||
return s.setStatus(txCtx, orderID,
|
||||
pb.OrderStatus_ORDER_STATUS_CANCELLED.String())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -40,6 +40,12 @@ func (t *mockTxManager) ReadWithRepeatableRead(ctx context.Context, fn func(ctx
|
||||
return fn(ctx)
|
||||
}
|
||||
|
||||
type mockKafkaProducer struct{}
|
||||
|
||||
func (kp mockKafkaProducer) Send(_ context.Context, _ entity.ID, _ string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
||||
@@ -138,8 +144,7 @@ func TestLomsService_OrderCreate(t *testing.T) {
|
||||
orders: mock.NewOrderRepositoryMock(mc).
|
||||
OrderCreateMock.Return(1, nil).
|
||||
OrderSetStatusMock.Return(errors.New("status update error")),
|
||||
stocks: mock.NewStockRepositoryMock(mc).
|
||||
StockReserveMock.Return(errors.New("reservation error")),
|
||||
stocks: mock.NewStockRepositoryMock(mc),
|
||||
},
|
||||
args: args{
|
||||
req: goodReq,
|
||||
@@ -152,8 +157,7 @@ func TestLomsService_OrderCreate(t *testing.T) {
|
||||
orders: mock.NewOrderRepositoryMock(mc).
|
||||
OrderCreateMock.Return(1, nil).
|
||||
OrderSetStatusMock.Return(errors.New("unexpected error")),
|
||||
stocks: mock.NewStockRepositoryMock(mc).
|
||||
StockReserveMock.Return(nil),
|
||||
stocks: mock.NewStockRepositoryMock(mc),
|
||||
},
|
||||
args: args{
|
||||
req: goodReq,
|
||||
@@ -167,7 +171,7 @@ func TestLomsService_OrderCreate(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{})
|
||||
svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{})
|
||||
_, err := svc.OrderCreate(ctx, tt.args.req)
|
||||
tt.wantErr(t, err)
|
||||
})
|
||||
@@ -278,7 +282,7 @@ func TestLomsService_OrderPay(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{})
|
||||
svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{})
|
||||
err := svc.OrderPay(ctx, tt.args.id)
|
||||
tt.wantErr(t, err)
|
||||
})
|
||||
@@ -292,6 +296,7 @@ func TestLomsService_OrderInfoBadInput(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
&mockTxManager{},
|
||||
&mockKafkaProducer{},
|
||||
)
|
||||
|
||||
_, err := svc.OrderInfo(context.Background(), 0)
|
||||
@@ -313,6 +318,7 @@ func TestLomsService_OrderInfoSuccess(t *testing.T) {
|
||||
mock.NewOrderRepositoryMock(mc).OrderGetByIDMock.Return(order, nil),
|
||||
nil,
|
||||
&mockTxManager{},
|
||||
&mockKafkaProducer{},
|
||||
)
|
||||
|
||||
gotOrder, err := svc.OrderInfo(context.Background(), 123)
|
||||
@@ -414,7 +420,7 @@ func TestLomsService_OrderCancel(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{})
|
||||
svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{})
|
||||
err := svc.OrderCancel(ctx, tt.args.id)
|
||||
tt.wantErr(t, err)
|
||||
})
|
||||
@@ -481,7 +487,7 @@ func TestLomsService_StocksInfo(t *testing.T) {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
svc := NewLomsService(nil, tt.fields.stocks, &mockTxManager{})
|
||||
svc := NewLomsService(nil, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{})
|
||||
got, err := svc.StocksInfo(ctx, tt.args.sku)
|
||||
tt.wantErr(t, err)
|
||||
if err == nil {
|
||||
|
||||
7
loms/internal/infra/messaging/kafka/order_event.go
Normal file
7
loms/internal/infra/messaging/kafka/order_event.go
Normal file
@@ -0,0 +1,7 @@
|
||||
package kafka
|
||||
|
||||
type OrderEvent struct {
|
||||
OrderID int64 `json:"order_id"`
|
||||
Status string `json:"status"`
|
||||
Moment string `json:"moment"`
|
||||
}
|
||||
110
loms/internal/infra/messaging/kafka/producer.go
Normal file
110
loms/internal/infra/messaging/kafka/producer.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package kafka
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"route256/loms/internal/domain/entity"
|
||||
|
||||
"github.com/IBM/sarama"
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
pb "route256/pkg/api/loms/v1"
|
||||
)
|
||||
|
||||
type statusProducer struct {
|
||||
topic string
|
||||
producer sarama.AsyncProducer
|
||||
}
|
||||
|
||||
func NewStatusProducer(topic string, producer sarama.AsyncProducer) (*statusProducer, error) {
|
||||
p := &statusProducer{topic: topic, producer: producer}
|
||||
go p.runCallbacks()
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func mapOrderStatus(pbStatus string) (string, error) {
|
||||
switch pbStatus {
|
||||
case pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String():
|
||||
return "awaiting payment", nil
|
||||
case pb.OrderStatus_ORDER_STATUS_CANCELLED.String():
|
||||
return "cancelled", nil
|
||||
case pb.OrderStatus_ORDER_STATUS_FAILED.String():
|
||||
return "failed", nil
|
||||
case pb.OrderStatus_ORDER_STATUS_NEW.String():
|
||||
return "new", nil
|
||||
case pb.OrderStatus_ORDER_STATUS_PAYED.String():
|
||||
return "payed", nil
|
||||
default:
|
||||
return "", fmt.Errorf("unexpected OrderStatus: %v", pbStatus)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *statusProducer) runCallbacks() {
|
||||
for {
|
||||
select {
|
||||
case err, ok := <-p.producer.Errors():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
log.Error().Err(err).Msgf("kafka publish error")
|
||||
case _, ok := <-p.producer.Successes():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: add msg metrics (latency/partition/offset)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *statusProducer) Send(ctx context.Context, id entity.ID, status string) error {
|
||||
log.Debug().Msgf("sending event for id: %d; status: %s", id, status)
|
||||
|
||||
newStatus, err := mapOrderStatus(status)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
evt := OrderEvent{
|
||||
OrderID: int64(id),
|
||||
Status: newStatus,
|
||||
Moment: time.Now().UTC().Format(time.RFC3339Nano),
|
||||
}
|
||||
|
||||
value, err := json.Marshal(evt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("marshal event: %w", err)
|
||||
}
|
||||
|
||||
return p.SendRaw(ctx, id, value)
|
||||
}
|
||||
|
||||
func (p *statusProducer) SendRaw(ctx context.Context, id entity.ID, value []byte) error {
|
||||
if len(value) == 0 {
|
||||
return fmt.Errorf("empty message value")
|
||||
}
|
||||
|
||||
msg := &sarama.ProducerMessage{
|
||||
Topic: p.topic,
|
||||
Key: sarama.StringEncoder(strconv.FormatInt(int64(id), 10)),
|
||||
Value: sarama.ByteEncoder(value),
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
|
||||
select {
|
||||
case p.producer.Input() <- msg:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (p *statusProducer) Close() error {
|
||||
return p.producer.Close()
|
||||
}
|
||||
54
loms/internal/infra/messaging/kafka_outbox/dispatcher.go
Normal file
54
loms/internal/infra/messaging/kafka_outbox/dispatcher.go
Normal file
@@ -0,0 +1,54 @@
|
||||
package kafkaoutbox
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"route256/loms/internal/domain/entity"
|
||||
"route256/loms/internal/domain/repository/outbox"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type producer interface {
|
||||
SendRaw(ctx context.Context, id entity.ID, value []byte) error
|
||||
}
|
||||
|
||||
type outboxRepo interface {
|
||||
WithNewEvents(ctx context.Context, limit int, h func(c context.Context, e outbox.Event) error) error
|
||||
}
|
||||
|
||||
type Dispatcher struct {
|
||||
repo outboxRepo
|
||||
producer producer
|
||||
limit int
|
||||
interval time.Duration
|
||||
}
|
||||
|
||||
func NewDispatcher(repo outboxRepo, producer producer, batch int, pollEvery time.Duration) *Dispatcher {
|
||||
return &Dispatcher{
|
||||
repo: repo,
|
||||
producer: producer,
|
||||
limit: batch,
|
||||
interval: pollEvery,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *Dispatcher) Run(ctx context.Context) {
|
||||
t := time.NewTicker(d.interval)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
if err := d.repo.WithNewEvents(ctx, d.limit, func(c context.Context, e outbox.Event) error {
|
||||
return d.producer.SendRaw(c, e.OrderID, e.Payload)
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Msg("d.repo.WithNewEvents")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user