add new features, OneTimeJob and Job.RunNow() (#646)

This commit is contained in:
John Roesler
2023-12-18 21:13:37 -06:00
committed by GitHub
parent f7cd2bcf04
commit 6e15f16d77
8 changed files with 483 additions and 55 deletions
+2
View File
@@ -87,6 +87,8 @@ Jobs can be run every x days at specific times.
Jobs can be run every x weeks on specific days of the week and at specific times. Jobs can be run every x weeks on specific days of the week and at specific times.
- [**Monthly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonthlyJob): - [**Monthly**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#MonthlyJob):
Jobs can be run every x months on specific days of the month and at specific times. Jobs can be run every x months on specific days of the month and at specific times.
- [**One time**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#OneTimeJob):
Jobs can be run once at a specific time. These are non-recurring jobs.
### Concurrency Limits ### Concurrency Limits
Jobs can be limited individually or across the entire scheduler. Jobs can be limited individually or across the entire scheduler.
+2
View File
@@ -12,6 +12,7 @@ var (
ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration")
ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil")
ErrJobNotFound = fmt.Errorf("gocron: job not found") ErrJobNotFound = fmt.Errorf("gocron: job not found")
ErrJobRunNowFailed = fmt.Errorf("gocron: Job: RunNow: scheduler unreachable")
ErrMonthlyJobDays = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must be between 31 and -31 inclusive, and not 0") ErrMonthlyJobDays = fmt.Errorf("gocron: MonthlyJob: daysOfTheMonth must be between 31 and -31 inclusive, and not 0")
ErrMonthlyJobAtTimeNil = fmt.Errorf("gocron: MonthlyJob: atTime within atTimes must not be nil") ErrMonthlyJobAtTimeNil = fmt.Errorf("gocron: MonthlyJob: atTime within atTimes must not be nil")
ErrMonthlyJobAtTimesNil = fmt.Errorf("gocron: MonthlyJob: atTimes must not be nil") ErrMonthlyJobAtTimesNil = fmt.Errorf("gocron: MonthlyJob: atTimes must not be nil")
@@ -22,6 +23,7 @@ var (
ErrNewJobTaskNotFunc = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func") ErrNewJobTaskNotFunc = fmt.Errorf("gocron: NewJob: Task.Function must be of kind reflect.Func")
ErrNewJobWrongNumberOfParameters = fmt.Errorf("gocron: NewJob: Number of provided parameters does not match expected") ErrNewJobWrongNumberOfParameters = fmt.Errorf("gocron: NewJob: Number of provided parameters does not match expected")
ErrNewJobWrongTypeOfParameters = fmt.Errorf("gocron: NewJob: Type of provided parameters does not match expected") ErrNewJobWrongTypeOfParameters = fmt.Errorf("gocron: NewJob: Type of provided parameters does not match expected")
ErrOneTimeJobStartDateTimePast = fmt.Errorf("gocron: OneTimeJob: start must not be in the past")
ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop") ErrStopExecutorTimedOut = fmt.Errorf("gocron: timed out waiting for executor to stop")
ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish") ErrStopJobsTimedOut = fmt.Errorf("gocron: timed out waiting for jobs to finish")
ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop") ErrStopSchedulerTimedOut = fmt.Errorf("gocron: timed out waiting for scheduler to stop")
+48
View File
@@ -196,6 +196,28 @@ func ExampleJob_NextRun() {
fmt.Println(j.NextRun()) fmt.Println(j.NextRun())
} }
func ExampleJob_RunNow() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
j, _ := s.NewJob(
MonthlyJob(
1,
NewDaysOfTheMonth(3, -5, -1),
NewAtTimes(
NewAtTime(10, 30, 0),
NewAtTime(11, 15, 0),
),
),
NewTask(
func() {},
),
)
s.Start()
// Runs the job one time now, without impacting the schedule
_ = j.RunNow()
}
func ExampleMonthlyJob() { func ExampleMonthlyJob() {
s, _ := NewScheduler() s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }() defer func() { _ = s.Shutdown() }()
@@ -222,6 +244,32 @@ func ExampleNewScheduler() {
fmt.Println(s.Jobs()) fmt.Println(s.Jobs())
} }
func ExampleOneTimeJob() {
s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }()
// run a job once, immediately
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartImmediately(),
),
NewTask(
func() {},
),
)
// run a job once in 10 seconds
_, _ = s.NewJob(
OneTimeJob(
OneTimeJobStartDateTime(time.Now().Add(10*time.Second)),
),
NewTask(
func() {},
),
)
s.Start()
}
func ExampleScheduler_NewJob() { func ExampleScheduler_NewJob() {
s, _ := NewScheduler() s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }() defer func() { _ = s.Shutdown() }()
+47 -32
View File
@@ -14,7 +14,7 @@ type executor struct {
cancel context.CancelFunc cancel context.CancelFunc
logger Logger logger Logger
stopCh chan struct{} stopCh chan struct{}
jobsIDsIn chan uuid.UUID jobsIn chan jobIn
jobIDsOut chan uuid.UUID jobIDsOut chan uuid.UUID
jobOutRequest chan jobOutRequest jobOutRequest chan jobOutRequest
stopTimeout time.Duration stopTimeout time.Duration
@@ -25,8 +25,13 @@ type executor struct {
locker Locker locker Locker
} }
type jobIn struct {
id uuid.UUID
shouldSendOut bool
}
type singletonRunner struct { type singletonRunner struct {
in chan uuid.UUID in chan jobIn
rescheduleLimiter chan struct{} rescheduleLimiter chan struct{}
} }
@@ -35,7 +40,7 @@ type limitModeConfig struct {
mode LimitMode mode LimitMode
limit uint limit uint
rescheduleLimiter chan struct{} rescheduleLimiter chan struct{}
in chan uuid.UUID in chan jobIn
// singletonJobs is used to track singleton jobs that are running // singletonJobs is used to track singleton jobs that are running
// in the limit mode runner. This is used to prevent the same job // in the limit mode runner. This is used to prevent the same job
// from running multiple times across limit mode runners when both // from running multiple times across limit mode runners when both
@@ -72,7 +77,7 @@ func (e *executor) start() {
// are run immediately. // are run immediately.
// 2. sent from time.AfterFuncs in which job schedules // 2. sent from time.AfterFuncs in which job schedules
// are spun up by the scheduler // are spun up by the scheduler
case id := <-e.jobsIDsIn: case jIn := <-e.jobsIn:
select { select {
case <-e.stopCh: case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg) e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
@@ -111,23 +116,25 @@ func (e *executor) start() {
// the executor from building up a waiting queue // the executor from building up a waiting queue
// and forces rescheduling // and forces rescheduling
case e.limitMode.rescheduleLimiter <- struct{}{}: case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- id e.limitMode.in <- jIn
default: default:
// all runners are busy, reschedule the work for later // all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric // TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut {
select { select {
case e.jobIDsOut <- id: case e.jobIDsOut <- jIn.id:
default: default:
} }
} }
}
} else { } else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait // since we're not using LimitModeReschedule, but instead using LimitModeWait
// we do want to queue up the work to the limit mode runners and allow them // we do want to queue up the work to the limit mode runners and allow them
// to work through the channel backlog. A hard limit of 1000 is in place // to work through the channel backlog. A hard limit of 1000 is in place
// at which point this call would block. // at which point this call would block.
// TODO when metrics are added, this should increment a wait metric // TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- id e.limitMode.in <- jIn
} }
} else { } else {
// no limit mode, so we're either running a regular job or // no limit mode, so we're either running a regular job or
@@ -135,7 +142,7 @@ func (e *executor) start() {
// //
// get the job, so we can figure out what kind it is and how // get the job, so we can figure out what kind it is and how
// to execute it // to execute it
j := requestJobCtx(ctx, id, e.jobOutRequest) j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
if j == nil { if j == nil {
// safety check as it'd be strange bug if this occurred // safety check as it'd be strange bug if this occurred
return return
@@ -143,15 +150,15 @@ func (e *executor) start() {
if j.singletonMode { if j.singletonMode {
// for singleton mode, get the existing runner for the job // for singleton mode, get the existing runner for the job
// or spin up a new one // or spin up a new one
runner, ok := e.singletonRunners[id] runner, ok := e.singletonRunners[jIn.id]
if !ok { if !ok {
runner.in = make(chan uuid.UUID, 1000) runner.in = make(chan jobIn, 1000)
if j.singletonLimitMode == LimitModeReschedule { if j.singletonLimitMode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1) runner.rescheduleLimiter = make(chan struct{}, 1)
} }
e.singletonRunners[id] = runner e.singletonRunners[jIn.id] = runner
singletonJobsWg.Add(1) singletonJobsWg.Add(1)
go e.singletonModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
} }
if j.singletonLimitMode == LimitModeReschedule { if j.singletonLimitMode == LimitModeReschedule {
@@ -159,19 +166,21 @@ func (e *executor) start() {
// for a running job and reschedules if the channel is full. // for a running job and reschedules if the channel is full.
select { select {
case runner.rescheduleLimiter <- struct{}{}: case runner.rescheduleLimiter <- struct{}{}:
runner.in <- id runner.in <- jIn
default: default:
// runner is busy, reschedule the work for later // runner is busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric // TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut {
select { select {
case e.jobIDsOut <- id: case e.jobIDsOut <- jIn.id:
default: default:
} }
} }
}
} else { } else {
// wait mode, fill up that queue (buffered channel, so it's ok) // wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- id runner.in <- jIn
} }
} else { } else {
select { select {
@@ -187,7 +196,7 @@ func (e *executor) start() {
// complete. // complete.
standardJobsWg.Add(1) standardJobsWg.Add(1)
go func(j internalJob) { go func(j internalJob) {
e.runJob(j) e.runJob(j, jIn.shouldSendOut)
standardJobsWg.Done() standardJobsWg.Done()
}(*j) }(*j)
} }
@@ -200,11 +209,11 @@ func (e *executor) start() {
} }
} }
func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name) e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for { for {
select { select {
case id := <-in: case jIn := <-in:
select { select {
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name) e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
@@ -214,16 +223,17 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
} }
ctx, cancel := context.WithCancel(e.ctx) ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, id, e.jobOutRequest) j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel() cancel()
if j != nil { if j != nil {
if j.singletonMode { if j.singletonMode {
e.limitMode.singletonJobsMu.Lock() e.limitMode.singletonJobsMu.Lock()
_, ok := e.limitMode.singletonJobs[id] _, ok := e.limitMode.singletonJobs[jIn.id]
if ok { if ok {
// this job is already running, so don't run it // this job is already running, so don't run it
// but instead reschedule it // but instead reschedule it
e.limitMode.singletonJobsMu.Unlock() e.limitMode.singletonJobsMu.Unlock()
if jIn.shouldSendOut {
select { select {
case <-e.ctx.Done(): case <-e.ctx.Done():
return return
@@ -231,7 +241,10 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
return return
case e.jobIDsOut <- j.id: case e.jobIDsOut <- j.id:
} }
// remove the limiter block to allow another job to be scheduled }
// remove the limiter block, as this particular job
// was a singleton already running, and we want to
// allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { select {
case <-rescheduleLimiter: case <-rescheduleLimiter:
@@ -240,14 +253,14 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
} }
continue continue
} }
e.limitMode.singletonJobs[id] = struct{}{} e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock() e.limitMode.singletonJobsMu.Unlock()
} }
e.runJob(*j) e.runJob(*j, jIn.shouldSendOut)
if j.singletonMode { if j.singletonMode {
e.limitMode.singletonJobsMu.Lock() e.limitMode.singletonJobsMu.Lock()
delete(e.limitMode.singletonJobs, id) delete(e.limitMode.singletonJobs, jIn.id)
e.limitMode.singletonJobsMu.Unlock() e.limitMode.singletonJobsMu.Unlock()
} }
} }
@@ -267,24 +280,24 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
} }
} }
func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name) e.logger.Debug("gocron: singletonModeRunner starting", "name", name)
for { for {
select { select {
case id := <-in: case jIn := <-in:
select { select {
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name) e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name)
wg.Done() wg.Done()
return return
default: default:
} }
ctx, cancel := context.WithCancel(e.ctx) ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, id, e.jobOutRequest) j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel() cancel()
if j != nil { if j != nil {
e.runJob(*j) e.runJob(*j, jIn.shouldSendOut)
} }
// remove the limiter block to allow another job to be scheduled // remove the limiter block to allow another job to be scheduled
@@ -295,14 +308,14 @@ func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitG
} }
} }
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name) e.logger.Debug("singletonModeRunner shutting down", "name", name)
wg.Done() wg.Done()
return return
} }
} }
} }
func (e *executor) runJob(j internalJob) { func (e *executor) runJob(j internalJob, shouldSendOut bool) {
if j.ctx == nil { if j.ctx == nil {
return return
} }
@@ -327,6 +340,7 @@ func (e *executor) runJob(j internalJob) {
} }
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
if shouldSendOut {
select { select {
case <-e.ctx.Done(): case <-e.ctx.Done():
return return
@@ -334,6 +348,7 @@ func (e *executor) runJob(j internalJob) {
return return
case e.jobIDsOut <- j.id: case e.jobIDsOut <- j.id:
} }
}
err := callJobFuncWithParams(j.function, j.parameters...) err := callJobFuncWithParams(j.function, j.parameters...)
if err != nil { if err != nil {
+78
View File
@@ -426,6 +426,48 @@ func MonthlyJob(interval uint, daysOfTheMonth DaysOfTheMonth, atTimes AtTimes) J
} }
} }
var _ JobDefinition = (*oneTimeJobDefinition)(nil)
type oneTimeJobDefinition struct {
startAt OneTimeJobStartAtOption
}
func (o oneTimeJobDefinition) setup(j *internalJob, _ *time.Location) error {
j.jobSchedule = oneTimeJob{}
return o.startAt(j)
}
// OneTimeJobStartAtOption defines when the one time job is run
type OneTimeJobStartAtOption func(*internalJob) error
// OneTimeJobStartImmediately tells the scheduler to run the one time job immediately.
func OneTimeJobStartImmediately() OneTimeJobStartAtOption {
return func(j *internalJob) error {
j.startImmediately = true
return nil
}
}
// OneTimeJobStartDateTime sets the date & time at which the job should run.
// This datetime must be in the future.
func OneTimeJobStartDateTime(start time.Time) OneTimeJobStartAtOption {
return func(j *internalJob) error {
if start.IsZero() || start.Before(time.Now()) {
return ErrOneTimeJobStartDateTimePast
}
j.startTime = start
return nil
}
}
// OneTimeJob is to run a job once at a specified time and not on
// any regular schedule.
func OneTimeJob(startAt OneTimeJobStartAtOption) JobDefinition {
return oneTimeJobDefinition{
startAt: startAt,
}
}
// ----------------------------------------------- // -----------------------------------------------
// ----------------------------------------------- // -----------------------------------------------
// ----------------- Job Options ----------------- // ----------------- Job Options -----------------
@@ -772,6 +814,14 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
return time.Time{} return time.Time{}
} }
var _ jobSchedule = (*oneTimeJob)(nil)
type oneTimeJob struct{}
func (o oneTimeJob) next(_ time.Time) time.Time {
return time.Time{}
}
// ----------------------------------------------- // -----------------------------------------------
// ----------------------------------------------- // -----------------------------------------------
// ---------------- Job Interface ---------------- // ---------------- Job Interface ----------------
@@ -786,6 +836,7 @@ type Job interface {
Name() string Name() string
NextRun() (time.Time, error) NextRun() (time.Time, error)
Tags() []string Tags() []string
RunNow() error
} }
var _ Job = (*job)(nil) var _ Job = (*job)(nil)
@@ -799,6 +850,7 @@ type job struct {
name string name string
tags []string tags []string
jobOutRequest chan jobOutRequest jobOutRequest chan jobOutRequest
runJobRequest chan runJobRequest
} }
// ID returns the job's unique identifier. // ID returns the job's unique identifier.
@@ -833,3 +885,29 @@ func (j job) NextRun() (time.Time, error) {
func (j job) Tags() []string { func (j job) Tags() []string {
return j.tags return j.tags
} }
// RunNow runs the job once, now. This does not alter
// the existing run schedule, and will respect all job
// and scheduler limits.
func (j job) RunNow() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp := make(chan error, 1)
select {
case j.runJobRequest <- runJobRequest{
id: j.id,
outChan: resp,
}:
case <-time.After(100 * time.Millisecond):
return ErrJobRunNowFailed
}
var err error
select {
case <-ctx.Done():
return ErrJobRunNowFailed
case errReceived := <-resp:
err = errReceived
}
return err
}
+19
View File
@@ -319,6 +319,25 @@ func TestDurationRandomJob_next(t *testing.T) {
} }
} }
func TestOneTimeJob_next(t *testing.T) {
otj := oneTimeJob{}
assert.Zero(t, otj.next(time.Time{}))
}
func TestJob_RunNow_Error(t *testing.T) {
s := newTestScheduler(t)
j, err := s.NewJob(
DurationJob(time.Second),
NewTask(func() {}),
)
require.NoError(t, err)
require.NoError(t, s.Shutdown())
assert.EqualError(t, j.RunNow(), ErrJobRunNowFailed.Error())
}
func TestJob_LastRun(t *testing.T) { func TestJob_LastRun(t *testing.T) {
testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local) testTime := time.Date(2000, 1, 1, 0, 0, 0, 0, time.Local)
fakeClock := clockwork.NewFakeClockAt(testTime) fakeClock := clockwork.NewFakeClockAt(testTime)
+61 -7
View File
@@ -49,6 +49,7 @@ type scheduler struct {
stopErrCh chan error stopErrCh chan error
allJobsOutRequest chan allJobsOutRequest allJobsOutRequest chan allJobsOutRequest
jobOutRequestCh chan jobOutRequest jobOutRequestCh chan jobOutRequest
runJobRequestCh chan runJobRequest
newJobCh chan internalJob newJobCh chan internalJob
removeJobCh chan uuid.UUID removeJobCh chan uuid.UUID
removeJobsByTagsCh chan []string removeJobsByTagsCh chan []string
@@ -59,6 +60,11 @@ type jobOutRequest struct {
outChan chan internalJob outChan chan internalJob
} }
type runJobRequest struct {
id uuid.UUID
outChan chan error
}
type allJobsOutRequest struct { type allJobsOutRequest struct {
outChan chan []Job outChan chan []Job
} }
@@ -77,7 +83,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
singletonRunners: make(map[uuid.UUID]singletonRunner), singletonRunners: make(map[uuid.UUID]singletonRunner),
logger: &noOpLogger{}, logger: &noOpLogger{},
jobsIDsIn: make(chan uuid.UUID), jobsIn: make(chan jobIn),
jobIDsOut: make(chan uuid.UUID), jobIDsOut: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000), jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error), done: make(chan error),
@@ -100,6 +106,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stopErrCh: make(chan error, 1), stopErrCh: make(chan error, 1),
jobOutRequestCh: make(chan jobOutRequest), jobOutRequestCh: make(chan jobOutRequest),
runJobRequestCh: make(chan runJobRequest),
allJobsOutRequest: make(chan allJobsOutRequest), allJobsOutRequest: make(chan allJobsOutRequest),
} }
@@ -135,6 +142,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
case out := <-s.allJobsOutRequest: case out := <-s.allJobsOutRequest:
s.selectAllJobsOutRequest(out) s.selectAllJobsOutRequest(out)
case run := <-s.runJobRequestCh:
s.selectRunJobRequest(run)
case <-s.startCh: case <-s.startCh:
s.selectStart() s.selectStart()
@@ -204,6 +214,31 @@ func (s *scheduler) selectAllJobsOutRequest(out allJobsOutRequest) {
} }
} }
func (s *scheduler) selectRunJobRequest(run runJobRequest) {
j, ok := s.jobs[run.id]
if !ok {
select {
case run.outChan <- ErrJobNotFound:
default:
}
}
select {
case <-s.shutdownCtx.Done():
select {
case run.outChan <- ErrJobRunNowFailed:
default:
}
case s.exec.jobsIn <- jobIn{
id: j.id,
shouldSendOut: false,
}:
select {
case run.outChan <- nil:
default:
}
}
}
func (s *scheduler) selectRemoveJob(id uuid.UUID) { func (s *scheduler) selectRemoveJob(id uuid.UUID) {
j, ok := s.jobs[id] j, ok := s.jobs[id]
if !ok { if !ok {
@@ -232,12 +267,18 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
} }
next := j.next(j.lastRun) next := j.next(j.lastRun)
if next.IsZero() {
return
}
j.nextRun = next j.nextRun = next
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
select { select {
case <-s.shutdownCtx.Done(): case <-s.shutdownCtx.Done():
return return
case s.exec.jobsIDsIn <- id: case s.exec.jobsIn <- jobIn{
id: j.id,
shouldSendOut: true,
}:
} }
}) })
s.jobs[id] = j s.jobs[id] = j
@@ -260,7 +301,10 @@ func (s *scheduler) selectNewJob(j internalJob) {
next = s.now() next = s.now()
select { select {
case <-s.shutdownCtx.Done(): case <-s.shutdownCtx.Done():
case s.exec.jobsIDsIn <- j.id: case s.exec.jobsIn <- jobIn{
id: j.id,
shouldSendOut: true,
}:
} }
} else { } else {
if next.IsZero() { if next.IsZero() {
@@ -271,7 +315,10 @@ func (s *scheduler) selectNewJob(j internalJob) {
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
select { select {
case <-s.shutdownCtx.Done(): case <-s.shutdownCtx.Done():
case s.exec.jobsIDsIn <- id: case s.exec.jobsIn <- jobIn{
id: id,
shouldSendOut: true,
}:
} }
}) })
} }
@@ -304,7 +351,10 @@ func (s *scheduler) selectStart() {
next = s.now() next = s.now()
select { select {
case <-s.shutdownCtx.Done(): case <-s.shutdownCtx.Done():
case s.exec.jobsIDsIn <- id: case s.exec.jobsIn <- jobIn{
id: id,
shouldSendOut: true,
}:
} }
} else { } else {
if next.IsZero() { if next.IsZero() {
@@ -315,7 +365,10 @@ func (s *scheduler) selectStart() {
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
select { select {
case <-s.shutdownCtx.Done(): case <-s.shutdownCtx.Done():
case s.exec.jobsIDsIn <- jobID: case s.exec.jobsIn <- jobIn{
id: jobID,
shouldSendOut: true,
}:
} }
}) })
} }
@@ -453,6 +506,7 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
name: j.name, name: j.name,
tags: slices.Clone(j.tags), tags: slices.Clone(j.tags),
jobOutRequest: s.jobOutRequestCh, jobOutRequest: s.jobOutRequestCh,
runJobRequest: s.runJobRequestCh,
}, nil }, nil
} }
@@ -632,7 +686,7 @@ func WithLimitConcurrentJobs(limit uint, mode LimitMode) SchedulerOption {
s.exec.limitMode = &limitModeConfig{ s.exec.limitMode = &limitModeConfig{
mode: mode, mode: mode,
limit: limit, limit: limit,
in: make(chan uuid.UUID, 1000), in: make(chan jobIn, 1000),
singletonJobs: make(map[uuid.UUID]struct{}), singletonJobs: make(map[uuid.UUID]struct{}),
} }
if mode == LimitModeReschedule { if mode == LimitModeReschedule {
+210
View File
@@ -719,6 +719,18 @@ func TestScheduler_NewJobErrors(t *testing.T) {
[]JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))}, []JobOption{WithStartAt(WithStartDateTime(time.Now().Add(-time.Second)))},
ErrWithStartDateTimePast, ErrWithStartDateTimePast,
}, },
{
"oneTimeJob start at is zero",
OneTimeJob(OneTimeJobStartDateTime(time.Time{})),
nil,
ErrOneTimeJobStartDateTimePast,
},
{
"oneTimeJob start at is in past",
OneTimeJob(OneTimeJobStartDateTime(time.Now().Add(-time.Second))),
nil,
ErrOneTimeJobStartDateTimePast,
},
} }
for _, tt := range tests { for _, tt := range tests {
@@ -1425,3 +1437,201 @@ func TestScheduler_ManyJobs(t *testing.T) {
assert.GreaterOrEqual(t, count, 9900) assert.GreaterOrEqual(t, count, 9900)
assert.LessOrEqual(t, count, 11000) assert.LessOrEqual(t, count, 11000)
} }
func TestScheduler_RunJobNow(t *testing.T) {
chDuration := make(chan struct{}, 10)
chMonthly := make(chan struct{}, 10)
chDurationImmediate := make(chan struct{}, 10)
chDurationSingleton := make(chan struct{}, 10)
chOneTime := make(chan struct{}, 10)
tests := []struct {
name string
ch chan struct{}
j JobDefinition
fun any
opts []JobOption
expectedDiff func() time.Duration
expectedRuns int
}{
{
"duration job",
chDuration,
DurationJob(time.Second * 10),
func() {
chDuration <- struct{}{}
},
nil,
func() time.Duration {
return 0
},
1,
},
{
"monthly job",
chMonthly,
MonthlyJob(1, NewDaysOfTheMonth(1), NewAtTimes(NewAtTime(0, 0, 0))),
func() {
chMonthly <- struct{}{}
},
nil,
func() time.Duration {
return 0
},
1,
},
{
"duration job - start immediately",
chDurationImmediate,
DurationJob(time.Second * 10),
func() {
chDurationImmediate <- struct{}{}
},
[]JobOption{
WithStartAt(
WithStartImmediately(),
),
},
func() time.Duration {
return 10 * time.Second
},
2,
},
{
"duration job - singleton",
chDurationSingleton,
DurationJob(time.Second * 10),
func() {
chDurationSingleton <- struct{}{}
time.Sleep(200 * time.Millisecond)
},
[]JobOption{
WithStartAt(
WithStartImmediately(),
),
WithSingletonMode(LimitModeReschedule),
},
func() time.Duration {
return 10 * time.Second
},
1,
},
{
"one time job",
chOneTime,
OneTimeJob(OneTimeJobStartImmediately()),
func() {
chOneTime <- struct{}{}
},
nil,
nil,
2,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t)
j, err := s.NewJob(tt.j, NewTask(tt.fun), tt.opts...)
require.NoError(t, err)
s.Start()
var nextRunBefore time.Time
if tt.expectedDiff != nil {
for ; nextRunBefore.IsZero() || err != nil; nextRunBefore, err = j.NextRun() { //nolint:revive
}
}
assert.NoError(t, err)
time.Sleep(100 * time.Millisecond)
require.NoError(t, j.RunNow())
var runCount int
select {
case <-tt.ch:
runCount++
case <-time.After(time.Second):
t.Fatal("timed out waiting for job to run")
}
timeout := time.Now().Add(time.Second)
for time.Now().Before(timeout) {
select {
case <-tt.ch:
runCount++
default:
}
}
assert.Equal(t, tt.expectedRuns, runCount)
nextRunAfter, err := j.NextRun()
if tt.expectedDiff != nil && tt.expectedDiff() > 0 {
for ; nextRunBefore.IsZero() || nextRunAfter.Equal(nextRunBefore); nextRunAfter, err = j.NextRun() { //nolint:revive
}
}
assert.NoError(t, err)
assert.NoError(t, s.Shutdown())
if tt.expectedDiff != nil {
assert.Equal(t, tt.expectedDiff(), nextRunAfter.Sub(nextRunBefore))
}
})
}
}
func TestScheduler_OneTimeJob(t *testing.T) {
tests := []struct {
name string
startAt func() OneTimeJobStartAtOption
}{
{
"start now",
func() OneTimeJobStartAtOption {
return OneTimeJobStartImmediately()
},
},
{
"start in 100 ms",
func() OneTimeJobStartAtOption {
return OneTimeJobStartDateTime(time.Now().Add(100 * time.Millisecond))
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
jobRan := make(chan struct{}, 2)
s := newTestScheduler(t)
j, err := s.NewJob(
OneTimeJob(tt.startAt()),
NewTask(func() {
jobRan <- struct{}{}
}),
)
require.NoError(t, err)
s.Start()
select {
case <-jobRan:
case <-time.After(500 * time.Millisecond):
t.Fatal("timed out waiting for job to run")
}
var nextRun time.Time
for ; nextRun.IsZero(); nextRun, err = j.NextRun() { //nolint:revive
}
assert.NoError(t, err)
assert.True(t, nextRun.Before(time.Now()))
assert.NoError(t, s.Shutdown())
})
}
}