Skip to content

feat: add support for processing of upload_v2 job type by slave #5796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/joho/godotenv v1.5.1
github.com/json-iterator/go v1.1.12
github.com/k3a/html2text v1.2.1
github.com/klauspost/compress v1.18.0
github.com/lib/pq v1.10.9
github.com/linkedin/goavro/v2 v2.13.1
github.com/marcboeker/go-duckdb v1.8.5
Expand Down Expand Up @@ -248,7 +249,6 @@ require (
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/kr/fs v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
Expand Down
108 changes: 61 additions & 47 deletions integration_test/warehouse/warehouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,57 +83,71 @@ func TestMain(m *testing.M) {

func TestUploads(t *testing.T) {
t.Run("tracks loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)

var (
ctx = context.Background()
events = 100
jobs = 1
)
testCases := []struct {
batchStagingFiles bool
}{
{batchStagingFiles: false},
{batchStagingFiles: true},
}
for _, tc := range testCases {
if tc.batchStagingFiles {
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.queryWithUploadID.enable"), "true")
}
db, minioResource, whClient := setupServer(t, false, nil, nil)

eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
uuid.New().String(),
uuid.New().String(),
var (
ctx = context.Background()
events = 100
jobs = 1
)
}), "\n")
eventsPayload := strings.Join(lo.RepeatBy(events, func(int) string {
return fmt.Sprintf(`{"data":{"id":%q,"user_id":%q,"received_at":"2023-05-12T04:36:50.199Z"},"metadata":{"columns":{"id":"string","user_id":"string","received_at":"datetime"}, "table": "tracks"}}`,
uuid.New().String(),
uuid.New().String(),
)
}), "\n")

require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
DestinationID: destinationID,
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
DestinationRevisionID: destinationID,
Schema: map[string]map[string]string{
"tracks": {
"id": "string",
"user_id": "string",
"received_at": "datetime",
require.NoError(t, whClient.Process(ctx, whclient.StagingFile{
WorkspaceID: workspaceID,
SourceID: sourceID,
DestinationID: destinationID,
Location: prepareStagingFile(t, ctx, minioResource, eventsPayload).ObjectName,
TotalEvents: events,
FirstEventAt: time.Now().Format(misc.RFC3339Milli),
LastEventAt: time.Now().Add(time.Minute * 30).Format(misc.RFC3339Milli),
UseRudderStorage: false,
BytesPerTable: map[string]int64{
"tracks": int64(len(eventsPayload)),
},
},
}))
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "status", B: succeeded},
}...)
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "status", B: exportedData},
{A: "wh_uploads.source_id", B: sourceID},
{A: "wh_uploads.destination_id", B: destinationID},
{A: "wh_uploads.namespace", B: namespace},
}...)
requireUploadJobsCount(t, ctx, db, jobs, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "namespace", B: namespace},
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
DestinationRevisionID: destinationID,
Schema: map[string]map[string]string{
"tracks": {
"id": "string",
"user_id": "string",
"received_at": "datetime",
},
},
}))
requireStagingFileEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "status", B: succeeded},
}...)
requireTableUploadEventsCount(t, ctx, db, events, []lo.Tuple2[string, any]{
{A: "status", B: exportedData},
{A: "wh_uploads.source_id", B: sourceID},
{A: "wh_uploads.destination_id", B: destinationID},
{A: "wh_uploads.namespace", B: namespace},
}...)
requireUploadJobsCount(t, ctx, db, jobs, []lo.Tuple2[string, any]{
{A: "source_id", B: sourceID},
{A: "destination_id", B: destinationID},
{A: "namespace", B: namespace},
{A: "status", B: exportedData},
}...)
requireDownstreamEventsCount(t, ctx, db, fmt.Sprintf("%s.%s", namespace, "tracks"), events)
}
})
t.Run("user and identifies loading", func(t *testing.T) {
db, minioResource, whClient := setupServer(t, false, nil, nil)
Expand Down
2 changes: 1 addition & 1 deletion services/notifier/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func scanJob(scan scanFn, job *Job) error {
}
if jobTypeRaw.Valid {
switch jobTypeRaw.String {
case string(JobTypeUpload), string(JobTypeAsync):
case string(JobTypeUpload), string(JobTypeAsync), string(JobTypeUploadV2):
job.Type = JobType(jobTypeRaw.String)
default:
return fmt.Errorf("scanning: unknown job type: %s", jobTypeRaw.String)
Expand Down
6 changes: 6 additions & 0 deletions warehouse/integrations/datalake/datalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func TestIntegration(t *testing.T) {
destType string
conf map[string]interface{}
schemaTTLInMinutes int
batchStagingFiles bool
prerequisite func(t testing.TB, ctx context.Context)
configOverride map[string]any
verifySchema func(*testing.T, filemanager.FileManager, string)
Expand All @@ -109,6 +110,7 @@ func TestIntegration(t *testing.T) {
"syncFrequency": "30",
},
schemaTTLInMinutes: 0,
batchStagingFiles: true,
prerequisite: func(t testing.TB, ctx context.Context) {
t.Helper()
createMinioBucket(t, ctx, s3EndPoint, s3AccessKeyID, s3AccessKey, s3BucketName, s3Region)
Expand Down Expand Up @@ -360,6 +362,10 @@ func TestIntegration(t *testing.T) {
t.Setenv("STORAGE_EMULATOR_HOST", fmt.Sprintf("localhost:%d", c.Port("gcs", 4443)))
t.Setenv("RSERVER_WORKLOAD_IDENTITY_TYPE", "GKE")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.schemaTTLInMinutes"), strconv.Itoa(tc.schemaTTLInMinutes))
if tc.batchStagingFiles {
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.enableV2NotifierJob"), "true")
t.Setenv(config.ConfigKeyToEnv(config.DefaultEnvPrefix, "Warehouse.loadFiles.queryWithUploadID.enable"), "true")
}

whth.BootstrapSvc(t, workspaceConfig, httpPort, jobsDBPort)

Expand Down
9 changes: 9 additions & 0 deletions warehouse/integrations/testhelper/staging.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,14 @@ func prepareStagingPayload(t testing.TB, testConfig *TestConfig, stagingFile str
stagingFileInfo, err := os.Stat(stagingFile)
require.NoError(t, err)

bytesPerTable := make(map[string]int64)
for _, event := range stagingEvents {
tableName := event.Metadata.Table
eventJSON, err := jsonrs.Marshal(event.Data)
require.NoError(t, err)
bytesPerTable[tableName] += int64(len(eventJSON))
}

payload := warehouseclient.StagingFile{
WorkspaceID: testConfig.WorkspaceID,
Schema: schemaMap,
Expand All @@ -275,6 +283,7 @@ func prepareStagingPayload(t testing.TB, testConfig *TestConfig, stagingFile str
SourceTaskRunID: testConfig.TaskRunID,
SourceJobRunID: testConfig.JobRunID,
TimeWindow: warehouseutils.GetTimeWindow(receivedAt),
BytesPerTable: bytesPerTable,
}
return payload
}
30 changes: 24 additions & 6 deletions warehouse/internal/loadfiles/loadfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,10 @@

uniqueLoadGenID := misc.FastUUID().String()

lf.Logger.Infof("[WH]: Starting batch processing %v stage files for %s:%s", publishBatchSize, destType, destID)
lf.Logger.Infon("[WH]: Starting batch processing stage files",
logger.NewIntField("publishBatchSize", int64(publishBatchSize)),
obskit.DestinationType(destType),
obskit.DestinationID(destID))

job.LoadFileGenStartTime = timeutil.Now()

Expand Down Expand Up @@ -211,6 +214,8 @@
}()

if !lf.AllowUploadV2JobCreation(job) {
lf.Logger.Infon("V2 job creation disabled. Processing staging files",
logger.NewIntField("count", int64(len(toProcessStagingFiles))))
err = lf.createUploadJobs(ctx, job, toProcessStagingFiles, publishBatchSize, uniqueLoadGenID)
if err != nil {
return 0, 0, fmt.Errorf("creating upload jobs: %w", err)
Expand All @@ -232,6 +237,9 @@
// but that should be fine since we don't expect too many such instances where the batch will have both v1 and v2 files
// Usually a batch will have only v1 or v2 files
g, gCtx := errgroup.WithContext(ctx)
lf.Logger.Infon("V2 job creation enabled. Processing staging files",
logger.NewIntField("v1Files", int64(len(v1Files))),
logger.NewIntField("v2Files", int64(len(v2Files))))
if len(v1Files) > 0 {
g.Go(func() error {
return lf.createUploadJobs(gCtx, job, v1Files, publishBatchSize, uniqueLoadGenID)
Expand Down Expand Up @@ -329,7 +337,10 @@

destID := job.Upload.DestinationID
destType := job.Upload.DestinationType
lf.Logger.Infof("[WH]: Publishing %d staging files for %s:%s to notifier", len(messages), destType, destID)
lf.Logger.Infon("[WH]: Publishing jobs to notifier",
logger.NewIntField("count", int64(len(messages))),
obskit.DestinationType(destType),
obskit.DestinationID(destID))

ch, err := lf.Notifier.Publish(ctx, &notifier.PublishRequest{
Payloads: messages,
Expand Down Expand Up @@ -401,7 +412,6 @@
return nil
}

// Unlike upload type job, for v2 we are not setting the status of staging files
func (lf *LoadFileGenerator) processNotifierResponseV2(ctx context.Context, ch <-chan *notifier.PublishResponse, job *model.UploadJob, chunk []*model.StagingFile) error {
responses, ok := <-ch
if !ok {
Expand Down Expand Up @@ -437,6 +447,10 @@
if err := lf.LoadRepo.Insert(ctx, loadFiles); err != nil {
return fmt.Errorf("inserting load files: %w", err)
}
stagingFileIds := repo.StagingFileIDs(chunk)
if err := lf.StageRepo.SetStatuses(ctx, stagingFileIds, warehouseutils.StagingFileSucceededState); err != nil {
return fmt.Errorf("setting staging file status to succeeded: %w", err)
}

Check warning on line 453 in warehouse/internal/loadfiles/loadfiles.go

View check run for this annotation

Codecov / codecov/patch

warehouse/internal/loadfiles/loadfiles.go#L452-L453

Added lines #L452 - L453 were not covered by tests
return nil
}

Expand Down Expand Up @@ -479,8 +493,12 @@
}
g, gCtx := errgroup.WithContext(ctx)
stagingFileGroups := lf.GroupStagingFiles(stagingFiles, lf.Conf.GetInt("Warehouse.loadFiles.maxSizeInMB", 128))
for _, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
for _, group := range fileGroups {
for i, fileGroups := range lo.Chunk(stagingFileGroups, publishBatchSize) {
for j, group := range fileGroups {
lf.Logger.Infon("Processing chunk and group",
logger.NewIntField("chunk", int64(i)),
logger.NewIntField("group", int64(j)),
logger.NewIntField("size", int64(len(group))))
baseReq := lf.prepareBaseJobRequest(job, uniqueLoadGenID, group[0], destinationRevisionIDMap)

stagingFileInfos := make([]StagingFileInfo, len(group))
Expand Down Expand Up @@ -642,7 +660,7 @@
sizes: make(map[string]int64),
})

lf.Logger.Infof("maxTable: %s, maxTableSize: %d", maxTable.name, maxTable.size)
lf.Logger.Infon("[groupBySize]", logger.NewStringField("maxTableName", maxTable.name), logger.NewIntField("maxTableSizeInBytes", maxTable.size))

// Sorting ensures that minimum batches are created
slices.SortFunc(files, func(a, b *model.StagingFile) int {
Expand Down
15 changes: 8 additions & 7 deletions warehouse/internal/loadfiles/loadfiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,7 +935,6 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
ctx := context.Background()

t.Run("worker partial failure", func(t *testing.T) {
t.Skip("enable the test once partial failure is implemented/handled as part of processing upload_v2 job")
notifier := &mockNotifier{
t: t,
tables: tables,
Expand Down Expand Up @@ -965,6 +964,12 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
}
}

timeWindow := time.Now().Add(time.Hour)
// Setting time window so that these 2 files are grouped together in a single upload_v2 job
stagingFiles[0].TimeWindow = timeWindow
stagingFiles[1].TimeWindow = timeWindow

// Batch 1 should fail, batch 2 should succeed
stagingFiles[0].Location = "abort"

startID, endID, err := lf.ForceCreateLoadFiles(ctx, &model.UploadJob{
Expand All @@ -973,15 +978,11 @@ func TestV2CreateLoadFiles_Failure(t *testing.T) {
StagingFiles: stagingFiles,
})
require.NoError(t, err)
require.Equal(t, int64(1), startID)

require.Len(t,
loadRepo.store,
len(tables)*(len(stagingFiles)-1),
)

require.Len(t, loadRepo.store, len(tables))
require.Equal(t, loadRepo.store[0].ID, startID)
require.Equal(t, loadRepo.store[len(loadRepo.store)-1].ID, endID)
require.Equal(t, loadRepo.store[0].TotalRows, 8)
})

t.Run("worker failures for all", func(t *testing.T) {
Expand Down
3 changes: 1 addition & 2 deletions warehouse/internal/loadfiles/mock_notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ func (n *mockNotifier) publishV2(payload *notifier.PublishRequest) (<-chan *noti
loadFileUploads = append(loadFileUploads, loadfiles.LoadFileUpload{
TableName: tableName,
Location: req.UniqueLoadGenID + "/" + tableName,
TotalRows: 10,
ContentLength: 1000,
TotalRows: len(req.StagingFiles),
DestinationRevisionID: destinationRevisionID,
UseRudderStorage: req.UseRudderStorage,
})
Expand Down
Loading
Loading