Skip to content

Add support for argo workflow targets #3287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions changelog.d/3113.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for argo `Workflow` as targets with an extra option to specify template.
42 changes: 41 additions & 1 deletion mirrord-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1843,7 +1843,7 @@
"additionalProperties": false
},
"Target": {
"description": "<!--${internal}--> ## path\n\nSpecifies the running pod (or deployment) to mirror.\n\nSupports: - `targetless` - `pod/{pod-name}[/container/{container-name}]`; - `deployment/{deployment-name}[/container/{container-name}]`; - `rollout/{rollout-name}[/container/{container-name}]`; - `job/{job-name}[/container/{container-name}]`; - `cronjob/{cronjob-name}[/container/{container-name}]`; - `statefulset/{statefulset-name}[/container/{container-name}]`; - `service/{service-name}[/container/{container-name}]`; - `replicaset/{replicaset-name}[/container/{container-name}]`;",
"description": "<!--${internal}--> ## path\n\nSpecifies the running pod (or deployment) to mirror.\n\nSupports: - `targetless` - `pod/{pod-name}[/container/{container-name}]`; - `deployment/{deployment-name}[/container/{container-name}]`; - `rollout/{rollout-name}[/container/{container-name}]`; - `job/{job-name}[/container/{container-name}]`; - `cronjob/{cronjob-name}[/container/{container-name}]`; - `statefulset/{statefulset-name}[/container/{container-name}]`; - `service/{service-name}[/container/{container-name}]`; - `replicaset/{replicaset-name}[/container/{container-name}]`; - `workflow/{workflow-name}[/template/{template-name}][/step/{step-name}][/container/ {container-name}]`;",
"anyOf": [
{
"description": "<!--${internal}--> [Deployment](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/).",
Expand Down Expand Up @@ -1908,6 +1908,14 @@
"$ref": "#/definitions/ReplicaSetTarget"
}
]
},
{
"description": "<!--${internal}--> [Argo Workflow](https://argo-workflows.readthedocs.io/en/latest/).",
"allOf": [
{
"$ref": "#/definitions/WorkflowTarget"
}
]
}
]
},
Expand Down Expand Up @@ -2052,6 +2060,38 @@
}
]
},
"WorkflowTarget": {
"description": "<!--${internal}--> Mirror the workflow specified by [`WorkflowTarget::workflow`].",
"type": "object",
"required": [
"workflow"
],
"properties": {
"container": {
"type": [
"string",
"null"
]
},
"step": {
"type": [
"string",
"null"
]
},
"template": {
"type": [
"string",
"null"
]
},
"workflow": {
"description": "<!--${internal}--> Workflow to mirror.",
"type": "string"
}
},
"additionalProperties": false
},
"io.k8s.api.core.v1.ResourceClaim": {
"description": "ResourceClaim references one entry in PodSpec.ResourceClaims.",
"type": "object",
Expand Down
12 changes: 10 additions & 2 deletions mirrord/cli/src/verify_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use mirrord_config::{
target::{
cron_job::CronJobTarget, deployment::DeploymentTarget, job::JobTarget, pod::PodTarget,
replica_set::ReplicaSetTarget, rollout::RolloutTarget, service::ServiceTarget,
stateful_set::StatefulSetTarget, Target, TargetConfig,
stateful_set::StatefulSetTarget, workflow::WorkflowTarget, Target, TargetConfig,
},
LayerConfig,
};
Expand Down Expand Up @@ -53,6 +53,9 @@ enum VerifiedTarget {

#[serde(untagged)]
ReplicaSet(ReplicaSetTarget),

#[serde(untagged)]
Workflow(WorkflowTarget),
}

impl From<Target> for VerifiedTarget {
Expand All @@ -66,6 +69,7 @@ impl From<Target> for VerifiedTarget {
Target::StatefulSet(target) => Self::StatefulSet(target),
Target::Service(target) => Self::Service(target),
Target::ReplicaSet(target) => Self::ReplicaSet(target),
Target::Workflow(target) => Self::Workflow(target),
Target::Targetless => Self::Targetless,
}
}
Expand All @@ -83,6 +87,7 @@ impl From<VerifiedTarget> for TargetType {
VerifiedTarget::StatefulSet(_) => TargetType::StatefulSet,
VerifiedTarget::Service(_) => TargetType::Service,
VerifiedTarget::ReplicaSet(_) => TargetType::ReplicaSet,
VerifiedTarget::Workflow(_) => TargetType::Workflow,
}
}
}
Expand Down Expand Up @@ -115,6 +120,7 @@ enum TargetType {
StatefulSet,
Service,
ReplicaSet,
Workflow,
}

impl core::fmt::Display for TargetType {
Expand All @@ -129,6 +135,7 @@ impl core::fmt::Display for TargetType {
TargetType::StatefulSet => "statefulset",
TargetType::Service => "service",
TargetType::ReplicaSet => "replicaset",
TargetType::Workflow => "workflow",
};

f.write_str(stringifed)
Expand All @@ -147,6 +154,7 @@ impl TargetType {
Self::StatefulSet,
Self::Service,
Self::ReplicaSet,
Self::Workflow,
]
.into_iter()
}
Expand All @@ -157,7 +165,7 @@ impl TargetType {
Self::Pod => !(config.copy_target.enabled && config.copy_target.scale_down),
Self::Job | Self::CronJob => config.copy_target.enabled,
Self::Service => !config.copy_target.enabled,
Self::Deployment | Self::StatefulSet | Self::ReplicaSet => true,
Self::Deployment | Self::StatefulSet | Self::ReplicaSet | Self::Workflow => true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come workflow targets don't require a copy_target? Since they actually target a specific node of the workflow graph that ends, like a job?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit hesitant in requiring it being copy-target just yet because of the amount of customization included in workflows, Right now we can add support for "catching" a running workflow

}
}
}
Expand Down
104 changes: 94 additions & 10 deletions mirrord/config/src/target.rs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing user-facing docs for workflow targets.

Also, we might want to clarify the container/... part of the target in those docs, because people might assume that part means - choose a template with that container name, but it doesn't, with the current code what it means is - once you found the pod that was created for the target template, choose this container of that pod.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::{Deserialize, Serialize};

use self::{
deployment::DeploymentTarget, job::JobTarget, pod::PodTarget, rollout::RolloutTarget,
service::ServiceTarget, stateful_set::StatefulSetTarget,
service::ServiceTarget, stateful_set::StatefulSetTarget, workflow::WorkflowTarget,
};
use crate::{
config::{
Expand All @@ -27,6 +27,7 @@ pub mod replica_set;
pub mod rollout;
pub mod service;
pub mod stateful_set;
pub mod workflow;

#[derive(Deserialize, PartialEq, Eq, Clone, Debug, JsonSchema)]
#[serde(untagged, rename_all = "lowercase", deny_unknown_fields)]
Expand Down Expand Up @@ -242,6 +243,7 @@ mirrord-layer failed to parse the provided target!
>> `statefulset/{statefulset-name}[/container/{container-name}]`;
>> `service/{service-name}[/container/{container-name}]`;
>> `replicaset/{replicaset-name}[/container/{container-name}]`;
>> `workflow/{workflow-name}[/template/{template-name}][/step/{step-name}][/container/{container-name}]`;

- Note:
>> specifying container name is optional, defaults to a container chosen by mirrord
Expand All @@ -268,6 +270,8 @@ mirrord-layer failed to parse the provided target!
/// - `statefulset/{statefulset-name}[/container/{container-name}]`;
/// - `service/{service-name}[/container/{container-name}]`;
/// - `replicaset/{replicaset-name}[/container/{container-name}]`;
/// - `workflow/{workflow-name}[/template/{template-name}][/step/{step-name}][/container/
/// {container-name}]`;
#[warn(clippy::wildcard_enum_match_arm)]
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug, JsonSchema)]
#[serde(untagged, deny_unknown_fields)]
Expand Down Expand Up @@ -308,6 +312,10 @@ pub enum Target {
/// [ReplicaSet](https://kubernetes.io/docs/concepts/workloads/controllers/replicaset/).
ReplicaSet(replica_set::ReplicaSetTarget),

/// <!--${internal}-->
/// [Argo Workflow](https://argo-workflows.readthedocs.io/en/latest/).
Workflow(workflow::WorkflowTarget),

/// <!--${internal}-->
/// Spawn a new pod.
#[schemars(skip)]
Expand All @@ -333,6 +341,7 @@ impl FromStr for Target {
Some("statefulset") => stateful_set::StatefulSetTarget::from_split(&mut split).map(Target::StatefulSet),
Some("service") => service::ServiceTarget::from_split(&mut split).map(Target::Service),
Some("replicaset") => replica_set::ReplicaSetTarget::from_split(&mut split).map(Target::ReplicaSet),
Some("workflow") => workflow::WorkflowTarget::from_split(&mut split).map(Target::Workflow),
_ => Err(ConfigError::InvalidTarget(format!(
"Provided target: {target} is unsupported. Did you remember to add a prefix, e.g. pod/{target}? \n{FAIL_PARSE_DEPLOYMENT_OR_POD}",
))),
Expand Down Expand Up @@ -375,7 +384,7 @@ pub trait TargetDisplay {
fn container(&self) -> Option<&String>;
}

/// Implements the [`TargetDisplay`] and [`fmt::Display`] traits for a target type.
/// Implements the [`TargetDisplay`] only.
macro_rules! impl_target_display {
($struct_name:ident, $target_type:ident, $target_type_display:literal) => {
impl TargetDisplay for $struct_name {
Expand All @@ -391,6 +400,13 @@ macro_rules! impl_target_display {
self.container.as_ref()
}
}
};
}

/// Implements the [`TargetDisplay`] and [`fmt::Display`] traits for a target type.
macro_rules! impl_display_and_target_display {
($struct_name:ident, $target_type:ident, $target_type_display:literal) => {
impl_target_display!($struct_name, $target_type, $target_type_display);

impl fmt::Display for $struct_name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -408,14 +424,15 @@ macro_rules! impl_target_display {
};
}

impl_target_display!(PodTarget, pod, "pod");
impl_target_display!(DeploymentTarget, deployment, "deployment");
impl_target_display!(RolloutTarget, rollout, "rollout");
impl_target_display!(JobTarget, job, "job");
impl_target_display!(CronJobTarget, cron_job, "cronjob");
impl_target_display!(StatefulSetTarget, stateful_set, "statefulset");
impl_target_display!(ServiceTarget, service, "service");
impl_target_display!(ReplicaSetTarget, replica_set, "replicaset");
impl_display_and_target_display!(PodTarget, pod, "pod");
impl_display_and_target_display!(DeploymentTarget, deployment, "deployment");
impl_display_and_target_display!(RolloutTarget, rollout, "rollout");
impl_display_and_target_display!(JobTarget, job, "job");
impl_display_and_target_display!(CronJobTarget, cron_job, "cronjob");
impl_display_and_target_display!(StatefulSetTarget, stateful_set, "statefulset");
impl_display_and_target_display!(ServiceTarget, service, "service");
impl_display_and_target_display!(ReplicaSetTarget, replica_set, "replicaset");
impl_target_display!(WorkflowTarget, workflow, "workflow");

impl fmt::Display for Target {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand All @@ -429,6 +446,7 @@ impl fmt::Display for Target {
Target::StatefulSet(target) => target.fmt(f),
Target::Service(target) => target.fmt(f),
Target::ReplicaSet(target) => target.fmt(f),
Target::Workflow(target) => target.fmt(f),
}
}
}
Expand All @@ -445,6 +463,7 @@ impl TargetDisplay for Target {
Target::StatefulSet(target) => target.type_(),
Target::Service(target) => target.type_(),
Target::ReplicaSet(target) => target.type_(),
Target::Workflow(target) => target.type_(),
}
}

Expand All @@ -459,6 +478,7 @@ impl TargetDisplay for Target {
Target::StatefulSet(target) => target.name(),
Target::Service(target) => target.name(),
Target::ReplicaSet(target) => target.name(),
Target::Workflow(target) => target.name(),
}
}

Expand All @@ -473,6 +493,7 @@ impl TargetDisplay for Target {
Target::StatefulSet(target) => target.container(),
Target::Service(target) => target.container(),
Target::ReplicaSet(target) => target.container(),
Target::Workflow(target) => target.container(),
}
}
}
Expand All @@ -491,6 +512,7 @@ bitflags::bitflags! {
const STATEFUL_SET = 128;
const SERVICE = 256;
const REPLICA_SET = 512;
const WORKFLOW = 1024;
}
}

Expand Down Expand Up @@ -550,6 +572,12 @@ impl CollectAnalytics for &TargetConfig {
flags |= TargetAnalyticFlags::CONTAINER;
}
}
Target::Workflow(target) => {
flags |= TargetAnalyticFlags::WORKFLOW;
if target.container.is_some() {
flags |= TargetAnalyticFlags::CONTAINER;
}
}
Target::Targetless => {
// Targetless is essentially 0, so no need to set any flags.
}
Expand Down Expand Up @@ -619,6 +647,32 @@ mod tests {
namespace: None
}
)] // Rollout specified.
#[case(
Some("workflow/foo/template/bar"),
None,
TargetConfig{
path: Some(Target::Workflow(WorkflowTarget {
workflow: "foo".to_string(),
template: Some("bar".to_string()),
step: None,
container: None
})),
namespace: None
}
)] // Workflow with template
#[case(
Some("workflow/foo/template/bar/step/2000"),
None,
TargetConfig{
path: Some(Target::Workflow(WorkflowTarget {
workflow: "foo".to_string(),
template: Some("bar".to_string()),
step: Some("2000".to_string()),
container: None
})),
namespace: None
}
)] // Workflow with template and step
fn default(
#[case] path_env: Option<&str>,
#[case] namespace_env: Option<&str>,
Expand Down Expand Up @@ -699,4 +753,34 @@ mod tests {
.unwrap();
assert_eq!(target_config, expected_target_config);
}

#[rstest]
#[case("pod/foo")]
#[case("pod/foo/container/bar")]
#[case("deployment/foo")]
#[case("deployment/foo/container/bar")]
#[case("rollout/foo")]
#[case("rollout/foo/container/bar")]
#[case("job/foo")]
#[case("job/foo/container/bar")]
#[case("cronjob/foo")]
#[case("cronjob/foo/container/bar")]
#[case("statefulset/foo")]
#[case("statefulset/foo/container/bar")]
#[case("service/foo")]
#[case("service/foo/container/bar")]
#[case("replicaset/foo")]
#[case("replicaset/foo/container/bar")]
#[case("workflow/foo")]
#[case("workflow/foo/container/bar")]
#[case("workflow/foo/template/bar")]
#[case("workflow/foo/template/bar/container/main")]
#[case("workflow/foo/template/bar/step/uno")]
#[case("workflow/foo/template/bar/step/uno/container/main")]
fn parse_and_to_string_are_same(#[case] target_path: &str) {
let target =
Target::from_str(target_path).expect("target should be parsed from target_path");

assert_eq!(target.to_string(), target_path)
}
}
Loading
Loading