-
Notifications
You must be signed in to change notification settings - Fork 20
/
Copy pathpipeline.go
104 lines (87 loc) · 2.13 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"time"
)
// StreamResult holds the value and err as it goes through multiple pipelines
type StreamResult struct {
err error
value int
}
// THRESHOLD is the maximum value that is allowed
const THRESHOLD = 50
func main() {
source := func(ctx context.Context) <-chan StreamResult {
intStream := make(chan StreamResult)
go func() {
for {
result := StreamResult{
value: rand.Intn(100),
}
if result.value > THRESHOLD {
result.err = errors.New("exceeded threshold")
result.value = -1
}
select {
case <-ctx.Done():
return
case intStream <- result:
}
}
}()
return intStream
}
processor := func(ctx context.Context, intStream <-chan StreamResult) <-chan StreamResult {
multiplierStream := make(chan StreamResult)
go func() {
for {
select {
case <-ctx.Done():
return
case res := <-intStream:
if res.err != nil {
multiplierStream <- res
continue
}
res.value *= 100
multiplierStream <- res
}
}
}()
return multiplierStream
}
sink := func(source <-chan StreamResult, upperBoundary int, thresholdPercentage float64) {
// SLA, only 1% error can be tolerated
maxAllowedError := int(math.Ceil(float64(upperBoundary) * thresholdPercentage))
count := 0
for i := 0; i < upperBoundary; i++ {
result := <-source
// Handle error silently
if result.err != nil {
count++
if count > maxAllowedError {
fmt.Printf("error threshold exceeded: %d, completed: %d/%d\n", count, i, upperBoundary)
break
}
fmt.Printf("tolerated error: %s\n", result.err.Error())
continue
}
// Print out the value for debugging
fmt.Printf("got %d, completed: %d/%d\n", result.value, i, upperBoundary)
}
}
ctx, cancel := context.WithCancel(context.Background())
// source will generate random integer in range 0-100, but treat those below 50 as errors
p1 := source(ctx)
// Multiply valid values (>50) by 100
p2 := processor(ctx, p1)
sink(p2, 100, 0.25)
// Terminate all the pipeline
cancel()
time.Sleep(1 * time.Second)
fmt.Println("done")
}