From de8b49d30c762d03f449265101a4248e16990e57 Mon Sep 17 00:00:00 2001 From: Vladimir Garvardt Date: Wed, 25 Sep 2019 16:02:46 +0200 Subject: [PATCH 1/3] Added connection name parameter --- .travis.yml | 6 +- Gopkg.lock | 255 ------------------------------------------- Gopkg.toml | 50 --------- Makefile | 2 - cmd/goengine/main.go | 2 +- go.mod | 21 ++++ go.sum | 47 ++++++++ rabbit/eventbus.go | 52 ++++++--- 8 files changed, 108 insertions(+), 327 deletions(-) delete mode 100644 Gopkg.lock delete mode 100644 Gopkg.toml create mode 100644 go.mod create mode 100644 go.sum diff --git a/.travis.yml b/.travis.yml index 952cda9..78d5139 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,7 @@ services: go: - "1.11" + - "1.12" - "stable" env: @@ -14,11 +15,8 @@ env: before_install: - docker-compose up -d -install: - - make deps - script: - - make $MAKE_TASK + - env GO111MODULE=on make $MAKE_TASK after_script: - docker-compose down -v diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index 983366d..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,255 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:586ea76dbd0374d6fb649a91d70d652b7fe0ccffb8910a77468e7702e7901f3d" - name = "github.com/go-stack/stack" - packages = ["."] - pruneopts = "UT" - revision = "2fee6af1a9795aafbe0253a0cfbdf668e1fb8a9a" - version = "v1.8.0" - -[[projects]] - digest = "1:bed9d72d596f94e65fff37f4d6c01398074a6bb1c3f3ceff963516bd01db6ff5" - name = "github.com/gofrs/uuid" - packages = ["."] - pruneopts = "UT" - revision = "6b08a5c5172ba18946672b49749cde22873dd7c2" - version = "v3.2.0" - -[[projects]] - branch = "master" - digest = "1:4a0c6bb4805508a6287675fac876be2ac1182539ca8a32468d8128882e9d5009" - name = "github.com/golang/snappy" - packages = ["."] - pruneopts = "UT" - revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" - -[[projects]] - branch = "master" - digest = "1:59392ed8afb901aab4287d4894df8191722e34f3957716f4350c8c133ce99046" - name = "github.com/hpcloud/tail" - packages = [ - ".", - "ratelimiter", - "util", - "watch", - "winfile", - ] - pruneopts = "UT" - revision = "a1dbeea552b7c8df4b542c66073e393de198a800" - -[[projects]] - digest = "1:5f4b78246f0bcb105b1e3b2b9e22b52a57cd02f57a8078572fe27c62f4a75ff7" - name = "github.com/onsi/ginkgo" - packages = [ - ".", - "config", - "internal/codelocation", - "internal/containernode", - "internal/failer", - "internal/leafnodes", - "internal/remote", - "internal/spec", - "internal/spec_iterator", - "internal/specrunner", - "internal/suite", - "internal/testingtproxy", - "internal/writer", - "reporters", - "reporters/stenographer", - "reporters/stenographer/support/go-colorable", - "reporters/stenographer/support/go-isatty", - "types", - ] - pruneopts = "UT" - revision = "2e1be8f7d90e9d3e3e58b0ce470f2f14d075406f" - version = "v1.7.0" - -[[projects]] - digest = "1:7a137fb7718928e473b7d805434ae563ec41790d3d227cdc64e8b14d1cab8a1f" - name = "github.com/onsi/gomega" - packages = [ - ".", - "format", - "internal/assertion", - "internal/asyncassertion", - "internal/oraclematcher", - "internal/testingtsupport", - "matchers", - "matchers/support/goraph/bipartitegraph", - "matchers/support/goraph/edge", - "matchers/support/goraph/node", - "matchers/support/goraph/util", - "types", - ] - pruneopts = "UT" - revision = "65fb64232476ad9046e57c26cd0bff3d3a8dc6cd" - version = "v1.4.3" - -[[projects]] - branch = "master" - digest = "1:525ac3364813b4688df380594e562133e07830dfce0722effda64b37634c13d0" - name = "github.com/streadway/amqp" - packages = ["."] - pruneopts = "UT" - revision = "a314942b2fd9dde7a3f70ba3f1062848ce6eb392" - -[[projects]] - branch = "master" - digest = "1:40fdfd6ab85ca32b6935853bbba35935dcb1d796c8135efd85947566c76e662e" - name = "github.com/xdg/scram" - packages = ["."] - pruneopts = "UT" - revision = "7eeb5667e42c09cb51bf7b7c28aea8c56767da90" - -[[projects]] - branch = "master" - digest = "1:f5c1d04bc09c644c592b45b9f0bad4030521b1a7d11c7dadbb272d9439fa6e8e" - name = "github.com/xdg/stringprep" - packages = ["."] - pruneopts = "UT" - revision = "73f8eece6fdcd902c185bf651de50f3828bed5ed" - -[[projects]] - digest = "1:21f9cb6f1337c4776ed1d2d7b7ed6ffba3269ca12274c065364a5c0edee1f28b" - name = "go.mongodb.org/mongo-driver" - packages = [ - "bson", - "bson/bsoncodec", - "bson/bsonrw", - "bson/bsontype", - "bson/primitive", - "event", - "internal", - "mongo", - "mongo/options", - "mongo/readconcern", - "mongo/readpref", - "mongo/writeconcern", - "tag", - "version", - "x/bsonx", - "x/bsonx/bsoncore", - "x/mongo/driver", - "x/mongo/driver/auth", - "x/mongo/driver/auth/internal/gssapi", - "x/mongo/driver/session", - "x/mongo/driver/topology", - "x/mongo/driver/uuid", - "x/network/address", - "x/network/command", - "x/network/compressor", - "x/network/connection", - "x/network/connstring", - "x/network/description", - "x/network/result", - "x/network/wiremessage", - ] - pruneopts = "UT" - revision = "1c3b9b9a41eecdf056560e07b68e6407b8d598c3" - version = "v1.0.0" - -[[projects]] - branch = "master" - digest = "1:f92f6956e4059f6a3efc14924d2dd58ba90da25cc57fe07ae3779ef2f5e0c5f2" - name = "golang.org/x/crypto" - packages = ["pbkdf2"] - pruneopts = "UT" - revision = "b01c7a72566457eb1420261cdafef86638fc3861" - -[[projects]] - branch = "master" - digest = "1:4939e20b972c22cd512abff0bf6ed8fc0e3e86aeb836d016be3323c6a901d99d" - name = "golang.org/x/net" - packages = [ - "html", - "html/atom", - "html/charset", - ] - pruneopts = "UT" - revision = "d26f9f9a57f3fab6a695bec0d84433c2c50f8bbf" - -[[projects]] - branch = "master" - digest = "1:75515eedc0dc2cb0b40372008b616fa2841d831c63eedd403285ff286c593295" - name = "golang.org/x/sync" - packages = ["semaphore"] - pruneopts = "UT" - revision = "37e7f081c4d4c64e13b10787722085407fe5d15f" - -[[projects]] - branch = "master" - digest = "1:aba2486bbebaadd1f62bde3c363ddbd984efade895833d86ed069f586fd9e899" - name = "golang.org/x/sys" - packages = ["unix"] - pruneopts = "UT" - revision = "aca44879d5644da7c5b8ec6a1115e9b6ea6c40d9" - -[[projects]] - digest = "1:4392fcf42d5cf0e3ff78c96b2acf8223d49e4fdc53eb77c99d2f8dfe4680e006" - name = "golang.org/x/text" - packages = [ - "encoding", - "encoding/charmap", - "encoding/htmlindex", - "encoding/internal", - "encoding/internal/identifier", - "encoding/japanese", - "encoding/korean", - "encoding/simplifiedchinese", - "encoding/traditionalchinese", - "encoding/unicode", - "internal/gen", - "internal/tag", - "internal/triegen", - "internal/ucd", - "internal/utf8internal", - "language", - "runes", - "transform", - "unicode/cldr", - "unicode/norm", - ] - pruneopts = "UT" - revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" - version = "v0.3.0" - -[[projects]] - digest = "1:abeb38ade3f32a92943e5be54f55ed6d6e3b6602761d74b4aab4c9dd45c18abd" - name = "gopkg.in/fsnotify/fsnotify.v1" - packages = ["."] - pruneopts = "UT" - revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" - version = "v1.4.7" - -[[projects]] - digest = "1:3c839a777de0e6da035c9de900b60cbec463b0a89351192c1ea083eaf9e0fce0" - name = "gopkg.in/tomb.v1" - packages = ["."] - pruneopts = "UT" - revision = "c131134a1947e9afd9cecfe11f4c6dff0732ae58" - -[[projects]] - digest = "1:4d2e5a73dc1500038e504a8d78b986630e3626dc027bc030ba5c75da257cdb96" - name = "gopkg.in/yaml.v2" - packages = ["."] - pruneopts = "UT" - revision = "51d6538a90f86fe93ac480b35f37b2be17fef232" - version = "v2.2.2" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = [ - "github.com/gofrs/uuid", - "github.com/onsi/ginkgo", - "github.com/onsi/gomega", - "github.com/streadway/amqp", - "go.mongodb.org/mongo-driver/bson", - "go.mongodb.org/mongo-driver/mongo", - "go.mongodb.org/mongo-driver/mongo/options", - "go.mongodb.org/mongo-driver/x/bsonx", - ] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index 3d312ad..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,50 +0,0 @@ -# Gopkg.toml example -# -# Refer to https://golang.github.io/dep/docs/Gopkg.toml.html -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" -# -# [prune] -# non-go = false -# go-tests = true -# unused-packages = true - - -[[constraint]] - name = "github.com/onsi/ginkgo" - version = "1.7.0" - -[[constraint]] - name = "github.com/onsi/gomega" - version = "1.4.3" - -[[constraint]] - branch = "master" - name = "github.com/streadway/amqp" - -[[constraint]] - name = "github.com/gofrs/uuid" - version = "3.2.0" - -[prune] - go-tests = true - unused-packages = true - -[[constraint]] - name = "go.mongodb.org/mongo-driver" - version = "v1.0.0" diff --git a/Makefile b/Makefile index fb3273b..e9fd7cd 100644 --- a/Makefile +++ b/Makefile @@ -5,10 +5,8 @@ PKG_SRC := github.com/hellofresh/goengine deps: @echo "$(OK_COLOR)==> Installing dependencies$(NO_COLOR)" - @go get -u github.com/golang/dep/cmd/dep @go get -u github.com/onsi/ginkgo/ginkgo @go get -u github.com/onsi/gomega - @dep ensure -v -vendor-only vet: @echo "$(OK_COLOR)==> checking code correctness with 'go vet' tool$(NO_COLOR)" diff --git a/cmd/goengine/main.go b/cmd/goengine/main.go index cd54614..b1af875 100644 --- a/cmd/goengine/main.go +++ b/cmd/goengine/main.go @@ -59,7 +59,7 @@ func main() { registry.RegisterType(&RecipeRated{}) goengine.Log("Setting up the event bus", map[string]interface{}{"dsn": brokerDSN}, nil) - bus := rabbit.NewEventBus(brokerDSN, "events", "events") + bus := rabbit.NewEventBus(brokerDSN, "events", "events", "goengine-test") goengine.Log("Setting up the event store", nil, nil) eventStore := mongodb.NewEventStore(mongoClient.Database("event_store"), registry) diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ee59843 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module github.com/hellofresh/goengine + +go 1.12 + +require ( + github.com/go-stack/stack v1.8.0 // indirect + github.com/gofrs/uuid v3.2.0+incompatible + github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect + github.com/onsi/ginkgo v1.7.0 + github.com/onsi/gomega v1.4.3 + github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9 + github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect + github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect + go.mongodb.org/mongo-driver v1.0.0 + golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 // indirect + golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 // indirect + golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect + golang.org/x/sys v0.0.0-20190130150945-aca44879d564 // indirect + gopkg.in/fsnotify/fsnotify.v1 v1.4.7 // indirect + gopkg.in/yaml.v2 v2.2.2 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..076fade --- /dev/null +++ b/go.sum @@ -0,0 +1,47 @@ +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hpcloud/tail v0.0.0-20180514194441-a1dbeea552b7/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0 h1:WSHQ+IS43OoUrWtD1/bbclrwK8TTH5hzp+umCiuxHgs= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3 h1:RE1xgDvH7imwFD45h+u2SgIfERHlS2yNG4DObb5BSKU= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9 h1:37QTz/gdHBLQcsmgMTnQDSWCtKzJ7YnfI2M2yTdr4BQ= +github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9/go.mod h1:1WNBiOZtZQLpVAyu0iTduoJL9hEsMloAK5XWrtW0xdY= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc h1:n+nNi93yXLkJvKwXNP9d55HC7lGK4H/SRcwB5IaUZLo= +github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= +go.mongodb.org/mongo-driver v1.0.0 h1:KxPRDyfB2xXnDE2My8acoOWBQkfv3tz0SaWTRZjJR0c= +go.mongodb.org/mongo-driver v1.0.0/go.mod h1:u7ryQJ+DOzQmeO7zB6MHyr8jkEQvC8vH7qLUO4lqsUM= +golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664 h1:YbZJ76lQ1BqNhVe7dKTSB67wDrc2VPRR75IyGyyPDX8= +golang.org/x/crypto v0.0.0-20190130090550-b01c7a725664/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3 h1:ulvT7fqt0yHWzpJwI57MezWnYDVpCAYBVuYst/L+fAY= +golang.org/x/net v0.0.0-20190125091013-d26f9f9a57f3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564 h1:o6ENHFwwr1TZ9CUPQcfo1HGvLP1OPsPOTB7xCIOPNmU= +golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= +gopkg.in/fsnotify/fsnotify.v1 v1.4.7/go.mod h1:Fyux9zXlo4rWoMSIzpn9fDAYjalPqJ/K1qJ27s+7ltE= +gopkg.in/tomb.v1 v1.0.0-20140529071818-c131134a1947/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/rabbit/eventbus.go b/rabbit/eventbus.go index 2ee236c..702888c 100644 --- a/rabbit/eventbus.go +++ b/rabbit/eventbus.go @@ -11,6 +11,11 @@ import ( "github.com/streadway/amqp" ) +const ( + defaultHeartbeat = 10 * time.Second + defaultLocale = "en_US" +) + // RawVersionedEvent represents the event that goes into rabbitmq type RawVersionedEvent struct { ID string `bson:"aggregate_id,omitempty"` @@ -22,20 +27,25 @@ type RawVersionedEvent struct { // EventBus ... type EventBus struct { - brokerDSN string - name string - exchange string + brokerDSN string + queue string + exchange string + connectionName string } // NewEventBus ... -func NewEventBus(brokerDSN string, name string, exchange string) *EventBus { - return &EventBus{brokerDSN, name, exchange} +func NewEventBus(brokerDSN, queue, exchange, connectionName string) *EventBus { + return &EventBus{brokerDSN, queue, exchange, connectionName} } // PublishEvents will publish events func (bus *EventBus) PublishEvents(events []*goengine.DomainMessage) error { // Connects opens an AMQP connection from the credentials in the URL. - conn, err := amqp.Dial(bus.brokerDSN) + conn, err := amqp.DialConfig(bus.brokerDSN, amqp.Config{ + Heartbeat: defaultHeartbeat, + Locale: defaultLocale, + Properties: amqp.Table{"connection_name": bus.connectionName + ".events-publisher"}, + }) if err != nil { return err } @@ -103,7 +113,7 @@ func (bus *EventBus) ReceiveEvents(options goengine.VersionedEventReceiverOption select { case ch := <-options.Close: defer conn.Close() - ch <- c.Cancel(bus.name, false) + ch <- c.Cancel(bus.queue, false) return case message, more := <-events: @@ -116,14 +126,18 @@ func (bus *EventBus) ReceiveEvents(options goengine.VersionedEventReceiverOption if nil != err { goengine.Log("EventBus.Cannot find event type", map[string]interface{}{"type": raw.Type}, nil) options.Error <- errors.New("Cannot find event type " + raw.Type) - message.Ack(true) + if err := message.Ack(true); err != nil { + goengine.Log("EventBus.Ack could not ack message", nil, err) + } } else { ackCh := make(chan bool) goengine.Log("EventBus.Dispatching Message", nil, nil) options.ReceiveEvent <- goengine.VersionedEventTransactedAccept{Event: domainEvent, ProcessedSuccessfully: ackCh} result := <-ackCh - message.Ack(result) + if err := message.Ack(result); err != nil { + goengine.Log("EventBus.Ack could not ack dispatched message", nil, err) + } } } } else { @@ -153,7 +167,11 @@ func (bus *EventBus) ReceiveEvents(options goengine.VersionedEventReceiverOption // DeleteQueue will delete a queue func (bus *EventBus) DeleteQueue(name string) error { - conn, err := amqp.Dial(bus.brokerDSN) + conn, err := amqp.DialConfig(bus.brokerDSN, amqp.Config{ + Heartbeat: defaultHeartbeat, + Locale: defaultLocale, + Properties: amqp.Table{"connection_name": bus.connectionName + ".queue-deleter"}, + }) if err != nil { return err } @@ -168,7 +186,11 @@ func (bus *EventBus) DeleteQueue(name string) error { } func (bus *EventBus) consumeEventsQueue(exclusive bool) (*amqp.Connection, *amqp.Channel, <-chan amqp.Delivery, error) { - conn, err := amqp.Dial(bus.brokerDSN) + conn, err := amqp.DialConfig(bus.brokerDSN, amqp.Config{ + Heartbeat: defaultHeartbeat, + Locale: defaultLocale, + Properties: amqp.Table{"connection_name": bus.connectionName + ".events-consumer"}, + }) if err != nil { return nil, nil, nil, err } @@ -187,21 +209,21 @@ func (bus *EventBus) consumeEventsQueue(exclusive bool) (*amqp.Connection, *amqp return nil, nil, nil, fmt.Errorf("exchange.declare: %v", err) } - if _, err = c.QueueDeclare(bus.name, true, false, false, false, nil); err != nil { + if _, err = c.QueueDeclare(bus.queue, true, false, false, false, nil); err != nil { return nil, nil, nil, fmt.Errorf("queue.declare: %v", err) } - if err = c.QueueBind(bus.name, bus.name, bus.exchange, false, nil); err != nil { + if err = c.QueueBind(bus.queue, bus.queue, bus.exchange, false, nil); err != nil { return nil, nil, nil, fmt.Errorf("queue.bind: %v", err) } - events, err := c.Consume(bus.name, bus.name, false, exclusive, false, false, nil) + events, err := c.Consume(bus.queue, bus.queue, false, exclusive, false, false, nil) if err != nil { return nil, nil, nil, fmt.Errorf("basic.consume: %v", err) } if err := c.Qos(1, 0, false); err != nil { - return nil, nil, nil, fmt.Errorf("Qos: %v", err) + return nil, nil, nil, fmt.Errorf("qos: %v", err) } return conn, c, events, nil From 2b02095f5e76d3b5aaaa68055e19b4fb5b9935bc Mon Sep 17 00:00:00 2001 From: Vladimir Garvardt Date: Wed, 25 Sep 2019 16:25:43 +0200 Subject: [PATCH 2/3] Added missing test deps --- .travis.yml | 3 +++ Makefile | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 78d5139..9e59972 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,6 +15,9 @@ env: before_install: - docker-compose up -d +install: + - make deps + script: - env GO111MODULE=on make $MAKE_TASK diff --git a/Makefile b/Makefile index e9fd7cd..003b492 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ OK_COLOR=\033[32;01m PKG_SRC := github.com/hellofresh/goengine deps: - @echo "$(OK_COLOR)==> Installing dependencies$(NO_COLOR)" + @echo "$(OK_COLOR)==> Installing test dependencies$(NO_COLOR)" @go get -u github.com/onsi/ginkgo/ginkgo @go get -u github.com/onsi/gomega From 5867b6903cc1a2671abad99e051751cd1fab0773 Mon Sep 17 00:00:00 2001 From: Vladimir Garvardt Date: Wed, 25 Sep 2019 16:29:54 +0200 Subject: [PATCH 3/3] No more go1.11 support --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 9e59972..2dc38b3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,6 @@ services: - docker go: - - "1.11" - "1.12" - "stable"