diff --git a/docker-compose.yaml b/docker-compose.yaml index eca82af..ccc6c37 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -18,12 +18,25 @@ services: context: . dockerfile: loms/Dockerfile depends_on: - - postgres-master + postgres-master: + condition: service_started + init-kafka: + condition: service_completed_successfully ports: - "8083:8083" - "8084:8084" - "8085:8085" + notifier: + build: + context: . + dockerfile: notifier/Dockerfile + depends_on: + init-kafka: + condition: service_completed_successfully + deploy: + replicas: 3 + postgres-master: image: gitlab-registry.ozon.dev/go/classroom-18/students/base/postgres:16 container_name: postgres-master @@ -57,3 +70,52 @@ services: - POSTGRESQL_NUM_SYNCHRONOUS_REPLICAS=1 ports: - "5434:5432" + + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8095:8080 + environment: + KAFKA_CLUSTERS_0_NAME: local + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka0:29092 + DYNAMIC_CONFIG_ENABLED: "true" + + kafka0: + container_name: kafka + image: confluentinc/cp-kafka:7.7.1.arm64 + ports: + - 9092:9092 + expose: + - '29092' + environment: + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka0:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_LISTENERS: PLAINTEXT://kafka0:29092,CONTROLLER://kafka0:29093,PLAINTEXT_HOST://:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka0:29093" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" + CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk' + + init-kafka: + image: confluentinc/cp-kafka:7.7.1.arm64 + depends_on: + - kafka0 + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list + + echo -e 'Creating kafka topics' + kafka-topics --create --topic loms.order-events --bootstrap-server kafka:29092 --partitions 2 --replication-factor 1 + + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " + diff --git a/docs/homework-3/loms.http b/docs/homework-3/loms.http index fb52805..b346cac 100644 --- a/docs/homework-3/loms.http +++ b/docs/homework-3/loms.http @@ -1,5 +1,5 @@ ### create normal order -POST http://192.168.64.4:8084/order/create +POST http://localhost:8084/order/create Content-Type: application/json { @@ -19,14 +19,14 @@ Content-Type: application/json ### get info, assert status="awaiting payment" -GET http://192.168.64.4:8084/order/info?orderId=1 +GET http://localhost:8084/order/info?orderId=1 Content-Type: application/json ### expected: 200 (OK) {"status":"awaiting payment","user":31337,"Items":[{"sku":1076963,"count":3},{"sku":135717466,"count":2}]} ### pay order -POST http://192.168.64.4:8084/order/pay +POST http://localhost:8084/order/pay Content-Type: application/json { @@ -36,14 +36,14 @@ Content-Type: application/json ### check actual status is "payed" -GET http://192.168.64.4:8084/order/info?orderId=1 +GET http://localhost:8084/order/info?orderId=1 Content-Type: application/json ### expected: 200 (OK) {"status":"payed","user":31337,"Items":[{"sku":1076963,"count":3},{"sku":135717466,"count":2}]} ### unable to cancel payed order -POST http://192.168.64.4:8084/order/cancel +POST http://localhost:8084/order/cancel Content-Type: application/json { @@ -53,14 +53,14 @@ Content-Type: application/json ### get unknown order -GET http://192.168.64.4:8084/order/info?orderId=404 +GET http://localhost:8084/order/info?orderId=404 Content-Type: application/json ### expected: 404 (Not Found) {"code": 5, ... } ### cancel order not exists -POST http://192.168.64.4:8084/order/cancel +POST http://localhost:8084/order/cancel Content-Type: application/json { @@ -70,7 +70,7 @@ Content-Type: application/json ### create order with item that has no stocks info -POST http://192.168.64.4:8084/order/create +POST http://localhost:8084/order/create Content-Type: application/json { @@ -86,13 +86,13 @@ Content-Type: application/json ### check order status is failed (not necessary, because no orderId after creation if any fails) -GET http://192.168.64.4:8084/order/info?orderId=2 +GET http://localhost:8084/order/info?orderId=2 Content-Type: application/json ### expected: 200 (OK) {"status":"failed","userId":31337,"Items":[{"sku":404,"count":3}]} ### cancel failed order -POST http://192.168.64.4:8084/order/cancel +POST http://localhost:8084/order/cancel Content-Type: application/json { @@ -102,21 +102,21 @@ Content-Type: application/json ### stock info for unknown sku -GET http://192.168.64.4:8084/stock/info?sku=404 +GET http://localhost:8084/stock/info?sku=404 Content-Type: application/json ### expected: 404 Not Found {"code":5, ... } ### stock info for normal sku -GET http://192.168.64.4:8084/stock/info?sku=135717466 +GET http://localhost:8084/stock/info?sku=135717466 Content-Type: application/json ### expected: 200 (OK) {"count":78} ### create order with count for sku more than stock -POST http://192.168.64.4:8084/order/create +POST http://localhost:8084/order/create Content-Type: application/json { @@ -131,13 +131,13 @@ Content-Type: application/json ### expected: 400 (Bad Request) {"code":9, ... } ### no change in stock info after failed order creation -GET http://192.168.64.4:8084/stock/info?sku=135717466 +GET http://localhost:8084/stock/info?sku=135717466 Content-Type: application/json ### expected: 200 (OK) {"count":78} ### create normal order for cancellation -POST http://192.168.64.4:8084/order/create +POST http://localhost:8084/order/create Content-Type: application/json { @@ -153,7 +153,7 @@ Content-Type: application/json ### cancel order -POST http://192.168.64.4:8084/order/cancel +POST http://localhost:8084/order/cancel Content-Type: application/json { @@ -163,14 +163,14 @@ Content-Type: application/json ### check canceled order status -GET http://192.168.64.4:8084/order/info?orderId=4 +GET http://localhost:8084/order/info?orderId=4 Content-Type: application/json ### expected: {"status":"cancelled","user":31337,"Items":[{"sku":1076963,"count":2}]} ### check stocks returns -GET http://192.168.64.4:8084/stock/info?sku=135717466 +GET http://localhost:8084/stock/info?sku=135717466 Content-Type: application/json ### expected: {"count":78}; 200 OK diff --git a/go.work.sum b/go.work.sum index 6eaefba..aa9f049 100644 --- a/go.work.sum +++ b/go.work.sum @@ -7,10 +7,10 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cncf/xds/go v0.0.0-20250326154945-ae57f3c0d45f/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8= github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/elastic/go-sysinfo v1.15.3/go.mod h1:K/cNrqYTDrSoMh2oDkYEMS2+a72GRxMvNP+GC+vRIlo= github.com/elastic/go-windows v1.0.2/go.mod h1:bGcDpBzXgYSqM0Gx3DM4+UxFj300SZLixie9u9ixLM8= github.com/envoyproxy/go-control-plane v0.13.4/go.mod h1:kDfuBlDVsSj2MjrLEtRWtHlsWIFcGyB2RMO44Dc5GZA= @@ -36,9 +36,10 @@ github.com/lyft/protoc-gen-star/v2 v2.0.4-0.20230330145011-496ad1ac90a4/go.mod h github.com/mfridman/xflag v0.1.0/go.mod h1:/483ywM5ZO5SuMVjrIGquYNE5CzLrj5Ux/LxWWnjRaE= github.com/microsoft/go-mssqldb v1.8.0/go.mod h1:6znkekS3T2vp0waiMhen4GPU1BiAsrP+iXHcE7a7rFo= github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= -github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8= +github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= +github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/russross/blackfriday v1.6.0/go.mod h1:ti0ldHuxg49ri4ksnFxlkCfN+hvslNlmVHqNRXXJNAY= @@ -47,18 +48,14 @@ github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/spf13/afero v1.10.0/go.mod h1:UBogFpq8E9Hx+xc5CNTTEpTnuHVmXDwZcZcE1eb/UhQ= github.com/spiffe/go-spiffe/v2 v2.5.0/go.mod h1:P+NxobPc6wXhVtINNtFjNWGBTreew1GBUCwT2wPmb7g= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/tursodatabase/libsql-client-go v0.0.0-20240902231107-85af5b9d094d/go.mod h1:l8xTsYB90uaVdMHXMCxKKLSgw5wLYBwBKKefNIUnm9s= github.com/vertica/vertica-sql-go v1.3.3/go.mod h1:jnn2GFuv+O2Jcjktb7zyc4Utlbu9YVqpHH/lx63+1M4= github.com/ydb-platform/ydb-go-genproto v0.0.0-20241112172322-ea1f63298f77/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I= github.com/ydb-platform/ydb-go-sdk/v3 v3.108.1/go.mod h1:l5sSv153E18VvYcsmr51hok9Sjc16tEC8AXGbwrk+ho= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0= go.opentelemetry.io/contrib/detectors/gcp v1.35.0/go.mod h1:qGWP8/+ILwMRIUf9uIVLloR1uo5ZYAslM4O6OqUi1DA= go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= -go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= -go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.36.0/go.mod h1:Y4J0ReaxCR1IMaabaSMugxJES1EpwhBHhv2bDHklZvc= golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= diff --git a/loms/Makefile b/loms/Makefile index ca66db0..5e5c661 100644 --- a/loms/Makefile +++ b/loms/Makefile @@ -2,7 +2,7 @@ BINDIR=${CURDIR}/../bin PACKAGE=route256/loms MIGRATIONS_FOLDER := ./db/migrations/ LOCAL_DB_NAME := route256 -LOCAL_DB_DSN := postgresql://user:password@192.168.64.4:5433/route256?sslmode=disable +LOCAL_DB_DSN := postgresql://user:password@localhost:5433/route256?sslmode=disable PROD_USER := loms-user PROD_PASS := loms-password diff --git a/loms/configs/values_local.yaml b/loms/configs/values_local.yaml index f8781db..b7be8e0 100644 --- a/loms/configs/values_local.yaml +++ b/loms/configs/values_local.yaml @@ -23,7 +23,7 @@ db_replica: db_name: route256 kafka: - host: localhost + host: kafka port: 29092 order_topic: loms.order-events - brokers: localhost:9092 + brokers: kafka:29092 diff --git a/loms/db/migrations/00004_create_outbox.sql b/loms/db/migrations/00004_create_outbox.sql new file mode 100644 index 0000000..4c0c22d --- /dev/null +++ b/loms/db/migrations/00004_create_outbox.sql @@ -0,0 +1,17 @@ +-- +goose Up +CREATE TABLE if not exists outbox +( + id bigserial PRIMARY KEY, + order_id BIGINT NOT NULL, + topic text NOT NULL, + key text, + payload jsonb NOT NULL, + status text NOT NULL DEFAULT 'new', -- new | sent | error + created_at timestamptz NOT NULL DEFAULT now(), + sent_at timestamptz +); +CREATE INDEX ON outbox (status, created_at); + + +-- +goose Down +DROP TABLE outbox; diff --git a/loms/go.mod b/loms/go.mod index ff64545..fedbbda 100644 --- a/loms/go.mod +++ b/loms/go.mod @@ -5,7 +5,6 @@ go 1.23.9 require ( github.com/gojuno/minimock/v3 v3.4.5 github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 - github.com/jackc/pgx v3.6.2+incompatible github.com/jackc/pgx/v5 v5.7.5 github.com/opentracing/opentracing-go v1.2.0 github.com/ozontech/allure-go/pkg/framework v0.6.34 @@ -24,7 +23,6 @@ require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect - github.com/cockroachdb/apd v1.1.0 // indirect github.com/containerd/log v0.1.0 // indirect github.com/containerd/platforms v0.2.1 // indirect github.com/cpuguy83/dockercfg v0.3.2 // indirect @@ -32,16 +30,25 @@ require ( github.com/docker/docker v28.0.1+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/ebitengine/purego v0.8.4 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect - github.com/gofrs/uuid v4.4.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.18.0 // indirect - github.com/lib/pq v1.10.9 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/mfridman/interpolate v0.0.2 // indirect @@ -54,7 +61,9 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.1 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect github.com/sethvargo/go-retry v0.3.0 // indirect github.com/shirou/gopsutil/v4 v4.25.1 // indirect github.com/sirupsen/logrus v1.9.3 // indirect @@ -73,6 +82,7 @@ require ( ) require ( + github.com/IBM/sarama v1.45.2 github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/loms/go.sum b/loms/go.sum index a6233e3..82e9e11 100644 --- a/loms/go.sum +++ b/loms/go.sum @@ -4,12 +4,12 @@ github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9 github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8= github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E= +github.com/IBM/sarama v1.45.2 h1:8m8LcMCu3REcwpa7fCP6v2fuPuzVwXDAM2DOv3CBrKw= +github.com/IBM/sarama v1.45.2/go.mod h1:ppaoTcVdGv186/z6MEKsMm70A5fwJfRTpstI37kVn3Y= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= -github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/containerd/platforms v0.2.1 h1:zvwtM3rz2YHPQsF2CHYM8+KtB5dvhISiXh5ZpSBQv6A= @@ -32,10 +32,18 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -44,33 +52,50 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA= -github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gojuno/minimock/v3 v3.4.5 h1:Jcb0tEYZvVlQNtAAYpg3jCOoSwss2c1/rNugYTzj304= github.com/gojuno/minimock/v3 v3.4.5/go.mod h1:o9F8i2IT8v3yirA7mmdpNGzh1WNesm6iQakMtQV6KiE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= -github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= -github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= -github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= github.com/jackc/pgx/v5 v5.7.5 h1:JHGfMnQY+IEtGM63d+NGMjoRpysB2JBwDr5fsngwmJs= github.com/jackc/pgx/v5 v5.7.5/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= @@ -79,8 +104,6 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/magiconair/properties v1.8.10 h1:s31yESBquKXCV9a/ScB3ESkOjUYYv+X0rg8SYxI99mE= @@ -119,6 +142,8 @@ github.com/ozontech/allure-go/pkg/allure v0.6.14 h1:lDamtSF+WtHQLg2+qQYijtC4Fk3K github.com/ozontech/allure-go/pkg/allure v0.6.14/go.mod h1:4oEG2yq+DGOzJS/ZjPc87C/mx3tAnlYpYonk77Ru/vQ= github.com/ozontech/allure-go/pkg/framework v0.6.34 h1:IjM65Y3JP7ale7Ug3aBnFV4+c1NYYBCrgl/VrtEd/FY= github.com/ozontech/allure-go/pkg/framework v0.6.34/go.mod h1:oISDLE6Tfww35TBQz+1nrtbLtyBqR6ELxOtJ+MVjHOw= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -127,6 +152,8 @@ github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/pressly/goose/v3 v3.24.3 h1:DSWWNwwggVUsYZ0X2VitiAa9sKuqtBfe+Jr9zFGwWlM= github.com/pressly/goose/v3 v3.24.3/go.mod h1:v9zYL4xdViLHCUUJh/mhjnm6JrK7Eul8AS93IxiZM4E= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= @@ -138,15 +165,19 @@ github.com/sethvargo/go-retry v0.3.0 h1:EEt31A35QhrcRZtrYFDTBg91cqZVnFL2navjDrah github.com/sethvargo/go-retry v0.3.0/go.mod h1:mNX17F0C/HguQMyMyJxcnU471gOZGxCLyYaFyAZraas= github.com/shirou/gopsutil/v4 v4.25.1 h1:QSWkTc+fu9LTAWfkZwZ6j8MSUk4A2LV7rbH0ZqmLjXs= github.com/shirou/gopsutil/v4 v4.25.1/go.mod h1:RoUCUpndaJFtT+2zsZzzmhvbfGoDCJ7nFXKJf8GqJbI= -github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= -github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/testcontainers/testcontainers-go v0.37.0 h1:L2Qc0vkTw2EHWQ08djon0D2uw7Z/PtHS/QzZZ5Ra/hg= @@ -157,6 +188,7 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -186,41 +218,60 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 h1:y5zboxd6LQAqYIhHnB48p0ByQ/GnQx2BE33L8BOHQkI= golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6/go.mod h1:U6Lno4MTRCDY+Ba7aCcauB9T60gsv5s4ralQzP72ZoQ= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= golang.org/x/time v0.0.0-20220210224613-90d013bbcef8 h1:vVKdlvoWBphwdxWKrFZEuM0kGgGLxUOYcY4U/2Vjg44= @@ -229,6 +280,7 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -244,6 +296,7 @@ google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/loms/internal/app/app.go b/loms/internal/app/app.go index 20c72aa..4f44c83 100644 --- a/loms/internal/app/app.go +++ b/loms/internal/app/app.go @@ -8,6 +8,7 @@ import ( "os" "time" + "github.com/IBM/sarama" "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog" @@ -22,6 +23,7 @@ import ( "route256/loms/internal/domain/service" "route256/loms/internal/infra/config" mw "route256/loms/internal/infra/grpc/middleware" + "route256/loms/internal/infra/messaging/kafka" "route256/loms/internal/infra/postgres" pb "route256/pkg/api/loms/v1" @@ -61,11 +63,20 @@ func NewApp(configPath string) (*App, error) { return nil, err } + producer, err := setupSaramaAsyncConn([]string{c.Kafka.Brokers}) + if err != nil { + return nil, err + } + stockRepo := stocksRepository.NewStockRepository(masterPool, replicaPool) orderRepo := ordersRepository.NewOrderRepository(masterPool) txManager := postgres.NewTxManager(masterPool, replicaPool) + kafkaProducer, err := kafka.NewStatusProducer(c.Kafka.OrderTopic, producer) + if err != nil { + return nil, err + } - service := service.NewLomsService(orderRepo, stockRepo, txManager) + service := service.NewLomsService(orderRepo, stockRepo, txManager, kafkaProducer) controller := server.NewServer(service) app := &App{ @@ -79,7 +90,6 @@ func NewApp(configPath string) (*App, error) { func (app *App) Shutdown(ctx context.Context) (err error) { if app.httpServer != nil { err = app.httpServer.Shutdown(ctx) - if err != nil { log.Error().Err(err).Msgf("failed http gateway server shutdown") } @@ -197,3 +207,19 @@ func getPostgresPools(c *config.Config) (masterPool, replicaPool *pgxpool.Pool, return pools[0], pools[1], nil } + +func setupSaramaAsyncConn(brokers []string) (sarama.AsyncProducer, error) { + cfg := sarama.NewConfig() + cfg.Producer.RequiredAcks = sarama.WaitForAll + cfg.Producer.Idempotent = true + cfg.Producer.Return.Successes = true + cfg.Producer.Retry.Max = 5 + cfg.Net.MaxOpenRequests = 1 + + producer, err := sarama.NewAsyncProducer(brokers, cfg) + if err != nil { + return nil, fmt.Errorf("create async producer: %w", err) + } + + return producer, nil +} diff --git a/loms/internal/domain/repository/outbox/outbox_event.go b/loms/internal/domain/repository/outbox/outbox_event.go new file mode 100644 index 0000000..ca0e963 --- /dev/null +++ b/loms/internal/domain/repository/outbox/outbox_event.go @@ -0,0 +1,11 @@ +package outbox + +import "route256/loms/internal/domain/entity" + +type Event struct { + ID int64 + OrderID entity.ID + Topic string + Key string + Payload []byte +} diff --git a/loms/internal/domain/repository/outbox/sqlc/db.go b/loms/internal/domain/repository/outbox/sqlc/db.go new file mode 100644 index 0000000..2725108 --- /dev/null +++ b/loms/internal/domain/repository/outbox/sqlc/db.go @@ -0,0 +1,32 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 + +package sqlc + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/loms/internal/domain/repository/outbox/sqlc/models.go b/loms/internal/domain/repository/outbox/sqlc/models.go new file mode 100644 index 0000000..dca13da --- /dev/null +++ b/loms/internal/domain/repository/outbox/sqlc/models.go @@ -0,0 +1,5 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 + +package sqlc diff --git a/loms/internal/domain/repository/outbox/sqlc/querier.go b/loms/internal/domain/repository/outbox/sqlc/querier.go new file mode 100644 index 0000000..e255529 --- /dev/null +++ b/loms/internal/domain/repository/outbox/sqlc/querier.go @@ -0,0 +1,18 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 + +package sqlc + +import ( + "context" +) + +type Querier interface { + OutboxInsert(ctx context.Context, arg *OutboxInsertParams) error + OutboxMarkError(ctx context.Context, dollar_1 []int64) (int64, error) + OutboxMarkSent(ctx context.Context, dollar_1 []int64) (int64, error) + OutboxSelectForPublish(ctx context.Context, limit int32) ([]*OutboxSelectForPublishRow, error) +} + +var _ Querier = (*Queries)(nil) diff --git a/loms/internal/domain/repository/outbox/sqlc/query.sql b/loms/internal/domain/repository/outbox/sqlc/query.sql new file mode 100644 index 0000000..5f795bd --- /dev/null +++ b/loms/internal/domain/repository/outbox/sqlc/query.sql @@ -0,0 +1,22 @@ +-- name: OutboxInsert :exec +INSERT INTO outbox (order_id, topic, "key", payload) +VALUES ($1, $2, $3, $4::jsonb); + +-- name: OutboxSelectForPublish :many +SELECT id, order_id, topic, "key", payload +FROM outbox +WHERE status = 'new' +ORDER BY created_at +LIMIT $1 +FOR UPDATE SKIP LOCKED; + +-- name: OutboxMarkSent :execrows +UPDATE outbox +SET status = 'sent', + sent_at = now() +WHERE id = ANY($1::bigint[]); + +-- name: OutboxMarkError :execrows +UPDATE outbox +SET status = 'error' +WHERE id = ANY($1::bigint[]); diff --git a/loms/internal/domain/repository/outbox/sqlc/query.sql.go b/loms/internal/domain/repository/outbox/sqlc/query.sql.go new file mode 100644 index 0000000..9ff03d3 --- /dev/null +++ b/loms/internal/domain/repository/outbox/sqlc/query.sql.go @@ -0,0 +1,104 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.29.0 +// source: query.sql + +package sqlc + +import ( + "context" +) + +const outboxInsert = `-- name: OutboxInsert :exec +INSERT INTO outbox (order_id, topic, "key", payload) +VALUES ($1, $2, $3, $4::jsonb) +` + +type OutboxInsertParams struct { + OrderID int64 + Topic string + Key *string + Column4 []byte +} + +func (q *Queries) OutboxInsert(ctx context.Context, arg *OutboxInsertParams) error { + _, err := q.db.Exec(ctx, outboxInsert, + arg.OrderID, + arg.Topic, + arg.Key, + arg.Column4, + ) + return err +} + +const outboxMarkError = `-- name: OutboxMarkError :execrows +UPDATE outbox +SET status = 'error' +WHERE id = ANY($1::bigint[]) +` + +func (q *Queries) OutboxMarkError(ctx context.Context, dollar_1 []int64) (int64, error) { + result, err := q.db.Exec(ctx, outboxMarkError, dollar_1) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} + +const outboxMarkSent = `-- name: OutboxMarkSent :execrows +UPDATE outbox +SET status = 'sent', + sent_at = now() +WHERE id = ANY($1::bigint[]) +` + +func (q *Queries) OutboxMarkSent(ctx context.Context, dollar_1 []int64) (int64, error) { + result, err := q.db.Exec(ctx, outboxMarkSent, dollar_1) + if err != nil { + return 0, err + } + return result.RowsAffected(), nil +} + +const outboxSelectForPublish = `-- name: OutboxSelectForPublish :many +SELECT id, order_id, topic, "key", payload +FROM outbox +WHERE status = 'new' +ORDER BY created_at +LIMIT $1 +FOR UPDATE SKIP LOCKED +` + +type OutboxSelectForPublishRow struct { + ID int64 + OrderID int64 + Topic string + Key *string + Payload []byte +} + +func (q *Queries) OutboxSelectForPublish(ctx context.Context, limit int32) ([]*OutboxSelectForPublishRow, error) { + rows, err := q.db.Query(ctx, outboxSelectForPublish, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*OutboxSelectForPublishRow + for rows.Next() { + var i OutboxSelectForPublishRow + if err := rows.Scan( + &i.ID, + &i.OrderID, + &i.Topic, + &i.Key, + &i.Payload, + ); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/loms/internal/domain/repository/outbox/sqlc/repository.go b/loms/internal/domain/repository/outbox/sqlc/repository.go new file mode 100644 index 0000000..1821b94 --- /dev/null +++ b/loms/internal/domain/repository/outbox/sqlc/repository.go @@ -0,0 +1,85 @@ +package sqlc + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + + "route256/loms/internal/domain/entity" + "route256/loms/internal/domain/repository/outbox" + "route256/loms/internal/infra/postgres" +) + +type outboxRepo struct { + db *pgxpool.Pool +} + +func NewOutboxRepository(masterPool *pgxpool.Pool) *outboxRepo { + return &outboxRepo{ + db: masterPool, + } +} + +func (r *outboxRepo) GetQuerier(ctx context.Context) *Queries { + tx, ok := postgres.TxFromCtx(ctx) + if ok { + return New(tx) + } + + return New(r.db) +} + +func (r *outboxRepo) AddEvent(ctx context.Context, evt outbox.Event) error { + querier := r.GetQuerier(ctx) + + return querier.OutboxInsert(ctx, &OutboxInsertParams{ + OrderID: int64(evt.OrderID), + Topic: evt.Topic, + Key: &evt.Key, + Column4: evt.Payload, + }) +} + +func (r *outboxRepo) WithNewEvents(ctx context.Context, limit int32, handler func(context.Context, outbox.Event) error) error { + querier := r.GetQuerier(ctx) + + rows, err := querier.OutboxSelectForPublish(ctx, int32(limit)) + if err != nil { + return err + } + if len(rows) == 0 { + return nil + } + + var sentIDs, errIDs []int64 + + for _, row := range rows { + ev := outbox.Event{ + ID: row.ID, + OrderID: entity.ID(row.OrderID), + Topic: row.Topic, + Key: *row.Key, + Payload: row.Payload, + } + + if err := handler(ctx, ev); err != nil { + errIDs = append(errIDs, row.ID) + } else { + sentIDs = append(sentIDs, row.ID) + } + } + + if len(sentIDs) > 0 { + if _, err := querier.OutboxMarkSent(ctx, sentIDs); err != nil { + return err + } + } + + if len(errIDs) > 0 { + if _, err := querier.OutboxMarkError(ctx, errIDs); err != nil { + return err + } + } + + return nil +} diff --git a/loms/internal/domain/service/service.go b/loms/internal/domain/service/service.go index 663b87b..ffb77eb 100644 --- a/loms/internal/domain/service/service.go +++ b/loms/internal/domain/service/service.go @@ -35,17 +35,23 @@ type txManager interface { ReadWithRepeatableRead(ctx context.Context, fn func(ctx context.Context) error) (err error) } -type LomsService struct { - orders OrderRepository - stocks StockRepository - txManager txManager +type StatusProducer interface { + Send(ctx context.Context, id entity.ID, status string) error } -func NewLomsService(orderRepo OrderRepository, stockRepo StockRepository, txManager txManager) *LomsService { +type LomsService struct { + orders OrderRepository + stocks StockRepository + txManager txManager + statusProducer StatusProducer +} + +func NewLomsService(orderRepo OrderRepository, stockRepo StockRepository, txManager txManager, statusProducer StatusProducer) *LomsService { return &LomsService{ - orders: orderRepo, - stocks: stockRepo, - txManager: txManager, + orders: orderRepo, + stocks: stockRepo, + txManager: txManager, + statusProducer: statusProducer, } } @@ -57,17 +63,24 @@ func (s *LomsService) rollbackStocks(ctx context.Context, stocks []*entity.Stock } } -func (s *LomsService) OrderCreate(ctx context.Context, orderReq *pb.OrderCreateRequest) (entity.ID, error) { - if orderReq == nil || orderReq.UserId <= 0 || len(orderReq.Items) == 0 { - return 0, model.ErrInvalidInput +// Wraps writing status to DB and status topic. +// Should use this function for status updates. +// Guarantees that status upd event will be sent only if DB write is successful. +func (s *LomsService) setStatus(ctx context.Context, id entity.ID, status string) error { + log.Trace().Msgf("running status update for %d with status %s", id, status) + + if err := s.orders.OrderSetStatus(ctx, id, status); err != nil { + return fmt.Errorf("orders.OrderSetStatus: %w", err) } - for _, item := range orderReq.Items { - if item.Sku <= 0 || item.Count == 0 { - return 0, model.ErrInvalidInput - } + if err := s.statusProducer.Send(ctx, id, status); err != nil { + log.Error().Err(err).Msg("statusProducer.Send") } + return nil +} + +func (s *LomsService) createInitial(ctx context.Context, orderReq *pb.OrderCreateRequest) (*entity.Order, error) { order := &entity.Order{ OrderID: 0, Status: pb.OrderStatus_ORDER_STATUS_NEW.String(), @@ -86,44 +99,82 @@ func (s *LomsService) OrderCreate(ctx context.Context, orderReq *pb.OrderCreateR return int(a.ID - b.ID) }) - var ( - orderID entity.ID - resErr error - ) - err := s.txManager.WriteWithTransaction(ctx, func(txCtx context.Context) error { id, err := s.orders.OrderCreate(txCtx, order) if err != nil { return err } - order.OrderID = id - orderID = id + order.OrderID = id + + return nil + }) + if err != nil { + return nil, err + } + + return order, nil +} + +func (s *LomsService) OrderCreate(ctx context.Context, orderReq *pb.OrderCreateRequest) (entity.ID, error) { + if orderReq == nil || orderReq.UserId <= 0 || len(orderReq.Items) == 0 { + return 0, model.ErrInvalidInput + } + + for _, item := range orderReq.Items { + if item.Sku <= 0 || item.Count == 0 { + return 0, model.ErrInvalidInput + } + } + + order, err := s.createInitial(ctx, orderReq) + if err != nil { + return 0, err + } + + if statErr := s.setStatus(ctx, order.OrderID, order.Status); statErr != nil { + return 0, statErr + } + + var resErr error + + err = s.txManager.WriteWithTransaction(ctx, func(txCtx context.Context) error { committed := make([]*entity.Stock, 0, len(order.Items)) for _, it := range order.Items { st := &entity.Stock{Item: it, Reserved: it.Count} - if err := s.stocks.StockReserve(txCtx, st); err != nil { + if resErr = s.stocks.StockReserve(txCtx, st); resErr != nil { s.rollbackStocks(txCtx, committed) - _ = s.orders.OrderSetStatus(txCtx, id, - pb.OrderStatus_ORDER_STATUS_FAILED.String()) + resErr = fmt.Errorf("stocks.StockReserve: %w", resErr) - resErr = fmt.Errorf("stocks.StockReserve: %w", err) return nil } committed = append(committed, st) } - return s.orders.OrderSetStatus(txCtx, id, - pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String()) + return nil }) + + finalStatus := pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String() + + defer func() { + if statErr := s.setStatus(ctx, order.OrderID, finalStatus); statErr != nil { + log.Error().Err(statErr).Msgf("failed to setStatus to %s", finalStatus) + } + }() + if err != nil { + finalStatus = pb.OrderStatus_ORDER_STATUS_FAILED.String() + return 0, err } if resErr != nil { + finalStatus = pb.OrderStatus_ORDER_STATUS_FAILED.String() + return 0, resErr } - return orderID, nil + + return order.OrderID, nil } func (s *LomsService) OrderInfo(ctx context.Context, orderID entity.ID) (*entity.Order, error) { @@ -157,7 +208,7 @@ func (s *LomsService) OrderPay(ctx context.Context, orderID entity.ID) error { log.Error().Err(err).Msg("failed to free stock reservation") } } - return s.orders.OrderSetStatus(txCtx, orderID, + return s.setStatus(txCtx, orderID, pb.OrderStatus_ORDER_STATUS_PAYED.String()) default: return model.ErrOrderInvalidStatus @@ -192,7 +243,7 @@ func (s *LomsService) OrderCancel(ctx context.Context, orderID entity.ID) error return err } } - return s.orders.OrderSetStatus(txCtx, orderID, + return s.setStatus(txCtx, orderID, pb.OrderStatus_ORDER_STATUS_CANCELLED.String()) }) } diff --git a/loms/internal/domain/service/service_test.go b/loms/internal/domain/service/service_test.go index 9478dd0..9055a07 100644 --- a/loms/internal/domain/service/service_test.go +++ b/loms/internal/domain/service/service_test.go @@ -40,6 +40,12 @@ func (t *mockTxManager) ReadWithRepeatableRead(ctx context.Context, fn func(ctx return fn(ctx) } +type mockKafkaProducer struct{} + +func (kp mockKafkaProducer) Send(_ context.Context, _ entity.ID, _ string) error { + return nil +} + func TestMain(m *testing.M) { goleak.VerifyTestMain(m) } @@ -138,8 +144,7 @@ func TestLomsService_OrderCreate(t *testing.T) { orders: mock.NewOrderRepositoryMock(mc). OrderCreateMock.Return(1, nil). OrderSetStatusMock.Return(errors.New("status update error")), - stocks: mock.NewStockRepositoryMock(mc). - StockReserveMock.Return(errors.New("reservation error")), + stocks: mock.NewStockRepositoryMock(mc), }, args: args{ req: goodReq, @@ -152,8 +157,7 @@ func TestLomsService_OrderCreate(t *testing.T) { orders: mock.NewOrderRepositoryMock(mc). OrderCreateMock.Return(1, nil). OrderSetStatusMock.Return(errors.New("unexpected error")), - stocks: mock.NewStockRepositoryMock(mc). - StockReserveMock.Return(nil), + stocks: mock.NewStockRepositoryMock(mc), }, args: args{ req: goodReq, @@ -167,7 +171,7 @@ func TestLomsService_OrderCreate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}) + svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{}) _, err := svc.OrderCreate(ctx, tt.args.req) tt.wantErr(t, err) }) @@ -278,7 +282,7 @@ func TestLomsService_OrderPay(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}) + svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{}) err := svc.OrderPay(ctx, tt.args.id) tt.wantErr(t, err) }) @@ -292,6 +296,7 @@ func TestLomsService_OrderInfoBadInput(t *testing.T) { nil, nil, &mockTxManager{}, + &mockKafkaProducer{}, ) _, err := svc.OrderInfo(context.Background(), 0) @@ -313,6 +318,7 @@ func TestLomsService_OrderInfoSuccess(t *testing.T) { mock.NewOrderRepositoryMock(mc).OrderGetByIDMock.Return(order, nil), nil, &mockTxManager{}, + &mockKafkaProducer{}, ) gotOrder, err := svc.OrderInfo(context.Background(), 123) @@ -414,7 +420,7 @@ func TestLomsService_OrderCancel(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}) + svc := NewLomsService(tt.fields.orders, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{}) err := svc.OrderCancel(ctx, tt.args.id) tt.wantErr(t, err) }) @@ -481,7 +487,7 @@ func TestLomsService_StocksInfo(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - svc := NewLomsService(nil, tt.fields.stocks, &mockTxManager{}) + svc := NewLomsService(nil, tt.fields.stocks, &mockTxManager{}, &mockKafkaProducer{}) got, err := svc.StocksInfo(ctx, tt.args.sku) tt.wantErr(t, err) if err == nil { diff --git a/loms/internal/infra/messaging/kafka/order_event.go b/loms/internal/infra/messaging/kafka/order_event.go new file mode 100644 index 0000000..3e65b76 --- /dev/null +++ b/loms/internal/infra/messaging/kafka/order_event.go @@ -0,0 +1,7 @@ +package kafka + +type OrderEvent struct { + OrderID int64 `json:"order_id"` + Status string `json:"status"` + Moment string `json:"moment"` +} diff --git a/loms/internal/infra/messaging/kafka/producer.go b/loms/internal/infra/messaging/kafka/producer.go new file mode 100644 index 0000000..9688ae0 --- /dev/null +++ b/loms/internal/infra/messaging/kafka/producer.go @@ -0,0 +1,110 @@ +package kafka + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "time" + + "route256/loms/internal/domain/entity" + + "github.com/IBM/sarama" + "github.com/rs/zerolog/log" + + pb "route256/pkg/api/loms/v1" +) + +type statusProducer struct { + topic string + producer sarama.AsyncProducer +} + +func NewStatusProducer(topic string, producer sarama.AsyncProducer) (*statusProducer, error) { + p := &statusProducer{topic: topic, producer: producer} + go p.runCallbacks() + + return p, nil +} + +func mapOrderStatus(pbStatus string) (string, error) { + switch pbStatus { + case pb.OrderStatus_ORDER_STATUS_AWAITING_PAYMENT.String(): + return "awaiting payment", nil + case pb.OrderStatus_ORDER_STATUS_CANCELLED.String(): + return "cancelled", nil + case pb.OrderStatus_ORDER_STATUS_FAILED.String(): + return "failed", nil + case pb.OrderStatus_ORDER_STATUS_NEW.String(): + return "new", nil + case pb.OrderStatus_ORDER_STATUS_PAYED.String(): + return "payed", nil + default: + return "", fmt.Errorf("unexpected OrderStatus: %v", pbStatus) + } +} + +func (p *statusProducer) runCallbacks() { + for { + select { + case err, ok := <-p.producer.Errors(): + if !ok { + return + } + + log.Error().Err(err).Msgf("kafka publish error") + case _, ok := <-p.producer.Successes(): + if !ok { + return + } + + // TODO: add msg metrics (latency/partition/offset) + } + } +} + +func (p *statusProducer) Send(ctx context.Context, id entity.ID, status string) error { + log.Debug().Msgf("sending event for id: %d; status: %s", id, status) + + newStatus, err := mapOrderStatus(status) + if err != nil { + return err + } + + evt := OrderEvent{ + OrderID: int64(id), + Status: newStatus, + Moment: time.Now().UTC().Format(time.RFC3339Nano), + } + + value, err := json.Marshal(evt) + if err != nil { + return fmt.Errorf("marshal event: %w", err) + } + + return p.SendRaw(ctx, id, value) +} + +func (p *statusProducer) SendRaw(ctx context.Context, id entity.ID, value []byte) error { + if len(value) == 0 { + return fmt.Errorf("empty message value") + } + + msg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.StringEncoder(strconv.FormatInt(int64(id), 10)), + Value: sarama.ByteEncoder(value), + Timestamp: time.Now(), + } + + select { + case p.producer.Input() <- msg: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *statusProducer) Close() error { + return p.producer.Close() +} diff --git a/loms/internal/infra/messaging/kafka_outbox/dispatcher.go b/loms/internal/infra/messaging/kafka_outbox/dispatcher.go new file mode 100644 index 0000000..0eddbcf --- /dev/null +++ b/loms/internal/infra/messaging/kafka_outbox/dispatcher.go @@ -0,0 +1,54 @@ +package kafkaoutbox + +import ( + "context" + "time" + + "route256/loms/internal/domain/entity" + "route256/loms/internal/domain/repository/outbox" + + "github.com/rs/zerolog/log" +) + +type producer interface { + SendRaw(ctx context.Context, id entity.ID, value []byte) error +} + +type outboxRepo interface { + WithNewEvents(ctx context.Context, limit int, h func(c context.Context, e outbox.Event) error) error +} + +type Dispatcher struct { + repo outboxRepo + producer producer + limit int + interval time.Duration +} + +func NewDispatcher(repo outboxRepo, producer producer, batch int, pollEvery time.Duration) *Dispatcher { + return &Dispatcher{ + repo: repo, + producer: producer, + limit: batch, + interval: pollEvery, + } +} + +func (d *Dispatcher) Run(ctx context.Context) { + t := time.NewTicker(d.interval) + defer t.Stop() + + for { + if err := d.repo.WithNewEvents(ctx, d.limit, func(c context.Context, e outbox.Event) error { + return d.producer.SendRaw(c, e.OrderID, e.Payload) + }); err != nil { + log.Error().Err(err).Msg("d.repo.WithNewEvents") + } + + select { + case <-ctx.Done(): + return + case <-t.C: + } + } +} diff --git a/loms/sqlc.yaml b/loms/sqlc.yaml index d52d204..99fe1f9 100644 --- a/loms/sqlc.yaml +++ b/loms/sqlc.yaml @@ -26,3 +26,16 @@ sql: emit_result_struct_pointers: true emit_params_struct_pointers: true omit_unused_structs: true + - engine: "postgresql" + queries: "internal/domain/repository/outbox/sqlc/query.sql" + schema: "db/migrations" + gen: + go: + package: "sqlc" + out: "internal/domain/repository/outbox/sqlc" + sql_package: "pgx/v5" + emit_interface: true + emit_pointers_for_null_types: true + emit_result_struct_pointers: true + emit_params_struct_pointers: true + omit_unused_structs: true diff --git a/loms/tests/integration/loms_integration_test.go b/loms/tests/integration/loms_integration_test.go index 5ca588a..8799a4c 100644 --- a/loms/tests/integration/loms_integration_test.go +++ b/loms/tests/integration/loms_integration_test.go @@ -8,6 +8,7 @@ import ( "database/sql" "fmt" "net" + "sync" "testing" "time" @@ -18,9 +19,10 @@ import ( "github.com/pressly/goose/v3" "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" - "go.uber.org/goleak" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/status" "route256/loms/internal/app/server" "route256/loms/internal/domain/entity" @@ -42,6 +44,13 @@ const ( migrationsDir = "../../db/migrations" ) +// TODO: drop, use actual kafka for tests. +type mockKafkaProducer struct{} + +func (kp mockKafkaProducer) Send(_ context.Context, id entity.ID, status string) error { + return nil +} + func startPostgres(ctx context.Context, migrationsDir string) (*pgxpool.Pool, func(), error) { req := testcontainers.ContainerRequest{ Image: "gitlab-registry.ozon.dev/go/classroom-18/students/base/postgres:16", @@ -111,8 +120,6 @@ type LomsIntegrationSuite struct { } func TestLomsIntegrationSuite(t *testing.T) { - defer goleak.VerifyNone(t) - suite.RunSuite(t, new(LomsIntegrationSuite)) } @@ -129,7 +136,7 @@ func (s *LomsIntegrationSuite) BeforeAll(t provider.T) { txManager := postgres.NewTxManager(pool, pool) - svc := lomsService.NewLomsService(orderRepo, stockRepo, txManager) + svc := lomsService.NewLomsService(orderRepo, stockRepo, txManager, &mockKafkaProducer{}) lomsServer := server.NewServer(svc) lis, err := net.Listen("tcp", "127.0.0.1:0") @@ -208,3 +215,122 @@ func (s *LomsIntegrationSuite) TestStocksInfoPositive(t provider.T) { sCtx.Require().Greater(resp.Count, uint32(0)) }) } + +func (s *LomsIntegrationSuite) TestOrderCreate_SuccessAsync(t provider.T) { + t.Title("Успешное создание заказов (async)") + + const ( + sku = 1625903 + count = 1 + + ordersCount = 100 + ) + + var ( + userIDs = []int64{42, 43, 44} + orders = make([]struct { + userID int64 + orderID int64 + }, ordersCount) + + initStocksCount uint64 + + ctx = context.Background() + ) + + t.WithNewStep("Получение изначальных стоков", func(sCtx provider.StepCtx) { + stockCount, err := s.lomsClient.StocksInfo(ctx, &pb.StocksInfoRequest{Sku: sku}) + sCtx.Require().NoError(err) + initStocksCount = uint64(stockCount.GetCount()) + }) + + t.WithNewStep("Создание заказов", func(sCtx provider.StepCtx) { + var wg sync.WaitGroup + + for i := range ordersCount { + wg.Add(1) + go func() { + defer wg.Done() + + userID := userIDs[i%len(userIDs)] + + req := &pb.OrderCreateRequest{ + UserId: userID, + Items: []*pb.OrderItem{ + { + Sku: sku, + Count: count, + }, + }, + } + orderCreateResp, err := s.lomsClient.OrderCreate(ctx, req) + sCtx.Require().NoError(err) + sCtx.Require().Greater(orderCreateResp.OrderId, int64(0)) + + orders[i].userID = userID + orders[i].orderID = orderCreateResp.OrderId + }() + } + wg.Wait() + }) + + t.WithNewStep("Проверка заказов", func(sCtx provider.StepCtx) { + for _, order := range orders { + res, err := s.lomsClient.OrderInfo(ctx, &pb.OrderInfoRequest{ + OrderId: order.orderID, + }) + sCtx.Require().NoError(err) + + expected := entity.Order{ + Status: "awaiting payment", + UserID: entity.ID(order.userID), + Items: []entity.OrderItem{ + { + ID: sku, + Count: count, + }, + }, + } + + sCtx.Require().Equal(expected.Status, res.Status, "Не совпадает статус заказа") + sCtx.Require().Equal(expected.UserID, entity.ID(res.UserId), "Не совпадает пользователь заказа") + sCtx.Require().Equal(len(expected.Items), len(res.Items), "Не совпадает количество товаров в заказе") + } + }) + + t.WithNewStep("Проверка стоков", func(sCtx provider.StepCtx) { + stocksCount, err := s.lomsClient.StocksInfo(ctx, &pb.StocksInfoRequest{ + Sku: sku, + }) + sCtx.Require().NoError(err) + sCtx.Require().Equal(uint32(initStocksCount-ordersCount*count), stocksCount.Count) + }) +} + +func (s *LomsIntegrationSuite) TestOrderCreate_NoStockInfo(t provider.T) { + t.Title("Неуспешное создание заказ из-за отсутствия информации о стоках товара") + + const ( + userID = 42 + sku = 404 + ) + + ctx := context.Background() + + t.WithNewStep("Создание заказа", func(sCtx provider.StepCtx) { + req := &pb.OrderCreateRequest{ + UserId: userID, + Items: []*pb.OrderItem{ + { + Sku: sku, + Count: 1, + }, + }, + } + _, err := s.lomsClient.OrderCreate(ctx, req) + + e, ok := status.FromError(err) + sCtx.Require().True(ok) + sCtx.Require().Equal(codes.FailedPrecondition, e.Code(), "expect 400 (failed precondition) status code, got: %s", err.Error()) + }) +} diff --git a/notifier/Dockerfile b/notifier/Dockerfile new file mode 100644 index 0000000..ba57718 --- /dev/null +++ b/notifier/Dockerfile @@ -0,0 +1,22 @@ +FROM golang:1.23.9-alpine as builder + +WORKDIR /build + +COPY notifier/go.mod go.mod +COPY notifier/go.sum go.sum + +RUN go mod download + +COPY . . + +WORKDIR notifier + +RUN CGO_ENABLED=0 GOOS=linux go build -o /server ./cmd/server/main.go + +FROM scratch +COPY --from=builder server /bin/server +COPY notifier/configs/values_local.yaml /bin/config/values_local.yaml + +ENV CONFIG_FILE=/bin/config/values_local.yaml + +ENTRYPOINT ["/bin/server"] diff --git a/notifier/cmd/server/main.go b/notifier/cmd/server/main.go new file mode 100644 index 0000000..1da5735 --- /dev/null +++ b/notifier/cmd/server/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/rs/zerolog/log" + + "route256/notifier/internal/app" +) + +func main() { + srv, err := app.NewApp(os.Getenv("CONFIG_FILE")) + if err != nil { + log.Fatal().Err(err).Msg("failed creating app") + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + srv.Run(ctx) + + <-ctx.Done() + log.Info().Msg("shutdown signal received") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("graceful shutdown failed") + } else { + log.Info().Msg("server stopped gracefully") + } +} diff --git a/notifier/configs/values_local.yaml b/notifier/configs/values_local.yaml index 5b85d1d..f6f7a1d 100644 --- a/notifier/configs/values_local.yaml +++ b/notifier/configs/values_local.yaml @@ -1,6 +1,9 @@ +service: + log_level: trace + kafka: - host: localhost + host: kafka port: 29092 order_topic: loms.order-events consumer_group_id: notifier-group - brokers: localhost:29092 + brokers: kafka:29092 diff --git a/notifier/go.mod b/notifier/go.mod index ad399c2..4b74d16 100644 --- a/notifier/go.mod +++ b/notifier/go.mod @@ -1,3 +1,35 @@ module route256/notifier go 1.23.1 + +require ( + github.com/IBM/sarama v1.45.2 + github.com/rs/zerolog v1.34.0 + gopkg.in/yaml.v3 v3.0.1 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/kr/text v0.2.0 // indirect + github.com/mattn/go-colorable v0.1.13 // indirect + github.com/mattn/go-isatty v0.0.19 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/rogpeppe/go-internal v1.14.1 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sys v0.33.0 // indirect +) diff --git a/notifier/go.sum b/notifier/go.sum new file mode 100644 index 0000000..abee28b --- /dev/null +++ b/notifier/go.sum @@ -0,0 +1,119 @@ +github.com/IBM/sarama v1.45.2 h1:8m8LcMCu3REcwpa7fCP6v2fuPuzVwXDAM2DOv3CBrKw= +github.com/IBM/sarama v1.45.2/go.mod h1:ppaoTcVdGv186/z6MEKsMm70A5fwJfRTpstI37kVn3Y= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.7.0 h1:n3NRTnBn5N0Cbi/IeOHuQn9s2UwVUH7Ga0ZWcP+9JTA= +github.com/eapache/go-resiliency v1.7.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 h1:Oy0F4ALJ04o5Qqpdz8XLIpNA3WM/iSIXqxtqo7UGVws= +github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh687T8= +github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19 h1:JITubQf0MOLdlGRuRq+jtsDlekdYPia9ZFsB8h/APPA= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= +github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/notifier/internal/app/app.go b/notifier/internal/app/app.go new file mode 100644 index 0000000..204ebdf --- /dev/null +++ b/notifier/internal/app/app.go @@ -0,0 +1,83 @@ +package app + +import ( + "context" + "fmt" + "os" + + "route256/notifier/internal/app/controller" + "route256/notifier/internal/domain/service" + "route256/notifier/internal/infra/config" + "route256/notifier/internal/infra/messaging/kafka" + + "github.com/IBM/sarama" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +type App struct { + config *config.Config + + controller *controller.Controller +} + +func NewApp(configPath string) (*App, error) { + cfg, err := config.LoadConfig(configPath) + if err != nil { + return nil, fmt.Errorf("unable to load config: %w", err) + } + + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + zerolog.SetGlobalLevel(zerolog.InfoLevel) + + if cfg.Service.LogLevel != "" { + level, logErr := zerolog.ParseLevel(cfg.Service.LogLevel) + if logErr != nil { + return nil, fmt.Errorf("unknown log level `%s` provided: %w", cfg.Service.LogLevel, logErr) + } + + zerolog.SetGlobalLevel(level) + } + + log.WithLevel(zerolog.GlobalLevel()).Msgf("using logging level=`%s`", zerolog.GlobalLevel().String()) + + consumer, err := setupSaramaConsumerGroup([]string{cfg.Kafka.Brokers}, cfg.Kafka.ConsumerGroupID) + if err != nil { + return nil, err + } + + kafkaConsumer, err := kafka.NewStatusConsumer(cfg.Kafka.OrderTopic, consumer) + if err != nil { + return nil, fmt.Errorf("NewKafkaStatusConsumer: %w", err) + } + + notifierService := service.NewNotifierService(kafkaConsumer) + controller := controller.NewController(notifierService) + + return &App{ + config: cfg, + controller: controller, + }, err +} + +func (a *App) Run(ctx context.Context) { + a.controller.Run(ctx) +} + +func (a *App) Shutdown(ctx context.Context) error { + return a.controller.Stop(ctx) +} + +func setupSaramaConsumerGroup(brokers []string, groupID string) (sarama.ConsumerGroup, error) { + cfg := sarama.NewConfig() + cfg.Consumer.Group.Rebalance.Strategy = sarama.NewBalanceStrategyRange() + cfg.Consumer.Offsets.Initial = sarama.OffsetNewest + cfg.Metadata.AllowAutoTopicCreation = false + + group, err := sarama.NewConsumerGroup(brokers, groupID, cfg) + if err != nil { + return nil, fmt.Errorf("sarama.NewConsumerGroup: %w", err) + } + + return group, nil +} diff --git a/notifier/internal/app/controller/controller.go b/notifier/internal/app/controller/controller.go new file mode 100644 index 0000000..91a2f03 --- /dev/null +++ b/notifier/internal/app/controller/controller.go @@ -0,0 +1,74 @@ +package controller + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type NotifierService interface { + RunFetchEvents(ctx context.Context) error +} + +type Controller struct { + notifier NotifierService + + wg sync.WaitGroup + errCh chan error + cancel context.CancelFunc +} + +func NewController(notifierService NotifierService) *Controller { + return &Controller{ + notifier: notifierService, + wg: sync.WaitGroup{}, + errCh: make(chan error, 1), + } +} + +// Run service asynchroniously. +func (c *Controller) Run(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + c.cancel = cancel + + c.wg.Add(1) + go func() { + defer c.wg.Done() + + if err := c.notifier.RunFetchEvents(ctx); err != nil && !errors.Is(err, context.Canceled) { + select { + case c.errCh <- fmt.Errorf("RunFetchEvents: %w", err): + default: + } + } + }() +} + +// Gracefully stop service. +func (c *Controller) Stop(ctx context.Context) error { + if c.cancel != nil { + c.cancel() + } + + done := make(chan struct{}) + go func() { + c.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-ctx.Done(): + return ctx.Err() + } + + close(c.errCh) + + select { + case err := <-c.errCh: + return err + default: + return nil + } +} diff --git a/notifier/internal/domain/service/service.go b/notifier/internal/domain/service/service.go new file mode 100644 index 0000000..30d43c3 --- /dev/null +++ b/notifier/internal/domain/service/service.go @@ -0,0 +1,42 @@ +package service + +import ( + "context" + "time" + + "github.com/rs/zerolog/log" +) + +type StatusConsumer interface { + FetchEvents(ctx context.Context) error +} + +type NotifierService struct { + consumer StatusConsumer +} + +func NewNotifierService(consumer StatusConsumer) *NotifierService { + return &NotifierService{ + consumer: consumer, + } +} + +func (s *NotifierService) RunFetchEvents(ctx context.Context) error { + backoff := 1 * time.Second + + for { + if err := s.consumer.FetchEvents(ctx); err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + + log.Error().Err(err).Msgf("consume error (retrying in %d second(s))", backoff) + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + } + } +} diff --git a/notifier/internal/infra/config/config.go b/notifier/internal/infra/config/config.go new file mode 100644 index 0000000..9fe2997 --- /dev/null +++ b/notifier/internal/infra/config/config.go @@ -0,0 +1,51 @@ +package config + +import ( + "fmt" + "os" + "path/filepath" + "strings" + + "gopkg.in/yaml.v3" +) + +type Config struct { + Service struct { + LogLevel string `yaml:"log_level"` + } `yaml:"service"` + + Kafka struct { + Host string `yaml:"host"` + Port string `yaml:"port"` + OrderTopic string `yaml:"order_topic"` + ConsumerGroupID string `yaml:"consumer_group_id"` + Brokers string `yaml:"brokers"` + } `yaml:"kafka"` +} + +func LoadConfig(filename string) (*Config, error) { + workDir, err := os.Getwd() + if err != nil { + return nil, err + } + cfgRoot := filepath.Join(workDir, "configs") + absCfgRoot, _ := filepath.Abs(cfgRoot) + + filePath := filepath.Join(absCfgRoot, filepath.Clean(filename)) + if !strings.HasPrefix(filePath, absCfgRoot) { + return nil, fmt.Errorf("invalid path") + } + + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + config := &Config{} + if err := yaml.NewDecoder(f).Decode(config); err != nil { + return nil, err + } + + return config, nil +} diff --git a/notifier/internal/infra/messaging/kafka/consumer.go b/notifier/internal/infra/messaging/kafka/consumer.go new file mode 100644 index 0000000..d353af1 --- /dev/null +++ b/notifier/internal/infra/messaging/kafka/consumer.go @@ -0,0 +1,48 @@ +package kafka + +import ( + "context" + + "github.com/IBM/sarama" + "github.com/rs/zerolog/log" +) + +type StatusConsumer struct { + group sarama.ConsumerGroup + topic string +} + +func NewStatusConsumer(topic string, consumerGroup sarama.ConsumerGroup) (*StatusConsumer, error) { + return &StatusConsumer{ + group: consumerGroup, + topic: topic, + }, nil +} + +func (c *StatusConsumer) FetchEvents(ctx context.Context) error { + h := &statusHandler{} + + return c.group.Consume(ctx, []string{c.topic}, h) +} + +type statusHandler struct{} + +func (h *statusHandler) Setup(sess sarama.ConsumerGroupSession) error { + log.Info().Msgf("[notifier] assigned %v", sess.Claims()) + + return nil +} + +func (h *statusHandler) Cleanup(_ sarama.ConsumerGroupSession) error { + return nil +} + +func (h *statusHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for msg := range claim.Messages() { + log.Info().Msgf("[order=%s] p=%d off=%d %s\n", string(msg.Key), msg.Partition, msg.Offset, string(msg.Value)) + + sess.MarkMessage(msg, "") + } + + return nil +}