mirror of
				https://github.com/3ybactuk/marketplace-go-service-project.git
				synced 2025-10-31 06:23:44 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			233 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			233 lines
		
	
	
		
			5.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package app
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"net"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/IBM/sarama"
 | |
| 	"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
 | |
| 	"github.com/jackc/pgx/v5/pgxpool"
 | |
| 	"github.com/prometheus/client_golang/prometheus/promhttp"
 | |
| 	"github.com/rs/zerolog"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| 	"google.golang.org/grpc"
 | |
| 	"google.golang.org/grpc/credentials/insecure"
 | |
| 	"google.golang.org/grpc/reflection"
 | |
| 
 | |
| 	"route256/loms/internal/app/server"
 | |
| 	ordersRepository "route256/loms/internal/domain/repository/orders/sqlc"
 | |
| 	stocksRepository "route256/loms/internal/domain/repository/stocks/sqlc"
 | |
| 	"route256/loms/internal/domain/service"
 | |
| 	"route256/loms/internal/infra/config"
 | |
| 	"route256/loms/internal/infra/db/postgres"
 | |
| 	mw "route256/loms/internal/infra/grpc/middleware"
 | |
| 	"route256/loms/internal/infra/messaging/kafka"
 | |
| 
 | |
| 	pb "route256/pkg/api/loms/v1"
 | |
| )
 | |
| 
 | |
| type App struct {
 | |
| 	config     *config.Config
 | |
| 	controller *server.Server
 | |
| 
 | |
| 	grpcServer *grpc.Server
 | |
| 	httpServer *http.Server
 | |
| 	gwConn     *grpc.ClientConn
 | |
| }
 | |
| 
 | |
| func NewApp(configPath string) (*App, error) {
 | |
| 	c, err := config.LoadConfig(configPath)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("unable to load config: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
 | |
| 	zerolog.SetGlobalLevel(zerolog.InfoLevel)
 | |
| 
 | |
| 	if c.Service.LogLevel != "" {
 | |
| 		level, logErr := zerolog.ParseLevel(c.Service.LogLevel)
 | |
| 		if logErr != nil {
 | |
| 			return nil, fmt.Errorf("unknown log level `%s` provided: %w", c.Service.LogLevel, logErr)
 | |
| 		}
 | |
| 
 | |
| 		zerolog.SetGlobalLevel(level)
 | |
| 	}
 | |
| 
 | |
| 	log.WithLevel(zerolog.GlobalLevel()).Msgf("using logging level=`%s`", zerolog.GlobalLevel().String())
 | |
| 
 | |
| 	masterPool, replicaPool, err := getPostgresPools(c)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	producer, err := setupSaramaAsyncConn([]string{c.Kafka.Brokers})
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	stockRepo := stocksRepository.NewStockRepository(masterPool, replicaPool)
 | |
| 	orderRepo := ordersRepository.NewOrderRepository(masterPool)
 | |
| 	txManager := postgres.NewTxManager(masterPool, replicaPool)
 | |
| 	kafkaProducer, err := kafka.NewStatusProducer(c.Kafka.OrderTopic, producer)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	service := service.NewLomsService(orderRepo, stockRepo, txManager, kafkaProducer)
 | |
| 	controller := server.NewServer(service)
 | |
| 
 | |
| 	app := &App{
 | |
| 		config:     c,
 | |
| 		controller: controller,
 | |
| 	}
 | |
| 
 | |
| 	return app, nil
 | |
| }
 | |
| 
 | |
| func (app *App) Shutdown(ctx context.Context) (err error) {
 | |
| 	if app.httpServer != nil {
 | |
| 		err = app.httpServer.Shutdown(ctx)
 | |
| 		if err != nil {
 | |
| 			log.Error().Err(err).Msgf("failed http gateway server shutdown")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	done := make(chan struct{})
 | |
| 	if app.grpcServer != nil {
 | |
| 		go func() {
 | |
| 			app.grpcServer.GracefulStop()
 | |
| 
 | |
| 			close(done)
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	select {
 | |
| 	case <-done:
 | |
| 	case <-ctx.Done():
 | |
| 		if app.grpcServer != nil {
 | |
| 			app.grpcServer.Stop()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if app.gwConn != nil {
 | |
| 		err2 := app.gwConn.Close()
 | |
| 
 | |
| 		if err2 != nil {
 | |
| 			err = err2
 | |
| 
 | |
| 			log.Error().Err(err).Msgf("failed gateway connection close")
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return err
 | |
| }
 | |
| 
 | |
| func (app *App) ListenAndServe(ctx context.Context) error {
 | |
| 	grpcAddr := fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.GRPCPort)
 | |
| 	l, err := net.Listen("tcp", grpcAddr)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	app.grpcServer = grpc.NewServer(
 | |
| 		grpc.ChainUnaryInterceptor(
 | |
| 			mw.WithTracing,
 | |
| 			mw.Logging,
 | |
| 			mw.WithMetrics,
 | |
| 			mw.Validate,
 | |
| 		),
 | |
| 	)
 | |
| 	reflection.Register(app.grpcServer)
 | |
| 
 | |
| 	pb.RegisterLOMSServer(app.grpcServer, app.controller)
 | |
| 
 | |
| 	go func() {
 | |
| 		if err = app.grpcServer.Serve(l); err != nil {
 | |
| 			log.Fatal().Err(err).Msg("failed to serve")
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	log.Info().Msgf("Serving grpc loms at grpc://%s", l.Addr())
 | |
| 
 | |
| 	// Setup HTTP gateway
 | |
| 
 | |
| 	conn, err := grpc.NewClient(
 | |
| 		grpcAddr,
 | |
| 		grpc.WithTransportCredentials(insecure.NewCredentials()),
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return fmt.Errorf("grpc.NewClient: %w", err)
 | |
| 	}
 | |
| 	app.gwConn = conn
 | |
| 
 | |
| 	gwmux := runtime.NewServeMux()
 | |
| 	if err = pb.RegisterLOMSHandler(ctx, gwmux, conn); err != nil {
 | |
| 		return fmt.Errorf("pb.RegisterLOMSHandler: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	root := http.NewServeMux()
 | |
| 	root.Handle("/metrics", promhttp.Handler())
 | |
| 	root.Handle("/", gwmux)
 | |
| 
 | |
| 	app.httpServer = &http.Server{
 | |
| 		Addr:        fmt.Sprintf("%s:%s", app.config.Service.Host, app.config.Service.HTTPPort),
 | |
| 		Handler:     root,
 | |
| 		ReadTimeout: 10 * time.Second,
 | |
| 	}
 | |
| 
 | |
| 	log.Info().Msgf("Serving http loms at http://%s", app.httpServer.Addr)
 | |
| 
 | |
| 	return app.httpServer.ListenAndServe()
 | |
| }
 | |
| 
 | |
| func getPostgresPools(c *config.Config) (masterPool, replicaPool *pgxpool.Pool, err error) {
 | |
| 	masterConn := fmt.Sprintf(
 | |
| 		"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
 | |
| 		c.DatabaseMaster.User,
 | |
| 		c.DatabaseMaster.Password,
 | |
| 		c.DatabaseMaster.Host,
 | |
| 		c.DatabaseMaster.Port,
 | |
| 		c.DatabaseMaster.DBName,
 | |
| 	)
 | |
| 
 | |
| 	replicaConn := fmt.Sprintf(
 | |
| 		"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
 | |
| 		c.DatabaseReplica.User,
 | |
| 		c.DatabaseReplica.Password,
 | |
| 		c.DatabaseReplica.Host,
 | |
| 		c.DatabaseReplica.Port,
 | |
| 		c.DatabaseReplica.DBName,
 | |
| 	)
 | |
| 
 | |
| 	pools, err := postgres.NewPools(context.Background(), masterConn, replicaConn)
 | |
| 	if err != nil {
 | |
| 		return nil, nil, err
 | |
| 	}
 | |
| 
 | |
| 	if len(pools) != 2 {
 | |
| 		return nil, nil, fmt.Errorf("got wrong number of pools when establishing postgres connection")
 | |
| 	}
 | |
| 
 | |
| 	return pools[0], pools[1], nil
 | |
| }
 | |
| 
 | |
| func setupSaramaAsyncConn(brokers []string) (sarama.AsyncProducer, error) {
 | |
| 	cfg := sarama.NewConfig()
 | |
| 	cfg.Producer.RequiredAcks = sarama.WaitForAll
 | |
| 	cfg.Producer.Idempotent = true
 | |
| 	cfg.Producer.Return.Successes = true
 | |
| 	cfg.Producer.Retry.Max = 5
 | |
| 	cfg.Net.MaxOpenRequests = 1
 | |
| 
 | |
| 	producer, err := sarama.NewAsyncProducer(brokers, cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("create async producer: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return producer, nil
 | |
| }
 | 
