Skip to content

Commit 4656247

Browse files
devops-github-rudderstackatzoumItsSudipvyeshwanth
authored
chore: sync release v1.48.4 to main branch (#5812)
# Description Syncing patch release v1.48.4 to main branch **↓↓ Please review and edit commit overrides before merging ↓↓** BEGIN_COMMIT_OVERRIDE fix: warehouse cached schema mismatch (#5805) END_COMMIT_OVERRIDE --------- Co-authored-by: Aris Tzoumas <[email protected]> Co-authored-by: Sudip Paul <[email protected]> Co-authored-by: yeshwanth vuppu <[email protected]>
1 parent 2418329 commit 4656247

File tree

3 files changed

+111
-28
lines changed

3 files changed

+111
-28
lines changed

CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
# Changelog
22

3+
## [1.48.4](https://github.com/rudderlabs/rudder-server/compare/v1.48.3...v1.48.4) (2025-05-05)
4+
5+
6+
### Bug Fixes
7+
8+
* warehouse cached schema mismatch ([#5805](https://github.com/rudderlabs/rudder-server/issues/5805)) ([1d3abfd](https://github.com/rudderlabs/rudder-server/commit/1d3abfd961c12bc4a31c97f96b67f049d4a9cdde))
9+
310
## [1.48.3](https://github.com/rudderlabs/rudder-server/compare/v1.48.2...v1.48.3) (2025-05-02)
411

512

warehouse/internal/repo/schema.go

+59-28
Original file line numberDiff line numberDiff line change
@@ -53,36 +53,67 @@ func (sh *WHSchema) Insert(ctx context.Context, whSchema *model.WHSchema) (int64
5353
return id, fmt.Errorf("marshaling schema: %w", err)
5454
}
5555

56-
err = sh.db.QueryRowContext(ctx, `
57-
INSERT INTO `+whSchemaTableName+` (
58-
source_id, namespace, destination_id,
59-
destination_type, schema, created_at,
60-
updated_at, expires_at
56+
err = (*repo)(sh).WithTx(ctx, func(tx *sqlmiddleware.Tx) error {
57+
// update all schemas with the same destination_id and namespace but different source_id
58+
// this is to ensure all the connections for a destination have the same schema copy
59+
_, err = tx.ExecContext(ctx, `
60+
UPDATE `+whSchemaTableName+`
61+
SET
62+
schema = $1,
63+
updated_at = $2,
64+
expires_at = $3
65+
WHERE
66+
destination_id = $4 AND
67+
namespace = $5 AND
68+
source_id != $6;
69+
`,
70+
schemaPayload,
71+
now.UTC(),
72+
whSchema.ExpiresAt,
73+
whSchema.DestinationID,
74+
whSchema.Namespace,
75+
whSchema.SourceID,
6176
)
62-
VALUES
63-
($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (
64-
source_id, destination_id, namespace
65-
) DO
66-
UPDATE
67-
SET
68-
schema = $5,
69-
updated_at = $7,
70-
expires_at = $8 RETURNING id;
71-
`,
72-
whSchema.SourceID,
73-
whSchema.Namespace,
74-
whSchema.DestinationID,
75-
whSchema.DestinationType,
76-
schemaPayload,
77-
now.UTC(),
78-
now.UTC(),
79-
whSchema.ExpiresAt,
80-
).Scan(&id)
81-
if err != nil {
82-
return id, fmt.Errorf("inserting schema: %w", err)
83-
}
77+
if err != nil {
78+
return fmt.Errorf("updating related schemas: %w", err)
79+
}
80+
81+
// Then, insert/update the new schema using the unique constraint
82+
err = tx.QueryRowContext(ctx, `
83+
INSERT INTO `+whSchemaTableName+` (
84+
source_id, namespace, destination_id,
85+
destination_type, schema, created_at,
86+
updated_at, expires_at
87+
)
88+
VALUES
89+
($1, $2, $3, $4, $5, $6, $7, $8)
90+
ON CONFLICT (
91+
source_id, destination_id, namespace
92+
) DO
93+
UPDATE
94+
SET
95+
schema = $5,
96+
updated_at = $7,
97+
expires_at = $8
98+
RETURNING id;
99+
`,
100+
whSchema.SourceID,
101+
whSchema.Namespace,
102+
whSchema.DestinationID,
103+
whSchema.DestinationType,
104+
schemaPayload,
105+
now.UTC(),
106+
now.UTC(),
107+
whSchema.ExpiresAt,
108+
).Scan(&id)
109+
if err != nil {
110+
return fmt.Errorf("inserting schema: %w", err)
111+
}
112+
113+
return nil
114+
})
84115

85-
return id, nil
116+
return id, err
86117
}
87118

88119
func (sh *WHSchema) GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error) {

warehouse/internal/repo/schema_test.go

+45
Original file line numberDiff line numberDiff line change
@@ -178,4 +178,49 @@ func TestWHSchemasRepo(t *testing.T) {
178178
require.NoError(t, err)
179179
require.Equal(t, expiryTime, updatedSchema.ExpiresAt)
180180
})
181+
182+
t.Run("Insert schema propagation to all connections with same destination_id and namespace", func(t *testing.T) {
183+
// Create first connection schema
184+
firstConnectionSchema := schema
185+
firstID, err := r.Insert(ctx, &firstConnectionSchema)
186+
require.NoError(t, err)
187+
firstConnectionSchema.ID = firstID
188+
189+
// Create second connection schema with same destination_id and namespace
190+
secondConnectionSchema := firstConnectionSchema
191+
secondConnectionSchema.SourceID = "other_source_id"
192+
secondConnectionSchema.ID = 0 // Reset ID for new insert
193+
secondID, err := r.Insert(ctx, &secondConnectionSchema)
194+
require.NoError(t, err)
195+
secondConnectionSchema.ID = secondID
196+
197+
// Verify both connections have the same initial schema
198+
firstRetrieved, err := r.GetForNamespace(ctx, firstConnectionSchema.SourceID, firstConnectionSchema.DestinationID, firstConnectionSchema.Namespace)
199+
require.NoError(t, err)
200+
require.Equal(t, firstConnectionSchema.Schema, firstRetrieved.Schema)
201+
202+
secondRetrieved, err := r.GetForNamespace(ctx, secondConnectionSchema.SourceID, secondConnectionSchema.DestinationID, secondConnectionSchema.Namespace)
203+
require.NoError(t, err)
204+
require.Equal(t, firstConnectionSchema.Schema, secondRetrieved.Schema)
205+
206+
// Update the first connection with new schema data
207+
updatedSchema := firstConnectionSchema
208+
updatedSchema.Schema = model.Schema{
209+
"new_table": {
210+
"new_column": "string",
211+
},
212+
}
213+
updatedSchema.ID = 0 // Reset ID for new insert
214+
_, err = r.Insert(ctx, &updatedSchema)
215+
require.NoError(t, err)
216+
217+
// Verify both connections are updated with the new schema
218+
firstRetrieved, err = r.GetForNamespace(ctx, firstConnectionSchema.SourceID, firstConnectionSchema.DestinationID, firstConnectionSchema.Namespace)
219+
require.NoError(t, err)
220+
require.Equal(t, updatedSchema.Schema, firstRetrieved.Schema)
221+
222+
secondRetrieved, err = r.GetForNamespace(ctx, secondConnectionSchema.SourceID, secondConnectionSchema.DestinationID, secondConnectionSchema.Namespace)
223+
require.NoError(t, err)
224+
require.Equal(t, updatedSchema.Schema, secondRetrieved.Schema)
225+
})
181226
}

0 commit comments

Comments
 (0)