mirror of
https://github.com/3ybactuk/marketplace-go-service-project.git
synced 2025-10-30 14:03:45 +03:00
233 lines
5.5 KiB
Go
233 lines
5.5 KiB
Go
package app
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/IBM/sarama"
|
|
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
"github.com/rs/zerolog"
|
|
"github.com/rs/zerolog/log"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/reflection"
|
|
|
|
"route256/loms/internal/app/server"
|
|
ordersRepository "route256/loms/internal/domain/repository/orders/sqlc"
|
|
stocksRepository "route256/loms/internal/domain/repository/stocks/sqlc"
|
|
"route256/loms/internal/domain/service"
|
|
"route256/loms/internal/infra/config"
|
|
"route256/loms/internal/infra/db/postgres"
|
|
mw "route256/loms/internal/infra/grpc/middleware"
|
|
"route256/loms/internal/infra/messaging/kafka"
|
|
|
|
pb "route256/pkg/api/loms/v1"
|
|
)
|
|
|
|
type App struct {
|
|
config *config.Config
|
|
controller *server.Server
|
|
|
|
grpcServer *grpc.Server
|
|
httpServer *http.Server
|
|
gwConn *grpc.ClientConn
|
|
}
|
|
|
|
func NewApp(configPath string) (*App, error) {
|
|
c, err := config.LoadConfig(configPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to load config: %w", err)
|
|
}
|
|
|
|
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
|
|
zerolog.SetGlobalLevel(zerolog.InfoLevel)
|
|
|
|
if c.Service.LogLevel != "" {
|
|
level, logErr := zerolog.ParseLevel(c.Service.LogLevel)
|
|
if logErr != nil {
|
|
return nil, fmt.Errorf("unknown log level `%s` provided: %w", c.Service.LogLevel, logErr)
|
|
}
|
|
|
|
zerolog.SetGlobalLevel(level)
|
|
}
|
|
|
|
log.WithLevel(zerolog.GlobalLevel()).Msgf("using logging level=`%s`", zerolog.GlobalLevel().String())
|
|
|
|
masterPool, replicaPool, err := getPostgresPools(c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
producer, err := setupSaramaAsyncConn([]string{c.Kafka.Brokers})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
stockRepo := stocksRepository.NewStockRepository(masterPool, replicaPool)
|
|
orderRepo := ordersRepository.NewOrderRepository(masterPool)
|
|
txManager := postgres.NewTxManager(masterPool, replicaPool)
|
|
kafkaProducer, err := kafka.NewStatusProducer(c.Kafka.OrderTopic, producer)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
service := service.NewLomsService(orderRepo, stockRepo, txManager, kafkaProducer)
|
|
controller := server.NewServer(service)
|
|
|
|
app := &App{
|
|
config: c,
|
|
controller: controller,
|
|
}
|
|
|
|
return app, nil
|
|
}
|
|
|
|
func (app *App) Shutdown(ctx context.Context) (err error) {
|
|
if app.httpServer != nil {
|
|
err = app.httpServer.Shutdown(ctx)
|
|
if err != nil {
|
|
log.Error().Err(err).Msgf("failed http gateway server shutdown")
|
|
}
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
if app.grpcServer != nil {
|
|
go func() {
|
|
app.grpcServer.GracefulStop()
|
|
|
|
close(done)
|
|
}()
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
case <-ctx.Done():
|
|
if app.grpcServer != nil {
|
|
app.grpcServer.Stop()
|
|
}
|
|
}
|
|
|
|
if app.gwConn != nil {
|
|
err2 := app.gwConn.Close()
|
|
|
|
if err2 != nil {
|
|
err = err2
|
|
|
|
log.Error().Err(err).Msgf("failed gateway connection close")
|
|
}
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func (app *App) ListenAndServe(ctx context.Context) error {
|
|
grpcAddr := fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.GRPCPort)
|
|
l, err := net.Listen("tcp", grpcAddr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
app.grpcServer = grpc.NewServer(
|
|
grpc.ChainUnaryInterceptor(
|
|
mw.WithTracing,
|
|
mw.Logging,
|
|
mw.WithMetrics,
|
|
mw.Validate,
|
|
),
|
|
)
|
|
reflection.Register(app.grpcServer)
|
|
|
|
pb.RegisterLOMSServer(app.grpcServer, app.controller)
|
|
|
|
go func() {
|
|
if err = app.grpcServer.Serve(l); err != nil {
|
|
log.Fatal().Err(err).Msg("failed to serve")
|
|
}
|
|
}()
|
|
|
|
log.Info().Msgf("Serving grpc loms at grpc://%s", l.Addr())
|
|
|
|
// Setup HTTP gateway
|
|
|
|
conn, err := grpc.NewClient(
|
|
grpcAddr,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("grpc.NewClient: %w", err)
|
|
}
|
|
app.gwConn = conn
|
|
|
|
gwmux := runtime.NewServeMux()
|
|
if err = pb.RegisterLOMSHandler(ctx, gwmux, conn); err != nil {
|
|
return fmt.Errorf("pb.RegisterLOMSHandler: %w", err)
|
|
}
|
|
|
|
root := http.NewServeMux()
|
|
root.Handle("/metrics", promhttp.Handler())
|
|
root.Handle("/", gwmux)
|
|
|
|
app.httpServer = &http.Server{
|
|
Addr: fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.HTTPPort),
|
|
Handler: root,
|
|
ReadTimeout: 10 * time.Second,
|
|
}
|
|
|
|
log.Info().Msgf("Serving http loms at http://%s", app.httpServer.Addr)
|
|
|
|
return app.httpServer.ListenAndServe()
|
|
}
|
|
|
|
func getPostgresPools(c *config.Config) (masterPool, replicaPool *pgxpool.Pool, err error) {
|
|
masterConn := fmt.Sprintf(
|
|
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
|
|
c.DatabaseMaster.User,
|
|
c.DatabaseMaster.Password,
|
|
c.DatabaseMaster.Host,
|
|
c.DatabaseMaster.Port,
|
|
c.DatabaseMaster.DBName,
|
|
)
|
|
|
|
replicaConn := fmt.Sprintf(
|
|
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
|
|
c.DatabaseReplica.User,
|
|
c.DatabaseReplica.Password,
|
|
c.DatabaseReplica.Host,
|
|
c.DatabaseReplica.Port,
|
|
c.DatabaseReplica.DBName,
|
|
)
|
|
|
|
pools, err := postgres.NewPools(context.Background(), masterConn, replicaConn)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
if len(pools) != 2 {
|
|
return nil, nil, fmt.Errorf("got wrong number of pools when establishing postgres connection")
|
|
}
|
|
|
|
return pools[0], pools[1], nil
|
|
}
|
|
|
|
func setupSaramaAsyncConn(brokers []string) (sarama.AsyncProducer, error) {
|
|
cfg := sarama.NewConfig()
|
|
cfg.Producer.RequiredAcks = sarama.WaitForAll
|
|
cfg.Producer.Idempotent = true
|
|
cfg.Producer.Return.Successes = true
|
|
cfg.Producer.Retry.Max = 5
|
|
cfg.Net.MaxOpenRequests = 1
|
|
|
|
producer, err := sarama.NewAsyncProducer(brokers, cfg)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create async producer: %w", err)
|
|
}
|
|
|
|
return producer, nil
|
|
}
|