@@ -869,7 +869,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
869
869
parentNode = nodes [parentWfNode .ID ]
870
870
}
871
871
872
- children := []* dagNode {}
872
+ children := make ( []* dagNode , 0 )
873
873
874
874
for _ , childID := range wfNode .Children {
875
875
childNode , ok := nodes [childID ]
@@ -882,7 +882,7 @@ func newWorkflowsDag(wf *wfv1.Workflow) ([]*dagNode, error) {
882
882
nodes [wfNode .ID ].children = children
883
883
}
884
884
885
- values := []* dagNode {}
885
+ values := make ( []* dagNode , 0 )
886
886
for _ , v := range nodes {
887
887
values = append (values , v )
888
888
}
@@ -904,13 +904,17 @@ func singularPath(nodes []*dagNode, toNode string) ([]*dagNode, error) {
904
904
}
905
905
}
906
906
907
+ if root == nil {
908
+ return nil , fmt .Errorf ("was unable to find root" )
909
+ }
910
+
907
911
if leaf == nil {
908
912
return nil , fmt .Errorf ("was unable to find %s" , toNode )
909
913
}
910
914
911
915
curr := leaf
912
916
913
- reverseNodes := []* dagNode {}
917
+ reverseNodes := make ( []* dagNode , 0 )
914
918
for {
915
919
reverseNodes = append (reverseNodes , curr )
916
920
if curr .n .ID == root .n .ID {
@@ -948,37 +952,32 @@ func getChildren(n *dagNode) map[string]bool {
948
952
949
953
type resetFn func (string )
950
954
type deleteFn func (string )
955
+ type matchFn func (* dagNode ) bool
951
956
952
- // untilFn is a function that returns two variables, the first indicates
953
- // a `found` boolean while the second indicates if reset should be called.
954
- type untilFn func (* dagNode ) (bool , bool )
955
-
956
- func getUntilFnNodeType (nodeType wfv1.NodeType ) untilFn {
957
- return func (n * dagNode ) (bool , bool ) {
958
- return n .n .Type == nodeType , true
957
+ func matchNodeType (nodeType wfv1.NodeType ) matchFn {
958
+ return func (n * dagNode ) bool {
959
+ return n .n .Type == nodeType
959
960
}
960
961
}
961
962
962
- func resetUntil (n * dagNode , should untilFn , resetFunc resetFn ) (* dagNode , error ) {
963
+ func resetUntil (n * dagNode , matchFunc matchFn , resetFunc resetFn ) (* dagNode , error ) {
963
964
curr := n
964
965
for {
965
966
if curr == nil {
966
967
return nil , fmt .Errorf ("was seeking node but ran out of nodes to explore" )
967
968
}
968
969
969
- if foundNode , shouldReset := should (curr ); foundNode {
970
- if shouldReset {
971
- resetFunc (curr .n .ID )
972
- }
970
+ if match := matchFunc (curr ); match {
971
+ resetFunc (curr .n .ID )
973
972
return curr , nil
974
973
}
975
974
curr = curr .parent
976
975
}
977
976
}
978
977
979
- func getTillBoundaryFn (boundaryID string ) untilFn {
980
- return func (n * dagNode ) ( bool , bool ) {
981
- return n .n .ID == boundaryID , n . n . BoundaryID != ""
978
+ func matchBoundaryID (boundaryID string ) matchFn {
979
+ return func (n * dagNode ) bool {
980
+ return n .n .ID == boundaryID
982
981
}
983
982
}
984
983
@@ -988,6 +987,10 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
988
987
if curr == nil {
989
988
return curr , nil
990
989
}
990
+ if curr .parent != nil && curr .parent .n .Type == wfv1 .NodeTypeRetry {
991
+ resetFunc (curr .parent .n .ID )
992
+ curr = curr .parent
993
+ }
991
994
if curr .parent != nil && curr .parent .n .Type == wfv1 .NodeTypeStepGroup {
992
995
resetFunc (curr .parent .n .ID )
993
996
}
@@ -996,41 +999,17 @@ func resetBoundaries(n *dagNode, resetFunc resetFn) (*dagNode, error) {
996
999
return curr .parent , nil
997
1000
}
998
1001
var err error
999
- curr , err = resetUntil (curr , getTillBoundaryFn (seekingBoundaryID ), resetFunc )
1002
+ curr , err = resetUntil (curr , matchBoundaryID (seekingBoundaryID ), resetFunc )
1000
1003
if err != nil {
1001
1004
return nil , err
1002
1005
}
1003
1006
}
1004
1007
}
1005
1008
1006
- func resetStepGroup (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1007
- return resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeStepGroup ), resetFunc )
1008
- }
1009
-
1010
- func resetSteps (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1011
- n , err := resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeSteps ), resetFunc )
1012
- if err != nil {
1013
- return nil , err
1014
- }
1015
- return resetBoundaries (n , resetFunc )
1016
- }
1017
-
1018
- func resetTaskGroup (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1019
- return resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeTaskGroup ), resetFunc )
1020
- }
1021
-
1022
- func resetDAG (n * dagNode , resetFunc resetFn ) (* dagNode , error ) {
1023
- n , err := resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypeDAG ), resetFunc )
1024
- if err != nil {
1025
- return nil , err
1026
- }
1027
- return resetBoundaries (n , resetFunc )
1028
- }
1029
-
1030
1009
// resetPod is only called in the event a Container was found. This implies that there is a parent pod.
1031
1010
func resetPod (n * dagNode , resetFunc resetFn , addToDelete deleteFn ) (* dagNode , error ) {
1032
1011
// this sets to reset but resets are overridden by deletes in the final FormulateRetryWorkflow logic.
1033
- curr , err := resetUntil (n , getUntilFnNodeType (wfv1 .NodeTypePod ), resetFunc )
1012
+ curr , err := resetUntil (n , matchNodeType (wfv1 .NodeTypePod ), resetFunc )
1034
1013
if err != nil {
1035
1014
return nil , err
1036
1015
}
@@ -1074,79 +1053,26 @@ func resetPath(allNodes []*dagNode, startNode string) (map[string]bool, map[stri
1074
1053
nodesToDelete [nodeID ] = true
1075
1054
}
1076
1055
1077
- var mustFind wfv1.NodeType
1078
- mustFind = ""
1079
-
1080
- if curr .n .Type == wfv1 .NodeTypeContainer {
1081
- // special case where the retry node is the container of a containerSet
1082
- mustFind = wfv1 .NodeTypePod
1083
- }
1084
-
1085
- findBoundaries := false
1086
1056
for curr != nil {
1087
-
1088
- switch curr .n .Type {
1089
- case wfv1 .NodeTypePod :
1090
- //ignore
1091
- case wfv1 .NodeTypeContainer :
1092
- //ignore
1093
- case wfv1 .NodeTypeSteps :
1094
- addToReset (curr .n .ID )
1095
- findBoundaries = true
1096
- case wfv1 .NodeTypeStepGroup :
1097
- addToReset (curr .n .ID )
1098
- findBoundaries = true
1099
- case wfv1 .NodeTypeDAG :
1057
+ switch {
1058
+ case isGroupNodeType (curr .n .Type ):
1100
1059
addToReset (curr .n .ID )
1101
- findBoundaries = true
1102
- case wfv1 .NodeTypeTaskGroup :
1103
- addToReset (curr .n .ID )
1104
- findBoundaries = true
1105
- case wfv1 .NodeTypeRetry :
1106
- addToReset (curr .n .ID )
1107
- case wfv1 .NodeTypeSkipped :
1108
- // ignore -> doesn't make sense to reach this
1109
- case wfv1 .NodeTypeSuspend :
1110
- // ignore
1111
- case wfv1 .NodeTypeHTTP :
1112
- // ignore
1113
- case wfv1 .NodeTypePlugin :
1114
- addToReset (curr .n .ID )
1115
- }
1116
-
1117
- if mustFind == "" && ! findBoundaries {
1118
- curr = curr .parent
1119
- continue
1120
- }
1121
-
1122
- if findBoundaries {
1123
1060
curr , err = resetBoundaries (curr , addToReset )
1124
1061
if err != nil {
1125
1062
return nil , nil , err
1126
1063
}
1127
- findBoundaries = false
1128
1064
continue
1129
- }
1130
-
1131
- switch mustFind {
1132
- case wfv1 .NodeTypePod :
1065
+ case curr .n .Type == wfv1 .NodeTypeRetry :
1066
+ addToReset (curr .n .ID )
1067
+ case curr .n .Type == wfv1 .NodeTypeContainer :
1133
1068
curr , err = resetPod (curr , addToReset , addToDelete )
1134
- case wfv1 .NodeTypeSteps :
1135
- curr , err = resetSteps (curr , addToReset )
1136
- case wfv1 .NodeTypeStepGroup :
1137
- curr , err = resetStepGroup (curr , addToReset )
1138
- case wfv1 .NodeTypeDAG :
1139
- curr , err = resetDAG (curr , addToReset )
1140
- case wfv1 .NodeTypeTaskGroup :
1141
- curr , err = resetTaskGroup (curr , addToReset )
1142
- default :
1143
- return nil , nil , fmt .Errorf ("invalid mustFind of %s supplied" , mustFind )
1144
- }
1145
- mustFind = ""
1146
- if err != nil {
1147
- return nil , nil , err
1069
+ if err != nil {
1070
+ return nil , nil , err
1071
+ }
1072
+ continue
1148
1073
}
1149
1074
1075
+ curr = curr .parent
1150
1076
}
1151
1077
return nodesToReset , nodesToDelete , nil
1152
1078
}
@@ -1165,11 +1091,13 @@ func setUnion[T comparable](m1 map[T]bool, m2 map[T]bool) map[T]bool {
1165
1091
}
1166
1092
return res
1167
1093
}
1168
- func shouldRetryFailedType (nodeTyp wfv1.NodeType ) bool {
1169
- if nodeTyp == wfv1 .NodeTypePod || nodeTyp == wfv1 .NodeTypeContainer {
1170
- return true
1171
- }
1172
- return false
1094
+
1095
+ func isGroupNodeType (nodeType wfv1.NodeType ) bool {
1096
+ return nodeType == wfv1 .NodeTypeDAG || nodeType == wfv1 .NodeTypeTaskGroup || nodeType == wfv1 .NodeTypeStepGroup || nodeType == wfv1 .NodeTypeSteps
1097
+ }
1098
+
1099
+ func isExecutionNodeType (nodeType wfv1.NodeType ) bool {
1100
+ return nodeType == wfv1 .NodeTypeContainer || nodeType == wfv1 .NodeTypePod || nodeType == wfv1 .NodeTypeHTTP || nodeType == wfv1 .NodeTypePlugin
1173
1101
}
1174
1102
1175
1103
// dagSortedNodes sorts the nodes based on topological order, omits onExitNode
@@ -1236,8 +1164,16 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
1236
1164
1237
1165
failed := make (map [string ]bool )
1238
1166
for nodeID , node := range wf .Status .Nodes {
1239
- if node .Phase .FailedOrError () && shouldRetryFailedType (node .Type ) && ! isDescendantNodeSucceeded (wf , node , deleteNodesMap ) {
1240
- failed [nodeID ] = true
1167
+ if node .FailedOrError () && isExecutionNodeType (node .Type ) {
1168
+ // Check its parent if current node is retry node
1169
+ if node .NodeFlag != nil && node .NodeFlag .Retried {
1170
+ node = * wf .Status .Nodes .Find (func (nodeStatus wfv1.NodeStatus ) bool {
1171
+ return nodeStatus .HasChild (node .ID )
1172
+ })
1173
+ }
1174
+ if ! isDescendantNodeSucceeded (wf , node , deleteNodesMap ) {
1175
+ failed [nodeID ] = true
1176
+ }
1241
1177
}
1242
1178
}
1243
1179
for failedNode := range failed {
@@ -1290,7 +1226,7 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
1290
1226
}
1291
1227
1292
1228
for nodeID := range toReset {
1293
- // avoid reseting nodes that are marked for deletion
1229
+ // avoid resetting nodes that are marked for deletion
1294
1230
if in := toDelete [nodeID ]; in {
1295
1231
continue
1296
1232
}
@@ -1335,9 +1271,6 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
1335
1271
queue .Remove (currNode )
1336
1272
}
1337
1273
}
1338
- if n .Name == wf .Name && ! shouldRetryFailedType (n .Type ) {
1339
- newWf .Status .Nodes .Set (id , resetNode (* n .DeepCopy ()))
1340
- }
1341
1274
}
1342
1275
for id , oldWfNode := range wf .Status .Nodes {
1343
1276
0 commit comments