Skip to content

Commit 9e2377f

Browse files
committed
adapters: always loop until explicitly closed
1 parent 684b0d3 commit 9e2377f

File tree

4 files changed

+52
-23
lines changed

4 files changed

+52
-23
lines changed

adapters/apex/apex.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type Handler struct {
7171
ingestOptions []ingest.Option
7272

7373
eventCh chan axiom.Event
74+
stopCh chan struct{}
7475
closeCh chan struct{}
7576
closeOnce sync.Once
7677
}
@@ -93,6 +94,7 @@ type Handler struct {
9394
func New(options ...Option) (*Handler, error) {
9495
handler := &Handler{
9596
eventCh: make(chan axiom.Event, defaultBatchSize),
97+
stopCh: make(chan struct{}),
9698
closeCh: make(chan struct{}),
9799
}
98100

@@ -127,13 +129,20 @@ func New(options ...Option) (*Handler, error) {
127129

128130
logger := stdlog.New(os.Stderr, "[AXIOM|APEX]", 0)
129131

130-
res, err := handler.client.IngestChannel(context.Background(), handler.datasetName, handler.eventCh, handler.ingestOptions...)
131-
if err != nil {
132-
logger.Printf("failed to ingest events: %s\n", err)
133-
} else if res.Failed > 0 {
134-
// Best effort on notifying the user about the ingest failure.
135-
logger.Printf("event at %s failed to ingest: %s\n",
136-
res.Failures[0].Timestamp, res.Failures[0].Error)
132+
for {
133+
if res, err := handler.client.IngestChannel(context.Background(), handler.datasetName, handler.eventCh, handler.ingestOptions...); err != nil {
134+
logger.Printf("failed to ingest events: %s\n", err)
135+
} else if res.Failed > 0 {
136+
// Best effort on notifying the user about the ingest failure.
137+
logger.Printf("event at %s failed to ingest: %s\n",
138+
res.Failures[0].Timestamp, res.Failures[0].Error)
139+
}
140+
141+
select {
142+
case <-handler.stopCh:
143+
return
144+
default:
145+
}
137146
}
138147
}()
139148

@@ -144,6 +153,7 @@ func New(options ...Option) (*Handler, error) {
144153
// renders it unusable for further use.
145154
func (h *Handler) Close() {
146155
h.closeOnce.Do(func() {
156+
close(h.stopCh)
147157
close(h.eventCh)
148158
<-h.closeCh
149159
})

adapters/logrus/logrus.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ type Hook struct {
8181
levels []logrus.Level
8282

8383
eventCh chan axiom.Event
84+
stopCh chan struct{}
8485
closeCh chan struct{}
8586
closeOnce sync.Once
8687
}
@@ -105,6 +106,7 @@ func New(options ...Option) (*Hook, error) {
105106
levels: logrus.AllLevels,
106107

107108
eventCh: make(chan axiom.Event, defaultBatchSize),
109+
stopCh: make(chan struct{}),
108110
closeCh: make(chan struct{}),
109111
}
110112

@@ -139,13 +141,20 @@ func New(options ...Option) (*Hook, error) {
139141

140142
logger := log.New(os.Stderr, "[AXIOM|LOGRUS]", 0)
141143

142-
res, err := hook.client.IngestChannel(context.Background(), hook.datasetName, hook.eventCh, hook.ingestOptions...)
143-
if err != nil {
144-
logger.Printf("failed to ingest events: %s\n", err)
145-
} else if res.Failed > 0 {
146-
// Best effort on notifying the user about the ingest failure.
147-
logger.Printf("event at %s failed to ingest: %s\n",
148-
res.Failures[0].Timestamp, res.Failures[0].Error)
144+
for {
145+
if res, err := hook.client.IngestChannel(context.Background(), hook.datasetName, hook.eventCh, hook.ingestOptions...); err != nil {
146+
logger.Printf("failed to ingest events: %s\n", err)
147+
} else if res.Failed > 0 {
148+
// Best effort on notifying the user about the ingest failure.
149+
logger.Printf("event at %s failed to ingest: %s\n",
150+
res.Failures[0].Timestamp, res.Failures[0].Error)
151+
}
152+
153+
select {
154+
case <-hook.stopCh:
155+
return
156+
default:
157+
}
149158
}
150159
}()
151160

@@ -157,6 +166,7 @@ func New(options ...Option) (*Hook, error) {
157166
// unusable for further use.
158167
func (h *Hook) Close() {
159168
h.closeOnce.Do(func() {
169+
close(h.stopCh)
160170
close(h.eventCh)
161171
<-h.closeCh
162172
})

adapters/slog/slog.go

+17-7
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type rootHandler struct {
7777
ingestOptions []ingest.Option
7878

7979
eventCh chan axiom.Event
80+
stopCh chan struct{}
8081
closeCh chan struct{}
8182
closeOnce sync.Once
8283
}
@@ -108,6 +109,7 @@ type Handler struct {
108109
func New(options ...Option) (*Handler, error) {
109110
root := &rootHandler{
110111
eventCh: make(chan axiom.Event, defaultBatchSize),
112+
stopCh: make(chan struct{}),
111113
closeCh: make(chan struct{}),
112114
}
113115

@@ -146,13 +148,20 @@ func New(options ...Option) (*Handler, error) {
146148

147149
logger := log.New(os.Stderr, "[AXIOM|SLOG]", 0)
148150

149-
res, err := root.client.IngestChannel(context.Background(), root.datasetName, root.eventCh, root.ingestOptions...)
150-
if err != nil {
151-
logger.Printf("failed to ingest events: %s\n", err)
152-
} else if res.Failed > 0 {
153-
// Best effort on notifying the user about the ingest failure.
154-
logger.Printf("event at %s failed to ingest: %s\n",
155-
res.Failures[0].Timestamp, res.Failures[0].Error)
151+
for {
152+
if res, err := root.client.IngestChannel(context.Background(), root.datasetName, root.eventCh, root.ingestOptions...); err != nil {
153+
logger.Printf("failed to ingest events: %s\n", err)
154+
} else if res.Failed > 0 {
155+
// Best effort on notifying the user about the ingest failure.
156+
logger.Printf("event at %s failed to ingest: %s\n",
157+
res.Failures[0].Timestamp, res.Failures[0].Error)
158+
}
159+
160+
select {
161+
case <-root.stopCh:
162+
return
163+
default:
164+
}
156165
}
157166
}()
158167

@@ -163,6 +172,7 @@ func New(options ...Option) (*Handler, error) {
163172
// renders it unusable for further use.
164173
func (h *Handler) Close() {
165174
h.closeOnce.Do(func() {
175+
close(h.stopCh)
166176
close(h.eventCh)
167177
<-h.closeCh
168178
})

axiom/datasets.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -512,16 +512,15 @@ func (s *DatasetsService) IngestChannel(ctx context.Context, id string, events <
512512
return fmt.Errorf("failed to ingest events: %w", err)
513513
}
514514
ingestStatus.Add(res)
515-
t.Reset(flushInterval) // Reset the ticker.
516515
batch = batch[:0] // Clear the batch.
516+
t.Reset(flushInterval) // Reset the ticker.
517517

518518
return nil
519519
}
520520

521521
for {
522522
select {
523523
case <-ctx.Done():
524-
525524
return &ingestStatus, spanError(span, context.Cause(ctx))
526525
case event, ok := <-events:
527526
if !ok {

0 commit comments

Comments
 (0)