mirror of
				https://github.com/3ybactuk/marketplace-go-service-project.git
				synced 2025-10-31 06:23:44 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			173 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			173 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package sqlc
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"errors"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/jackc/pgx/v5"
 | |
| 	"github.com/jackc/pgx/v5/pgxpool"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| 
 | |
| 	"route256/comments/internal/domain/entity"
 | |
| 	"route256/comments/internal/domain/model"
 | |
| )
 | |
| 
 | |
| type commentsRepo struct {
 | |
| 	shard1 *pgxpool.Pool
 | |
| 	shard2 *pgxpool.Pool
 | |
| 
 | |
| 	curID int64
 | |
| 	idMx  sync.Mutex
 | |
| }
 | |
| 
 | |
| func NewCommentsRepository(shard1, shard2 *pgxpool.Pool) *commentsRepo {
 | |
| 	return &commentsRepo{
 | |
| 		shard1: shard1,
 | |
| 		shard2: shard2,
 | |
| 		curID:  1,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *commentsRepo) pickShard(sku int64) (*pgxpool.Pool, int64) {
 | |
| 	if sku%2 == 0 {
 | |
| 		return r.shard1, 0
 | |
| 	}
 | |
| 
 | |
| 	return r.shard2, 1
 | |
| }
 | |
| 
 | |
| func mapComment(c *Comment) *entity.Comment {
 | |
| 	return &entity.Comment{
 | |
| 		ID:        c.ID,
 | |
| 		UserID:    c.UserID,
 | |
| 		SKU:       c.Sku,
 | |
| 		CreatedAt: c.CreatedAt.Time,
 | |
| 		Text:      c.Text,
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *commentsRepo) GetCommentByID(ctx context.Context, id int64) (*entity.Comment, error) {
 | |
| 	q1 := New(r.shard1)
 | |
| 	c, err := q1.GetCommentByID(ctx, id)
 | |
| 	switch {
 | |
| 	case err == nil:
 | |
| 		return mapComment(c), nil
 | |
| 	case errors.Is(err, pgx.ErrNoRows):
 | |
| 		log.Trace().Msgf("comment with id %d not found in shard 1", id)
 | |
| 	default:
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	q2 := New(r.shard2)
 | |
| 	c2, err2 := q2.GetCommentByID(ctx, id)
 | |
| 	switch {
 | |
| 	case err2 == nil:
 | |
| 		return mapComment(c2), nil
 | |
| 	case errors.Is(err2, pgx.ErrNoRows):
 | |
| 		return nil, model.ErrCommentNotFound
 | |
| 	default:
 | |
| 		return nil, err2
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (r *commentsRepo) InsertComment(ctx context.Context, comment *entity.Comment) (*entity.Comment, error) {
 | |
| 	shard, shardID := r.pickShard(comment.SKU)
 | |
| 	q := New(shard)
 | |
| 
 | |
| 	r.idMx.Lock()
 | |
| 	id := r.curID*1000 + shardID
 | |
| 
 | |
| 	req := &InsertCommentParams{
 | |
| 		UserID: comment.UserID,
 | |
| 		Sku:    comment.SKU,
 | |
| 		Text:   comment.Text,
 | |
| 		ID:     id,
 | |
| 	}
 | |
| 
 | |
| 	c, err := q.InsertComment(ctx, req)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	r.curID++
 | |
| 	r.idMx.Unlock()
 | |
| 
 | |
| 	return mapComment(c), nil
 | |
| }
 | |
| 
 | |
| func (r *commentsRepo) ListCommentsBySku(ctx context.Context, sku int64) ([]*entity.Comment, error) {
 | |
| 	shard, _ := r.pickShard(sku)
 | |
| 	q := New(shard)
 | |
| 
 | |
| 	list, err := q.ListCommentsBySku(ctx, sku)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	out := make([]*entity.Comment, 0, len(list))
 | |
| 	for _, com := range list {
 | |
| 		out = append(out, mapComment(com))
 | |
| 	}
 | |
| 
 | |
| 	return out, nil
 | |
| }
 | |
| 
 | |
| func (r *commentsRepo) ListCommentsByUser(ctx context.Context, userID int64) ([]*entity.Comment, error) {
 | |
| 	q1 := New(r.shard1)
 | |
| 	l1, err1 := q1.ListCommentsByUser(ctx, userID)
 | |
| 	if err1 != nil {
 | |
| 		return nil, err1
 | |
| 	}
 | |
| 
 | |
| 	q2 := New(r.shard2)
 | |
| 	l2, err2 := q2.ListCommentsByUser(ctx, userID)
 | |
| 	if err2 != nil {
 | |
| 		return nil, err2
 | |
| 	}
 | |
| 
 | |
| 	alreadyMerged := make(map[int64]struct{}, len(l1)+len(l2))
 | |
| 	merged := make([]*entity.Comment, 0, len(l1)+len(l2))
 | |
| 	for _, com := range l1 {
 | |
| 		merged = append(merged, mapComment(com))
 | |
| 		alreadyMerged[com.Sku] = struct{}{}
 | |
| 	}
 | |
| 	for _, com := range l2 {
 | |
| 		if _, ok := alreadyMerged[com.Sku]; !ok {
 | |
| 			merged = append(merged, mapComment(com))
 | |
| 			alreadyMerged[com.Sku] = struct{}{}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return merged, nil
 | |
| }
 | |
| 
 | |
| func (r *commentsRepo) UpdateComment(ctx context.Context, comment *entity.Comment) (*entity.Comment, error) {
 | |
| 	req := &UpdateCommentParams{
 | |
| 		ID:   comment.ID,
 | |
| 		Text: comment.Text,
 | |
| 	}
 | |
| 
 | |
| 	q1 := New(r.shard1)
 | |
| 	c, err := q1.UpdateComment(ctx, req)
 | |
| 	switch {
 | |
| 	case err == nil:
 | |
| 		return mapComment(c), nil
 | |
| 	case errors.Is(err, pgx.ErrNoRows):
 | |
| 		log.Trace().Msgf("comment with id %d not found in shard 1", req.ID)
 | |
| 	default:
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	q2 := New(r.shard2)
 | |
| 	c2, err2 := q2.UpdateComment(ctx, req)
 | |
| 	switch {
 | |
| 	case err2 == nil:
 | |
| 		return mapComment(c2), nil
 | |
| 	case errors.Is(err2, pgx.ErrNoRows):
 | |
| 		return nil, model.ErrCommentNotFound
 | |
| 	default:
 | |
| 		return nil, err2
 | |
| 	}
 | |
| }
 | 
