@@ -16,19 +16,24 @@ package pricing
16
16
17
17
import (
18
18
"context"
19
+ "encoding/csv"
19
20
"encoding/json"
20
21
"fmt"
22
+ "io"
21
23
"net/http"
22
24
"os"
23
25
"path/filepath"
24
26
"runtime"
27
+ "strconv"
25
28
"sync"
29
+ "time"
26
30
27
31
"k8s.io/klog/v2"
28
32
)
29
33
30
34
const (
31
35
initialOnDemandPricesFile = "initial-on-demand-prices.json"
36
+ pricingCSVURL = "https://gcloud-compute.com/machine-types-regions.csv"
32
37
)
33
38
34
39
type Provider interface {
@@ -46,12 +51,16 @@ type DefaultProvider struct {
46
51
muSpot sync.RWMutex
47
52
region string
48
53
prices map [string ]map [string ]float64
54
+ spotPrices map [string ]map [string ]float64
55
+ lastUpdated time.Time
49
56
}
50
57
51
58
func NewDefaultProvider (ctx context.Context , region string ) * DefaultProvider {
52
59
p := & DefaultProvider {
53
- region : region ,
54
- prices : make (map [string ]map [string ]float64 ),
60
+ region : region ,
61
+ prices : make (map [string ]map [string ]float64 ),
62
+ spotPrices : make (map [string ]map [string ]float64 ),
63
+ lastUpdated : time .Now (),
55
64
}
56
65
57
66
if err := p .loadInitialPrices (); err != nil {
@@ -113,7 +122,6 @@ func (p *DefaultProvider) OnDemandPrice(instanceType string) (float64, bool) {
113
122
114
123
regionPrices , ok := p .prices [p .region ]
115
124
if ! ok {
116
- fmt .Println (p .prices )
117
125
return 0 , false
118
126
}
119
127
@@ -122,16 +130,124 @@ func (p *DefaultProvider) OnDemandPrice(instanceType string) (float64, bool) {
122
130
}
123
131
124
132
func (p * DefaultProvider ) SpotPrice (instanceType string , zone string ) (float64 , bool ) {
125
- // Currently, we don't have spot price information
126
- return 0 , false
133
+ p .muSpot .RLock ()
134
+ defer p .muSpot .RUnlock ()
135
+
136
+ regionPrices , ok := p .spotPrices [p .region ]
137
+ if ! ok {
138
+ return 0 , false
139
+ }
140
+
141
+ price , ok := regionPrices [instanceType ]
142
+ return price , ok
127
143
}
128
144
129
- func (p * DefaultProvider ) UpdateOnDemandPricing (ctx context.Context ) error {
130
- // For now, we only use static pricing data
145
+ type priceUpdateConfig struct {
146
+ priceColumn string
147
+ targetMap * map [string ]map [string ]float64
148
+ mu * sync.RWMutex
149
+ }
150
+
151
+ func (p * DefaultProvider ) updatePricing (ctx context.Context , config priceUpdateConfig ) error {
152
+ config .mu .Lock ()
153
+ defer config .mu .Unlock ()
154
+
155
+ // Download the CSV file
156
+ resp , err := http .Get (pricingCSVURL )
157
+ if err != nil {
158
+ return fmt .Errorf ("downloading CSV: %w" , err )
159
+ }
160
+ defer resp .Body .Close ()
161
+
162
+ // Read the CSV data
163
+ reader := csv .NewReader (resp .Body )
164
+ reader .Comma = ','
165
+
166
+ // Read header
167
+ header , err := reader .Read ()
168
+ if err != nil {
169
+ return fmt .Errorf ("reading CSV header: %w" , err )
170
+ }
171
+
172
+ // Find the required column indices
173
+ priceColIndex := - 1
174
+ regionColIndex := - 1
175
+ machineTypeColIndex := - 1
176
+
177
+ for i , col := range header {
178
+ switch col {
179
+ case config .priceColumn :
180
+ priceColIndex = i
181
+ case "region" :
182
+ regionColIndex = i
183
+ case "name" :
184
+ machineTypeColIndex = i
185
+ }
186
+ }
187
+
188
+ if priceColIndex == - 1 || regionColIndex == - 1 || machineTypeColIndex == - 1 {
189
+ return fmt .Errorf ("could not find required columns in CSV" )
190
+ }
191
+
192
+ // Process the data
193
+ newPrices := make (map [string ]map [string ]float64 )
194
+
195
+ for {
196
+ record , err := reader .Read ()
197
+ if err == io .EOF {
198
+ break
199
+ }
200
+ if err != nil {
201
+ klog .Errorf ("Error reading CSV record: %v" , err )
202
+ continue
203
+ }
204
+
205
+ // Get machine type, region and price
206
+ machineType := record [machineTypeColIndex ]
207
+ region := record [regionColIndex ]
208
+ priceStr := record [priceColIndex ]
209
+
210
+ // Parse price
211
+ price , err := strconv .ParseFloat (priceStr , 64 )
212
+ if err != nil {
213
+ klog .Errorf ("Error parsing price for %s in region %s: %v" , machineType , region , err )
214
+ continue
215
+ }
216
+
217
+ // Initialize region map if it doesn't exist
218
+ if _ , exists := newPrices [region ]; ! exists {
219
+ newPrices [region ] = make (map [string ]float64 )
220
+ }
221
+
222
+ // Store the price
223
+ newPrices [region ][machineType ] = price
224
+ }
225
+
226
+ // Check if we got any prices
227
+ if len (newPrices ) == 0 {
228
+ return fmt .Errorf ("no prices found during price update for %s pricing" , config .priceColumn )
229
+ }
230
+
231
+ // Update the prices
232
+ * config .targetMap = newPrices
233
+ p .lastUpdated = time .Now ()
234
+
235
+ klog .V (2 ).Infof ("Updated prices for %d regions" , len (newPrices ))
131
236
return nil
132
237
}
133
238
239
+ func (p * DefaultProvider ) UpdateOnDemandPricing (ctx context.Context ) error {
240
+ return p .updatePricing (ctx , priceUpdateConfig {
241
+ priceColumn : "hour" ,
242
+ targetMap : & p .prices ,
243
+ mu : & p .muOnDemand ,
244
+ })
245
+ }
246
+
134
247
func (p * DefaultProvider ) UpdateSpotPricing (ctx context.Context ) error {
135
- // Currently, we don't have spot price information
136
- return nil
248
+ return p .updatePricing (ctx , priceUpdateConfig {
249
+ priceColumn : "hourSpot" ,
250
+ targetMap : & p .spotPrices ,
251
+ mu : & p .muSpot ,
252
+ })
137
253
}
0 commit comments