Skip to content

Commit 9b91d90

Browse files
authored
feat: expose submit and cancel job methods as public in scheduler (#1260)
* expose schedule and cancel job methods as public * address code review comment
1 parent 6aa7588 commit 9b91d90

File tree

4 files changed

+51
-55
lines changed

4 files changed

+51
-55
lines changed

ballista/scheduler/src/scheduler_server/grpc.rs

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -457,14 +457,13 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
457457
plan.display_indent()
458458
);
459459

460-
let job_id = self.state.task_manager.generate_job_id();
461-
462460
log::trace!("setting job name: {}", job_name);
463-
self.submit_job(&job_id, &job_name, session_ctx, &plan)
461+
let job_id = self
462+
.submit_job(&job_name, session_ctx, &plan)
464463
.await
465464
.map_err(|e| {
466465
let msg =
467-
format!("Failed to send JobQueued event for {job_id}: {e:?}");
466+
format!("Failed to send JobQueued event for {job_name}: {e:?}");
468467
error!("{}", msg);
469468

470469
Status::internal(msg)
@@ -534,20 +533,12 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerGrpc
534533
let job_id = request.into_inner().job_id;
535534
info!("Received cancellation request for job {}", job_id);
536535

537-
self.query_stage_event_loop
538-
.get_sender()
539-
.map_err(|e| {
540-
let msg = format!("Get query stage event loop error due to {e:?}");
541-
error!("{}", msg);
542-
Status::internal(msg)
543-
})?
544-
.post_event(QueryStageSchedulerEvent::JobCancel(job_id))
545-
.await
546-
.map_err(|e| {
547-
let msg = format!("Post to query stage event loop error due to {e:?}");
548-
error!("{}", msg);
549-
Status::internal(msg)
550-
})?;
536+
self.cancel_job(job_id).await.map_err(|e| {
537+
let msg = format!("Post to query stage event loop error due to {e:?}");
538+
error!("{}", msg);
539+
Status::internal(msg)
540+
})?;
541+
551542
Ok(Response::new(CancelJobResult { cancelled: true }))
552543
}
553544

ballista/scheduler/src/scheduler_server/mod.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,28 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
163163
pub(crate) fn metrics_collector(&self) -> &dyn SchedulerMetricsCollector {
164164
self.query_stage_scheduler.metrics_collector()
165165
}
166+
/// Cancels job for given job_id
167+
pub async fn cancel_job(&self, job_id: String) -> Result<()> {
168+
log::debug!("Received cancellation request for job {}", job_id);
166169

167-
pub(crate) async fn submit_job(
170+
self.query_stage_event_loop
171+
.get_sender()?
172+
.post_event(QueryStageSchedulerEvent::JobCancel(job_id.to_owned()))
173+
.await?;
174+
175+
Ok(())
176+
}
177+
178+
/// Submits a job to executor returning job_id
179+
pub async fn submit_job(
168180
&self,
169-
job_id: &str,
170181
job_name: &str,
171182
ctx: Arc<SessionContext>,
172183
plan: &LogicalPlan,
173-
) -> Result<()> {
184+
) -> Result<String> {
185+
log::debug!("Received submit request for job {}", job_name);
186+
let job_id = self.state.task_manager.generate_job_id();
187+
174188
self.query_stage_event_loop
175189
.get_sender()?
176190
.post_event(QueryStageSchedulerEvent::JobQueued {
@@ -180,7 +194,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> SchedulerServer<T
180194
plan: Box::new(plan.clone()),
181195
queued_at: timestamp_millis(),
182196
})
183-
.await
197+
.await?;
198+
199+
Ok(job_id)
184200
}
185201

186202
/// It just send task status update event to the channel,
@@ -505,7 +521,7 @@ mod test {
505521
)
506522
.await?;
507523

508-
let status = test.run("job", "", &plan).await.expect("running plan");
524+
let (status, job_id) = test.run("", &plan).await.expect("running plan");
509525

510526
match status.status {
511527
Some(job_status::Status::Successful(SuccessfulJob {
@@ -519,8 +535,8 @@ mod test {
519535
}
520536
}
521537

522-
assert_submitted_event("job", &metrics_collector);
523-
assert_completed_event("job", &metrics_collector);
538+
assert_submitted_event(&job_id, &metrics_collector);
539+
assert_completed_event(&job_id, &metrics_collector);
524540

525541
Ok(())
526542
}
@@ -580,7 +596,7 @@ mod test {
580596
)
581597
.await?;
582598

583-
let status = test.run("job", "", &plan).await.expect("running plan");
599+
let (status, job_id) = test.run("", &plan).await.expect("running plan");
584600

585601
assert!(
586602
matches!(
@@ -594,8 +610,8 @@ mod test {
594610
"Expected job status to be failed but it was {status:?}"
595611
);
596612

597-
assert_submitted_event("job", &metrics_collector);
598-
assert_failed_event("job", &metrics_collector);
613+
assert_submitted_event(&job_id, &metrics_collector);
614+
assert_failed_event(&job_id, &metrics_collector);
599615

600616
Ok(())
601617
}
@@ -625,7 +641,7 @@ mod test {
625641
.into_optimized_plan()?;
626642

627643
// This should fail when we try and create the physical plan
628-
let status = test.run("job", "", &plan).await?;
644+
let (status, job_id) = test.run("", &plan).await?;
629645

630646
assert!(
631647
matches!(
@@ -639,8 +655,8 @@ mod test {
639655
"Expected job status to be failed but it was {status:?}"
640656
);
641657

642-
assert_no_submitted_event("job", &metrics_collector);
643-
assert_failed_event("job", &metrics_collector);
658+
assert_no_submitted_event(&job_id, &metrics_collector);
659+
assert_failed_event(&job_id, &metrics_collector);
644660

645661
Ok(())
646662
}

ballista/scheduler/src/scheduler_server/query_stage_scheduler.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,7 @@ mod tests {
376376
)
377377
.await?;
378378

379-
let job_id = "job-1";
380-
381-
test.submit(job_id, "", &plan).await?;
379+
let job_id = test.submit("", &plan).await?;
382380

383381
test.tick().await?;
384382

@@ -398,7 +396,7 @@ mod tests {
398396
expected, running_jobs
399397
);
400398

401-
test.cancel(job_id).await?;
399+
test.cancel(&job_id).await?;
402400

403401
let expected = 0usize;
404402
let success = await_condition(Duration::from_millis(10), 20, || {

ballista/scheduler/src/test_utils.rs

Lines changed: 11 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -480,12 +480,7 @@ impl SchedulerTest {
480480
.await
481481
}
482482

483-
pub async fn submit(
484-
&mut self,
485-
job_id: &str,
486-
job_name: &str,
487-
plan: &LogicalPlan,
488-
) -> Result<()> {
483+
pub async fn submit(&mut self, job_name: &str, plan: &LogicalPlan) -> Result<String> {
489484
println!("{:?}", self.session_config);
490485
let ctx = self
491486
.scheduler
@@ -494,11 +489,9 @@ impl SchedulerTest {
494489
.create_session(&self.session_config)
495490
.await?;
496491

497-
self.scheduler
498-
.submit_job(job_id, job_name, ctx, plan)
499-
.await?;
492+
let job_id = self.scheduler.submit_job(job_name, ctx, plan).await?;
500493

501-
Ok(())
494+
Ok(job_id)
502495
}
503496

504497
pub async fn post_scheduler_event(
@@ -606,22 +599,20 @@ impl SchedulerTest {
606599
final_status
607600
}
608601

602+
/// Returns job status and job_id
609603
pub async fn run(
610604
&mut self,
611-
job_id: &str,
612605
job_name: &str,
613606
plan: &LogicalPlan,
614-
) -> Result<JobStatus> {
607+
) -> Result<(JobStatus, String)> {
615608
let ctx = self
616609
.scheduler
617610
.state
618611
.session_manager
619612
.create_session(&self.session_config)
620613
.await?;
621614

622-
self.scheduler
623-
.submit_job(job_id, job_name, ctx, plan)
624-
.await?;
615+
let job_id = self.scheduler.submit_job(job_name, ctx, plan).await?;
625616

626617
let mut receiver = self.status_receiver.take().unwrap();
627618

@@ -640,7 +631,7 @@ impl SchedulerTest {
640631
.scheduler
641632
.state
642633
.task_manager
643-
.get_job_status(job_id)
634+
.get_job_status(&job_id)
644635
.await?;
645636

646637
if let Some(JobStatus {
@@ -659,7 +650,7 @@ impl SchedulerTest {
659650
tokio::time::sleep(Duration::from_millis(100)).await
660651
};
661652

662-
final_status
653+
final_status.map(|s| (s, job_id))
663654
}
664655
}
665656

@@ -746,7 +737,7 @@ pub fn assert_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
746737
.iter()
747738
.any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
748739

749-
assert!(found, "{}", "Expected submitted event for job {job_id}");
740+
assert!(found, "Expected submitted event for job {job_id}");
750741
}
751742

752743
pub fn assert_no_submitted_event(job_id: &str, collector: &TestMetricsCollector) {
@@ -755,7 +746,7 @@ pub fn assert_no_submitted_event(job_id: &str, collector: &TestMetricsCollector)
755746
.iter()
756747
.any(|ev| matches!(ev, MetricEvent::Submitted(_, _, _)));
757748

758-
assert!(!found, "{}", "Expected no submitted event for job {job_id}");
749+
assert!(!found, "Expected no submitted event for job {job_id}");
759750
}
760751

761752
pub fn assert_completed_event(job_id: &str, collector: &TestMetricsCollector) {
@@ -782,7 +773,7 @@ pub fn assert_failed_event(job_id: &str, collector: &TestMetricsCollector) {
782773
.iter()
783774
.any(|ev| matches!(ev, MetricEvent::Failed(_, _, _)));
784775

785-
assert!(found, "{}", "Expected failed event for job {job_id}");
776+
assert!(found, "Expected failed event for job {job_id}");
786777
}
787778

788779
pub fn revive_graph_and_complete_next_stage(graph: &mut ExecutionGraph) -> Result<usize> {

0 commit comments

Comments
 (0)