Skip to content

Commit

Permalink
Merge pull request #42 from hellofresh/patch/enhance-delays
Browse files Browse the repository at this point in the history
Enhance delays
  • Loading branch information
cwygoda authored Sep 4, 2019
2 parents 67783fa + d98e192 commit e45f508
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 91 deletions.
49 changes: 20 additions & 29 deletions driver/sql/projection_notification_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"runtime"
"sync"
"time"

"github.com/hellofresh/goengine"
"github.com/pkg/errors"
Expand Down Expand Up @@ -111,39 +110,31 @@ func (b *ProjectionNotificationProcessor) Queue(ctx context.Context, notificatio
}

func (b *ProjectionNotificationProcessor) startProcessor(ctx context.Context, handler ProcessHandler) {
ProcessorLoop:
for {
select {
case <-b.done:
notification, stopped := b.notificationQueue.Next(ctx)
if stopped {
return
case <-ctx.Done():
return
case notification := <-b.notificationQueue.Channel():
var queueFunc ProjectionTrigger
if notification == nil {
queueFunc = b.notificationQueue.Queue
} else {
queueFunc = b.notificationQueue.ReQueue

if notification.ValidAfter.After(time.Now()) {
b.notificationQueue.PutBack(notification)
continue ProcessorLoop
}
}
}

// Execute the notification
b.metrics.StartNotificationProcessing(notification)
if err := handler(ctx, notification, queueFunc); err != nil {
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
e.Error(err)
e.Any("notification", notification)
})
var queueFunc ProjectionTrigger
if notification == nil {
queueFunc = b.notificationQueue.Queue
} else {
queueFunc = b.notificationQueue.ReQueue
}

b.metrics.FinishNotificationProcessing(notification, false)
// Execute the notification
b.metrics.StartNotificationProcessing(notification)
if err := handler(ctx, notification, queueFunc); err != nil {
b.logger.Error("the ProcessHandler produced an error", func(e goengine.LoggerEntry) {
e.Error(err)
e.Any("notification", notification)
})

} else {
b.metrics.FinishNotificationProcessing(notification, true)
}
b.metrics.FinishNotificationProcessing(notification, false)

} else {
b.metrics.FinishNotificationProcessing(notification, true)
}
}
}
Expand Down
41 changes: 22 additions & 19 deletions driver/sql/projection_notification_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,40 +51,43 @@ func TestStartProcessor(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.title, func(t *testing.T) {
ctrl := gomock.NewController(t)

nqMock := mocks.NewNotificationQueuer(ctrl)
queueBufferSize := 1
queueProcessorsCount := 1
ctx := context.Background()
notification := testCase.notification()

e := nqMock.EXPECT()
queueCallCount := 0
reQueueCallCount := 0
notificationQueue := mocks.NewNotificationQueuer(ctrl)
expect := notificationQueue.EXPECT()

switch testCase.queueFunc {
case "Queue":
queueCallCount++
expect.Queue(gomock.Eq(ctx), gomock.Eq(notification)).Times(1)
case "ReQueue":
reQueueCallCount++
expect.ReQueue(gomock.Eq(ctx), gomock.Eq(notification)).Times(1)
}

e.Queue(gomock.Eq(ctx), gomock.Eq(notification)).Times(queueCallCount)
e.ReQueue(gomock.Eq(ctx), gomock.Eq(notification)).Times(reQueueCallCount)
done := make(chan struct{})
e.Open().DoAndReturn(func() chan struct{} {
channel := make(chan *sql.ProjectionNotification, queueBufferSize)
channel <- notification
called := false

expect.Open().DoAndReturn(func() chan struct{} {
return done
}).AnyTimes()
channel := make(chan *sql.ProjectionNotification, 1)
channel <- notification
e.Channel().Return(channel).AnyTimes()
e.PutBack(gomock.Eq(notification)).Do(func(notification *sql.ProjectionNotification) {
channel <- notification

expect.Next(gomock.Eq(ctx)).DoAndReturn(func(ctx context.Context) (*sql.ProjectionNotification, bool) {
if called {
return nil, true
}
called = true
return notification, false
}).AnyTimes()
e.Close().Do(func() {

expect.Close().Do(func() {
close(channel)
})

bufferSize := 1
queueProcessorsCount := 1
processor, err := sql.NewBackgroundProcessor(queueProcessorsCount, bufferSize, nil, nil, nqMock)
processor, err := sql.NewBackgroundProcessor(queueProcessorsCount, queueBufferSize, nil, nil, notificationQueue)
require.NoError(t, err)

var wg sync.WaitGroup
Expand Down
43 changes: 26 additions & 17 deletions driver/sql/projection_notification_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ var _ ProjectionTrigger = (&NotificationQueue{}).ReQueue
type (
// NotificationQueuer describes a smart queue for projection notifications
NotificationQueuer interface {
Channel() chan *ProjectionNotification
Open() chan struct{}
Close()

Empty() bool
Open() chan struct{}
PutBack(*ProjectionNotification)
Next(context.Context) (*ProjectionNotification, bool)

Queue(context.Context, *ProjectionNotification) error
ReQueue(context.Context, *ProjectionNotification) error
}
Expand All @@ -46,9 +47,12 @@ func newNotificationQueue(queueBuffer int, retryDelay time.Duration, metrics Met
}
}

// Channel returns the queue channel
func (nq *NotificationQueue) Channel() chan *ProjectionNotification {
return nq.queue
// Open enables the queue for business
func (nq *NotificationQueue) Open() chan struct{} {
nq.done = make(chan struct{})
nq.queue = make(chan *ProjectionNotification, nq.queueBuffer)

return nq.done
}

// Close closes the queue channel
Expand All @@ -61,9 +65,22 @@ func (nq *NotificationQueue) Empty() bool {
return len(nq.queue) == 0
}

// PutBack sends a notification to the queue channel without further ado
func (nq *NotificationQueue) PutBack(notification *ProjectionNotification) {
nq.queue <- notification
// Next yields the next notification on the queue or stopped when processor has stopped
func (nq *NotificationQueue) Next(ctx context.Context) (*ProjectionNotification, bool) {
for {
select {
case <-nq.done:
return nil, true
case <-ctx.Done():
return nil, true
case notification := <-nq.queue:
if notification != nil && notification.ValidAfter.After(time.Now()) {
nq.queue <- notification
continue
}
return notification, false
}
}
}

// Queue sends a notification to the queue
Expand All @@ -88,11 +105,3 @@ func (nq *NotificationQueue) ReQueue(ctx context.Context, notification *Projecti

return nq.Queue(ctx, notification)
}

// Open enables the queue for business
func (nq *NotificationQueue) Open() chan struct{} {
nq.done = make(chan struct{})
nq.queue = make(chan *ProjectionNotification, nq.queueBuffer)

return nq.done
}
41 changes: 15 additions & 26 deletions mocks/driver/sql/notification_queue.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit e45f508

Please sign in to comment.