Skip to content

feat: add support for processing of upload_v2 job type by slave #5796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/json-iterator/go v1.1.12
github.com/k3a/html2text v1.2.1
github.com/klauspost/compress v1.18.0
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.13.1
github.com/marcboeker/go-duckdb v1.8.5
Expand Down Expand Up @@ -248,7 +249,6 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down
108 changes: 61 additions & 47 deletions integration_test/warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,57 +83,71 @@ func TestMain(m *testing.M) {

func TestUploads(t *testing.T) {
t.Run("tracks loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)

var (
ctx = context.Background()
events = 100
jobs = 1
)
testCases := []struct {
batchStagingFiles bool
}{
{batchStagingFiles: false},
{batchStagingFiles: true},
}
for _, tc := range testCases {
if tc.batchStagingFiles {
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.queryWithUploadID.enable"), "true")
}
db, minioResource, whClient := setupServer(t, false, nil, nil)

eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
uuid.New().String(),
uuid.New().String(),
var (
ctx = context.Background()
events = 100
jobs = 1
)
}), "\n")
eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
uuid.New().String(),
uuid.New().String(),
)
}), "\n")

require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
DestinationID: destinationID,
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]string{
"tracks": {
"id": "string",
"user_id": "string",
"received_at": "datetime",
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
DestinationID: destinationID,
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
BytesPerTable: map[string]int64{
"tracks": int64(len(eventsPayload)),
},
},
}))
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "status", B: succeeded},
}...)
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "status", B: exportedData},
{A: "wh_uploads.source_id", B: sourceID},
{A: "wh_uploads.destination_id", B: destinationID},
{A: "wh_uploads.namespace", B: namespace},
}...)
requireUploadJobsCount(t, ctx, db, jobs, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "namespace", B: namespace},
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
DestinationRevisionID: destinationID,
Schema: map[string]map[string]string{
"tracks": {
"id": "string",
"user_id": "string",
"received_at": "datetime",
},
},
}))
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "status", B: succeeded},
}...)
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "status", B: exportedData},
{A: "wh_uploads.source_id", B: sourceID},
{A: "wh_uploads.destination_id", B: destinationID},
{A: "wh_uploads.namespace", B: namespace},
}...)
requireUploadJobsCount(t, ctx, db, jobs, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "namespace", B: namespace},
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
}
})
t.Run("user and identifies loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
Expand Down
2 changes: 1 addition & 1 deletion services/notifier/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func scanJob(scan scanFn, job *Job) error {
}
if jobTypeRaw.Valid {
switch jobTypeRaw.String {
case string(JobTypeUpload), string(JobTypeAsync):
case string(JobTypeUpload), string(JobTypeAsync), string(JobTypeUploadV2):
job.Type = JobType(jobTypeRaw.String)
default:
return fmt.Errorf("scanning: unknown job type: %s", jobTypeRaw.String)
Expand Down
6 changes: 6 additions & 0 deletions warehouse/integrations/datalake/datalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func TestIntegration(t *testing.T) {
destType string
conf map[string]interface{}
schemaTTLInMinutes int
batchStagingFiles bool
prerequisite func(t testing.TB, ctx context.Context)
configOverride map[string]any
verifySchema func(*testing.T, filemanager.FileManager, string)
Expand All @@ -109,6 +110,7 @@ func TestIntegration(t *testing.T) {
"syncFrequency": "30",
},
schemaTTLInMinutes: 0,
batchStagingFiles: true,
prerequisite: func(t testing.TB, ctx context.Context) {
t.Helper()
createMinioBucket(t, ctx, s3EndPoint, s3AccessKeyID, s3AccessKey, s3BucketName, s3Region)
Expand Down Expand Up @@ -360,6 +362,10 @@ func TestIntegration(t *testing.T) {
t.Setenv("STORAGE_EMULATOR_HOST", fmt.Sprintf("localhost:%d", c.Port("gcs", 4443)))
t.Setenv("RSERVER_WORKLOAD_IDENTITY_TYPE", "GKE")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.schemaTTLInMinutes"), strconv.Itoa(tc.schemaTTLInMinutes))
if tc.batchStagingFiles {
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.queryWithUploadID.enable"), "true")
}

whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort)

Expand Down
9 changes: 9 additions & 0 deletions warehouse/integrations/testhelper/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ func prepareStagingPayload(t testing.TB, testConfig *TestConfig, stagingFile str
stagingFileInfo, err := os.Stat(stagingFile)
require.NoError(t, err)

bytesPerTable := make(map[string]int64)
for _, event := range stagingEvents {
tableName := event.Metadata.Table
eventJSON, err := jsonrs.Marshal(event.Data)
require.NoError(t, err)
bytesPerTable[tableName] += int64(len(eventJSON))
}

payload := warehouseclient.StagingFile{
WorkspaceID: testConfig.WorkspaceID,
Schema: schemaMap,
Expand All @@ -275,6 +283,7 @@ func prepareStagingPayload(t testing.TB, testConfig *TestConfig, stagingFile str
SourceTaskRunID: testConfig.TaskRunID,
SourceJobRunID: testConfig.JobRunID,
TimeWindow: warehouseutils.GetTimeWindow(receivedAt),
BytesPerTable: bytesPerTable,
}
return payload
}
19 changes: 14 additions & 5 deletions warehouse/internal/loadfiles/loadfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
}()

if !lf.AllowUploadV2JobCreation(job) {
lf.Logger.Infof("V2 job creation disabled. Processing %d staging files", len(toProcessStagingFiles))
err = lf.createUploadJobs(ctx, job, toProcessStagingFiles, publishBatchSize, uniqueLoadGenID)
if err != nil {
return 0, 0, fmt.Errorf("creating upload jobs: %w", err)
Expand All @@ -234,11 +235,13 @@ func (lf *LoadFileGenerator) createFromStaging(ctx context.Context, job *model.U
g, gCtx := errgroup.WithContext(ctx)
if len(v1Files) > 0 {
g.Go(func() error {
lf.Logger.Infof("V2 job creation enabled. Processing %d v1 staging files", len(v1Files))
return lf.createUploadJobs(gCtx, job, v1Files, publishBatchSize, uniqueLoadGenID)
})
}
if len(v2Files) > 0 {
g.Go(func() error {
lf.Logger.Infof("V2 job creation enabled. Processing %d v2 staging files", len(v2Files))
return lf.createUploadV2Jobs(gCtx, job, v2Files, publishBatchSize, uniqueLoadGenID)
})
}
Expand Down Expand Up @@ -329,7 +332,7 @@ func (lf *LoadFileGenerator) publishToNotifier(

destID := job.Upload.DestinationID
destType := job.Upload.DestinationType
lf.Logger.Infof("[WH]: Publishing %d staging files for %s:%s to notifier", len(messages), destType, destID)
lf.Logger.Infof("[WH]: Publishing %d jobs for %s:%s to notifier", len(messages), obskit.DestinationType(destType), obskit.DestinationID(destID))

ch, err := lf.Notifier.Publish(ctx, &notifier.PublishRequest{
Payloads: messages,
Expand Down Expand Up @@ -401,7 +404,6 @@ func (lf *LoadFileGenerator) processNotifierResponse(ctx context.Context, ch <-c
return nil
}

// Unlike upload type job, for v2 we are not setting the status of staging files
func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <-chan *notifier.PublishResponse, job *model.UploadJob, chunk []*model.StagingFile) error {
responses, ok := <-ch
if !ok {
Expand Down Expand Up @@ -437,6 +439,12 @@ func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <
if err := lf.LoadRepo.Insert(ctx, loadFiles); err != nil {
return fmt.Errorf("inserting load files: %w", err)
}
stagingFileIds := lo.Map(chunk, func(file *model.StagingFile, _ int) int64 {
return file.ID
})
if err := lf.StageRepo.SetStatuses(ctx, stagingFileIds, warehouseutils.StagingFileSucceededState); err != nil {
return fmt.Errorf("setting staging file status to succeeded: %w", err)
}
return nil
}

Expand Down Expand Up @@ -479,8 +487,9 @@ func (lf *LoadFileGenerator) createUploadV2Jobs(ctx context.Context, job *model.
}
g, gCtx := errgroup.WithContext(ctx)
stagingFileGroups := lf.GroupStagingFiles(stagingFiles, lf.Conf.GetInt("Warehouse.loadFiles.maxSizeInMB", 128))
for _, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
for _, group := range fileGroups {
for i, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
for j, group := range fileGroups {
lf.Logger.Infof("chunk %d, group %d, size %d", i, j, len(group))
baseReq := lf.prepareBaseJobRequest(job, uniqueLoadGenID, group[0], destinationRevisionIDMap)

stagingFileInfos := make([]StagingFileInfo, len(group))
Expand Down Expand Up @@ -642,7 +651,7 @@ func (lf *LoadFileGenerator) groupBySize(files []*model.StagingFile, maxSizeMB i
sizes: make(map[string]int64),
})

lf.Logger.Infof("maxTable: %s, maxTableSize: %d", maxTable.name, maxTable.size)
lf.Logger.Infon("[groupBySize]", logger.NewStringField("maxTableName", maxTable.name), logger.NewIntField("maxTableSizeInBytes", maxTable.size))

// Sorting ensures that minimum batches are created
slices.SortFunc(files, func(a, b *model.StagingFile) int {
Expand Down
15 changes: 8 additions & 7 deletions warehouse/internal/loadfiles/loadfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,6 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
ctx := context.Background()

t.Run("worker partial failure", func(t *testing.T) {
t.Skip("enable the test once partial failure is implemented/handled as part of processing upload_v2 job")
notifier := &mockNotifier{
t: t,
tables: tables,
Expand Down Expand Up @@ -965,6 +964,12 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
}
}

timeWindow := time.Now().Add(time.Hour)
// Setting time window so that these 2 files are grouped together in a single upload_v2 job
stagingFiles[0].TimeWindow = timeWindow
stagingFiles[1].TimeWindow = timeWindow

// Batch 1 should fail, batch 2 should succeed
stagingFiles[0].Location = "abort"

startID, endID, err := lf.ForceCreateLoadFiles(ctx, &model.UploadJob{
Expand All @@ -973,15 +978,11 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
StagingFiles: stagingFiles,
})
require.NoError(t, err)
require.Equal(t, int64(1), startID)

require.Len(t,
loadRepo.store,
len(tables)*(len(stagingFiles)-1),
)

require.Len(t, loadRepo.store, len(tables))
require.Equal(t, loadRepo.store[0].ID, startID)
require.Equal(t, loadRepo.store[len(loadRepo.store)-1].ID, endID)
require.Equal(t, loadRepo.store[0].TotalRows, 8)
})

t.Run("worker failures for all", func(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions warehouse/internal/loadfiles/mock_notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (n *mockNotifier) publishV2(payload *notifier.PublishRequest) (<-chan *noti
loadFileUploads = append(loadFileUploads, loadfiles.LoadFileUpload{
TableName: tableName,
Location: req.UniqueLoadGenID + "/" + tableName,
TotalRows: 10,
ContentLength: 1000,
TotalRows: len(req.StagingFiles),
DestinationRevisionID: destinationRevisionID,
UseRudderStorage: req.UseRudderStorage,
})
Expand Down
Loading
Loading