Skip to content

Commit 2418329

Browse files
authored
chore: revert synapse staging table with max varchar length (#5817)
Reverts #5775
1 parent 2a24e59 commit 2418329

File tree

3 files changed

+25
-185
lines changed

3 files changed

+25
-185
lines changed

warehouse/integrations/azure-synapse/azure-synapse.go

+25-108
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,6 @@ var stringColumns = lo.Keys(lo.PickBy(azureSynapseDataTypesMapToRudder, func(_,
8282
return value == "string"
8383
}))
8484

85-
var azureSynapseColsWithVariableLength = lo.Keys(lo.PickBy(azureSynapseDataTypesMapToRudder, func(key, _ string) bool {
86-
return key == "varchar" || key == "nvarchar" || key == "char" || key == "nchar"
87-
}))
88-
8985
type AzureSynapse struct {
9086
db *sqlmw.DB
9187
namespace string
@@ -214,67 +210,6 @@ func (*AzureSynapse) IsEmpty(_ context.Context, _ model.Warehouse) (empty bool,
214210
return
215211
}
216212

217-
func (as *AzureSynapse) createStagingTable(ctx context.Context, tableName string) (string, error) {
218-
stagingTableName := warehouseutils.StagingTableName(
219-
provider,
220-
tableName,
221-
tableNameLimit,
222-
)
223-
224-
cols := warehouseutils.SortColumnKeysFromColumnMap(as.uploader.GetTableSchemaInWarehouse(tableName))
225-
226-
varcharCols, err := as.getStringColumnsWithVariableLength(ctx, tableName)
227-
if err != nil {
228-
return "", fmt.Errorf("getting varchar columns info: %w", err)
229-
}
230-
231-
columnDefs := lo.Map(cols, func(name string, _ int) string {
232-
if dataType, ok := varcharCols[name]; ok {
233-
var targetType string
234-
// use varchar for char and nvarchar for nchar
235-
// this is because char and nchar have a max length of 8000 and 4000 respectively.
236-
// row size limit is 8060 in Azure Synapse. To fix this, we use varchar and nvarchar with max length
237-
switch dataType {
238-
case "char":
239-
targetType = "varchar"
240-
case "nchar":
241-
targetType = "nvarchar"
242-
default:
243-
targetType = string(dataType)
244-
}
245-
return fmt.Sprintf(`CAST('' AS %[1]s(max)) as %[2]s`, targetType, name)
246-
}
247-
return name
248-
})
249-
250-
// The use of prepared statements for creating temporary tables is not suitable in this context.
251-
// Temporary tables in SQL Server have a limited scope and are automatically purged after the transaction commits.
252-
// Therefore, creating normal tables is chosen as an alternative.
253-
//
254-
// For more information on this behavior:
255-
// - See the discussion at https://github.com/denisenkom/go-mssqldb/issues/149 regarding prepared statements.
256-
// - Refer to Microsoft's documentation on temporary tables at
257-
// https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008-r2/ms175528(v=sql.105)?redirectedfrom=MSDN.
258-
createStagingTableStmt := fmt.Sprintf(`
259-
SELECT TOP 0 %[3]s
260-
INTO
261-
%[1]s.%[2]s
262-
FROM
263-
%[1]s.%[4]s
264-
`,
265-
as.namespace,
266-
stagingTableName,
267-
strings.Join(columnDefs, ", "),
268-
tableName,
269-
)
270-
271-
if _, err := as.db.ExecContext(ctx, createStagingTableStmt); err != nil {
272-
return "", fmt.Errorf("creating staging table: %w", err)
273-
}
274-
275-
return stagingTableName, nil
276-
}
277-
278213
func (as *AzureSynapse) loadTable(
279214
ctx context.Context,
280215
tableName string,
@@ -300,10 +235,32 @@ func (as *AzureSynapse) loadTable(
300235
misc.RemoveFilePaths(fileNames...)
301236
}()
302237

238+
stagingTableName := warehouseutils.StagingTableName(
239+
provider,
240+
tableName,
241+
tableNameLimit,
242+
)
243+
244+
// The use of prepared statements for creating temporary tables is not suitable in this context.
245+
// Temporary tables in SQL Server have a limited scope and are automatically purged after the transaction commits.
246+
// Therefore, creating normal tables is chosen as an alternative.
247+
//
248+
// For more information on this behavior:
249+
// - See the discussion at https://github.com/denisenkom/go-mssqldb/issues/149 regarding prepared statements.
250+
// - Refer to Microsoft's documentation on temporary tables at
251+
// https://docs.microsoft.com/en-us/previous-versions/sql/sql-server-2008-r2/ms175528(v=sql.105)?redirectedfrom=MSDN.
303252
log.Debugw("creating staging table")
304-
stagingTableName, err := as.createStagingTable(ctx, tableName)
305-
if err != nil {
306-
return nil, "", fmt.Errorf("creating staging table: %w", err)
253+
createStagingTableStmt := fmt.Sprintf(`
254+
SELECT
255+
TOP 0 * INTO %[1]s.%[2]s
256+
FROM
257+
%[1]s.%[3]s;`,
258+
as.namespace,
259+
stagingTableName,
260+
tableName,
261+
)
262+
if _, err = as.db.ExecContext(ctx, createStagingTableStmt); err != nil {
263+
return nil, "", fmt.Errorf("creating temporary table: %w", err)
307264
}
308265

309266
if !skipTempTableDelete {
@@ -395,46 +352,6 @@ func (as *AzureSynapse) loadTable(
395352
}, stagingTableName, nil
396353
}
397354

398-
// dataType is the columnn data type in Azure Synapse
399-
type dataType string
400-
401-
// getStringColumnsWithVariableLength returns the column name and data type for all columns that are of type varchar, nvarchar, char, or nchar
402-
func (as *AzureSynapse) getStringColumnsWithVariableLength(ctx context.Context, tableName string) (map[string]dataType, error) {
403-
dataTypes := "'" + strings.Join(azureSynapseColsWithVariableLength, "', '") + "'"
404-
query := fmt.Sprintf(`
405-
SELECT column_name, DATA_TYPE
406-
FROM INFORMATION_SCHEMA.COLUMNS
407-
WHERE TABLE_SCHEMA = @schema AND TABLE_NAME = @tableName AND DATA_TYPE IN (%s);
408-
`,
409-
dataTypes,
410-
)
411-
412-
rows, err := as.db.QueryContext(ctx, query,
413-
sql.Named("schema", as.namespace),
414-
sql.Named("tableName", tableName),
415-
)
416-
if err != nil {
417-
return nil, fmt.Errorf("querying string columns: %w", err)
418-
}
419-
defer func() { _ = rows.Close() }()
420-
421-
stringColumns := make(map[string]dataType)
422-
for rows.Next() {
423-
var (
424-
columnName string
425-
dataType dataType
426-
)
427-
if err := rows.Scan(&columnName, &dataType); err != nil {
428-
return nil, fmt.Errorf("scanning string columns: %w", err)
429-
}
430-
stringColumns[columnName] = dataType
431-
}
432-
if err := rows.Err(); err != nil {
433-
return nil, fmt.Errorf("iterating string columns: %w", err)
434-
}
435-
return stringColumns, nil
436-
}
437-
438355
// getVarcharLengthMap retrieves the maximum allowed length for varchar columns in a given table.
439356
// A `CHARACTER_MAXIMUM_LENGTH` of `-1` indicates that the column has the maximum possible length (i.e., `varchar(max)`).
440357
func (as *AzureSynapse) getVarcharLengthMap(ctx context.Context, tableName string) (map[string]int, error) {

warehouse/integrations/azure-synapse/azure_synapse_test.go

-77
Original file line numberDiff line numberDiff line change
@@ -774,83 +774,6 @@ func TestIntegration(t *testing.T) {
774774
{"7274e5db-f918-4efe-5322-872f66e235c5", "2022-12-15T06:53:49Z", "", "2022-12-15T06:53:49Z", "", "", ""},
775775
})
776776
})
777-
778-
t.Run("should truncate varchar, nvarchar, char, nchar columns", func(t *testing.T) {
779-
tableName := "load_table_with_variable_character_columns"
780-
781-
// Schema for upload and warehouse
782-
schemaInUpload := model.TableSchema{
783-
"id": "string",
784-
"received_at": "datetime",
785-
"t_c_varchar_col": "string",
786-
"t_d_nvarchar_col": "string",
787-
"t_e_char_col": "string",
788-
"t_f_nchar_col": "string",
789-
}
790-
schemaInWarehouse := schemaInUpload
791-
// t_c_varchar_col, t_d_nvarchar_col, t_e_char_col, t_f_nchar_col have more than 512 characters in the file
792-
uploadOutput := whth.UploadLoadFile(t, fm, "testdata/loadcharcolumns.csv.gz", tableName)
793-
794-
loadFiles := []whutils.LoadFile{{Location: uploadOutput.Location}}
795-
mockUploader := newMockUploader(t, loadFiles, tableName, schemaInUpload, schemaInWarehouse)
796-
797-
az := azuresynapse.New(config.New(), logger.NOP, stats.NOP)
798-
err := az.Setup(ctx, warehouse, mockUploader)
799-
require.NoError(t, err)
800-
801-
err = az.CreateSchema(ctx)
802-
require.NoError(t, err)
803-
804-
// Create table with all character columns having length 512
805-
_, err = db.ExecContext(ctx, fmt.Sprintf(`
806-
CREATE TABLE %s.%s (
807-
id varchar(64),
808-
received_at datetimeoffset,
809-
t_c_varchar_col varchar(512),
810-
t_d_nvarchar_col nvarchar(512),
811-
t_e_char_col char(512),
812-
t_f_nchar_col nchar(512)
813-
)
814-
`, namespace, tableName))
815-
require.NoError(t, err)
816-
817-
// Load data which should trigger truncation for all character columns
818-
loadTableStat, err := az.LoadTable(ctx, tableName)
819-
require.NoError(t, err)
820-
require.Equal(t, loadTableStat.RowsInserted, int64(1))
821-
require.Equal(t, loadTableStat.RowsUpdated, int64(0))
822-
823-
// Verify all character columns are truncated to 512 chars
824-
records := whth.RetrieveRecordsFromWarehouse(t, db,
825-
fmt.Sprintf(`
826-
SELECT
827-
id,
828-
received_at,
829-
t_c_varchar_col,
830-
t_d_nvarchar_col,
831-
t_e_char_col,
832-
t_f_nchar_col
833-
FROM %q.%q
834-
ORDER BY id;
835-
`,
836-
namespace,
837-
tableName,
838-
),
839-
)
840-
841-
// Each character column should contain exactly 512 'a' characters
842-
expected512Chars := strings.Repeat("a", 512)
843-
require.Equal(t, records, [][]string{
844-
{
845-
"6734e5db-f918-4efe-1421-872f66e235c5",
846-
"2022-12-15T06:53:49Z",
847-
expected512Chars,
848-
expected512Chars,
849-
expected512Chars,
850-
expected512Chars,
851-
},
852-
})
853-
})
854777
})
855778
}
856779

Binary file not shown.

0 commit comments

Comments
 (0)