package sqlc import ( "context" "errors" "sync" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog/log" "route256/comments/internal/domain/entity" "route256/comments/internal/domain/model" ) type commentsRepo struct { shard1 *pgxpool.Pool shard2 *pgxpool.Pool curID int64 idMx sync.Mutex } func NewCommentsRepository(shard1, shard2 *pgxpool.Pool) *commentsRepo { return &commentsRepo{ shard1: shard1, shard2: shard2, curID: 1, } } func (r *commentsRepo) pickShard(sku int64) (*pgxpool.Pool, int64) { if sku%2 == 0 { return r.shard1, 0 } return r.shard2, 1 } func mapComment(c *Comment) *entity.Comment { return &entity.Comment{ ID: c.ID, UserID: c.UserID, SKU: c.Sku, CreatedAt: c.CreatedAt.Time, Text: c.Text, } } func (r *commentsRepo) GetCommentByID(ctx context.Context, id int64) (*entity.Comment, error) { q1 := New(r.shard1) c, err := q1.GetCommentByID(ctx, id) switch { case err == nil: return mapComment(c), nil case errors.Is(err, pgx.ErrNoRows): log.Trace().Msgf("comment with id %d not found in shard 1", id) default: return nil, err } q2 := New(r.shard2) c2, err2 := q2.GetCommentByID(ctx, id) switch { case err2 == nil: return mapComment(c2), nil case errors.Is(err2, pgx.ErrNoRows): return nil, model.ErrCommentNotFound default: return nil, err2 } } func (r *commentsRepo) InsertComment(ctx context.Context, comment *entity.Comment) (*entity.Comment, error) { shard, shardID := r.pickShard(comment.SKU) q := New(shard) r.idMx.Lock() id := r.curID*1000 + shardID r.curID++ r.idMx.Unlock() req := &InsertCommentParams{ UserID: comment.UserID, Sku: comment.SKU, Text: comment.Text, ID: id, } c, err := q.InsertComment(ctx, req) if err != nil { return nil, err } return mapComment(c), nil } func (r *commentsRepo) ListCommentsBySku(ctx context.Context, sku int64) ([]*entity.Comment, error) { shard, _ := r.pickShard(sku) q := New(shard) list, err := q.ListCommentsBySku(ctx, sku) if err != nil { return nil, err } out := make([]*entity.Comment, 0, len(list)) for _, com := range list { out = append(out, mapComment(com)) } return out, nil } func (r *commentsRepo) ListCommentsByUser(ctx context.Context, userID int64) ([]*entity.Comment, error) { q1 := New(r.shard1) l1, err1 := q1.ListCommentsByUser(ctx, userID) if err1 != nil { return nil, err1 } q2 := New(r.shard2) l2, err2 := q2.ListCommentsByUser(ctx, userID) if err2 != nil { return nil, err2 } merged := make([]*entity.Comment, 0, len(l1)+len(l2)) for _, com := range l1 { merged = append(merged, mapComment(com)) } for _, com := range l2 { merged = append(merged, mapComment(com)) } return merged, nil } func (r *commentsRepo) UpdateComment(ctx context.Context, comment *entity.Comment) (*entity.Comment, error) { req := &UpdateCommentParams{ ID: comment.ID, Text: comment.Text, } q1 := New(r.shard1) c, err := q1.UpdateComment(ctx, req) switch { case err == nil: return mapComment(c), nil case errors.Is(err, pgx.ErrNoRows): log.Trace().Msgf("comment with id %d not found in shard 1", req.ID) default: return nil, err } q2 := New(r.shard2) c2, err2 := q2.UpdateComment(ctx, req) switch { case err2 == nil: return mapComment(c2), nil case errors.Is(err2, pgx.ErrNoRows): return nil, model.ErrCommentNotFound default: return nil, err2 } }