Skip to content

Commit 51048ea

Browse files
chore: review comments 2
1 parent 8cc2cc3 commit 51048ea

File tree

5 files changed

+57
-51
lines changed

5 files changed

+57
-51
lines changed

warehouse/internal/loadfiles/loadfiles.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,10 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
178178

179179
uniqueLoadGenID := misc.FastUUID().String()
180180

181-
lf.Logger.Infof("[WH]: Starting batch processing %v stage files for %s:%s", publishBatchSize, destType, destID)
181+
lf.Logger.Infon("[WH]: Starting batch processing stage files",
182+
logger.NewIntField("publishBatchSize", int64(publishBatchSize)),
183+
obskit.DestinationType(destType),
184+
obskit.DestinationID(destID))
182185

183186
job.LoadFileGenStartTime = timeutil.Now()
184187

@@ -211,7 +214,8 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
211214
}()
212215

213216
if !lf.AllowUploadV2JobCreation(job) {
214-
lf.Logger.Infof("V2 job creation disabled. Processing %d staging files", len(toProcessStagingFiles))
217+
lf.Logger.Infon("V2 job creation disabled. Processing staging files",
218+
logger.NewIntField("count", int64(len(toProcessStagingFiles))))
215219
err = lf.createUploadJobs(ctx, job, toProcessStagingFiles, publishBatchSize, uniqueLoadGenID)
216220
if err != nil {
217221
return 0, 0, fmt.Errorf("creating upload jobs: %w", err)
@@ -233,15 +237,16 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
233237
// but that should be fine since we don't expect too many such instances where the batch will have both v1 and v2 files
234238
// Usually a batch will have only v1 or v2 files
235239
g, gCtx := errgroup.WithContext(ctx)
240+
lf.Logger.Infon("V2 job creation enabled. Processing staging files",
241+
logger.NewIntField("v1Files", int64(len(v1Files))),
242+
logger.NewIntField("v2Files", int64(len(v2Files))))
236243
if len(v1Files) > 0 {
237244
g.Go(func() error {
238-
lf.Logger.Infof("V2 job creation enabled. Processing %d v1 staging files", len(v1Files))
239245
return lf.createUploadJobs(gCtx, job, v1Files, publishBatchSize, uniqueLoadGenID)
240246
})
241247
}
242248
if len(v2Files) > 0 {
243249
g.Go(func() error {
244-
lf.Logger.Infof("V2 job creation enabled. Processing %d v2 staging files", len(v2Files))
245250
return lf.createUploadV2Jobs(gCtx, job, v2Files, publishBatchSize, uniqueLoadGenID)
246251
})
247252
}
@@ -332,7 +337,10 @@ func (lf *LoadFileGenerator) publishToNotifier(
332337

333338
destID := job.Upload.DestinationID
334339
destType := job.Upload.DestinationType
335-
lf.Logger.Infof("[WH]: Publishing %d jobs for %s:%s to notifier", len(messages), obskit.DestinationType(destType), obskit.DestinationID(destID))
340+
lf.Logger.Infon("[WH]: Publishing jobs to notifier",
341+
logger.NewIntField("count", int64(len(messages))),
342+
obskit.DestinationType(destType),
343+
obskit.DestinationID(destID))
336344

337345
ch, err := lf.Notifier.Publish(ctx, &notifier.PublishRequest{
338346
Payloads: messages,
@@ -439,9 +447,7 @@ func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <
439447
if err := lf.LoadRepo.Insert(ctx, loadFiles); err != nil {
440448
return fmt.Errorf("inserting load files: %w", err)
441449
}
442-
stagingFileIds := lo.Map(chunk, func(file *model.StagingFile, _ int) int64 {
443-
return file.ID
444-
})
450+
stagingFileIds := repo.StagingFileIDs(chunk)
445451
if err := lf.StageRepo.SetStatuses(ctx, stagingFileIds, warehouseutils.StagingFileSucceededState); err != nil {
446452
return fmt.Errorf("setting staging file status to succeeded: %w", err)
447453
}
@@ -489,7 +495,10 @@ func (lf *LoadFileGenerator) createUploadV2Jobs(ctx context.Context, job *model.
489495
stagingFileGroups := lf.GroupStagingFiles(stagingFiles, lf.Conf.GetInt("Warehouse.loadFiles.maxSizeInMB", 128))
490496
for i, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
491497
for j, group := range fileGroups {
492-
lf.Logger.Infof("chunk %d, group %d, size %d", i, j, len(group))
498+
lf.Logger.Infon("Processing chunk and group",
499+
logger.NewIntField("chunk", int64(i)),
500+
logger.NewIntField("group", int64(j)),
501+
logger.NewIntField("size", int64(len(group))))
493502
baseReq := lf.prepareBaseJobRequest(job, uniqueLoadGenID, group[0], destinationRevisionIDMap)
494503

495504
stagingFileInfos := make([]StagingFileInfo, len(group))

warehouse/router/upload.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -831,7 +831,7 @@ func (job *UploadJob) GetLoadFilesMetadata(ctx context.Context, options whutils.
831831
}
832832
sqlStatement := job.getLoadFilesMetadataQuery(tableFilterSQL, limitSQL)
833833

834-
job.logger.Debugf(`Fetching loadFileLocations: %v`, sqlStatement)
834+
job.logger.Debugn("Fetching loadFileLocations", logger.NewStringField("sqlStatement", sqlStatement))
835835
rows, err := job.db.QueryContext(ctx, sqlStatement)
836836
if err != nil {
837837
return nil, fmt.Errorf("query: %s\nfailed with Error : %w", sqlStatement, err)

warehouse/router/upload_test.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -777,24 +777,8 @@ func TestUploadJob_GetLoadFilesMetadata(t *testing.T) {
777777
upload: model.Upload{},
778778
stagingFileIDs: []int64{1, 2, 3},
779779
logger: logger.NOP,
780-
config: struct {
781-
refreshPartitionBatchSize int
782-
retryTimeWindow time.Duration
783-
minRetryAttempts int
784-
disableAlter bool
785-
minUploadBackoff time.Duration
786-
maxUploadBackoff time.Duration
787-
alwaysRegenerateAllLoadFiles bool
788-
reportingEnabled bool
789-
maxParallelLoadsWorkspaceIDs map[string]interface{}
790-
columnsBatchSize int
791-
longRunningUploadStatThresholdInMin time.Duration
792-
skipPreviouslyFailedTables bool
793-
queryLoadFilesWithUploadID config.ValueLoader[bool]
794-
}{
795-
queryLoadFilesWithUploadID: conf.GetReloadableBoolVar(false, "Warehouse.loadFiles.queryWithUploadID.enable"),
796-
},
797780
}
781+
job.config.queryLoadFilesWithUploadID = conf.GetReloadableBoolVar(false, "Warehouse.loadFiles.queryWithUploadID.enable")
798782
var stagingFileId int64
799783
stagingFileId, job.upload.ID = createUpload(t, ctx, db)
800784
loadFiles := []model.LoadFile{
@@ -817,7 +801,7 @@ func TestUploadJob_GetLoadFilesMetadata(t *testing.T) {
817801
}
818802
}
819803

820-
func createUpload(t *testing.T, ctx context.Context, db *sqlmiddleware.DB) (int64, int64) {
804+
func createUpload(t testing.TB, ctx context.Context, db *sqlmiddleware.DB) (int64, int64) {
821805
t.Helper()
822806
stagingFilesRepo := repo.NewStagingFiles(db)
823807
stagingFile := model.StagingFileWithSchema{

warehouse/slave/worker.go

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,22 @@ func (w *worker) start(ctx context.Context, notificationChan <-chan *notifier.Cl
114114
for {
115115
select {
116116
case <-ctx.Done():
117-
w.log.Infof("Slave worker-%d-%s is shutting down", w.workerIdx, slaveID)
117+
w.log.Infon("Slave worker is shutting down",
118+
logger.NewField("workerIdx", w.workerIdx),
119+
logger.NewField("slaveId", slaveID),
120+
)
118121
return
119122
case claimedJob, ok := <-notificationChan:
120123
if !ok {
121124
return
122125
}
123126
w.stats.workerIdleTime.Since(workerIdleTimeStart)
124127

125-
w.log.Debugf("Successfully claimed job:%d by slave worker-%d-%s & job type %s",
126-
claimedJob.Job.ID,
127-
w.workerIdx,
128-
slaveID,
129-
claimedJob.Job.Type,
128+
w.log.Debugn("Successfully claimed job by slave worker",
129+
logger.NewField("jobId", claimedJob.Job.ID),
130+
logger.NewField("workerIdx", w.workerIdx),
131+
logger.NewField("slaveId", slaveID),
132+
logger.NewField("jobType", claimedJob.Job.Type),
130133
)
131134

132135
switch claimedJob.Job.Type {
@@ -136,10 +139,10 @@ func (w *worker) start(ctx context.Context, notificationChan <-chan *notifier.Cl
136139
w.processClaimedUploadJob(ctx, claimedJob)
137140
}
138141

139-
w.log.Infof("Successfully processed job:%d by slave worker-%d-%s",
140-
claimedJob.Job.ID,
141-
w.workerIdx,
142-
slaveID,
142+
w.log.Infon("Successfully processed job",
143+
logger.NewField("jobId", claimedJob.Job.ID),
144+
logger.NewField("workerIdx", w.workerIdx),
145+
logger.NewField("slaveId", slaveID),
143146
)
144147

145148
workerIdleTimeStart = time.Now()
@@ -173,7 +176,9 @@ func (w *worker) processClaimedUploadJob(ctx context.Context, claimedJob *notifi
173176
return
174177
}
175178
job.BatchID = claimedJob.Job.BatchID
176-
w.log.Infof(`Starting processing staging-files from claim:%v`, claimedJob.Job.ID)
179+
w.log.Infon("Starting processing staging-files from claim",
180+
logger.NewField("jobId", claimedJob.Job.ID),
181+
)
177182
job.Output, err = w.processMultiStagingFiles(ctx, &job)
178183
if err != nil {
179184
handleErr(err, claimedJob)
@@ -187,7 +192,10 @@ func (w *worker) processClaimedUploadJob(ctx context.Context, claimedJob *notifi
187192
return
188193
}
189194
job.BatchID = claimedJob.Job.BatchID
190-
w.log.Infof(`Starting processing staging-file:%v from claim:%v`, job.StagingFileID, claimedJob.Job.ID)
195+
w.log.Infon("Starting processing staging-file from claim",
196+
logger.NewField("stagingFileID", job.StagingFileID),
197+
logger.NewField("jobId", claimedJob.Job.ID),
198+
)
191199
job.Output, err = w.processStagingFile(ctx, &job)
192200
if err != nil {
193201
handleErr(err, claimedJob)
@@ -414,7 +422,10 @@ func (w *worker) processSingleStagingFile(
414422
jr.incrementEventCount(tableName)
415423
}
416424

417-
jr.logger.Debugf("Process %v bytes from downloaded staging file: %s", lineBytesCounter, stagingFile.Location)
425+
jr.logger.Debugn("Process bytes from downloaded staging file",
426+
logger.NewField("bytes", lineBytesCounter),
427+
logger.NewField("location", stagingFile.Location),
428+
)
418429
jr.processingStagingFileStat.Since(processingStart)
419430
jr.bytesProcessedStagingFileStat.Count(lineBytesCounter)
420431

@@ -426,10 +437,10 @@ func (w *worker) processStagingFile(ctx context.Context, job *payload) ([]upload
426437

427438
jr := newJobRun(job.basePayload, w.workerIdx, w.conf, w.log, w.statsFactory, w.encodingFactory)
428439

429-
w.log.Debugf("Starting processing staging file: %v at %s for %s",
430-
job.StagingFileID,
431-
job.StagingFileLocation,
432-
jr.identifier,
440+
w.log.Debugn("Starting processing staging file",
441+
logger.NewField("stagingFileID", job.StagingFileID),
442+
logger.NewField("stagingFileLocation", job.StagingFileLocation),
443+
logger.NewField("identifier", jr.identifier),
433444
)
434445

435446
defer func() {
@@ -478,7 +489,9 @@ func (w *worker) processMultiStagingFiles(ctx context.Context, job *payloadV2) (
478489

479490
for _, stagingFile := range job.StagingFiles {
480491
g.Go(func() error {
481-
w.log.Infof(`Processing staging-file:%v for upload_v2 job`, stagingFile.ID)
492+
w.log.Infon("Processing staging-file for upload_v2 job",
493+
logger.NewField("stagingFileID", stagingFile.ID),
494+
)
482495
if err := w.processSingleStagingFile(gCtx, jr, &job.basePayload, stagingFile); err != nil {
483496
return fmt.Errorf("processing staging file %d: %w", stagingFile.ID, err)
484497
}

warehouse/slave/worker_job.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -295,11 +295,11 @@ func (jr *jobRun) downloadStagingFile(ctx context.Context, stagingFileInfo stagi
295295
return fmt.Errorf("downloading staging file: %w", err)
296296
}
297297

298-
jr.logger.Infof("[WH]: Starting processing staging file with revision config for StagingFileID: %d, DestinationRevisionID: %s, StagingDestinationRevisionID: %s, identifier: %s",
299-
stagingFileInfo.ID,
300-
jr.job.DestinationRevisionID,
301-
jr.job.StagingDestinationRevisionID,
302-
jr.identifier,
298+
jr.logger.Infon("[WH]: Starting processing staging file with revision config",
299+
logger.NewField("stagingFileID", stagingFileInfo.ID),
300+
logger.NewField("destinationRevisionID", jr.job.DestinationRevisionID),
301+
logger.NewField("stagingDestinationRevisionID", jr.job.StagingDestinationRevisionID),
302+
logger.NewField("identifier", jr.identifier),
303303
)
304304

305305
if err := doTask(jr.job.StagingDestinationConfig, jr.job.StagingUseRudderStorage); err != nil {

0 commit comments

Comments
 (0)