Merge remote-tracking branch 'upstream/v2' into v2

This commit is contained in:
2024-03-28 13:27:49 +03:00
16 changed files with 802 additions and 194 deletions
+2 -6
View File
@@ -24,12 +24,8 @@ jobs:
with: with:
go-version: ${{ matrix.go-version }} go-version: ${{ matrix.go-version }}
- name: golangci-lint - name: golangci-lint
uses: golangci/golangci-lint-action@v3.7.0 uses: golangci/golangci-lint-action@v4.0.0
with: with:
version: v1.55.2 version: v1.55.2
- name: test - name: test
run: make test_coverage run: make test
- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
+23 -2
View File
@@ -53,6 +53,11 @@ func main() {
// start the scheduler // start the scheduler
s.Start() s.Start()
// block until you are ready to shut down
select {
case <-time.After(time.Minute):
}
// when you're done, shut it down // when you're done, shut it down
err = s.Shutdown() err = s.Shutdown()
if err != nil { if err != nil {
@@ -61,6 +66,11 @@ func main() {
} }
``` ```
## Examples
- [Go doc examples](https://pkg.go.dev/github.com/andoma-go/gocron/v2#pkg-examples)
- [Examples directory](examples)
## Concepts ## Concepts
- **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then - **Job**: The job encapsulates a "task", which is made up of a go function and any function parameters. The Job then
@@ -111,10 +121,12 @@ Multiple instances of gocron can be run.
- [**Elector**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#WithDistributedElector): - [**Elector**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#WithDistributedElector):
An elector can be used to elect a single instance of gocron to run as the primary with the An elector can be used to elect a single instance of gocron to run as the primary with the
other instances checking to see if a new leader needs to be elected. other instances checking to see if a new leader needs to be elected.
- Implementations: [andoma-go electors](https://github.com/andoma-go?q=-elector&type=all&language=&sort=) - Implementations: [andoma-go electors](https://github.com/andoma-go?q=-elector&type=all&language=&sort=)
(don't see what you need? request on slack to get a repo created to contribute it!)
- [**Locker**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#WithDistributedLocker): - [**Locker**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#WithDistributedLocker):
A locker can be used to lock each run of a job to a single instance of gocron. A locker can be used to lock each run of a job to a single instance of gocron.
- Implementations: [andoma-go lockers](https://github.com/andoma-go?q=-lock&type=all&language=&sort=) - Implementations: [andoma-go lockers](https://github.com/andoma-go?q=-lock&type=all&language=&sort=)
(don't see what you need? request on slack to get a repo created to contribute it!)
### Events ### Events
@@ -146,6 +158,15 @@ Logs can be enabled.
The Logger interface can be implemented with your desired logging library. The Logger interface can be implemented with your desired logging library.
The provided NewLogger uses the standard library's log package. The provided NewLogger uses the standard library's log package.
### Metrics
Metrics may be collected from the execution of each job.
- [**Monitor**](https://pkg.go.dev/github.com/andoma-go/gocron/v2#Monitor):
A monitor can be used to collect metrics for each job from a scheduler.
- Implementations: [andoma-go monitors](https://github.com/andoma-go?q=-monitor&type=all&language=&sort=)
(don't see what you need? request on slack to get a repo created to contribute it!)
### Testing ### Testing
The gocron library is set up to enable testing. The gocron library is set up to enable testing.
+3 -2
View File
@@ -2,11 +2,12 @@
## Supported Versions ## Supported Versions
The current plan is to maintain version 1 as long as possible incorporating any necessary security patches. The current plan is to maintain version 2 as long as possible incorporating any necessary security patches. Version 1 is deprecated and will no longer be patched.
| Version | Supported | | Version | Supported |
| ------- | ------------------ | | ------- | ------------------ |
| 1.x.x | :white_check_mark: | | 1.x.x | :heavy_multiplication_x: |
| 2.x.x | :white_check_mark: |
## Reporting a Vulnerability ## Reporting a Vulnerability
-5
View File
@@ -1,5 +0,0 @@
coverage:
status:
project:
default:
if_ci_failed: success
+2
View File
@@ -9,6 +9,7 @@ var (
ErrDailyJobAtTimesNil = fmt.Errorf("gocron: DailyJob: atTimes must not be nil") ErrDailyJobAtTimesNil = fmt.Errorf("gocron: DailyJob: atTimes must not be nil")
ErrDailyJobHours = fmt.Errorf("gocron: DailyJob: atTimes hours must be between 0 and 23 inclusive") ErrDailyJobHours = fmt.Errorf("gocron: DailyJob: atTimes hours must be between 0 and 23 inclusive")
ErrDailyJobMinutesSeconds = fmt.Errorf("gocron: DailyJob: atTimes minutes and seconds must be between 0 and 59 inclusive") ErrDailyJobMinutesSeconds = fmt.Errorf("gocron: DailyJob: atTimes minutes and seconds must be between 0 and 59 inclusive")
ErrDurationJobIntervalZero = fmt.Errorf("gocron: DurationJob: time interval is 0")
ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration") ErrDurationRandomJobMinMax = fmt.Errorf("gocron: DurationRandomJob: minimum duration must be less than maximum duration")
ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil") ErrEventListenerFuncNil = fmt.Errorf("gocron: eventListenerFunc must not be nil")
ErrJobNotFound = fmt.Errorf("gocron: job not found") ErrJobNotFound = fmt.Errorf("gocron: job not found")
@@ -38,6 +39,7 @@ var (
ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0")
ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil")
ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil")
ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil")
ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty")
ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past")
ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0") ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0")
+72 -13
View File
@@ -367,8 +367,6 @@ func ExampleScheduler_removeByTags() {
) )
fmt.Println(len(s.Jobs())) fmt.Println(len(s.Jobs()))
time.Sleep(20 * time.Millisecond)
s.RemoveByTags("tag1", "tag2") s.RemoveByTags("tag1", "tag2")
fmt.Println(len(s.Jobs())) fmt.Println(len(s.Jobs()))
@@ -391,7 +389,6 @@ func ExampleScheduler_removeJob() {
) )
fmt.Println(len(s.Jobs())) fmt.Println(len(s.Jobs()))
time.Sleep(20 * time.Millisecond)
_ = s.RemoveJob(j.ID()) _ = s.RemoveJob(j.ID())
@@ -519,7 +516,7 @@ func ExampleWithClock() {
} }
func ExampleWithDistributedElector() { func ExampleWithDistributedElector() {
//var _ Elector = (*myElector)(nil) //var _ gocron.Elector = (*myElector)(nil)
// //
//type myElector struct{} //type myElector struct{}
// //
@@ -527,15 +524,15 @@ func ExampleWithDistributedElector() {
// return nil // return nil
//} //}
// //
//elector := myElector{} //elector := &myElector{}
// //
//_, _ = NewScheduler( //_, _ = gocron.NewScheduler(
// WithDistributedElector(elector), // gocron.WithDistributedElector(elector),
//) //)
} }
func ExampleWithDistributedLocker() { func ExampleWithDistributedLocker() {
//var _ Locker = (*myLocker)(nil) //var _ gocron.Locker = (*myLocker)(nil)
// //
//type myLocker struct{} //type myLocker struct{}
// //
@@ -552,10 +549,10 @@ func ExampleWithDistributedLocker() {
// return nil // return nil
//} //}
// //
//locker := myLocker{} //locker := &myLocker{}
// //
//_, _ = NewScheduler( //_, _ = gocron.NewScheduler(
// WithDistributedLocker(locker), // gocron.WithDistributedLocker(locker),
//) //)
} }
@@ -664,8 +661,8 @@ func ExampleWithLimitedRuns() {
s.Start() s.Start()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
fmt.Printf("no jobs in scheduler: %v\n", s.Jobs())
_ = s.StopJobs() _ = s.StopJobs()
fmt.Printf("no jobs in scheduler: %v\n", s.Jobs())
// Output: // Output:
// one, 2 // one, 2
// no jobs in scheduler: [] // no jobs in scheduler: []
@@ -687,6 +684,69 @@ func ExampleWithLogger() {
) )
} }
func ExampleWithMonitor() {
//type exampleMonitor struct {
// mu sync.Mutex
// counter map[string]int
// time map[string][]time.Duration
//}
//
//func newExampleMonitor() *exampleMonitor {
// return &exampleMonitor{
// counter: make(map[string]int),
// time: make(map[string][]time.Duration),
//}
//}
//
//func (t *exampleMonitor) IncrementJob(_ uuid.UUID, name string, _ []string, _ JobStatus) {
// t.mu.Lock()
// defer t.mu.Unlock()
// _, ok := t.counter[name]
// if !ok {
// t.counter[name] = 0
// }
// t.counter[name]++
//}
//
//func (t *exampleMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, name string, _ []string) {
// t.mu.Lock()
// defer t.mu.Unlock()
// _, ok := t.time[name]
// if !ok {
// t.time[name] = make([]time.Duration, 0)
// }
// t.time[name] = append(t.time[name], endTime.Sub(startTime))
//}
//
//monitor := newExampleMonitor()
//s, _ := NewScheduler(
// WithMonitor(monitor),
//)
//name := "example"
//_, _ = s.NewJob(
// DurationJob(
// time.Second,
// ),
// NewTask(
// func() {
// time.Sleep(1 * time.Second)
// },
// ),
// WithName(name),
// WithStartAt(
// WithStartImmediately(),
// ),
//)
//s.Start()
//time.Sleep(5 * time.Second)
//_ = s.Shutdown()
//
//fmt.Printf("Job %q total execute count: %d\n", name, monitor.counter[name])
//for i, val := range monitor.time[name] {
// fmt.Printf("Job %q execute #%d elapsed %.4f seconds\n", name, i+1, val.Seconds())
//}
}
func ExampleWithName() { func ExampleWithName() {
s, _ := NewScheduler() s, _ := NewScheduler()
defer func() { _ = s.Shutdown() }() defer func() { _ = s.Shutdown() }()
@@ -748,7 +808,6 @@ func ExampleWithStartAt() {
), ),
) )
s.Start() s.Start()
time.Sleep(20 * time.Millisecond)
next, _ := j.NextRun() next, _ := j.NextRun()
fmt.Println(next) fmt.Println(next)
+73
View File
@@ -0,0 +1,73 @@
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/andoma-go/gocron/v2"
)
var _ gocron.Elector = (*myElector)(nil)
type myElector struct {
num int
leader bool
}
func (m myElector) IsLeader(_ context.Context) error {
if m.leader {
log.Printf("node %d is leader", m.num)
return nil
}
log.Printf("node %d is not leader", m.num)
return fmt.Errorf("not leader")
}
func main() {
log.SetFlags(log.LstdFlags | log.Lmicroseconds)
for i := 0; i < 3; i++ {
go func(i int) {
elector := &myElector{
num: i,
}
if i == 0 {
elector.leader = true
}
scheduler, err := gocron.NewScheduler(
gocron.WithDistributedElector(elector),
)
if err != nil {
log.Println(err)
return
}
_, err = scheduler.NewJob(
gocron.DurationJob(time.Second),
gocron.NewTask(func() {
log.Println("run job")
}),
)
if err != nil {
log.Println(err)
return
}
scheduler.Start()
if i == 0 {
time.Sleep(5 * time.Second)
elector.leader = false
}
if i == 1 {
time.Sleep(5 * time.Second)
elector.leader = true
}
}(i)
}
select {} // wait forever
}
+64 -53
View File
@@ -10,19 +10,21 @@ import (
) )
type executor struct { type executor struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
logger Logger logger Logger
stopCh chan struct{} stopCh chan struct{}
jobsIn chan jobIn jobsIn chan jobIn
jobIDsOut chan uuid.UUID jobsOutForRescheduling chan uuid.UUID
jobOutRequest chan jobOutRequest jobsOutCompleted chan uuid.UUID
stopTimeout time.Duration jobOutRequest chan jobOutRequest
done chan error stopTimeout time.Duration
singletonRunners map[uuid.UUID]singletonRunner done chan error
limitMode *limitModeConfig singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
elector Elector limitMode *limitModeConfig
locker Locker elector Elector
locker Locker
monitor Monitor
} }
type jobIn struct { type jobIn struct {
@@ -66,7 +68,7 @@ func (e *executor) start() {
limitModeJobsWg := &waitGroupWithMutex{} limitModeJobsWg := &waitGroupWithMutex{}
// create a fresh map for tracking singleton runners // create a fresh map for tracking singleton runners
e.singletonRunners = make(map[uuid.UUID]singletonRunner) e.singletonRunners = &sync.Map{}
// start the for leap that is the executor // start the for leap that is the executor
// selecting on channels for work to do // selecting on channels for work to do
@@ -121,12 +123,7 @@ func (e *executor) start() {
// all runners are busy, reschedule the work for later // all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric // TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut { e.sendOutForRescheduling(&jIn)
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
} }
} else { } else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait // since we're not using LimitModeReschedule, but instead using LimitModeWait
@@ -135,6 +132,7 @@ func (e *executor) start() {
// at which point this call would block. // at which point this call would block.
// TODO when metrics are added, this should increment a wait metric // TODO when metrics are added, this should increment a wait metric
e.limitMode.in <- jIn e.limitMode.in <- jIn
e.sendOutForRescheduling(&jIn)
} }
} else { } else {
// no limit mode, so we're either running a regular job or // no limit mode, so we're either running a regular job or
@@ -150,15 +148,18 @@ func (e *executor) start() {
if j.singletonMode { if j.singletonMode {
// for singleton mode, get the existing runner for the job // for singleton mode, get the existing runner for the job
// or spin up a new one // or spin up a new one
runner, ok := e.singletonRunners[jIn.id] runner := &singletonRunner{}
runnerSrc, ok := e.singletonRunners.Load(jIn.id)
if !ok { if !ok {
runner.in = make(chan jobIn, 1000) runner.in = make(chan jobIn, 1000)
if j.singletonLimitMode == LimitModeReschedule { if j.singletonLimitMode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1) runner.rescheduleLimiter = make(chan struct{}, 1)
} }
e.singletonRunners[jIn.id] = runner e.singletonRunners.Store(jIn.id, runner)
singletonJobsWg.Add(1) singletonJobsWg.Add(1)
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter) go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
} else {
runner = runnerSrc.(*singletonRunner)
} }
if j.singletonLimitMode == LimitModeReschedule { if j.singletonLimitMode == LimitModeReschedule {
@@ -167,20 +168,17 @@ func (e *executor) start() {
select { select {
case runner.rescheduleLimiter <- struct{}{}: case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn runner.in <- jIn
e.sendOutForRescheduling(&jIn)
default: default:
// runner is busy, reschedule the work for later // runner is busy, reschedule the work for later
// which means we just skip it here and do nothing // which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric // TODO when metrics are added, this should increment a rescheduled metric
if jIn.shouldSendOut { e.sendOutForRescheduling(&jIn)
select {
case e.jobIDsOut <- jIn.id:
default:
}
}
} }
} else { } else {
// wait mode, fill up that queue (buffered channel, so it's ok) // wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn runner.in <- jIn
e.sendOutForRescheduling(&jIn)
} }
} else { } else {
select { select {
@@ -196,7 +194,7 @@ func (e *executor) start() {
// complete. // complete.
standardJobsWg.Add(1) standardJobsWg.Add(1)
go func(j internalJob) { go func(j internalJob) {
e.runJob(j, jIn.shouldSendOut) e.runJob(j, jIn)
standardJobsWg.Done() standardJobsWg.Done()
}(*j) }(*j)
} }
@@ -209,6 +207,20 @@ func (e *executor) start() {
} }
} }
func (e *executor) sendOutForRescheduling(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobsOutForRescheduling <- jIn.id:
case <-e.ctx.Done():
return
}
}
// we need to set this to false now, because to handle
// non-limit jobs, we send out from the e.runJob function
// and in this case we don't want to send out twice.
jIn.shouldSendOut = false
}
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) { func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name) e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for { for {
@@ -239,24 +251,21 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
return return
case <-j.ctx.Done(): case <-j.ctx.Done():
return return
case e.jobIDsOut <- j.id: case e.jobsOutForRescheduling <- j.id:
} }
} }
// remove the limiter block, as this particular job // remove the limiter block, as this particular job
// was a singleton already running, and we want to // was a singleton already running, and we want to
// allow another job to be scheduled // allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { <-rescheduleLimiter
case <-rescheduleLimiter:
default:
}
} }
continue continue
} }
e.limitMode.singletonJobs[jIn.id] = struct{}{} e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock() e.limitMode.singletonJobsMu.Unlock()
} }
e.runJob(*j, jIn.shouldSendOut) e.runJob(*j, jIn)
if j.singletonMode { if j.singletonMode {
e.limitMode.singletonJobsMu.Lock() e.limitMode.singletonJobsMu.Lock()
@@ -267,10 +276,7 @@ func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWith
// remove the limiter block to allow another job to be scheduled // remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { <-rescheduleLimiter
case <-rescheduleLimiter:
default:
}
} }
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name) e.logger.Debug("limitModeRunner shutting down", "name", name)
@@ -297,15 +303,12 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest) j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel() cancel()
if j != nil { if j != nil {
e.runJob(*j, jIn.shouldSendOut) e.runJob(*j, jIn)
} }
// remove the limiter block to allow another job to be scheduled // remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule { if limitMode == LimitModeReschedule {
select { <-rescheduleLimiter
case <-rescheduleLimiter:
default:
}
} }
case <-e.ctx.Done(): case <-e.ctx.Done():
e.logger.Debug("singletonModeRunner shutting down", "name", name) e.logger.Debug("singletonModeRunner shutting down", "name", name)
@@ -315,7 +318,7 @@ func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroup
} }
} }
func (e *executor) runJob(j internalJob, shouldSendOut bool) { func (e *executor) runJob(j internalJob, jIn jobIn) {
if j.ctx == nil { if j.ctx == nil {
return return
} }
@@ -329,32 +332,40 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) {
if e.elector != nil { if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil { if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutForRescheduling(&jIn)
return return
} }
} else if e.locker != nil { } else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name) lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil { if err != nil {
e.sendOutForRescheduling(&jIn)
return return
} }
defer func() { _ = lock.Unlock(j.ctx) }() defer func() { _ = lock.Unlock(j.ctx) }()
} }
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name) _ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
if shouldSendOut { e.sendOutForRescheduling(&jIn)
select { select {
case <-e.ctx.Done(): case e.jobsOutCompleted <- j.id:
return case <-e.ctx.Done():
case <-j.ctx.Done():
return
case e.jobIDsOut <- j.id:
}
} }
startTime := time.Now()
err := callJobFuncWithParams(j.function, j.parameters...) err := callJobFuncWithParams(j.function, j.parameters...)
if e.monitor != nil {
e.monitor.RecordJobTiming(startTime, time.Now(), j.id, j.name, j.tags)
}
if err != nil { if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
if e.monitor != nil {
e.monitor.IncrementJob(j.id, j.name, j.tags, Fail)
}
} else { } else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
if e.monitor != nil {
e.monitor.IncrementJob(j.id, j.name, j.tags, Success)
}
} }
} }
+2 -2
View File
@@ -3,10 +3,10 @@ module github.com/andoma-go/gocron/v2
go 1.20 go 1.20
require ( require (
github.com/google/uuid v1.5.0 github.com/google/uuid v1.6.0
github.com/jonboulle/clockwork v0.4.0 github.com/jonboulle/clockwork v0.4.0
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.8.4 github.com/stretchr/testify v1.9.0
go.uber.org/goleak v1.3.0 go.uber.org/goleak v1.3.0
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 golang.org/x/exp v0.0.0-20231219180239-dc181d75b848
) )
+4 -4
View File
@@ -1,8 +1,8 @@
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4=
github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc=
github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI=
@@ -15,8 +15,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 h1:+iq7lrkxmFNBM7xx+Rae2W6uyPfhPeDWD+n+JgppptE= golang.org/x/exp v0.0.0-20231219180239-dc181d75b848 h1:+iq7lrkxmFNBM7xx+Rae2W6uyPfhPeDWD+n+JgppptE=
+39 -27
View File
@@ -24,7 +24,9 @@ type internalJob struct {
name string name string
tags []string tags []string
jobSchedule jobSchedule
lastRun, nextRun time.Time lastScheduledRun time.Time
nextScheduled time.Time
lastRun time.Time
function any function any
parameters []any parameters []any
timer clockwork.Timer timer clockwork.Timer
@@ -148,6 +150,9 @@ type durationJobDefinition struct {
} }
func (d durationJobDefinition) setup(j *internalJob, _ *time.Location) error { func (d durationJobDefinition) setup(j *internalJob, _ *time.Location) error {
if d.duration == 0 {
return ErrDurationJobIntervalZero
}
j.jobSchedule = &durationJob{duration: d.duration} j.jobSchedule = &durationJob{duration: d.duration}
return nil return nil
} }
@@ -579,8 +584,8 @@ func WithTags(tags ...string) JobOption {
// listeners that can be used to listen for job events. // listeners that can be used to listen for job events.
type EventListener func(*internalJob) error type EventListener func(*internalJob) error
// AfterJobRuns is used to listen for when a job has run regardless // AfterJobRuns is used to listen for when a job has run
// of any returned error value, and run the provided function. // without an error, and then run the provided function.
func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) EventListener { func AfterJobRuns(eventListenerFunc func(jobID uuid.UUID, jobName string)) EventListener {
return func(j *internalJob) error { return func(j *internalJob) error {
if eventListenerFunc == nil { if eventListenerFunc == nil {
@@ -678,18 +683,18 @@ func (d dailyJob) next(lastRun time.Time) time.Time {
func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time { func (d dailyJob) nextDay(lastRun time.Time, firstPass bool) time.Time {
for _, at := range d.atTimes { for _, at := range d.atTimes {
// sub the at time hour/min/sec onto the lastRun's values // sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time // to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day(), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())
if firstPass && atDate.After(lastRun) { if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than, // checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time // and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again // will be in the loop, and we don't want to select it again
return atDate return atDate
} else if !firstPass && !atDate.Before(lastRun) { } else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next day, it's ok to consider // now that we're looking at the next day, it's ok to consider
// the same at time that was last run (as lastRun has been incremented) // the same at time that was last run (as lastScheduledRun has been incremented)
return atDate return atDate
} }
} }
@@ -724,18 +729,18 @@ func (w weeklyJob) nextWeekDayAtTime(lastRun time.Time, firstPass bool) time.Tim
// weekDayDiff is used to add the correct amount to the atDate day below // weekDayDiff is used to add the correct amount to the atDate day below
weekDayDiff := wd - lastRun.Weekday() weekDayDiff := wd - lastRun.Weekday()
for _, at := range w.atTimes { for _, at := range w.atTimes {
// sub the at time hour/min/sec onto the lastRun's values // sub the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time // to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) atDate := time.Date(lastRun.Year(), lastRun.Month(), lastRun.Day()+int(weekDayDiff), at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())
if firstPass && atDate.After(lastRun) { if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than, // checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time // and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again // will be in the loop, and we don't want to select it again
return atDate return atDate
} else if !firstPass && !atDate.Before(lastRun) { } else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next week, it's ok to consider // now that we're looking at the next week, it's ok to consider
// the same at time that was last run (as lastRun has been incremented) // the same at time that was last run (as lastScheduledRun has been incremented)
return atDate return atDate
} }
} }
@@ -756,38 +761,43 @@ type monthlyJob struct {
func (m monthlyJob) next(lastRun time.Time) time.Time { func (m monthlyJob) next(lastRun time.Time) time.Time {
daysList := make([]int, len(m.days)) daysList := make([]int, len(m.days))
copy(daysList, m.days) copy(daysList, m.days)
firstDayNextMonth := time.Date(lastRun.Year(), lastRun.Month()+1, 1, 0, 0, 0, 0, lastRun.Location())
for _, daySub := range m.daysFromEnd {
// getting a combined list of all the daysList and the negative daysList
// which count backwards from the first day of the next month
// -1 == the last day of the month
day := firstDayNextMonth.AddDate(0, 0, daySub).Day()
daysList = append(daysList, day)
}
slices.Sort(daysList)
firstPass := true daysFromEnd := m.handleNegativeDays(lastRun, daysList, m.daysFromEnd)
next := m.nextMonthDayAtTime(lastRun, daysList, firstPass) next := m.nextMonthDayAtTime(lastRun, daysFromEnd, true)
if !next.IsZero() { if !next.IsZero() {
return next return next
} }
firstPass = false
from := time.Date(lastRun.Year(), lastRun.Month()+time.Month(m.interval), 1, 0, 0, 0, 0, lastRun.Location()) from := time.Date(lastRun.Year(), lastRun.Month()+time.Month(m.interval), 1, 0, 0, 0, 0, lastRun.Location())
for next.IsZero() { for next.IsZero() {
next = m.nextMonthDayAtTime(from, daysList, firstPass) daysFromEnd = m.handleNegativeDays(from, daysList, m.daysFromEnd)
next = m.nextMonthDayAtTime(from, daysFromEnd, false)
from = from.AddDate(0, int(m.interval), 0) from = from.AddDate(0, int(m.interval), 0)
} }
return next return next
} }
func (m monthlyJob) handleNegativeDays(from time.Time, days, negativeDays []int) []int {
var out []int
// getting a list of the days from the end of the following month
// -1 == the last day of the month
firstDayNextMonth := time.Date(from.Year(), from.Month()+1, 1, 0, 0, 0, 0, from.Location())
for _, daySub := range negativeDays {
day := firstDayNextMonth.AddDate(0, 0, daySub).Day()
out = append(out, day)
}
out = append(out, days...)
slices.Sort(out)
return out
}
func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass bool) time.Time { func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass bool) time.Time {
// find the next day in the month that should run and then check for an at time // find the next day in the month that should run and then check for an at time
for _, day := range days { for _, day := range days {
if day >= lastRun.Day() { if day >= lastRun.Day() {
for _, at := range m.atTimes { for _, at := range m.atTimes {
// sub the day, and the at time hour/min/sec onto the lastRun's values // sub the day, and the at time hour/min/sec onto the lastScheduledRun's values
// to use in checks to see if we've got our next run time // to use in checks to see if we've got our next run time
atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location()) atDate := time.Date(lastRun.Year(), lastRun.Month(), day, at.Hour(), at.Minute(), at.Second(), lastRun.Nanosecond(), lastRun.Location())
@@ -799,12 +809,12 @@ func (m monthlyJob) nextMonthDayAtTime(lastRun time.Time, days []int, firstPass
if firstPass && atDate.After(lastRun) { if firstPass && atDate.After(lastRun) {
// checking to see if it is after i.e. greater than, // checking to see if it is after i.e. greater than,
// and not greater or equal as our lastRun day/time // and not greater or equal as our lastScheduledRun day/time
// will be in the loop, and we don't want to select it again // will be in the loop, and we don't want to select it again
return atDate return atDate
} else if !firstPass && !atDate.Before(lastRun) { } else if !firstPass && !atDate.Before(lastRun) {
// now that we're looking at the next month, it's ok to consider // now that we're looking at the next month, it's ok to consider
// the same at time that was lastRun (as lastRun has been incremented) // the same at time that was lastScheduledRun (as lastScheduledRun has been incremented)
return atDate return atDate
} }
} }
@@ -841,7 +851,9 @@ type Job interface {
NextRun() (time.Time, error) NextRun() (time.Time, error)
// RunNow runs the job once, now. This does not alter // RunNow runs the job once, now. This does not alter
// the existing run schedule, and will respect all job // the existing run schedule, and will respect all job
// and scheduler limits. // and scheduler limits. This means that running a job now may
// cause the job's regular interval to be rescheduled due to
// the instance being run by RunNow blocking your run limit.
RunNow() error RunNow() error
// Tags returns the job's string tags. // Tags returns the job's string tags.
Tags() []string Tags() []string
@@ -882,7 +894,7 @@ func (j job) NextRun() (time.Time, error) {
if ij == nil || ij.id == uuid.Nil { if ij == nil || ij.id == uuid.Nil {
return time.Time{}, ErrJobNotFound return time.Time{}, ErrJobNotFound
} }
return ij.nextRun, nil return ij.nextScheduled, nil
} }
func (j job) Tags() []string { func (j job) Tags() []string {
+36
View File
@@ -257,6 +257,42 @@ func TestMonthlyJob_next(t *testing.T) {
time.Date(2000, 8, 31, 5, 30, 0, 0, time.UTC), time.Date(2000, 8, 31, 5, 30, 0, 0, time.UTC),
244 * 24 * time.Hour, 244 * 24 * time.Hour,
}, },
{
"handle -1 with differing month's day count",
1,
nil,
[]int{-1},
[]time.Time{
time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC),
},
time.Date(2024, 1, 31, 5, 30, 0, 0, time.UTC),
time.Date(2024, 2, 29, 5, 30, 0, 0, time.UTC),
29 * 24 * time.Hour,
},
{
"handle -1 with another differing month's day count",
1,
nil,
[]int{-1},
[]time.Time{
time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC),
},
time.Date(2024, 2, 29, 5, 30, 0, 0, time.UTC),
time.Date(2024, 3, 31, 5, 30, 0, 0, time.UTC),
31 * 24 * time.Hour,
},
{
"handle -1 every 3 months next run in February",
3,
nil,
[]int{-1},
[]time.Time{
time.Date(0, 0, 0, 5, 30, 0, 0, time.UTC),
},
time.Date(2023, 11, 30, 5, 30, 0, 0, time.UTC),
time.Date(2024, 2, 29, 5, 30, 0, 0, time.UTC),
91 * 24 * time.Hour,
},
} }
for _, tt := range tests { for _, tt := range tests {
+26
View File
@@ -0,0 +1,26 @@
package gocron
import (
"time"
"github.com/google/uuid"
)
// JobStatus is the status of job run that should be collected with the metric.
type JobStatus string
// The different statuses of job that can be used.
const (
Fail JobStatus = "fail"
Success JobStatus = "success"
)
// Monitor represents the interface to collect jobs metrics.
type Monitor interface {
// IncrementJob will provide details about the job and expects the underlying implementation
// to handle instantiating and incrementing a value
IncrementJob(id uuid.UUID, name string, tags []string, status JobStatus)
// RecordJobTiming will provide details about the job and the timing and expects the underlying implementation
// to handle instantiating and recording the value
RecordJobTiming(startTime, endTime time.Time, id uuid.UUID, name string, tags []string)
}
+102 -53
View File
@@ -70,11 +70,17 @@ type scheduler struct {
allJobsOutRequest chan allJobsOutRequest allJobsOutRequest chan allJobsOutRequest
jobOutRequestCh chan jobOutRequest jobOutRequestCh chan jobOutRequest
runJobRequestCh chan runJobRequest runJobRequestCh chan runJobRequest
newJobCh chan internalJob newJobCh chan newJobIn
removeJobCh chan uuid.UUID removeJobCh chan uuid.UUID
removeJobsByTagsCh chan []string removeJobsByTagsCh chan []string
} }
type newJobIn struct {
ctx context.Context
cancel context.CancelFunc
job internalJob
}
type jobOutRequest struct { type jobOutRequest struct {
id uuid.UUID id uuid.UUID
outChan chan internalJob outChan chan internalJob
@@ -100,13 +106,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
exec := executor{ exec := executor{
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
stopTimeout: time.Second * 10, stopTimeout: time.Second * 10,
singletonRunners: make(map[uuid.UUID]singletonRunner), singletonRunners: nil,
logger: &noOpLogger{}, logger: &noOpLogger{},
jobsIn: make(chan jobIn), jobsIn: make(chan jobIn),
jobIDsOut: make(chan uuid.UUID), jobsOutForRescheduling: make(chan uuid.UUID),
jobOutRequest: make(chan jobOutRequest, 1000), jobsOutCompleted: make(chan uuid.UUID),
done: make(chan error), jobOutRequest: make(chan jobOutRequest, 1000),
done: make(chan error),
} }
s := &scheduler{ s := &scheduler{
@@ -118,7 +125,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
clock: clockwork.NewRealClock(), clock: clockwork.NewRealClock(),
logger: &noOpLogger{}, logger: &noOpLogger{},
newJobCh: make(chan internalJob), newJobCh: make(chan newJobIn),
removeJobCh: make(chan uuid.UUID), removeJobCh: make(chan uuid.UUID),
removeJobsByTagsCh: make(chan []string), removeJobsByTagsCh: make(chan []string),
startCh: make(chan struct{}), startCh: make(chan struct{}),
@@ -141,11 +148,14 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
s.logger.Info("gocron: new scheduler created") s.logger.Info("gocron: new scheduler created")
for { for {
select { select {
case id := <-s.exec.jobIDsOut: case id := <-s.exec.jobsOutForRescheduling:
s.selectExecJobIDsOut(id) s.selectExecJobsOutForRescheduling(id)
case j := <-s.newJobCh: case id := <-s.exec.jobsOutCompleted:
s.selectNewJob(j) s.selectExecJobsOutCompleted(id)
case in := <-s.newJobCh:
s.selectNewJob(in)
case id := <-s.removeJobCh: case id := <-s.removeJobCh:
s.selectRemoveJob(id) s.selectRemoveJob(id)
@@ -281,9 +291,54 @@ func (s *scheduler) selectRemoveJob(id uuid.UUID) {
// Jobs coming back from the executor to the scheduler that // Jobs coming back from the executor to the scheduler that
// need to evaluated for rescheduling. // need to evaluated for rescheduling.
func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) { func (s *scheduler) selectExecJobsOutForRescheduling(id uuid.UUID) {
j := s.jobs[id] j, ok := s.jobs[id]
j.lastRun = j.nextRun if !ok {
// the job was removed while it was running, and
// so we don't need to reschedule it.
return
}
j.lastScheduledRun = j.nextScheduled
next := j.next(j.lastScheduledRun)
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
return
}
if next.Before(s.now()) {
// in some cases the next run time can be in the past, for example:
// - the time on the machine was incorrect and has been synced with ntp
// - the machine went to sleep, and woke up some time later
// in those cases, we want to increment to the next run in the future
// and schedule the job for that time.
for next.Before(s.now()) {
next = j.next(next)
}
}
j.nextScheduled = next
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
// run if the scheduler has been shutdown.
select {
case <-s.shutdownCtx.Done():
return
case s.exec.jobsIn <- jobIn{
id: j.id,
shouldSendOut: true,
}:
}
})
// update the job with its new next and last run times and timer.
s.jobs[id] = j
}
func (s *scheduler) selectExecJobsOutCompleted(id uuid.UUID) {
j, ok := s.jobs[id]
if !ok {
return
}
// if the job has a limited number of runs set, we need to // if the job has a limited number of runs set, we need to
// check how many runs have occurred and stop running this // check how many runs have occurred and stop running this
@@ -302,37 +357,7 @@ func (s *scheduler) selectExecJobIDsOut(id uuid.UUID) {
} }
} }
next := j.next(j.lastRun) j.lastRun = s.now()
if next.IsZero() {
// the job's next function will return zero for OneTime jobs.
// since they are one time only, they do not need rescheduling.
return
}
if next.Before(s.now()) {
// in some cases the next run time can be in the past, for example:
// - the time on the machine was incorrect and has been synced with ntp
// - the machine went to sleep, and woke up some time later
// in those cases, we want to increment to the next run in the future
// and schedule the job for that time.
for next.Before(s.now()) {
next = j.next(next)
}
}
j.nextRun = next
j.timer = s.clock.AfterFunc(next.Sub(s.now()), func() {
// set the actual timer on the job here and listen for
// shut down events so that the job doesn't attempt to
// run if the scheduler has been shutdown.
select {
case <-s.shutdownCtx.Done():
return
case s.exec.jobsIn <- jobIn{
id: j.id,
shouldSendOut: true,
}:
}
})
// update the job with its new next and last run times and timer.
s.jobs[id] = j s.jobs[id] = j
} }
@@ -346,7 +371,8 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
close(out.outChan) close(out.outChan)
} }
func (s *scheduler) selectNewJob(j internalJob) { func (s *scheduler) selectNewJob(in newJobIn) {
j := in.job
if s.started { if s.started {
next := j.startTime next := j.startTime
if j.startImmediately { if j.startImmediately {
@@ -374,10 +400,11 @@ func (s *scheduler) selectNewJob(j internalJob) {
} }
}) })
} }
j.nextRun = next j.nextScheduled = next
} }
s.jobs[j.id] = j s.jobs[j.id] = j
in.cancel()
} }
func (s *scheduler) selectRemoveJobsByTags(tags []string) { func (s *scheduler) selectRemoveJobsByTags(tags []string) {
@@ -424,7 +451,7 @@ func (s *scheduler) selectStart() {
} }
}) })
} }
j.nextRun = next j.nextScheduled = next
s.jobs[id] = j s.jobs[id] = j
} }
select { select {
@@ -446,10 +473,11 @@ func (s *scheduler) now() time.Time {
func (s *scheduler) jobFromInternalJob(in internalJob) job { func (s *scheduler) jobFromInternalJob(in internalJob) job {
return job{ return job{
id: in.id, in.id,
name: in.name, in.name,
tags: slices.Clone(in.tags), slices.Clone(in.tags),
jobOutRequest: s.jobOutRequestCh, s.jobOutRequestCh,
s.runJobRequestCh,
} }
} }
@@ -548,9 +576,19 @@ func (s *scheduler) addOrUpdateJob(id uuid.UUID, definition JobDefinition, taskW
return nil, err return nil, err
} }
newJobCtx, newJobCancel := context.WithCancel(context.Background())
select { select {
case <-s.shutdownCtx.Done(): case <-s.shutdownCtx.Done():
case s.newJobCh <- j: case s.newJobCh <- newJobIn{
ctx: newJobCtx,
cancel: newJobCancel,
job: j,
}:
}
select {
case <-newJobCtx.Done():
case <-s.shutdownCtx.Done():
} }
return &job{ return &job{
@@ -774,3 +812,14 @@ func WithStopTimeout(timeout time.Duration) SchedulerOption {
return nil return nil
} }
} }
// WithMonitor sets the metrics provider to be used by the Scheduler.
func WithMonitor(monitor Monitor) SchedulerOption {
return func(s *scheduler) error {
if monitor == nil {
return ErrWithMonitorNil
}
s.exec.monitor = monitor
return nil
}
}
+353 -20
View File
@@ -8,7 +8,6 @@ import (
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/goleak" "go.uber.org/goleak"
@@ -301,9 +300,7 @@ func TestScheduler_StopTimeout(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
s.Start() s.Start()
time.Sleep(time.Millisecond * 200) assert.ErrorIs(t, err, s.Shutdown())
err = s.Shutdown()
assert.ErrorIs(t, err, ErrStopJobsTimedOut)
cancel() cancel()
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
}) })
@@ -332,15 +329,11 @@ func TestScheduler_Shutdown(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
s.Start() s.Start()
time.Sleep(50 * time.Millisecond)
require.NoError(t, s.StopJobs()) require.NoError(t, s.StopJobs())
time.Sleep(200 * time.Millisecond)
s.Start() s.Start()
time.Sleep(50 * time.Millisecond)
require.NoError(t, s.Shutdown()) require.NoError(t, s.Shutdown())
time.Sleep(200 * time.Millisecond)
}) })
t.Run("calling Job methods after shutdown errors", func(t *testing.T) { t.Run("calling Job methods after shutdown errors", func(t *testing.T) {
@@ -361,7 +354,6 @@ func TestScheduler_Shutdown(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
s.Start() s.Start()
time.Sleep(50 * time.Millisecond)
require.NoError(t, s.Shutdown()) require.NoError(t, s.Shutdown())
_, err = j.LastRun() _, err = j.LastRun()
@@ -465,7 +457,6 @@ func TestScheduler_NewJob(t *testing.T) {
s.Start() s.Start()
require.NoError(t, s.Shutdown()) require.NoError(t, s.Shutdown())
time.Sleep(50 * time.Millisecond)
}) })
} }
} }
@@ -487,6 +478,12 @@ func TestScheduler_NewJobErrors(t *testing.T) {
nil, nil,
ErrCronJobParse, ErrCronJobParse,
}, },
{
"duration job time interval is zero",
DurationJob(0 * time.Second),
nil,
ErrDurationJobIntervalZero,
},
{ {
"random with bad min/max", "random with bad min/max",
DurationRandomJob( DurationRandomJob(
@@ -908,6 +905,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) {
WithStopTimeout(-1), WithStopTimeout(-1),
ErrWithStopTimeoutZeroOrNegative, ErrWithStopTimeoutZeroOrNegative,
}, },
{
"WithMonitorer nil",
WithMonitor(nil),
ErrWithMonitorNil,
},
} }
for _, tt := range tests { for _, tt := range tests {
@@ -1144,6 +1146,7 @@ var _ Elector = (*testElector)(nil)
type testElector struct { type testElector struct {
mu sync.Mutex mu sync.Mutex
leaderElected bool leaderElected bool
notLeader chan struct{}
} }
func (t *testElector) IsLeader(ctx context.Context) error { func (t *testElector) IsLeader(ctx context.Context) error {
@@ -1156,6 +1159,7 @@ func (t *testElector) IsLeader(ctx context.Context) error {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.leaderElected { if t.leaderElected {
t.notLeader <- struct{}{}
return fmt.Errorf("already elected leader") return fmt.Errorf("already elected leader")
} }
t.leaderElected = true t.leaderElected = true
@@ -1167,12 +1171,14 @@ var _ Locker = (*testLocker)(nil)
type testLocker struct { type testLocker struct {
mu sync.Mutex mu sync.Mutex
jobLocked bool jobLocked bool
notLocked chan struct{}
} }
func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) { func (t *testLocker) Lock(_ context.Context, _ string) (Lock, error) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
if t.jobLocked { if t.jobLocked {
t.notLocked <- struct{}{}
return nil, fmt.Errorf("job already locked") return nil, fmt.Errorf("job already locked")
} }
t.jobLocked = true t.jobLocked = true
@@ -1188,21 +1194,58 @@ func (t testLock) Unlock(_ context.Context) error {
} }
func TestScheduler_WithDistributed(t *testing.T) { func TestScheduler_WithDistributed(t *testing.T) {
notLocked := make(chan struct{}, 10)
notLeader := make(chan struct{}, 10)
goleak.VerifyNone(t) goleak.VerifyNone(t)
tests := []struct { tests := []struct {
name string name string
count int count int
opt SchedulerOption opt SchedulerOption
assertions func(*testing.T)
}{ }{
{ {
"3 schedulers with elector", "3 schedulers with elector",
3, 3,
WithDistributedElector(&testElector{}), WithDistributedElector(&testElector{
notLeader: notLeader,
}),
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLeaderCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLeader:
notLeaderCount++
default:
}
}
assert.Equal(t, 2, notLeaderCount)
},
}, },
{ {
"3 schedulers with locker", "3 schedulers with locker",
3, 3,
WithDistributedLocker(&testLocker{}), WithDistributedLocker(&testLocker{
notLocked: notLocked,
}),
func(t *testing.T) {
timeout := time.Now().Add(1 * time.Second)
var notLockedCount int
for {
if time.Now().After(timeout) {
break
}
select {
case <-notLocked:
notLockedCount++
default:
}
}
},
}, },
} }
@@ -1226,6 +1269,7 @@ func TestScheduler_WithDistributed(t *testing.T) {
), ),
NewTask( NewTask(
func() { func() {
time.Sleep(100 * time.Millisecond)
jobsRan <- struct{}{} jobsRan <- struct{}{}
}, },
), ),
@@ -1267,6 +1311,8 @@ func TestScheduler_WithDistributed(t *testing.T) {
} }
assert.Equal(t, 1, runCount) assert.Equal(t, 1, runCount)
time.Sleep(time.Second)
tt.assertions(t)
}) })
} }
} }
@@ -1303,7 +1349,6 @@ func TestScheduler_RemoveJob(t *testing.T) {
id = uuid.New() id = uuid.New()
} }
time.Sleep(50 * time.Millisecond)
err := s.RemoveJob(id) err := s.RemoveJob(id)
assert.ErrorIs(t, err, err) assert.ErrorIs(t, err, err)
require.NoError(t, s.Shutdown()) require.NoError(t, s.Shutdown())
@@ -1311,6 +1356,71 @@ func TestScheduler_RemoveJob(t *testing.T) {
} }
} }
func TestScheduler_RemoveLotsOfJobs(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
numJobs int
}{
{
"10 successes",
10,
},
{
"100 successes",
100,
},
{
"1000 successes",
1000,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t)
var ids []uuid.UUID
for i := 0; i < tt.numJobs; i++ {
j, err := s.NewJob(DurationJob(time.Second), NewTask(func() { time.Sleep(20 * time.Second) }))
require.NoError(t, err)
ids = append(ids, j.ID())
}
for _, id := range ids {
err := s.RemoveJob(id)
require.NoError(t, err)
}
assert.Len(t, s.Jobs(), 0)
require.NoError(t, s.Shutdown())
})
}
}
func TestScheduler_RemoveJob_RemoveSelf(t *testing.T) {
goleak.VerifyNone(t)
s := newTestScheduler(t)
s.Start()
_, err := s.NewJob(
DurationJob(100*time.Millisecond),
NewTask(func() {}),
WithEventListeners(
BeforeJobRuns(
func(jobID uuid.UUID, jobName string) {
s.RemoveByTags("tag1")
},
),
),
WithTags("tag1"),
)
require.NoError(t, err)
time.Sleep(time.Millisecond * 400)
assert.NoError(t, s.Shutdown())
}
func TestScheduler_WithEventListeners(t *testing.T) { func TestScheduler_WithEventListeners(t *testing.T) {
goleak.VerifyNone(t) goleak.VerifyNone(t)
@@ -1483,7 +1593,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
{ {
"duration job - start immediately", "duration job - start immediately",
chDurationImmediate, chDurationImmediate,
DurationJob(time.Second * 10), DurationJob(time.Second * 5),
func() { func() {
chDurationImmediate <- struct{}{} chDurationImmediate <- struct{}{}
}, },
@@ -1493,7 +1603,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
), ),
}, },
func() time.Duration { func() time.Duration {
return 10 * time.Second return 5 * time.Second
}, },
2, 2,
}, },
@@ -1512,7 +1622,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
WithSingletonMode(LimitModeReschedule), WithSingletonMode(LimitModeReschedule),
}, },
func() time.Duration { func() time.Duration {
return 10 * time.Second return 20 * time.Second
}, },
1, 1,
}, },
@@ -1533,9 +1643,10 @@ func TestScheduler_RunJobNow(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t) s := newTestScheduler(t)
j, err := s.NewJob(tt.j, NewTask(tt.fun), tt.opts...) _, err := s.NewJob(tt.j, NewTask(tt.fun), tt.opts...)
require.NoError(t, err) require.NoError(t, err)
j := s.Jobs()[0]
s.Start() s.Start()
var nextRunBefore time.Time var nextRunBefore time.Time
@@ -1571,6 +1682,7 @@ func TestScheduler_RunJobNow(t *testing.T) {
nextRunAfter, err := j.NextRun() nextRunAfter, err := j.NextRun()
if tt.expectedDiff != nil && tt.expectedDiff() > 0 { if tt.expectedDiff != nil && tt.expectedDiff() > 0 {
for ; nextRunBefore.IsZero() || nextRunAfter.Equal(nextRunBefore); nextRunAfter, err = j.NextRun() { //nolint:revive for ; nextRunBefore.IsZero() || nextRunAfter.Equal(nextRunBefore); nextRunAfter, err = j.NextRun() { //nolint:revive
time.Sleep(100 * time.Millisecond)
} }
} }
@@ -1584,6 +1696,67 @@ func TestScheduler_RunJobNow(t *testing.T) {
} }
} }
func TestScheduler_LastRunSingleton(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
f func(t *testing.T, j Job)
}{
{
"simple",
func(t *testing.T, j Job) {},
},
{
"with runNow",
func(t *testing.T, j Job) {
runTime := time.Now()
assert.NoError(t, j.RunNow())
// because we're using wait mode we need to wait here
// to make sure the job queued with RunNow has finished running
time.Sleep(time.Millisecond * 200)
lastRun, err := j.LastRun()
assert.NoError(t, err)
assert.LessOrEqual(t, lastRun.Sub(runTime), time.Millisecond*125)
assert.GreaterOrEqual(t, lastRun.Sub(runTime), time.Millisecond*75)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t)
j, err := s.NewJob(
DurationJob(time.Millisecond*100),
NewTask(func() {
time.Sleep(time.Millisecond * 200)
}),
WithSingletonMode(LimitModeWait),
)
require.NoError(t, err)
startTime := time.Now()
s.Start()
lastRun, err := j.LastRun()
assert.NoError(t, err)
assert.True(t, lastRun.IsZero())
time.Sleep(time.Millisecond * 200)
lastRun, err = j.LastRun()
assert.NoError(t, err)
assert.LessOrEqual(t, lastRun.Sub(startTime), time.Millisecond*125)
assert.GreaterOrEqual(t, lastRun.Sub(startTime), time.Millisecond*75)
tt.f(t, j)
assert.NoError(t, s.Shutdown())
})
}
}
func TestScheduler_OneTimeJob(t *testing.T) { func TestScheduler_OneTimeJob(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@@ -1636,6 +1809,81 @@ func TestScheduler_OneTimeJob(t *testing.T) {
} }
} }
func TestScheduler_WithLimitedRuns(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
schedulerOpts []SchedulerOption
job JobDefinition
jobOpts []JobOption
runLimit uint
expectedRuns int
}{
{
"simple",
nil,
DurationJob(time.Millisecond * 100),
nil,
1,
1,
},
{
"OneTimeJob, WithLimitConcurrentJobs",
[]SchedulerOption{
WithLimitConcurrentJobs(1, LimitModeWait),
},
OneTimeJob(OneTimeJobStartImmediately()),
nil,
1,
1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
s := newTestScheduler(t, tt.schedulerOpts...)
jobRan := make(chan struct{}, 10)
jobOpts := []JobOption{
WithLimitedRuns(tt.runLimit),
}
jobOpts = append(jobOpts, tt.jobOpts...)
_, err := s.NewJob(
tt.job,
NewTask(func() {
jobRan <- struct{}{}
}),
jobOpts...,
)
require.NoError(t, err)
s.Start()
time.Sleep(time.Millisecond * 150)
assert.NoError(t, s.Shutdown())
var runCount int
for runCount < tt.expectedRuns {
select {
case <-jobRan:
runCount++
case <-time.After(time.Second):
t.Fatal("timed out waiting for job to run")
}
}
select {
case <-jobRan:
t.Fatal("job ran more than expected")
default:
}
assert.Equal(t, tt.expectedRuns, runCount)
})
}
}
func TestScheduler_Jobs(t *testing.T) { func TestScheduler_Jobs(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
@@ -1661,6 +1909,91 @@ func TestScheduler_Jobs(t *testing.T) {
jobsSecond := s.Jobs() jobsSecond := s.Jobs()
assert.Equal(t, jobsFirst, jobsSecond) assert.Equal(t, jobsFirst, jobsSecond)
assert.NoError(t, s.Shutdown())
})
}
}
type testMonitor struct {
mu sync.Mutex
counter map[string]int
time map[string][]time.Duration
}
func newTestMonitor() *testMonitor {
return &testMonitor{
counter: make(map[string]int),
time: make(map[string][]time.Duration),
}
}
func (t *testMonitor) IncrementJob(_ uuid.UUID, name string, _ []string, _ JobStatus) {
t.mu.Lock()
defer t.mu.Unlock()
_, ok := t.counter[name]
if !ok {
t.counter[name] = 0
}
t.counter[name]++
}
func (t *testMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, name string, _ []string) {
t.mu.Lock()
defer t.mu.Unlock()
_, ok := t.time[name]
if !ok {
t.time[name] = make([]time.Duration, 0)
}
t.time[name] = append(t.time[name], endTime.Sub(startTime))
}
func TestScheduler_WithMonitor(t *testing.T) {
goleak.VerifyNone(t)
tests := []struct {
name string
jd JobDefinition
jobName string
}{
{
"scheduler with monitor",
DurationJob(time.Millisecond * 50),
"job",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ch := make(chan struct{}, 20)
monitor := newTestMonitor()
s := newTestScheduler(t, WithMonitor(monitor))
opt := []JobOption{
WithName(tt.jobName),
WithStartAt(
WithStartImmediately(),
),
}
_, err := s.NewJob(
tt.jd,
NewTask(func() {
ch <- struct{}{}
}),
opt...,
)
require.NoError(t, err)
s.Start()
time.Sleep(150 * time.Millisecond)
require.NoError(t, s.Shutdown())
close(ch)
expectedCount := 0
for range ch {
expectedCount++
}
got := monitor.counter[tt.jobName]
if got != expectedCount {
t.Fatalf("job %q counter expected %d, got %d", tt.jobName, expectedCount, got)
}
}) })
} }
} }
+1 -7
View File
@@ -44,18 +44,12 @@ func requestJob(id uuid.UUID, ch chan jobOutRequest) *internalJob {
func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob { func requestJobCtx(ctx context.Context, id uuid.UUID, ch chan jobOutRequest) *internalJob {
resp := make(chan internalJob, 1) resp := make(chan internalJob, 1)
select {
case <-ctx.Done():
return nil
default:
}
select { select {
case ch <- jobOutRequest{ case ch <- jobOutRequest{
id: id, id: id,
outChan: resp, outChan: resp,
}: }:
default: case <-ctx.Done():
return nil return nil
} }
var j internalJob var j internalJob