Merge remote-tracking branch 'upstream/v2' into v2
This commit is contained in:
@@ -39,7 +39,7 @@ func main() {
|
|||||||
gocron.NewTask(
|
gocron.NewTask(
|
||||||
func(a string, b int) {
|
func(a string, b int) {
|
||||||
// do things
|
// do things
|
||||||
},
|
},
|
||||||
"hello",
|
"hello",
|
||||||
1,
|
1,
|
||||||
),
|
),
|
||||||
|
|||||||
@@ -228,6 +228,17 @@ func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
|
|||||||
outJobs[counter] = s.jobFromInternalJob(j)
|
outJobs[counter] = s.jobFromInternalJob(j)
|
||||||
counter++
|
counter++
|
||||||
}
|
}
|
||||||
|
slices.SortFunc(outJobs, func(a, b Job) int {
|
||||||
|
aID, bID := a.ID().String(), b.ID().String()
|
||||||
|
switch {
|
||||||
|
case aID < bID:
|
||||||
|
return -1
|
||||||
|
case aID > bID:
|
||||||
|
return 1
|
||||||
|
default:
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
})
|
||||||
select {
|
select {
|
||||||
case <-s.shutdownCtx.Done():
|
case <-s.shutdownCtx.Done():
|
||||||
case out.outChan <- outJobs:
|
case out.outChan <- outJobs:
|
||||||
@@ -268,10 +279,15 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {
|
|||||||
delete(s.jobs, id)
|
delete(s.jobs, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Jobs coming back from the executor to the scheduler that
|
||||||
|
// need to evaluated for rescheduling.
|
||||||
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
|
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
|
||||||
j := s.jobs[id]
|
j := s.jobs[id]
|
||||||
j.lastRun = j.nextRun
|
j.lastRun = j.nextRun
|
||||||
|
|
||||||
|
// if the job has a limited number of runs set, we need to
|
||||||
|
// check how many runs have occurred and stop running this
|
||||||
|
// job if it has reached the limit.
|
||||||
if j.limitRunsTo != nil {
|
if j.limitRunsTo != nil {
|
||||||
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
|
j.limitRunsTo.runCount = j.limitRunsTo.runCount + 1
|
||||||
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
|
if j.limitRunsTo.runCount == j.limitRunsTo.limit {
|
||||||
@@ -288,10 +304,25 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
|
|||||||
|
|
||||||
next := j.next(j.lastRun)
|
next := j.next(j.lastRun)
|
||||||
if next.IsZero() {
|
if next.IsZero() {
|
||||||
|
// the job's next function will return zero for OneTime jobs.
|
||||||
|
// since they are one time only, they do not need rescheduling.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if next.Before(s.now()) {
|
||||||
|
// in some cases the next run time can be in the past, for example:
|
||||||
|
// - the time on the machine was incorrect and has been synced with ntp
|
||||||
|
// - the machine went to sleep, and woke up some time later
|
||||||
|
// in those cases, we want to increment to the next run in the future
|
||||||
|
// and schedule the job for that time.
|
||||||
|
for next.Before(s.now()) {
|
||||||
|
next = j.next(next)
|
||||||
|
}
|
||||||
|
}
|
||||||
j.nextRun = next
|
j.nextRun = next
|
||||||
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
|
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
|
||||||
|
// set the actual timer on the job here and listen for
|
||||||
|
// shut down events so that the job doesn't attempt to
|
||||||
|
// run if the scheduler has been shutdown.
|
||||||
select {
|
select {
|
||||||
case <-s.shutdownCtx.Done():
|
case <-s.shutdownCtx.Done():
|
||||||
return
|
return
|
||||||
@@ -301,6 +332,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
|
|||||||
}:
|
}:
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
// update the job with its new next and last run times and timer.
|
||||||
s.jobs[id] = j
|
s.jobs[id] = j
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1635,3 +1635,32 @@ func TestScheduler_OneTimeJob(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestScheduler_Jobs(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
"order is equal",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
s := newTestScheduler(t)
|
||||||
|
|
||||||
|
for i := 0; i <= 20; i++ {
|
||||||
|
_, err := s.NewJob(
|
||||||
|
DurationJob(time.Second),
|
||||||
|
NewTask(func() {}),
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobsFirst := s.Jobs()
|
||||||
|
jobsSecond := s.Jobs()
|
||||||
|
|
||||||
|
assert.Equal(t, jobsFirst, jobsSecond)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user