Skip to content

Commit

Permalink
Merge pull request #51 from hellofresh/amqp-listen-publish
Browse files Browse the repository at this point in the history
Add Amqp listener
  • Loading branch information
bilaljaved authored Nov 1, 2019
2 parents b1bfa9d + d425ec4 commit 4896794
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 14 deletions.
37 changes: 34 additions & 3 deletions extension/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@ package amqp
import (
"io"

"github.com/hellofresh/goengine"
"github.com/streadway/amqp"
)

// NotificationChannel represents a channel for notifications
type NotificationChannel interface {
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
Qos(prefetchCount, prefetchSize int, global bool) error
}

// setup returns a connection and channel to be used for the Queue setup
func setup(url, queue string) (io.Closer, NotificationChannel, error) {

conn, err := amqp.Dial(url)
if err != nil {
return nil, nil, err
Expand All @@ -25,7 +34,29 @@ func setup(url, queue string) (io.Closer, NotificationChannel, error) {
return conn, ch, nil
}

// NotificationChannel represents a channel for notifications
type NotificationChannel interface {
Publish(exchange, queue string, mandatory, immediate bool, msg amqp.Publishing) error
// DirectQueueConsume returns a Consume func that will connect to the provided AMQP server and create a queue for direct message delivery
func DirectQueueConsume(amqpDSN, queue string) (Consume, error) {
if _, err := amqp.ParseURI(amqpDSN); err != nil {
return nil, goengine.InvalidArgumentError("amqpDSN")
}
if len(queue) == 0 {
return nil, goengine.InvalidArgumentError("queue")
}

return func() (io.Closer, <-chan amqp.Delivery, error) {
conn, ch, err := setup(amqpDSN, queue)
if err != nil {
return nil, nil, err
}

// Indicate we only want 1 message to be acknowledge at a time.
if err := ch.Qos(1, 0, false); err != nil {
return nil, nil, err
}

// Since there can be multiple consumers, fair distribution of deliveries is required
deliveries, err := ch.Consume(queue, "", false, false, false, false, nil)

return conn, deliveries, err
}, nil
}
58 changes: 57 additions & 1 deletion extension/amqp/amqp_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
// +build unit

package amqp_test

import "github.com/streadway/amqp"
import (
"testing"

"github.com/hellofresh/goengine"
goengineAmqp "github.com/hellofresh/goengine/extension/amqp"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
)

type mockAcknowledger struct {
}

func (ack mockAcknowledger) Ack(tag uint64, multiple bool) error {
return nil
}

func (ack mockAcknowledger) Nack(tag uint64, multiple bool, requeue bool) error {
return nil
}

func (ack mockAcknowledger) Reject(tag uint64, requeue bool) error {
return nil
}

type mockConnection struct {
}
Expand All @@ -21,3 +45,35 @@ func (ch mockChannel) Publish(
) error {
return nil
}

func (ch mockChannel) Consume(
queue,
consumer string,
autoAck,
exclusive,
noLocal,
noWait bool,
args amqp.Table,
) (<-chan amqp.Delivery, error) {
return make(chan amqp.Delivery), nil
}
func (ch mockChannel) Qos(prefetchCount, prefetchSize int, global bool) error {
return nil
}

func TestDirectQueueConsume(t *testing.T) {
t.Run("Invalid arguments", func(t *testing.T) {
_, err := goengineAmqp.DirectQueueConsume("http://localhost:5672/", "my-queue")
assert.Equal(t, goengine.InvalidArgumentError("amqpDSN"), err)

_, err = goengineAmqp.DirectQueueConsume("amqp://localhost:5672/", "")
assert.Equal(t, goengine.InvalidArgumentError("queue"), err)
})

t.Run("Returns amqp.Consume", func(t *testing.T) {
c, err := goengineAmqp.DirectQueueConsume("amqp://localhost:5672/", "my-queue")
assert.NoError(t, err)
assert.NotNil(t, c)
assert.IsType(t, (goengineAmqp.Consume)(nil), c)
})
}
142 changes: 142 additions & 0 deletions extension/amqp/listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package amqp

import (
"context"
"io"
"time"

"github.com/hellofresh/goengine"
"github.com/hellofresh/goengine/driver/sql"
"github.com/mailru/easyjson"
"github.com/streadway/amqp"
)

// Ensure Listener implements sql.Listener
var _ sql.Listener = &Listener{}

type (
// Consume returns a channel of amqp.Delivery's and a related closer or an error
Consume func() (io.Closer, <-chan amqp.Delivery, error)

// Listener consumes messages from an queue
Listener struct {
consume Consume
minReconnectInterval time.Duration
maxReconnectInterval time.Duration
logger goengine.Logger
}
)

// NewListener returns a new Listener
func NewListener(
consume Consume,
minReconnectInterval time.Duration,
maxReconnectInterval time.Duration,
logger goengine.Logger,
) (*Listener, error) {
switch {
case consume == nil:
return nil, goengine.InvalidArgumentError("consume")
}

if logger == nil {
logger = goengine.NopLogger
}

return &Listener{
consume: consume,
minReconnectInterval: minReconnectInterval,
maxReconnectInterval: maxReconnectInterval,
logger: logger,
}, nil
}

// Listen receives messages from a queue, transforms them into a sql.ProjectionNotification and calls the trigger
func (l *Listener) Listen(ctx context.Context, trigger sql.ProjectionTrigger) error {
var nextReconnect time.Time
reconnectInterval := l.minReconnectInterval
for {
select {
case <-ctx.Done():
return context.Canceled
default:
}

conn, deliveries, err := l.consume()
if err != nil {
l.logger.Error("failed to start consuming amqp messages", func(entry goengine.LoggerEntry) {
entry.Error(err)
entry.String("reconnect_in", reconnectInterval.String())
})

time.Sleep(reconnectInterval)
reconnectInterval *= 2
if reconnectInterval > l.maxReconnectInterval {
reconnectInterval = l.maxReconnectInterval
}
continue
}
reconnectInterval = l.minReconnectInterval
nextReconnect = time.Now().Add(reconnectInterval)

l.consumeMessages(ctx, conn, deliveries, trigger)

select {
case <-ctx.Done():
return context.Canceled
default:
time.Sleep(time.Until(nextReconnect))
}
}
}

func (l *Listener) consumeMessages(ctx context.Context, conn io.Closer, deliveries <-chan amqp.Delivery, trigger sql.ProjectionTrigger) {
defer func() {
if conn == nil {
return
}

if err := conn.Close(); err != nil {
l.logger.Error("failed to close amqp connection", func(entry goengine.LoggerEntry) {
entry.Error(err)
})
}
}()

for {
select {
case <-ctx.Done():
return
case msg, ok := <-deliveries:
if !ok {
return
}

notification := &sql.ProjectionNotification{}
if err := easyjson.Unmarshal(msg.Body, notification); err != nil {
l.logger.Error("failed to unmarshal delivery, dropping message", func(entry goengine.LoggerEntry) {
entry.Error(err)
})
continue
}

if err := msg.Ack(false); err != nil {
l.logger.Error("failed to acknowledge notification delivery", func(entry goengine.LoggerEntry) {
entry.Error(err)
entry.Int64("notification.no", notification.No)
entry.String("notification.aggregate_id", notification.AggregateID)
})
continue
}

if err := trigger(ctx, notification); err != nil {
l.logger.Error("failed to project notification", func(entry goengine.LoggerEntry) {
entry.Error(err)
entry.Int64("notification.no", notification.No)
entry.String("notification.aggregate_id", notification.AggregateID)
})
}

}
}
}
Loading

0 comments on commit 4896794

Please sign in to comment.