mirror of
				https://github.com/3ybactuk/marketplace-go-service-project.git
				synced 2025-10-31 06:23:44 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			84 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			84 lines
		
	
	
		
			2.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package app
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 
 | |
| 	"route256/notifier/internal/app/controller"
 | |
| 	"route256/notifier/internal/domain/service"
 | |
| 	"route256/notifier/internal/infra/config"
 | |
| 	"route256/notifier/internal/infra/messaging/kafka"
 | |
| 
 | |
| 	"github.com/IBM/sarama"
 | |
| 	"github.com/rs/zerolog"
 | |
| 	"github.com/rs/zerolog/log"
 | |
| )
 | |
| 
 | |
| type App struct {
 | |
| 	config *config.Config
 | |
| 
 | |
| 	controller *controller.Controller
 | |
| }
 | |
| 
 | |
| func NewApp(configPath string) (*App, error) {
 | |
| 	cfg, err := config.LoadConfig(configPath)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("unable to load config: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
 | |
| 	zerolog.SetGlobalLevel(zerolog.InfoLevel)
 | |
| 
 | |
| 	if cfg.Service.LogLevel != "" {
 | |
| 		level, logErr := zerolog.ParseLevel(cfg.Service.LogLevel)
 | |
| 		if logErr != nil {
 | |
| 			return nil, fmt.Errorf("unknown log level `%s` provided: %w", cfg.Service.LogLevel, logErr)
 | |
| 		}
 | |
| 
 | |
| 		zerolog.SetGlobalLevel(level)
 | |
| 	}
 | |
| 
 | |
| 	log.WithLevel(zerolog.GlobalLevel()).Msgf("using logging level=`%s`", zerolog.GlobalLevel().String())
 | |
| 
 | |
| 	consumer, err := setupSaramaConsumerGroup([]string{cfg.Kafka.Brokers}, cfg.Kafka.ConsumerGroupID)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	kafkaConsumer, err := kafka.NewStatusConsumer(cfg.Kafka.OrderTopic, consumer)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("NewKafkaStatusConsumer: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	notifierService := service.NewNotifierService(kafkaConsumer)
 | |
| 	controller := controller.NewController(notifierService)
 | |
| 
 | |
| 	return &App{
 | |
| 		config:     cfg,
 | |
| 		controller: controller,
 | |
| 	}, err
 | |
| }
 | |
| 
 | |
| func (a *App) Run(ctx context.Context) {
 | |
| 	a.controller.Run(ctx)
 | |
| }
 | |
| 
 | |
| func (a *App) Shutdown(ctx context.Context) error {
 | |
| 	return a.controller.Stop(ctx)
 | |
| }
 | |
| 
 | |
| func setupSaramaConsumerGroup(brokers []string, groupID string) (sarama.ConsumerGroup, error) {
 | |
| 	cfg := sarama.NewConfig()
 | |
| 	cfg.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange()
 | |
| 	cfg.Consumer.Offsets.Initial = sarama.OffsetNewest
 | |
| 	cfg.Metadata.AllowAutoTopicCreation = false
 | |
| 
 | |
| 	group, err := sarama.NewConsumerGroup(brokers, groupID, cfg)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("sarama.NewConsumerGroup: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	return group, nil
 | |
| }
 | 
