Skip to content

[Improvement-17010][DataX] Change datax_channel_count to a custom parameter #17032

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.dolphinscheduler.plugin.task.datax;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ResourceType;
import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
Expand All @@ -30,9 +29,12 @@
import java.util.List;
import java.util.Objects;

import lombok.Data;

/**
* DataX parameter
*/
@Data
public class DataxParameters extends AbstractParameters {

/**
Expand Down Expand Up @@ -95,6 +97,11 @@ public class DataxParameters extends AbstractParameters {
*/
private int jobSpeedRecord;

/**
* channel count
*/
private int channelCount = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to expose this into UI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the following documents, the channel can be directly configured in the JSON.
ds-doc-datax
hdfswriter

What do you think about it ?

Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, this PR seems meanless, it doesn't change the usage.

Copy link
Contributor Author

@BruceWong96 BruceWong96 Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, this PR seems meanless, it doesn't change the usage.

Sorry, I might have misunderstood this issue.
After reviewing the issue, it’s clear that custom configurations are fully handled through JSON.
Modifying this parameter is only required when using DS’s default configuration.

To address this, we should add a channel configuration option in the UI to let users adjust the channel count setting.

Is my understanding correct?
Thanks.


/**
* Xms memory
*/
Expand All @@ -110,126 +117,6 @@ public class DataxParameters extends AbstractParameters {
*/
private List<ResourceInfo> resourceList;

public int getCustomConfig() {
return customConfig;
}

public void setCustomConfig(int customConfig) {
this.customConfig = customConfig;
}

public String getJson() {
return json;
}

public void setJson(String json) {
this.json = json;
}

public String getDsType() {
return dsType;
}

public void setDsType(String dsType) {
this.dsType = dsType;
}

public int getDataSource() {
return dataSource;
}

public void setDataSource(int dataSource) {
this.dataSource = dataSource;
}

public String getDtType() {
return dtType;
}

public void setDtType(String dtType) {
this.dtType = dtType;
}

public int getDataTarget() {
return dataTarget;
}

public void setDataTarget(int dataTarget) {
this.dataTarget = dataTarget;
}

public String getSql() {
return sql;
}

public void setSql(String sql) {
this.sql = sql;
}

public String getTargetTable() {
return targetTable;
}

public void setTargetTable(String targetTable) {
this.targetTable = targetTable;
}

public List<String> getPreStatements() {
return preStatements;
}

public void setPreStatements(List<String> preStatements) {
this.preStatements = preStatements;
}

public List<String> getPostStatements() {
return postStatements;
}

public void setPostStatements(List<String> postStatements) {
this.postStatements = postStatements;
}

public int getJobSpeedByte() {
return jobSpeedByte;
}

public void setJobSpeedByte(int jobSpeedByte) {
this.jobSpeedByte = jobSpeedByte;
}

public int getJobSpeedRecord() {
return jobSpeedRecord;
}

public void setJobSpeedRecord(int jobSpeedRecord) {
this.jobSpeedRecord = jobSpeedRecord;
}

public int getXms() {
return xms;
}

public void setXms(int xms) {
this.xms = xms;
}

public int getXmx() {
return xmx;
}

public void setXmx(int xmx) {
this.xmx = xmx;
}

public List<ResourceInfo> getResourceList() {
return resourceList;
}

public void setResourceList(List<ResourceInfo> resourceList) {
this.resourceList = resourceList;
}

@Override
public boolean checkParameters() {
if (customConfig == Flag.NO.ordinal()) {
Expand All @@ -247,27 +134,6 @@ public List<ResourceInfo> getResourceFilesList() {
return resourceList;
}

@Override
public String toString() {
return "DataxParameters{"
+ "customConfig=" + customConfig
+ ", json='" + json + '\''
+ ", dsType='" + dsType + '\''
+ ", dataSource=" + dataSource
+ ", dtType='" + dtType + '\''
+ ", dataTarget=" + dataTarget
+ ", sql='" + sql + '\''
+ ", targetTable='" + targetTable + '\''
+ ", preStatements=" + preStatements
+ ", postStatements=" + postStatements
+ ", jobSpeedByte=" + jobSpeedByte
+ ", jobSpeedRecord=" + jobSpeedRecord
+ ", xms=" + xms
+ ", xmx=" + xmx
+ ", resourceList=" + JSONUtils.toJsonString(resourceList)
+ '}';
}

@Override
public ResourceParametersHelper getResources() {
ResourceParametersHelper resources = super.getResources();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ public class DataxTask extends AbstractTask {
* post jdbc info regex
*/
private static final String POST_JDBC_INFO_REGEX = "(?<=(post jdbc info:)).*(?=)";

/**
* datax path
*/
private static final String DATAX_LAUNCHER = "${DATAX_LAUNCHER}";
/**
* datax channel count
*/
private static final int DATAX_CHANNEL_COUNT = 1;

/**
* datax parameters
Expand Down Expand Up @@ -327,8 +324,9 @@ private List<ObjectNode> buildDataxJobContentJson() {
private ObjectNode buildDataxJobSettingJson() {

ObjectNode speed = JSONUtils.createObjectNode();

speed.put("channel", DATAX_CHANNEL_COUNT);
if (dataXParameters.getChannelCount() > 0) {
speed.put("channel", dataXParameters.getChannelCount());
}

if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
Expand All @@ -352,8 +350,6 @@ private ObjectNode buildDataxJobSettingJson() {
private ObjectNode buildDataxCoreJson() {

ObjectNode speed = JSONUtils.createObjectNode();
speed.put("channel", DATAX_CHANNEL_COUNT);

if (dataXParameters.getJobSpeedByte() > 0) {
speed.put("byte", dataXParameters.getJobSpeedByte());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,27 +70,30 @@ public void testToString() {
dataxParameters.setDtType("MYSQL");
dataxParameters.setJobSpeedByte(1);
dataxParameters.setJobSpeedRecord(1);
dataxParameters.setChannelCount(1);
dataxParameters.setJson("json");
dataxParameters.setResourceList(resourceInfoList);

String expected = "DataxParameters"
+ "{"
+ "("
+ "customConfig=0, "
+ "json='json', "
+ "dsType='MYSQL', "
+ "json=json, "
+ "dsType=MYSQL, "
+ "dataSource=1, "
+ "dtType='MYSQL', "
+ "dtType=MYSQL, "
+ "dataTarget=1, "
+ "sql='null', "
+ "targetTable='null', "
+ "sql=null, "
+ "targetTable=null, "
+ "preStatements=null, "
+ "postStatements=null, "
+ "jobSpeedByte=1, "
+ "jobSpeedRecord=1, "
+ "channelCount=1, "
+ "xms=0, "
+ "xmx=-100, "
+ "resourceList=[{\"id\":null,\"resourceName\":\"/hdfs.keytab\",\"res\":null}]"
+ "}";
+ "resourceList=[ResourceInfo(id=null, "
+ "resourceName=/hdfs.keytab, res=null)]"
+ ")";

Assertions.assertEquals(expected, dataxParameters.toString());
}
Expand Down
Loading