package app import ( "context" "fmt" "net" "net/http" "os" "time" "github.com/IBM/sarama" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/jackc/pgx/v5/pgxpool" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/reflection" "route256/loms/internal/app/server" ordersRepository "route256/loms/internal/domain/repository/orders/sqlc" stocksRepository "route256/loms/internal/domain/repository/stocks/sqlc" "route256/loms/internal/domain/service" "route256/loms/internal/infra/config" "route256/loms/internal/infra/db/postgres" mw "route256/loms/internal/infra/grpc/middleware" "route256/loms/internal/infra/messaging/kafka" pb "route256/pkg/api/loms/v1" ) type App struct { config *config.Config controller *server.Server grpcServer *grpc.Server httpServer *http.Server gwConn *grpc.ClientConn } func NewApp(configPath string) (*App, error) { c, err := config.LoadConfig(configPath) if err != nil { return nil, fmt.Errorf("unable to load config: %w", err) } log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) zerolog.SetGlobalLevel(zerolog.InfoLevel) if c.Service.LogLevel != "" { level, logErr := zerolog.ParseLevel(c.Service.LogLevel) if logErr != nil { return nil, fmt.Errorf("unknown log level `%s` provided: %w", c.Service.LogLevel, logErr) } zerolog.SetGlobalLevel(level) } log.WithLevel(zerolog.GlobalLevel()).Msgf("using logging level=`%s`", zerolog.GlobalLevel().String()) masterPool, replicaPool, err := getPostgresPools(c) if err != nil { return nil, err } producer, err := setupSaramaAsyncConn([]string{c.Kafka.Brokers}) if err != nil { return nil, err } stockRepo := stocksRepository.NewStockRepository(masterPool, replicaPool) orderRepo := ordersRepository.NewOrderRepository(masterPool) txManager := postgres.NewTxManager(masterPool, replicaPool) kafkaProducer, err := kafka.NewStatusProducer(c.Kafka.OrderTopic, producer) if err != nil { return nil, err } service := service.NewLomsService(orderRepo, stockRepo, txManager, kafkaProducer) controller := server.NewServer(service) app := &App{ config: c, controller: controller, } return app, nil } func (app *App) Shutdown(ctx context.Context) (err error) { if app.httpServer != nil { err = app.httpServer.Shutdown(ctx) if err != nil { log.Error().Err(err).Msgf("failed http gateway server shutdown") } } done := make(chan struct{}) if app.grpcServer != nil { go func() { app.grpcServer.GracefulStop() close(done) }() } select { case <-done: case <-ctx.Done(): if app.grpcServer != nil { app.grpcServer.Stop() } } if app.gwConn != nil { err2 := app.gwConn.Close() if err2 != nil { err = err2 log.Error().Err(err).Msgf("failed gateway connection close") } } return err } func (app *App) ListenAndServe(ctx context.Context) error { grpcAddr := fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.GRPCPort) l, err := net.Listen("tcp", grpcAddr) if err != nil { return err } app.grpcServer = grpc.NewServer( grpc.ChainUnaryInterceptor( mw.WithTracing, mw.Logging, mw.WithMetrics, mw.Validate, ), ) reflection.Register(app.grpcServer) pb.RegisterLOMSServer(app.grpcServer, app.controller) go func() { if err = app.grpcServer.Serve(l); err != nil { log.Fatal().Err(err).Msg("failed to serve") } }() log.Info().Msgf("Serving grpc loms at grpc://%s", l.Addr()) // Setup HTTP gateway conn, err := grpc.NewClient( grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { return fmt.Errorf("grpc.NewClient: %w", err) } app.gwConn = conn gwmux := runtime.NewServeMux() if err = pb.RegisterLOMSHandler(ctx, gwmux, conn); err != nil { return fmt.Errorf("pb.RegisterLOMSHandler: %w", err) } root := http.NewServeMux() root.Handle("/metrics", promhttp.Handler()) root.Handle("/", gwmux) app.httpServer = &http.Server{ Addr: fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.HTTPPort), Handler: root, ReadTimeout: 10 * time.Second, } log.Info().Msgf("Serving http loms at http://%s", app.httpServer.Addr) return app.httpServer.ListenAndServe() } func getPostgresPools(c *config.Config) (masterPool, replicaPool *pgxpool.Pool, err error) { masterConn := fmt.Sprintf( "postgresql://%s:%s@%s:%s/%s?sslmode=disable", c.DatabaseMaster.User, c.DatabaseMaster.Password, c.DatabaseMaster.Host, c.DatabaseMaster.Port, c.DatabaseMaster.DBName, ) replicaConn := fmt.Sprintf( "postgresql://%s:%s@%s:%s/%s?sslmode=disable", c.DatabaseReplica.User, c.DatabaseReplica.Password, c.DatabaseReplica.Host, c.DatabaseReplica.Port, c.DatabaseReplica.DBName, ) pools, err := postgres.NewPools(context.Background(), masterConn, replicaConn) if err != nil { return nil, nil, err } if len(pools) != 2 { return nil, nil, fmt.Errorf("got wrong number of pools when establishing postgres connection") } return pools[0], pools[1], nil } func setupSaramaAsyncConn(brokers []string) (sarama.AsyncProducer, error) { cfg := sarama.NewConfig() cfg.Producer.RequiredAcks = sarama.WaitForAll cfg.Producer.Idempotent = true cfg.Producer.Return.Successes = true cfg.Producer.Retry.Max = 5 cfg.Net.MaxOpenRequests = 1 producer, err := sarama.NewAsyncProducer(brokers, cfg) if err != nil { return nil, fmt.Errorf("create async producer: %w", err) } return producer, nil }