mirror of
https://github.com/3ybactuk/marketplace-go-service-project.git
synced 2025-10-30 14:03:45 +03:00
[hw-5] concurrency, graceful shutdown, concurrent tests
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package app
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
@@ -68,7 +69,11 @@ func NewApp(configPath string) (*App, error) {
|
||||
return app, nil
|
||||
}
|
||||
|
||||
func (app *App) ListenAndServe() error {
|
||||
func (app *App) Shutdown(ctx context.Context) error {
|
||||
return app.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
func (app *App) ListenAndServe(_ context.Context) error {
|
||||
address := fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.Port)
|
||||
|
||||
l, err := net.Listen("tcp", address)
|
||||
@@ -96,6 +101,9 @@ func (app *App) setupCartService() (*service.CartService, error) {
|
||||
httpClient,
|
||||
app.config.ProductService.Token,
|
||||
fmt.Sprintf("%s:%s", app.config.ProductService.Host, app.config.ProductService.Port),
|
||||
app.config.ProductService.Limit,
|
||||
app.config.ProductService.Burst,
|
||||
app.config.Service.Workers,
|
||||
)
|
||||
|
||||
// LOMS service client
|
||||
|
||||
@@ -10,23 +10,40 @@ import (
|
||||
|
||||
"route256/cart/internal/domain/entity"
|
||||
"route256/cart/internal/domain/model"
|
||||
"route256/cart/internal/infra/errgroup"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type ProductService struct {
|
||||
httpClient http.Client
|
||||
token string
|
||||
address string
|
||||
|
||||
limiter *rate.Limiter
|
||||
|
||||
workers int
|
||||
}
|
||||
|
||||
func NewProductService(httpClient http.Client, token string, address string) *ProductService {
|
||||
func NewProductService(httpClient http.Client, token, address string, limitRPS, burst, workers int) *ProductService {
|
||||
log.Debug().Msgf("creating product server with %d worker limit", workers)
|
||||
|
||||
return &ProductService{
|
||||
httpClient: httpClient,
|
||||
token: token,
|
||||
address: address,
|
||||
|
||||
limiter: rate.NewLimiter(rate.Limit(limitRPS), burst),
|
||||
workers: workers,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ProductService) GetProductBySku(ctx context.Context, sku entity.Sku) (*model.Product, error) {
|
||||
if err := s.limiter.Wait(ctx); err != nil {
|
||||
return nil, fmt.Errorf("limiter.Wait: %w", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
@@ -73,3 +90,32 @@ type GetProductResponse struct {
|
||||
Price int32 `json:"price"`
|
||||
Sku int64 `json:"sku"`
|
||||
}
|
||||
|
||||
func (s *ProductService) GetProducts(ctx context.Context, skus []entity.Sku) ([]*model.Product, error) {
|
||||
if len(skus) == 0 {
|
||||
return []*model.Product{}, nil
|
||||
}
|
||||
|
||||
results := make([]*model.Product, len(skus))
|
||||
|
||||
g, _ := errgroup.WithContext(ctx, s.workers)
|
||||
|
||||
for i, sku := range skus {
|
||||
g.Go(func(groupCtx context.Context) error {
|
||||
p, err := s.GetProductBySku(groupCtx, sku)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
results[i] = p
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return results, nil
|
||||
}
|
||||
@@ -2,15 +2,21 @@ package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"route256/cart/internal/domain/entity"
|
||||
"route256/cart/internal/domain/model"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"route256/cart/internal/domain/entity"
|
||||
"route256/cart/internal/domain/model"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
||||
|
||||
func TestAddItem(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
@@ -267,3 +273,135 @@ func TestDeleteItemsByUserID(t *testing.T) {
|
||||
|
||||
assert.Len(t, repo.storage, 0, "check storage length")
|
||||
}
|
||||
|
||||
func TestConcurrent_AddItemSameSku(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const (
|
||||
goroutines = 100
|
||||
userID = entity.UID(42)
|
||||
sku = entity.Sku(777)
|
||||
)
|
||||
|
||||
repo := NewInMemoryRepository(1)
|
||||
ctx := context.Background()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(goroutines)
|
||||
|
||||
for i := 0; i < goroutines; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
item := &model.Item{
|
||||
Product: &model.Product{Sku: sku},
|
||||
Count: 1,
|
||||
}
|
||||
require.NoError(t, repo.AddItem(ctx, userID, item))
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
cart, err := repo.GetItemsByUserID(ctx, userID)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Len(t, cart.Items, 1, "only one item should exist")
|
||||
assert.Equal(t, uint32(goroutines), cart.ItemCount[sku], "wrong count")
|
||||
}
|
||||
|
||||
func TestConcurrent_AddItemDifferentUsers(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
const (
|
||||
users = 50
|
||||
addsPerUser = 10
|
||||
)
|
||||
|
||||
repo := NewInMemoryRepository(users)
|
||||
ctx := context.Background()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for u := 0; u < users; u++ {
|
||||
uid := entity.UID(u)
|
||||
wg.Add(1)
|
||||
|
||||
go func(id entity.UID) {
|
||||
defer wg.Done()
|
||||
|
||||
for i := 0; i < addsPerUser; i++ {
|
||||
item := &model.Item{
|
||||
Product: &model.Product{Sku: entity.Sku(i)},
|
||||
Count: 1,
|
||||
}
|
||||
require.NoError(t, repo.AddItem(ctx, id, item))
|
||||
}
|
||||
}(uid)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
for u := 0; u < users; u++ {
|
||||
uid := entity.UID(u)
|
||||
cart, err := repo.GetItemsByUserID(ctx, uid)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Lenf(t, cart.Items, addsPerUser, "user %d has wrong item count", u)
|
||||
for i := 0; i < addsPerUser; i++ {
|
||||
sku := entity.Sku(i)
|
||||
assert.Equalf(t, uint32(1), cart.ItemCount[sku], "user %d: sku %d count", u, sku)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrent_AddAndRead(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
repo := NewInMemoryRepository(1)
|
||||
ctx := context.Background()
|
||||
uid := entity.UID(1)
|
||||
sku := entity.Sku(9)
|
||||
|
||||
const (
|
||||
writerGoroutines = 20
|
||||
readerGoroutines = 20
|
||||
iterations = 100
|
||||
)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for i := 0; i < writerGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
item := &model.Item{
|
||||
Product: &model.Product{Sku: sku},
|
||||
Count: 1,
|
||||
}
|
||||
require.NoError(t, repo.AddItem(ctx, uid, item))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < readerGoroutines; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < iterations; j++ {
|
||||
_, err := repo.GetItemsByUserID(ctx, uid)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
cart, err := repo.GetItemsByUserID(ctx, uid)
|
||||
require.NoError(t, err)
|
||||
expected := uint32(writerGoroutines * iterations)
|
||||
|
||||
assert.Equal(t, expected, cart.ItemCount[sku], "wrong item count")
|
||||
}
|
||||
|
||||
@@ -4,9 +4,6 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
|
||||
"route256/cart/internal/domain/entity"
|
||||
"route256/cart/internal/domain/model"
|
||||
@@ -22,6 +19,7 @@ type Repository interface {
|
||||
|
||||
type ProductService interface {
|
||||
GetProductBySku(ctx context.Context, sku entity.Sku) (*model.Product, error)
|
||||
GetProducts(ctx context.Context, skus []entity.Sku) ([]*model.Product, error)
|
||||
}
|
||||
|
||||
type LomsService interface {
|
||||
@@ -77,9 +75,6 @@ func (s *CartService) AddItem(ctx context.Context, userID entity.UID, item *mode
|
||||
// GetUserCart gets all user cart's item ids, gets the item description from the product-service
|
||||
// and return a list of the collected items.
|
||||
// In case of failed request to product-service, return nothing and error.
|
||||
//
|
||||
// TODO: add worker group, BUT it's OK for now,
|
||||
// assuming user does not have hundreds of different items in his cart.
|
||||
func (s *CartService) GetItemsByUserID(ctx context.Context, userID entity.UID) (*model.Cart, error) {
|
||||
if userID <= 0 {
|
||||
return nil, fmt.Errorf("userID invalid")
|
||||
@@ -99,65 +94,31 @@ func (s *CartService) GetItemsByUserID(ctx context.Context, userID entity.UID) (
|
||||
|
||||
resultCart := &model.Cart{
|
||||
UserID: userID,
|
||||
Items: make([]*model.Item, len(cart.Items)),
|
||||
Items: make([]*model.Item, 0, len(cart.Items)),
|
||||
TotalPrice: 0,
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
sumMutex sync.Mutex
|
||||
)
|
||||
|
||||
for idx, sku := range cart.Items {
|
||||
wg.Add(1)
|
||||
|
||||
go func(sku entity.Sku, count uint32, idx int) {
|
||||
defer wg.Done()
|
||||
|
||||
product, err := s.productService.GetProductBySku(ctx, sku)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- fmt.Errorf("productService.GetProductBySku: %w", err):
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
log.Error().Err(err).Msg("productService.GetProductBySku")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
resultCart.Items[idx] = &model.Item{
|
||||
Product: product,
|
||||
Count: count,
|
||||
}
|
||||
|
||||
sumMutex.Lock()
|
||||
resultCart.TotalPrice += uint32(product.Price) * count
|
||||
sumMutex.Unlock()
|
||||
}(sku, cart.ItemCount[sku], idx)
|
||||
products, err := s.productService.GetProducts(ctx, cart.Items)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
for _, product := range products {
|
||||
cnt := cart.ItemCount[product.Sku]
|
||||
|
||||
close(doneCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
cancel()
|
||||
|
||||
return nil, err
|
||||
case <-doneCh:
|
||||
slices.SortStableFunc(resultCart.Items, func(a, b *model.Item) int {
|
||||
return int(a.Product.Sku - b.Product.Sku)
|
||||
resultCart.Items = append(resultCart.Items, &model.Item{
|
||||
Product: product,
|
||||
Count: cnt,
|
||||
})
|
||||
|
||||
return resultCart, nil
|
||||
resultCart.TotalPrice += cnt * uint32(product.Price)
|
||||
}
|
||||
|
||||
slices.SortStableFunc(resultCart.Items, func(a, b *model.Item) int {
|
||||
return int(a.Product.Sku - b.Product.Sku)
|
||||
})
|
||||
|
||||
return resultCart, nil
|
||||
}
|
||||
|
||||
func (s *CartService) DeleteItem(ctx context.Context, userID entity.UID, sku entity.Sku) error {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/gojuno/minimock/v3"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/goleak"
|
||||
|
||||
"route256/cart/internal/domain/entity"
|
||||
"route256/cart/internal/domain/model"
|
||||
@@ -33,6 +34,21 @@ func (f *productServiceFake) GetProductBySku(_ context.Context, sku entity.Sku)
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (f *productServiceFake) GetProducts(ctx context.Context, skus []entity.Sku) ([]*model.Product, error) {
|
||||
res := make([]*model.Product, len(skus))
|
||||
|
||||
for i, sku := range skus {
|
||||
prod, err := f.GetProductBySku(ctx, sku)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res[i] = prod
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
type lomsServiceFake struct{}
|
||||
|
||||
func (f *lomsServiceFake) StocksInfo(_ context.Context, sku entity.Sku) (uint32, error) {
|
||||
@@ -51,6 +67,10 @@ func (f *lomsServiceFake) OrderCreate(_ context.Context, cart *model.Cart) (int6
|
||||
return 1234, nil
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
goleak.VerifyTestMain(m)
|
||||
}
|
||||
|
||||
func TestCartService_AddItem(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
||||
62
cart/internal/infra/errgroup/errgroup.go
Normal file
62
cart/internal/infra/errgroup/errgroup.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package errgroup
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type Group struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
wg sync.WaitGroup
|
||||
|
||||
firstErr chan error
|
||||
|
||||
// Used to limit number of goroutines (workers) for group.
|
||||
workerLimCh chan struct{}
|
||||
}
|
||||
|
||||
func WithContext(parent context.Context, workers int) (*Group, context.Context) {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
return &Group{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
firstErr: make(chan error, 1),
|
||||
workerLimCh: make(chan struct{}, workers),
|
||||
}, ctx
|
||||
}
|
||||
|
||||
func (g *Group) Go(f func(ctx context.Context) error) {
|
||||
// Wait for worker to be freed.
|
||||
g.workerLimCh <- struct{}{}
|
||||
|
||||
g.wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
g.wg.Done()
|
||||
|
||||
<-g.workerLimCh // release worker
|
||||
}()
|
||||
|
||||
if err := f(g.ctx); err != nil {
|
||||
select {
|
||||
case g.firstErr <- err:
|
||||
g.cancel()
|
||||
default: // got error from other goroutine
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (g *Group) Wait() error {
|
||||
g.wg.Wait()
|
||||
g.cancel()
|
||||
|
||||
select {
|
||||
case err := <-g.firstErr:
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user