fix case where OneTimeJob with concurrent limit and limited runs fails to run (#703)

This commit is contained in:
John Roesler
2024-03-26 16:29:25 -05:00
committed by GitHub
parent 9ae7545c27
commit dcd4edae17
2 changed files with 93 additions and 17 deletions
+18 -17
View File
@@ -300,23 +300,6 @@ func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
}
j.lastScheduledRun = j.nextScheduled
// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
if j.limitRunsTo != nil {
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
go func() {
select {
case <-s.shutdownCtx.Done():
return
case s.removeJobCh <- id:
}
}()
return
}
}
next := j.next(j.lastScheduledRun)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
@@ -356,6 +339,24 @@ func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
if !ok {
return
}
// if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this
// job if it has reached the limit.
if j.limitRunsTo != nil {
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
go func() {
select {
case <-s.shutdownCtx.Done():
return
case s.removeJobCh <- id:
}
}()
return
}
}
j.lastRun = s.now()
s.jobs[id] = j
}
+75
View File
@@ -1809,6 +1809,81 @@ func TestScheduler_OneTimeJob(t *testing.T) {
}
}
func TestScheduler_WithLimitedRuns(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
schedulerOpts []SchedulerOption
job JobDefinition
jobOpts []JobOption
runLimit uint
expectedRuns int
}{
{
"simple",
nil,
DurationJob(time.Millisecond * 100),
nil,
1,
1,
},
{
"OneTimeJob, WithLimitConcurrentJobs",
[]SchedulerOption{
WithLimitConcurrentJobs(1, LimitModeWait),
},
OneTimeJob(OneTimeJobStartImmediately()),
nil,
1,
1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, tt.schedulerOpts...)
jobRan := make(chan struct{}, 10)
jobOpts := []JobOption{
WithLimitedRuns(tt.runLimit),
}
jobOpts = append(jobOpts, tt.jobOpts...)
_, err := s.NewJob(
tt.job,
NewTask(func() {
jobRan <- struct{}{}
}),
jobOpts...,
)
require.NoError(t, err)
s.Start()
time.Sleep(time.Millisecond * 150)
assert.NoError(t, s.Shutdown())
var runCount int
for runCount < tt.expectedRuns {
select {
case <-jobRan:
runCount++
case <-time.After(time.Second):
t.Fatal("timed out waiting for job to run")
}
}
select {
case <-jobRan:
t.Fatal("job ran more than expected")
default:
}
assert.Equal(t, tt.expectedRuns, runCount)
})
}
}
func TestScheduler_Jobs(t *testing.T) {
tests := []struct {
name string