mirror of
https://github.com/3ybactuk/marketplace-go-service-project.git
synced 2025-10-30 14:03:45 +03:00
[hw-8] add: repo layer
This commit is contained in:
87
comments/infra/db/postgres/tx.go
Normal file
87
comments/infra/db/postgres/tx.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package postgres
|
||||
|
||||
// From https://gitlab.ozon.dev/go/classroom-18/students/week-4-workshop/-/blob/master/internal/infra/postgres/tx.go
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
// Tx транзакция.
|
||||
type Tx pgx.Tx
|
||||
|
||||
type txKey struct{}
|
||||
|
||||
func ctxWithTx(ctx context.Context, tx pgx.Tx) context.Context {
|
||||
return context.WithValue(ctx, txKey{}, tx)
|
||||
}
|
||||
|
||||
func TxFromCtx(ctx context.Context) (pgx.Tx, bool) {
|
||||
tx, ok := ctx.Value(txKey{}).(pgx.Tx)
|
||||
|
||||
return tx, ok
|
||||
}
|
||||
|
||||
type TxManager struct {
|
||||
write *pgxpool.Pool
|
||||
read *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewTxManager(write, read *pgxpool.Pool) *TxManager {
|
||||
return &TxManager{
|
||||
write: write,
|
||||
read: read,
|
||||
}
|
||||
}
|
||||
|
||||
// WithTransaction выполняет fn в транзакции с дефолтным уровнем изоляции.
|
||||
func (m *TxManager) WriteWithTransaction(ctx context.Context, fn func(ctx context.Context) error) (err error) {
|
||||
return m.withTx(ctx, m.write, pgx.TxOptions{}, fn)
|
||||
}
|
||||
|
||||
func (m *TxManager) ReadWithTransaction(ctx context.Context, fn func(ctx context.Context) error) (err error) {
|
||||
return m.withTx(ctx, m.read, pgx.TxOptions{}, fn)
|
||||
}
|
||||
|
||||
// WithTransaction выполняет fn в транзакции с уровнем изоляции RepeatableRead.
|
||||
func (m *TxManager) WriteWithRepeatableRead(ctx context.Context, fn func(ctx context.Context) error) (err error) {
|
||||
return m.withTx(ctx, m.write, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, fn)
|
||||
}
|
||||
|
||||
func (m *TxManager) ReadWithRepeatableRead(ctx context.Context, fn func(ctx context.Context) error) (err error) {
|
||||
return m.withTx(ctx, m.read, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, fn)
|
||||
}
|
||||
|
||||
// WithTx выполняет fn в транзакции.
|
||||
func (m *TxManager) withTx(ctx context.Context, pool *pgxpool.Pool, options pgx.TxOptions, fn func(ctx context.Context) error) (err error) {
|
||||
var span opentracing.Span
|
||||
span, ctx = opentracing.StartSpanFromContext(ctx, "Transaction")
|
||||
defer span.Finish()
|
||||
|
||||
tx, err := pool.BeginTx(ctx, options)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ctx = ctxWithTx(ctx, tx)
|
||||
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
// a panic occurred, rollback and repanic
|
||||
_ = tx.Rollback(ctx)
|
||||
panic(p)
|
||||
} else if err != nil {
|
||||
// something went wrong, rollback
|
||||
_ = tx.Rollback(ctx)
|
||||
} else {
|
||||
// all good, commit
|
||||
err = tx.Commit(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
err = fn(ctx)
|
||||
|
||||
return
|
||||
}
|
||||
32
comments/infra/repository/sqlc/db.go
Normal file
32
comments/infra/repository/sqlc/db.go
Normal file
@@ -0,0 +1,32 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgconn"
|
||||
)
|
||||
|
||||
type DBTX interface {
|
||||
Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
|
||||
Query(context.Context, string, ...interface{}) (pgx.Rows, error)
|
||||
QueryRow(context.Context, string, ...interface{}) pgx.Row
|
||||
}
|
||||
|
||||
func New(db DBTX) *Queries {
|
||||
return &Queries{db: db}
|
||||
}
|
||||
|
||||
type Queries struct {
|
||||
db DBTX
|
||||
}
|
||||
|
||||
func (q *Queries) WithTx(tx pgx.Tx) *Queries {
|
||||
return &Queries{
|
||||
db: tx,
|
||||
}
|
||||
}
|
||||
17
comments/infra/repository/sqlc/models.go
Normal file
17
comments/infra/repository/sqlc/models.go
Normal file
@@ -0,0 +1,17 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"github.com/jackc/pgx/v5/pgtype"
|
||||
)
|
||||
|
||||
type Comment struct {
|
||||
ID int64
|
||||
UserID int64
|
||||
Sku int64
|
||||
Text string
|
||||
CreatedAt pgtype.Timestamp
|
||||
}
|
||||
19
comments/infra/repository/sqlc/querier.go
Normal file
19
comments/infra/repository/sqlc/querier.go
Normal file
@@ -0,0 +1,19 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type Querier interface {
|
||||
GetCommentByID(ctx context.Context, id int64) (*Comment, error)
|
||||
InsertComment(ctx context.Context, arg *InsertCommentParams) (*Comment, error)
|
||||
ListCommentsBySku(ctx context.Context, sku int64) ([]*Comment, error)
|
||||
ListCommentsByUser(ctx context.Context, userID int64) ([]*Comment, error)
|
||||
UpdateComment(ctx context.Context, arg *UpdateCommentParams) (*Comment, error)
|
||||
}
|
||||
|
||||
var _ Querier = (*Queries)(nil)
|
||||
24
comments/infra/repository/sqlc/query.sql
Normal file
24
comments/infra/repository/sqlc/query.sql
Normal file
@@ -0,0 +1,24 @@
|
||||
-- name: InsertComment :one
|
||||
INSERT INTO comments (user_id, sku, text) VALUES ($1, $2, $3)
|
||||
RETURNING id, user_id, sku, text, created_at;
|
||||
|
||||
-- name: GetCommentByID :one
|
||||
SELECT id, user_id, sku, text, created_at FROM comments WHERE id = $1;
|
||||
|
||||
-- name: UpdateComment :one
|
||||
UPDATE comments
|
||||
SET text = $2
|
||||
WHERE id = $1
|
||||
RETURNING id, user_id, sku, text, created_at;
|
||||
|
||||
-- name: ListCommentsBySku :many
|
||||
SELECT id, user_id, sku, text, created_at
|
||||
FROM comments
|
||||
WHERE sku = $1
|
||||
ORDER BY created_at DESC, user_id ASC;
|
||||
|
||||
-- name: ListCommentsByUser :many
|
||||
SELECT id, user_id, sku, text, created_at
|
||||
FROM comments
|
||||
WHERE user_id = $1
|
||||
ORDER BY created_at DESC;
|
||||
142
comments/infra/repository/sqlc/query.sql.go
Normal file
142
comments/infra/repository/sqlc/query.sql.go
Normal file
@@ -0,0 +1,142 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.29.0
|
||||
// source: query.sql
|
||||
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
const getCommentByID = `-- name: GetCommentByID :one
|
||||
SELECT id, user_id, sku, text, created_at FROM comments WHERE id = $1
|
||||
`
|
||||
|
||||
func (q *Queries) GetCommentByID(ctx context.Context, id int64) (*Comment, error) {
|
||||
row := q.db.QueryRow(ctx, getCommentByID, id)
|
||||
var i Comment
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.UserID,
|
||||
&i.Sku,
|
||||
&i.Text,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const insertComment = `-- name: InsertComment :one
|
||||
INSERT INTO comments (user_id, sku, text) VALUES ($1, $2, $3)
|
||||
RETURNING id, user_id, sku, text, created_at
|
||||
`
|
||||
|
||||
type InsertCommentParams struct {
|
||||
UserID int64
|
||||
Sku int64
|
||||
Text string
|
||||
}
|
||||
|
||||
func (q *Queries) InsertComment(ctx context.Context, arg *InsertCommentParams) (*Comment, error) {
|
||||
row := q.db.QueryRow(ctx, insertComment, arg.UserID, arg.Sku, arg.Text)
|
||||
var i Comment
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.UserID,
|
||||
&i.Sku,
|
||||
&i.Text,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return &i, err
|
||||
}
|
||||
|
||||
const listCommentsBySku = `-- name: ListCommentsBySku :many
|
||||
SELECT id, user_id, sku, text, created_at
|
||||
FROM comments
|
||||
WHERE sku = $1
|
||||
ORDER BY created_at DESC, user_id ASC
|
||||
`
|
||||
|
||||
func (q *Queries) ListCommentsBySku(ctx context.Context, sku int64) ([]*Comment, error) {
|
||||
rows, err := q.db.Query(ctx, listCommentsBySku, sku)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []*Comment
|
||||
for rows.Next() {
|
||||
var i Comment
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.UserID,
|
||||
&i.Sku,
|
||||
&i.Text,
|
||||
&i.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, &i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const listCommentsByUser = `-- name: ListCommentsByUser :many
|
||||
SELECT id, user_id, sku, text, created_at
|
||||
FROM comments
|
||||
WHERE user_id = $1
|
||||
ORDER BY created_at DESC
|
||||
`
|
||||
|
||||
func (q *Queries) ListCommentsByUser(ctx context.Context, userID int64) ([]*Comment, error) {
|
||||
rows, err := q.db.Query(ctx, listCommentsByUser, userID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var items []*Comment
|
||||
for rows.Next() {
|
||||
var i Comment
|
||||
if err := rows.Scan(
|
||||
&i.ID,
|
||||
&i.UserID,
|
||||
&i.Sku,
|
||||
&i.Text,
|
||||
&i.CreatedAt,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, &i)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
const updateComment = `-- name: UpdateComment :one
|
||||
UPDATE comments
|
||||
SET text = $2
|
||||
WHERE id = $1
|
||||
RETURNING id, user_id, sku, text, created_at
|
||||
`
|
||||
|
||||
type UpdateCommentParams struct {
|
||||
ID int64
|
||||
Text string
|
||||
}
|
||||
|
||||
func (q *Queries) UpdateComment(ctx context.Context, arg *UpdateCommentParams) (*Comment, error) {
|
||||
row := q.db.QueryRow(ctx, updateComment, arg.ID, arg.Text)
|
||||
var i Comment
|
||||
err := row.Scan(
|
||||
&i.ID,
|
||||
&i.UserID,
|
||||
&i.Sku,
|
||||
&i.Text,
|
||||
&i.CreatedAt,
|
||||
)
|
||||
return &i, err
|
||||
}
|
||||
148
comments/infra/repository/sqlc/repository.go
Normal file
148
comments/infra/repository/sqlc/repository.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package sqlc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"route256/comments/internal/domain/entity"
|
||||
"route256/comments/internal/domain/model"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type commentsRepo struct {
|
||||
shard1 *pgxpool.Pool
|
||||
shard2 *pgxpool.Pool
|
||||
}
|
||||
|
||||
func NewCommentsRepository(shard1, shard2 *pgxpool.Pool) *commentsRepo {
|
||||
return &commentsRepo{
|
||||
shard1: shard1,
|
||||
shard2: shard2,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *commentsRepo) pickShard(sku int64) *pgxpool.Pool {
|
||||
if sku%2 == 0 {
|
||||
return r.shard1
|
||||
}
|
||||
|
||||
return r.shard2
|
||||
}
|
||||
|
||||
func (r *commentsRepo) GetCommentByID(ctx context.Context, id int64) (*Comment, error) {
|
||||
q1 := New(r.shard1)
|
||||
c, err := q1.GetCommentByID(ctx, id)
|
||||
switch {
|
||||
case err == nil:
|
||||
return c, nil
|
||||
case errors.Is(err, pgx.ErrNoRows):
|
||||
log.Trace().Msgf("comment with id %d not found in shard 1", id)
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q2 := New(r.shard2)
|
||||
c2, err2 := q2.GetCommentByID(ctx, id)
|
||||
switch {
|
||||
case err2 == nil:
|
||||
return c2, nil
|
||||
case errors.Is(err2, pgx.ErrNoRows):
|
||||
return nil, model.ErrCommentNotFound
|
||||
default:
|
||||
return nil, err2
|
||||
}
|
||||
}
|
||||
|
||||
func (r *commentsRepo) InsertComment(ctx context.Context, comment *entity.Comment) (*Comment, error) {
|
||||
shard := r.pickShard(comment.SKU)
|
||||
q := New(shard)
|
||||
|
||||
req := &InsertCommentParams{
|
||||
UserID: comment.UserID,
|
||||
Sku: comment.SKU,
|
||||
Text: comment.Text,
|
||||
}
|
||||
|
||||
c, err := q.InsertComment(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (r *commentsRepo) ListCommentsBySku(ctx context.Context, sku int64) ([]*Comment, error) {
|
||||
shard := r.pickShard(sku)
|
||||
q := New(shard)
|
||||
|
||||
list, err := q.ListCommentsBySku(ctx, sku)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make([]*Comment, len(list))
|
||||
copy(out, list)
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (r *commentsRepo) ListCommentsByUser(ctx context.Context, userID int64) ([]*Comment, error) {
|
||||
q1 := New(r.shard1)
|
||||
l1, err1 := q1.ListCommentsByUser(ctx, userID)
|
||||
if err1 != nil {
|
||||
return nil, err1
|
||||
}
|
||||
|
||||
q2 := New(r.shard2)
|
||||
l2, err2 := q2.ListCommentsByUser(ctx, userID)
|
||||
if err2 != nil {
|
||||
return nil, err2
|
||||
}
|
||||
|
||||
merged := make([]*Comment, 0, len(l1)+len(l2))
|
||||
merged = append(merged, l1...)
|
||||
merged = append(merged, l2...)
|
||||
|
||||
sort.Slice(merged, func(i, j int) bool {
|
||||
if merged[i].CreatedAt.Time.Equal(merged[j].CreatedAt.Time) {
|
||||
return merged[i].UserID < merged[j].UserID
|
||||
}
|
||||
|
||||
return merged[i].CreatedAt.Time.After(merged[j].CreatedAt.Time)
|
||||
})
|
||||
|
||||
return merged, nil
|
||||
}
|
||||
|
||||
func (r *commentsRepo) UpdateComment(ctx context.Context, comment *entity.Comment) (*Comment, error) {
|
||||
req := &UpdateCommentParams{
|
||||
ID: comment.ID,
|
||||
Text: comment.Text,
|
||||
}
|
||||
|
||||
q1 := New(r.shard1)
|
||||
c, err := q1.UpdateComment(ctx, req)
|
||||
switch {
|
||||
case err == nil:
|
||||
return c, nil
|
||||
case errors.Is(err, pgx.ErrNoRows):
|
||||
log.Trace().Msgf("comment with id %d not found in shard 1", req.ID)
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q2 := New(r.shard2)
|
||||
c2, err2 := q2.UpdateComment(ctx, req)
|
||||
switch {
|
||||
case err2 == nil:
|
||||
return c2, nil
|
||||
case errors.Is(err2, pgx.ErrNoRows):
|
||||
return nil, model.ErrCommentNotFound
|
||||
default:
|
||||
return nil, err2
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user