diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml index b0dc892..becd412 100644 --- a/.github/workflows/go_test.yml +++ b/.github/workflows/go_test.yml @@ -24,12 +24,8 @@ jobs: with: go-version: ${{ matrix.go-version }} - name: golangci-lint - uses: golangci/golangci-lint-action@v3.7.0 + uses: golangci/golangci-lint-action@v4.0.0 with: version: v1.55.2 - name: test - run: make test_coverage - - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v3 - env: - CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + run: make test diff --git a/README.md b/README.md index cf8ba35..ea33c15 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,11 @@ func main() { // start the scheduler s.Start() + // block until you are ready to shut down + select { + case <-time.After(time.Minute): + } + // when you're done, shut it down err = s.Shutdown() if err != nil { @@ -61,6 +66,11 @@ func main() { } ``` +## Examples + +- [Go doc examples](https://pkg.go.dev/github.com/andoma-go/gocron/v2#pkg-examples) +- [Examples directory](examples) + ## Concepts - **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then @@ -111,10 +121,12 @@ Multiple instances of gocron can be run. - [**Elector**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#WithDistributedElector): An elector can be used to elect a single instance of gocron to run as the primary with the other instances checking to see if a new leader needs to be elected. -- Implementations: [andoma-go electors](https://github.com/andoma-go?q=-elector&type=all&language=&sort=) + - Implementations: [andoma-go electors](https://github.com/andoma-go?q=-elector&type=all&language=&sort=) + (don't see what you need? request on slack to get a repo created to contribute it!) - [**Locker**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#WithDistributedLocker): A locker can be used to lock each run of a job to a single instance of gocron. -- Implementations: [andoma-go lockers](https://github.com/andoma-go?q=-lock&type=all&language=&sort=) + - Implementations: [andoma-go lockers](https://github.com/andoma-go?q=-lock&type=all&language=&sort=) + (don't see what you need? request on slack to get a repo created to contribute it!) ### Events @@ -146,6 +158,15 @@ Logs can be enabled. The Logger interface can be implemented with your desired logging library. The provided NewLogger uses the standard library's log package. +### Metrics + +Metrics may be collected from the execution of each job. + +- [**Monitor**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#Monitor): + A monitor can be used to collect metrics for each job from a scheduler. + - Implementations: [andoma-go monitors](https://github.com/andoma-go?q=-monitor&type=all&language=&sort=) + (don't see what you need? request on slack to get a repo created to contribute it!) + ### Testing The gocron library is set up to enable testing. diff --git a/SECURITY.md b/SECURITY.md index 8d4e3b1..e266891 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -2,11 +2,12 @@ ## Supported Versions -The current plan is to maintain version 1 as long as possible incorporating any necessary security patches. +The current plan is to maintain version 2 as long as possible incorporating any necessary security patches. Version 1 is deprecated and will no longer be patched. | Version | Supported | | ------- | ------------------ | -| 1.x.x | :white_check_mark: | +| 1.x.x | :heavy_multiplication_x: | +| 2.x.x | :white_check_mark: | ## Reporting a Vulnerability diff --git a/codecov.yml b/codecov.yml deleted file mode 100644 index 8f46d2d..0000000 --- a/codecov.yml +++ /dev/null @@ -1,5 +0,0 @@ -coverage: - status: - project: - default: - if_ci_failed: success diff --git a/errors.go b/errors.go index 2c9f7c2..e280124 100644 --- a/errors.go +++ b/errors.go @@ -9,6 +9,7 @@ var ( ErrDailyJobAtTimesNil = fmt.Errorf("gocron: DailyJob: atTimes must not be nil") ErrDailyJobHours = fmt.Errorf("gocron: DailyJob: atTimes hours must be between 0 and 23 inclusive") ErrDailyJobMinutesSeconds = fmt.Errorf("gocron: DailyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") + ErrDurationJobIntervalZero = fmt.Errorf("gocron: DurationJob: time interval is 0") ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") ErrJobNotFound = fmt.Errorf("gocron: job not found") @@ -38,6 +39,7 @@ var ( ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") + ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil") ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0") diff --git a/example_test.go b/example_test.go index 04524a0..5dab170 100644 --- a/example_test.go +++ b/example_test.go @@ -367,8 +367,6 @@ func ExampleScheduler_removeByTags() { ) fmt.Println(len(s.Jobs())) - time.Sleep(20 * time.Millisecond) - s.RemoveByTags("tag1", "tag2") fmt.Println(len(s.Jobs())) @@ -391,7 +389,6 @@ func ExampleScheduler_removeJob() { ) fmt.Println(len(s.Jobs())) - time.Sleep(20 * time.Millisecond) _ = s.RemoveJob(j.ID()) @@ -519,7 +516,7 @@ func ExampleWithClock() { } func ExampleWithDistributedElector() { - //var _ Elector = (*myElector)(nil) + //var _ gocron.Elector = (*myElector)(nil) // //type myElector struct{} // @@ -527,15 +524,15 @@ func ExampleWithDistributedElector() { // return nil //} // - //elector := myElector{} + //elector := &myElector{} // - //_, _ = NewScheduler( - // WithDistributedElector(elector), + //_, _ = gocron.NewScheduler( + // gocron.WithDistributedElector(elector), //) } func ExampleWithDistributedLocker() { - //var _ Locker = (*myLocker)(nil) + //var _ gocron.Locker = (*myLocker)(nil) // //type myLocker struct{} // @@ -552,10 +549,10 @@ func ExampleWithDistributedLocker() { // return nil //} // - //locker := myLocker{} + //locker := &myLocker{} // - //_, _ = NewScheduler( - // WithDistributedLocker(locker), + //_, _ = gocron.NewScheduler( + // gocron.WithDistributedLocker(locker), //) } @@ -664,8 +661,8 @@ func ExampleWithLimitedRuns() { s.Start() time.Sleep(100 * time.Millisecond) - fmt.Printf("no jobs in scheduler: %v\n", s.Jobs()) _ = s.StopJobs() + fmt.Printf("no jobs in scheduler: %v\n", s.Jobs()) // Output: // one, 2 // no jobs in scheduler: [] @@ -687,6 +684,69 @@ func ExampleWithLogger() { ) } +func ExampleWithMonitor() { + //type exampleMonitor struct { + // mu sync.Mutex + // counter map[string]int + // time map[string][]time.Duration + //} + // + //func newExampleMonitor() *exampleMonitor { + // return &exampleMonitor{ + // counter: make(map[string]int), + // time: make(map[string][]time.Duration), + //} + //} + // + //func (t *exampleMonitor) IncrementJob(_ uuid.UUID, name string, _ []string, _ JobStatus) { + // t.mu.Lock() + // defer t.mu.Unlock() + // _, ok := t.counter[name] + // if !ok { + // t.counter[name] = 0 + // } + // t.counter[name]++ + //} + // + //func (t *exampleMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, name string, _ []string) { + // t.mu.Lock() + // defer t.mu.Unlock() + // _, ok := t.time[name] + // if !ok { + // t.time[name] = make([]time.Duration, 0) + // } + // t.time[name] = append(t.time[name], endTime.Sub(startTime)) + //} + // + //monitor := newExampleMonitor() + //s, _ := NewScheduler( + // WithMonitor(monitor), + //) + //name := "example" + //_, _ = s.NewJob( + // DurationJob( + // time.Second, + // ), + // NewTask( + // func() { + // time.Sleep(1 * time.Second) + // }, + // ), + // WithName(name), + // WithStartAt( + // WithStartImmediately(), + // ), + //) + //s.Start() + //time.Sleep(5 * time.Second) + //_ = s.Shutdown() + // + //fmt.Printf("Job %q total execute count: %d\n", name, monitor.counter[name]) + //for i, val := range monitor.time[name] { + // fmt.Printf("Job %q execute #%d elapsed %.4f seconds\n", name, i+1, val.Seconds()) + //} +} + func ExampleWithName() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() @@ -748,7 +808,6 @@ func ExampleWithStartAt() { ), ) s.Start() - time.Sleep(20 * time.Millisecond) next, _ := j.NextRun() fmt.Println(next) diff --git a/examples/elector/main.go b/examples/elector/main.go new file mode 100644 index 0000000..403fb1a --- /dev/null +++ b/examples/elector/main.go @@ -0,0 +1,73 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/andoma-go/gocron/v2" +) + +var _ gocron.Elector = (*myElector)(nil) + +type myElector struct { + num int + leader bool +} + +func (m myElector) IsLeader(_ context.Context) error { + if m.leader { + log.Printf("node %d is leader", m.num) + return nil + } + log.Printf("node %d is not leader", m.num) + return fmt.Errorf("not leader") +} + +func main() { + log.SetFlags(log.LstdFlags | log.Lmicroseconds) + + for i := 0; i < 3; i++ { + go func(i int) { + elector := &myElector{ + num: i, + } + if i == 0 { + elector.leader = true + } + + scheduler, err := gocron.NewScheduler( + gocron.WithDistributedElector(elector), + ) + if err != nil { + log.Println(err) + return + } + + _, err = scheduler.NewJob( + gocron.DurationJob(time.Second), + gocron.NewTask(func() { + log.Println("run job") + }), + ) + + if err != nil { + log.Println(err) + return + } + scheduler.Start() + + if i == 0 { + time.Sleep(5 * time.Second) + elector.leader = false + } + if i == 1 { + time.Sleep(5 * time.Second) + elector.leader = true + } + }(i) + } + + select {} // wait forever +} diff --git a/executor.go b/executor.go index 1f013f5..1b51e57 100644 --- a/executor.go +++ b/executor.go @@ -10,19 +10,21 @@ import ( ) type executor struct { - ctx context.Context - cancel context.CancelFunc - logger Logger - stopCh chan struct{} - jobsIn chan jobIn - jobIDsOut chan uuid.UUID - jobOutRequest chan jobOutRequest - stopTimeout time.Duration - done chan error - singletonRunners map[uuid.UUID]singletonRunner - limitMode *limitModeConfig - elector Elector - locker Locker + ctx context.Context + cancel context.CancelFunc + logger Logger + stopCh chan struct{} + jobsIn chan jobIn + jobsOutForRescheduling chan uuid.UUID + jobsOutCompleted chan uuid.UUID + jobOutRequest chan jobOutRequest + stopTimeout time.Duration + done chan error + singletonRunners *sync.Map // map[uuid.UUID]singletonRunner + limitMode *limitModeConfig + elector Elector + locker Locker + monitor Monitor } type jobIn struct { @@ -66,7 +68,7 @@ func (e *executor) start() { limitModeJobsWg := &waitGroupWithMutex{} // create a fresh map for tracking singleton runners - e.singletonRunners = make(map[uuid.UUID]singletonRunner) + e.singletonRunners = &sync.Map{} // start the for leap that is the executor // selecting on channels for work to do @@ -121,12 +123,7 @@ func (e *executor) start() { // all runners are busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - if jIn.shouldSendOut { - select { - case e.jobIDsOut <- jIn.id: - default: - } - } + e.sendOutForRescheduling(&jIn) } } else { // since we're not using LimitModeReschedule, but instead using LimitModeWait @@ -135,6 +132,7 @@ func (e *executor) start() { // at which point this call would block. // TODO when metrics are added, this should increment a wait metric e.limitMode.in <- jIn + e.sendOutForRescheduling(&jIn) } } else { // no limit mode, so we're either running a regular job or @@ -150,15 +148,18 @@ func (e *executor) start() { if j.singletonMode { // for singleton mode, get the existing runner for the job // or spin up a new one - runner, ok := e.singletonRunners[jIn.id] + runner := &singletonRunner{} + runnerSrc, ok := e.singletonRunners.Load(jIn.id) if !ok { runner.in = make(chan jobIn, 1000) if j.singletonLimitMode == LimitModeReschedule { runner.rescheduleLimiter = make(chan struct{}, 1) } - e.singletonRunners[jIn.id] = runner + e.singletonRunners.Store(jIn.id, runner) singletonJobsWg.Add(1) go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) + } else { + runner = runnerSrc.(*singletonRunner) } if j.singletonLimitMode == LimitModeReschedule { @@ -167,20 +168,17 @@ func (e *executor) start() { select { case runner.rescheduleLimiter <- struct{}{}: runner.in <- jIn + e.sendOutForRescheduling(&jIn) default: // runner is busy, reschedule the work for later // which means we just skip it here and do nothing // TODO when metrics are added, this should increment a rescheduled metric - if jIn.shouldSendOut { - select { - case e.jobIDsOut <- jIn.id: - default: - } - } + e.sendOutForRescheduling(&jIn) } } else { // wait mode, fill up that queue (buffered channel, so it's ok) runner.in <- jIn + e.sendOutForRescheduling(&jIn) } } else { select { @@ -196,7 +194,7 @@ func (e *executor) start() { // complete. standardJobsWg.Add(1) go func(j internalJob) { - e.runJob(j, jIn.shouldSendOut) + e.runJob(j, jIn) standardJobsWg.Done() }(*j) } @@ -209,6 +207,20 @@ func (e *executor) start() { } } +func (e *executor) sendOutForRescheduling(jIn *jobIn) { + if jIn.shouldSendOut { + select { + case e.jobsOutForRescheduling <- jIn.id: + case <-e.ctx.Done(): + return + } + } + // we need to set this to false now, because to handle + // non-limit jobs, we send out from the e.runJob function + // and in this case we don't want to send out twice. + jIn.shouldSendOut = false +} + func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { e.logger.Debug("gocron: limitModeRunner starting", "name", name) for { @@ -239,24 +251,21 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith return case <-j.ctx.Done(): return - case e.jobIDsOut <- j.id: + case e.jobsOutForRescheduling <- j.id: } } // remove the limiter block, as this particular job // was a singleton already running, and we want to // allow another job to be scheduled if limitMode == LimitModeReschedule { - select { - case <-rescheduleLimiter: - default: - } + <-rescheduleLimiter } continue } e.limitMode.singletonJobs[jIn.id] = struct{}{} e.limitMode.singletonJobsMu.Unlock() } - e.runJob(*j, jIn.shouldSendOut) + e.runJob(*j, jIn) if j.singletonMode { e.limitMode.singletonJobsMu.Lock() @@ -267,10 +276,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith // remove the limiter block to allow another job to be scheduled if limitMode == LimitModeReschedule { - select { - case <-rescheduleLimiter: - default: - } + <-rescheduleLimiter } case <-e.ctx.Done(): e.logger.Debug("limitModeRunner shutting down", "name", name) @@ -297,15 +303,12 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) cancel() if j != nil { - e.runJob(*j, jIn.shouldSendOut) + e.runJob(*j, jIn) } // remove the limiter block to allow another job to be scheduled if limitMode == LimitModeReschedule { - select { - case <-rescheduleLimiter: - default: - } + <-rescheduleLimiter } case <-e.ctx.Done(): e.logger.Debug("singletonModeRunner shutting down", "name", name) @@ -315,7 +318,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup } } -func (e *executor) runJob(j internalJob, shouldSendOut bool) { +func (e *executor) runJob(j internalJob, jIn jobIn) { if j.ctx == nil { return } @@ -329,32 +332,40 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) { if e.elector != nil { if err := e.elector.IsLeader(j.ctx); err != nil { + e.sendOutForRescheduling(&jIn) return } } else if e.locker != nil { lock, err := e.locker.Lock(j.ctx, j.name) if err != nil { + e.sendOutForRescheduling(&jIn) return } defer func() { _ = lock.Unlock(j.ctx) }() } _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) - if shouldSendOut { - select { - case <-e.ctx.Done(): - return - case <-j.ctx.Done(): - return - case e.jobIDsOut <- j.id: - } + e.sendOutForRescheduling(&jIn) + select { + case e.jobsOutCompleted <- j.id: + case <-e.ctx.Done(): } + startTime := time.Now() err := callJobFuncWithParams(j.function, j.parameters...) + if e.monitor != nil { + e.monitor.RecordJobTiming(startTime, time.Now(), j.id, j.name, j.tags) + } if err != nil { _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) + if e.monitor != nil { + e.monitor.IncrementJob(j.id, j.name, j.tags, Fail) + } } else { _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) + if e.monitor != nil { + e.monitor.IncrementJob(j.id, j.name, j.tags, Success) + } } } diff --git a/go.mod b/go.mod index da9955f..d7b14f9 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/andoma-go/gocron/v2 go 1.20 require ( - github.com/google/uuid v1.5.0 + github.com/google/uuid v1.6.0 github.com/jonboulle/clockwork v0.4.0 github.com/robfig/cron/v3 v3.0.1 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 go.uber.org/goleak v1.3.0 golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 ) diff --git a/go.sum b/go.sum index ee24feb..8cd5c26 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= -github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= @@ -15,8 +15,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= -github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 h1:+iq7lrkxmFNBM7xx+Rae2W6uyPfhPeDWD+n+JgppptE= diff --git a/job.go b/job.go index d25cab4..7641dbe 100644 --- a/job.go +++ b/job.go @@ -24,7 +24,9 @@ type internalJob struct { name string tags []string jobSchedule - lastRun, nextRun time.Time + lastScheduledRun time.Time + nextScheduled time.Time + lastRun time.Time function any parameters []any timer clockwork.Timer @@ -148,6 +150,9 @@ type durationJobDefinition struct { } func (d durationJobDefinition) setup(j *internalJob, _ *time.Location) error { + if d.duration == 0 { + return ErrDurationJobIntervalZero + } j.jobSchedule = &durationJob{duration: d.duration} return nil } @@ -579,8 +584,8 @@ func WithTags(tags ...string) JobOption { // listeners that can be used to listen for job events. type EventListener func(*internalJob) error -// AfterJobRuns is used to listen for when a job has run regardless -// of any returned error value, and run the provided function. +// AfterJobRuns is used to listen for when a job has run +// without an error, and then run the provided function. func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) EventListener { return func(j *internalJob) error { if eventListenerFunc == nil { @@ -678,18 +683,18 @@ func (d dailyJob) next(lastRun time.Time) time.Time { func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time { for _, at := range d.atTimes { - // sub the at time hour/min/sec onto the lastRun's values + // sub the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next day, it's ok to consider - // the same at time that was last run (as lastRun has been incremented) + // the same at time that was last run (as lastScheduledRun has been incremented) return atDate } } @@ -724,18 +729,18 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim // weekDayDiff is used to add the correct amount to the atDate day below weekDayDiff := wd - lastRun.Weekday() for _, at := range w.atTimes { - // sub the at time hour/min/sec onto the lastRun's values + // sub the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next week, it's ok to consider - // the same at time that was last run (as lastRun has been incremented) + // the same at time that was last run (as lastScheduledRun has been incremented) return atDate } } @@ -756,38 +761,43 @@ type monthlyJob struct { func (m monthlyJob) next(lastRun time.Time) time.Time { daysList := make([]int, len(m.days)) copy(daysList, m.days) - firstDayNextMonth := time.Date(lastRun.Year(), lastRun.Month()+1, 1, 0, 0, 0, 0, lastRun.Location()) - for _, daySub := range m.daysFromEnd { - // getting a combined list of all the daysList and the negative daysList - // which count backwards from the first day of the next month - // -1 == the last day of the month - day := firstDayNextMonth.AddDate(0, 0, daySub).Day() - daysList = append(daysList, day) - } - slices.Sort(daysList) - firstPass := true - next := m.nextMonthDayAtTime(lastRun, daysList, firstPass) + daysFromEnd := m.handleNegativeDays(lastRun, daysList, m.daysFromEnd) + next := m.nextMonthDayAtTime(lastRun, daysFromEnd, true) if !next.IsZero() { return next } - firstPass = false from := time.Date(lastRun.Year(), lastRun.Month()+time.Month(m.interval), 1, 0, 0, 0, 0, lastRun.Location()) for next.IsZero() { - next = m.nextMonthDayAtTime(from, daysList, firstPass) + daysFromEnd = m.handleNegativeDays(from, daysList, m.daysFromEnd) + next = m.nextMonthDayAtTime(from, daysFromEnd, false) from = from.AddDate(0, int(m.interval), 0) } return next } +func (m monthlyJob) handleNegativeDays(from time.Time, days, negativeDays []int) []int { + var out []int + // getting a list of the days from the end of the following month + // -1 == the last day of the month + firstDayNextMonth := time.Date(from.Year(), from.Month()+1, 1, 0, 0, 0, 0, from.Location()) + for _, daySub := range negativeDays { + day := firstDayNextMonth.AddDate(0, 0, daySub).Day() + out = append(out, day) + } + out = append(out, days...) + slices.Sort(out) + return out +} + func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass bool) time.Time { // find the next day in the month that should run and then check for an at time for _, day := range days { if day >= lastRun.Day() { for _, at := range m.atTimes { - // sub the day, and the at time hour/min/sec onto the lastRun's values + // sub the day, and the at time hour/min/sec onto the lastScheduledRun's values // to use in checks to see if we've got our next run time atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) @@ -799,12 +809,12 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass if firstPass && atDate.After(lastRun) { // checking to see if it is after i.e. greater than, - // and not greater or equal as our lastRun day/time + // and not greater or equal as our lastScheduledRun day/time // will be in the loop, and we don't want to select it again return atDate } else if !firstPass && !atDate.Before(lastRun) { // now that we're looking at the next month, it's ok to consider - // the same at time that was lastRun (as lastRun has been incremented) + // the same at time that was lastScheduledRun (as lastScheduledRun has been incremented) return atDate } } @@ -841,7 +851,9 @@ type Job interface { NextRun() (time.Time, error) // RunNow runs the job once, now. This does not alter // the existing run schedule, and will respect all job - // and scheduler limits. + // and scheduler limits. This means that running a job now may + // cause the job's regular interval to be rescheduled due to + // the instance being run by RunNow blocking your run limit. RunNow() error // Tags returns the job's string tags. Tags() []string @@ -882,7 +894,7 @@ func (j job) NextRun() (time.Time, error) { if ij == nil || ij.id == uuid.Nil { return time.Time{}, ErrJobNotFound } - return ij.nextRun, nil + return ij.nextScheduled, nil } func (j job) Tags() []string { diff --git a/job_test.go b/job_test.go index fae68db..84c428a 100644 --- a/job_test.go +++ b/job_test.go @@ -257,6 +257,42 @@ func TestMonthlyJob_next(t *testing.T) { time.Date(2000, 8, 31, 5, 30, 0, 0, time.UTC), 244 * 24 * time.Hour, }, + { + "handle -1 with differing month's day count", + 1, + nil, + []int{-1}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2024, 1, 31, 5, 30, 0, 0, time.UTC), + time.Date(2024, 2, 29, 5, 30, 0, 0, time.UTC), + 29 * 24 * time.Hour, + }, + { + "handle -1 with another differing month's day count", + 1, + nil, + []int{-1}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2024, 2, 29, 5, 30, 0, 0, time.UTC), + time.Date(2024, 3, 31, 5, 30, 0, 0, time.UTC), + 31 * 24 * time.Hour, + }, + { + "handle -1 every 3 months next run in February", + 3, + nil, + []int{-1}, + []time.Time{ + time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC), + }, + time.Date(2023, 11, 30, 5, 30, 0, 0, time.UTC), + time.Date(2024, 2, 29, 5, 30, 0, 0, time.UTC), + 91 * 24 * time.Hour, + }, } for _, tt := range tests { diff --git a/monitor.go b/monitor.go new file mode 100644 index 0000000..5491371 --- /dev/null +++ b/monitor.go @@ -0,0 +1,26 @@ +package gocron + +import ( + "time" + + "github.com/google/uuid" +) + +// JobStatus is the status of job run that should be collected with the metric. +type JobStatus string + +// The different statuses of job that can be used. +const ( + Fail JobStatus = "fail" + Success JobStatus = "success" +) + +// Monitor represents the interface to collect jobs metrics. +type Monitor interface { + // IncrementJob will provide details about the job and expects the underlying implementation + // to handle instantiating and incrementing a value + IncrementJob(id uuid.UUID, name string, tags []string, status JobStatus) + // RecordJobTiming will provide details about the job and the timing and expects the underlying implementation + // to handle instantiating and recording the value + RecordJobTiming(startTime, endTime time.Time, id uuid.UUID, name string, tags []string) +} diff --git a/scheduler.go b/scheduler.go index eebb221..dd1323c 100644 --- a/scheduler.go +++ b/scheduler.go @@ -70,11 +70,17 @@ type scheduler struct { allJobsOutRequest chan allJobsOutRequest jobOutRequestCh chan jobOutRequest runJobRequestCh chan runJobRequest - newJobCh chan internalJob + newJobCh chan newJobIn removeJobCh chan uuid.UUID removeJobsByTagsCh chan []string } +type newJobIn struct { + ctx context.Context + cancel context.CancelFunc + job internalJob +} + type jobOutRequest struct { id uuid.UUID outChan chan internalJob @@ -100,13 +106,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { exec := executor{ stopCh: make(chan struct{}), stopTimeout: time.Second * 10, - singletonRunners: make(map[uuid.UUID]singletonRunner), + singletonRunners: nil, logger: &noOpLogger{}, - jobsIn: make(chan jobIn), - jobIDsOut: make(chan uuid.UUID), - jobOutRequest: make(chan jobOutRequest, 1000), - done: make(chan error), + jobsIn: make(chan jobIn), + jobsOutForRescheduling: make(chan uuid.UUID), + jobsOutCompleted: make(chan uuid.UUID), + jobOutRequest: make(chan jobOutRequest, 1000), + done: make(chan error), } s := &scheduler{ @@ -118,7 +125,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { clock: clockwork.NewRealClock(), logger: &noOpLogger{}, - newJobCh: make(chan internalJob), + newJobCh: make(chan newJobIn), removeJobCh: make(chan uuid.UUID), removeJobsByTagsCh: make(chan []string), startCh: make(chan struct{}), @@ -141,11 +148,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) { s.logger.Info("gocron: new scheduler created") for { select { - case id := <-s.exec.jobIDsOut: - s.selectExecJobIDsOut(id) + case id := <-s.exec.jobsOutForRescheduling: + s.selectExecJobsOutForRescheduling(id) - case j := <-s.newJobCh: - s.selectNewJob(j) + case id := <-s.exec.jobsOutCompleted: + s.selectExecJobsOutCompleted(id) + + case in := <-s.newJobCh: + s.selectNewJob(in) case id := <-s.removeJobCh: s.selectRemoveJob(id) @@ -281,9 +291,54 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) { // Jobs coming back from the executor to the scheduler that // need to evaluated for rescheduling. -func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { - j := s.jobs[id] - j.lastRun = j.nextRun +func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + // the job was removed while it was running, and + // so we don't need to reschedule it. + return + } + j.lastScheduledRun = j.nextScheduled + + next := j.next(j.lastScheduledRun) + if next.IsZero() { + // the job's next function will return zero for OneTime jobs. + // since they are one time only, they do not need rescheduling. + return + } + if next.Before(s.now()) { + // in some cases the next run time can be in the past, for example: + // - the time on the machine was incorrect and has been synced with ntp + // - the machine went to sleep, and woke up some time later + // in those cases, we want to increment to the next run in the future + // and schedule the job for that time. + for next.Before(s.now()) { + next = j.next(next) + } + } + j.nextScheduled = next + j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { + // set the actual timer on the job here and listen for + // shut down events so that the job doesn't attempt to + // run if the scheduler has been shutdown. + select { + case <-s.shutdownCtx.Done(): + return + case s.exec.jobsIn <- jobIn{ + id: j.id, + shouldSendOut: true, + }: + } + }) + // update the job with its new next and last run times and timer. + s.jobs[id] = j +} + +func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) { + j, ok := s.jobs[id] + if !ok { + return + } // if the job has a limited number of runs set, we need to // check how many runs have occurred and stop running this @@ -302,37 +357,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { } } - next := j.next(j.lastRun) - if next.IsZero() { - // the job's next function will return zero for OneTime jobs. - // since they are one time only, they do not need rescheduling. - return - } - if next.Before(s.now()) { - // in some cases the next run time can be in the past, for example: - // - the time on the machine was incorrect and has been synced with ntp - // - the machine went to sleep, and woke up some time later - // in those cases, we want to increment to the next run in the future - // and schedule the job for that time. - for next.Before(s.now()) { - next = j.next(next) - } - } - j.nextRun = next - j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() { - // set the actual timer on the job here and listen for - // shut down events so that the job doesn't attempt to - // run if the scheduler has been shutdown. - select { - case <-s.shutdownCtx.Done(): - return - case s.exec.jobsIn <- jobIn{ - id: j.id, - shouldSendOut: true, - }: - } - }) - // update the job with its new next and last run times and timer. + j.lastRun = s.now() s.jobs[id] = j } @@ -346,7 +371,8 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) { close(out.outChan) } -func (s *scheduler) selectNewJob(j internalJob) { +func (s *scheduler) selectNewJob(in newJobIn) { + j := in.job if s.started { next := j.startTime if j.startImmediately { @@ -374,10 +400,11 @@ func (s *scheduler) selectNewJob(j internalJob) { } }) } - j.nextRun = next + j.nextScheduled = next } s.jobs[j.id] = j + in.cancel() } func (s *scheduler) selectRemoveJobsByTags(tags []string) { @@ -424,7 +451,7 @@ func (s *scheduler) selectStart() { } }) } - j.nextRun = next + j.nextScheduled = next s.jobs[id] = j } select { @@ -446,10 +473,11 @@ func (s *scheduler) now() time.Time { func (s *scheduler) jobFromInternalJob(in internalJob) job { return job{ - id: in.id, - name: in.name, - tags: slices.Clone(in.tags), - jobOutRequest: s.jobOutRequestCh, + in.id, + in.name, + slices.Clone(in.tags), + s.jobOutRequestCh, + s.runJobRequestCh, } } @@ -548,9 +576,19 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW return nil, err } + newJobCtx, newJobCancel := context.WithCancel(context.Background()) select { case <-s.shutdownCtx.Done(): - case s.newJobCh <- j: + case s.newJobCh <- newJobIn{ + ctx: newJobCtx, + cancel: newJobCancel, + job: j, + }: + } + + select { + case <-newJobCtx.Done(): + case <-s.shutdownCtx.Done(): } return &job{ @@ -774,3 +812,14 @@ func WithStopTimeout(timeout time.Duration) SchedulerOption { return nil } } + +// WithMonitor sets the metrics provider to be used by the Scheduler. +func WithMonitor(monitor Monitor) SchedulerOption { + return func(s *scheduler) error { + if monitor == nil { + return ErrWithMonitorNil + } + s.exec.monitor = monitor + return nil + } +} diff --git a/scheduler_test.go b/scheduler_test.go index 3df33bb..ec69fae 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -8,7 +8,6 @@ import ( "time" "github.com/google/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -301,9 +300,7 @@ func TestScheduler_StopTimeout(t *testing.T) { require.NoError(t, err) s.Start() - time.Sleep(time.Millisecond * 200) - err = s.Shutdown() - assert.ErrorIs(t, err, ErrStopJobsTimedOut) + assert.ErrorIs(t, err, s.Shutdown()) cancel() time.Sleep(2 * time.Second) }) @@ -332,15 +329,11 @@ func TestScheduler_Shutdown(t *testing.T) { require.NoError(t, err) s.Start() - time.Sleep(50 * time.Millisecond) require.NoError(t, s.StopJobs()) - time.Sleep(200 * time.Millisecond) s.Start() - time.Sleep(50 * time.Millisecond) require.NoError(t, s.Shutdown()) - time.Sleep(200 * time.Millisecond) }) t.Run("calling Job methods after shutdown errors", func(t *testing.T) { @@ -361,7 +354,6 @@ func TestScheduler_Shutdown(t *testing.T) { require.NoError(t, err) s.Start() - time.Sleep(50 * time.Millisecond) require.NoError(t, s.Shutdown()) _, err = j.LastRun() @@ -465,7 +457,6 @@ func TestScheduler_NewJob(t *testing.T) { s.Start() require.NoError(t, s.Shutdown()) - time.Sleep(50 * time.Millisecond) }) } } @@ -487,6 +478,12 @@ func TestScheduler_NewJobErrors(t *testing.T) { nil, ErrCronJobParse, }, + { + "duration job time interval is zero", + DurationJob(0 * time.Second), + nil, + ErrDurationJobIntervalZero, + }, { "random with bad min/max", DurationRandomJob( @@ -908,6 +905,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) { WithStopTimeout(-1), ErrWithStopTimeoutZeroOrNegative, }, + { + "WithMonitorer nil", + WithMonitor(nil), + ErrWithMonitorNil, + }, } for _, tt := range tests { @@ -1144,6 +1146,7 @@ var _ Elector = (*testElector)(nil) type testElector struct { mu sync.Mutex leaderElected bool + notLeader chan struct{} } func (t *testElector) IsLeader(ctx context.Context) error { @@ -1156,6 +1159,7 @@ func (t *testElector) IsLeader(ctx context.Context) error { t.mu.Lock() defer t.mu.Unlock() if t.leaderElected { + t.notLeader <- struct{}{} return fmt.Errorf("already elected leader") } t.leaderElected = true @@ -1167,12 +1171,14 @@ var _ Locker = (*testLocker)(nil) type testLocker struct { mu sync.Mutex jobLocked bool + notLocked chan struct{} } func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) { t.mu.Lock() defer t.mu.Unlock() if t.jobLocked { + t.notLocked <- struct{}{} return nil, fmt.Errorf("job already locked") } t.jobLocked = true @@ -1188,21 +1194,58 @@ func (t testLock) Unlock(_ context.Context) error { } func TestScheduler_WithDistributed(t *testing.T) { + notLocked := make(chan struct{}, 10) + notLeader := make(chan struct{}, 10) + goleak.VerifyNone(t) tests := []struct { - name string - count int - opt SchedulerOption + name string + count int + opt SchedulerOption + assertions func(*testing.T) }{ { "3 schedulers with elector", 3, - WithDistributedElector(&testElector{}), + WithDistributedElector(&testElector{ + notLeader: notLeader, + }), + func(t *testing.T) { + timeout := time.Now().Add(1 * time.Second) + var notLeaderCount int + for { + if time.Now().After(timeout) { + break + } + select { + case <-notLeader: + notLeaderCount++ + default: + } + } + assert.Equal(t, 2, notLeaderCount) + }, }, { "3 schedulers with locker", 3, - WithDistributedLocker(&testLocker{}), + WithDistributedLocker(&testLocker{ + notLocked: notLocked, + }), + func(t *testing.T) { + timeout := time.Now().Add(1 * time.Second) + var notLockedCount int + for { + if time.Now().After(timeout) { + break + } + select { + case <-notLocked: + notLockedCount++ + default: + } + } + }, }, } @@ -1226,6 +1269,7 @@ func TestScheduler_WithDistributed(t *testing.T) { ), NewTask( func() { + time.Sleep(100 * time.Millisecond) jobsRan <- struct{}{} }, ), @@ -1267,6 +1311,8 @@ func TestScheduler_WithDistributed(t *testing.T) { } assert.Equal(t, 1, runCount) + time.Sleep(time.Second) + tt.assertions(t) }) } } @@ -1303,7 +1349,6 @@ func TestScheduler_RemoveJob(t *testing.T) { id = uuid.New() } - time.Sleep(50 * time.Millisecond) err := s.RemoveJob(id) assert.ErrorIs(t, err, err) require.NoError(t, s.Shutdown()) @@ -1311,6 +1356,71 @@ func TestScheduler_RemoveJob(t *testing.T) { } } +func TestScheduler_RemoveLotsOfJobs(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + numJobs int + }{ + { + "10 successes", + 10, + }, + { + "100 successes", + 100, + }, + { + "1000 successes", + 1000, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t) + + var ids []uuid.UUID + for i := 0; i < tt.numJobs; i++ { + j, err := s.NewJob(DurationJob(time.Second), NewTask(func() { time.Sleep(20 * time.Second) })) + require.NoError(t, err) + ids = append(ids, j.ID()) + } + + for _, id := range ids { + err := s.RemoveJob(id) + require.NoError(t, err) + } + + assert.Len(t, s.Jobs(), 0) + require.NoError(t, s.Shutdown()) + }) + } +} + +func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) { + goleak.VerifyNone(t) + s := newTestScheduler(t) + s.Start() + + _, err := s.NewJob( + DurationJob(100*time.Millisecond), + NewTask(func() {}), + WithEventListeners( + BeforeJobRuns( + func(jobID uuid.UUID, jobName string) { + s.RemoveByTags("tag1") + }, + ), + ), + WithTags("tag1"), + ) + require.NoError(t, err) + + time.Sleep(time.Millisecond * 400) + assert.NoError(t, s.Shutdown()) +} + func TestScheduler_WithEventListeners(t *testing.T) { goleak.VerifyNone(t) @@ -1483,7 +1593,7 @@ func TestScheduler_RunJobNow(t *testing.T) { { "duration job - start immediately", chDurationImmediate, - DurationJob(time.Second * 10), + DurationJob(time.Second * 5), func() { chDurationImmediate <- struct{}{} }, @@ -1493,7 +1603,7 @@ func TestScheduler_RunJobNow(t *testing.T) { ), }, func() time.Duration { - return 10 * time.Second + return 5 * time.Second }, 2, }, @@ -1512,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) { WithSingletonMode(LimitModeReschedule), }, func() time.Duration { - return 10 * time.Second + return 20 * time.Second }, 1, }, @@ -1533,9 +1643,10 @@ func TestScheduler_RunJobNow(t *testing.T) { t.Run(tt.name, func(t *testing.T) { s := newTestScheduler(t) - j, err := s.NewJob(tt.j, NewTask(tt.fun), tt.opts...) + _, err := s.NewJob(tt.j, NewTask(tt.fun), tt.opts...) require.NoError(t, err) + j := s.Jobs()[0] s.Start() var nextRunBefore time.Time @@ -1571,6 +1682,7 @@ func TestScheduler_RunJobNow(t *testing.T) { nextRunAfter, err := j.NextRun() if tt.expectedDiff != nil && tt.expectedDiff() > 0 { for ; nextRunBefore.IsZero() || nextRunAfter.Equal(nextRunBefore); nextRunAfter, err = j.NextRun() { //nolint:revive + time.Sleep(100 * time.Millisecond) } } @@ -1584,6 +1696,67 @@ func TestScheduler_RunJobNow(t *testing.T) { } } +func TestScheduler_LastRunSingleton(t *testing.T) { + goleak.VerifyNone(t) + + tests := []struct { + name string + f func(t *testing.T, j Job) + }{ + { + "simple", + func(t *testing.T, j Job) {}, + }, + { + "with runNow", + func(t *testing.T, j Job) { + runTime := time.Now() + assert.NoError(t, j.RunNow()) + + // because we're using wait mode we need to wait here + // to make sure the job queued with RunNow has finished running + time.Sleep(time.Millisecond * 200) + lastRun, err := j.LastRun() + assert.NoError(t, err) + assert.LessOrEqual(t, lastRun.Sub(runTime), time.Millisecond*125) + assert.GreaterOrEqual(t, lastRun.Sub(runTime), time.Millisecond*75) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t) + j, err := s.NewJob( + DurationJob(time.Millisecond*100), + NewTask(func() { + time.Sleep(time.Millisecond * 200) + }), + WithSingletonMode(LimitModeWait), + ) + require.NoError(t, err) + + startTime := time.Now() + s.Start() + + lastRun, err := j.LastRun() + assert.NoError(t, err) + assert.True(t, lastRun.IsZero()) + + time.Sleep(time.Millisecond * 200) + + lastRun, err = j.LastRun() + assert.NoError(t, err) + assert.LessOrEqual(t, lastRun.Sub(startTime), time.Millisecond*125) + assert.GreaterOrEqual(t, lastRun.Sub(startTime), time.Millisecond*75) + + tt.f(t, j) + + assert.NoError(t, s.Shutdown()) + }) + } +} + func TestScheduler_OneTimeJob(t *testing.T) { tests := []struct { name string @@ -1636,6 +1809,81 @@ func TestScheduler_OneTimeJob(t *testing.T) { } } +func TestScheduler_WithLimitedRuns(t *testing.T) { + goleak.VerifyNone(t) + + tests := []struct { + name string + schedulerOpts []SchedulerOption + job JobDefinition + jobOpts []JobOption + runLimit uint + expectedRuns int + }{ + { + "simple", + nil, + DurationJob(time.Millisecond * 100), + nil, + 1, + 1, + }, + { + "OneTimeJob, WithLimitConcurrentJobs", + []SchedulerOption{ + WithLimitConcurrentJobs(1, LimitModeWait), + }, + OneTimeJob(OneTimeJobStartImmediately()), + nil, + 1, + 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := newTestScheduler(t, tt.schedulerOpts...) + + jobRan := make(chan struct{}, 10) + + jobOpts := []JobOption{ + WithLimitedRuns(tt.runLimit), + } + jobOpts = append(jobOpts, tt.jobOpts...) + + _, err := s.NewJob( + tt.job, + NewTask(func() { + jobRan <- struct{}{} + }), + jobOpts..., + ) + require.NoError(t, err) + + s.Start() + time.Sleep(time.Millisecond * 150) + + assert.NoError(t, s.Shutdown()) + + var runCount int + for runCount < tt.expectedRuns { + select { + case <-jobRan: + runCount++ + case <-time.After(time.Second): + t.Fatal("timed out waiting for job to run") + } + } + select { + case <-jobRan: + t.Fatal("job ran more than expected") + default: + } + assert.Equal(t, tt.expectedRuns, runCount) + }) + } +} + func TestScheduler_Jobs(t *testing.T) { tests := []struct { name string @@ -1661,6 +1909,91 @@ func TestScheduler_Jobs(t *testing.T) { jobsSecond := s.Jobs() assert.Equal(t, jobsFirst, jobsSecond) + assert.NoError(t, s.Shutdown()) + }) + } +} + +type testMonitor struct { + mu sync.Mutex + counter map[string]int + time map[string][]time.Duration +} + +func newTestMonitor() *testMonitor { + return &testMonitor{ + counter: make(map[string]int), + time: make(map[string][]time.Duration), + } +} + +func (t *testMonitor) IncrementJob(_ uuid.UUID, name string, _ []string, _ JobStatus) { + t.mu.Lock() + defer t.mu.Unlock() + _, ok := t.counter[name] + if !ok { + t.counter[name] = 0 + } + t.counter[name]++ +} + +func (t *testMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, name string, _ []string) { + t.mu.Lock() + defer t.mu.Unlock() + _, ok := t.time[name] + if !ok { + t.time[name] = make([]time.Duration, 0) + } + t.time[name] = append(t.time[name], endTime.Sub(startTime)) +} + +func TestScheduler_WithMonitor(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + jd JobDefinition + jobName string + }{ + { + "scheduler with monitor", + DurationJob(time.Millisecond * 50), + "job", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ch := make(chan struct{}, 20) + monitor := newTestMonitor() + s := newTestScheduler(t, WithMonitor(monitor)) + + opt := []JobOption{ + WithName(tt.jobName), + WithStartAt( + WithStartImmediately(), + ), + } + _, err := s.NewJob( + tt.jd, + NewTask(func() { + ch <- struct{}{} + }), + opt..., + ) + require.NoError(t, err) + s.Start() + time.Sleep(150 * time.Millisecond) + require.NoError(t, s.Shutdown()) + close(ch) + expectedCount := 0 + for range ch { + expectedCount++ + } + + got := monitor.counter[tt.jobName] + if got != expectedCount { + t.Fatalf("job %q counter expected %d, got %d", tt.jobName, expectedCount, got) + } }) } } diff --git a/util.go b/util.go index 8bff942..18986b3 100644 --- a/util.go +++ b/util.go @@ -44,18 +44,12 @@ func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob { func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob { resp := make(chan internalJob, 1) - select { - case <-ctx.Done(): - return nil - default: - } - select { case ch <- jobOutRequest{ id: id, outChan: resp, }: - default: + case <-ctx.Done(): return nil } var j internalJob