package kafkaoutbox import ( "context" "time" "route256/loms/internal/domain/entity" "route256/loms/internal/domain/repository/outbox" "github.com/rs/zerolog/log" ) type producer interface { SendRaw(ctx context.Context, id entity.ID, value []byte) error } type outboxRepo interface { WithNewEvents(ctx context.Context, limit int, h func(c context.Context, e outbox.Event) error) error } type Dispatcher struct { repo outboxRepo producer producer limit int interval time.Duration } func NewDispatcher(repo outboxRepo, producer producer, batch int, pollEvery time.Duration) *Dispatcher { return &Dispatcher{ repo: repo, producer: producer, limit: batch, interval: pollEvery, } } func (d *Dispatcher) Run(ctx context.Context) { t := time.NewTicker(d.interval) defer t.Stop() for { if err := d.repo.WithNewEvents(ctx, d.limit, func(c context.Context, e outbox.Event) error { return d.producer.SendRaw(c, e.OrderID, e.Payload) }); err != nil { log.Error().Err(err).Msg("d.repo.WithNewEvents") } select { case <-ctx.Done(): return case <-t.C: } } }