diff --git a/README.md b/README.md index 19095ec..2e473bf 100644 --- a/README.md +++ b/README.md @@ -111,10 +111,12 @@ Multiple instances of gocron can be run. - [**Elector**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedElector): An elector can be used to elect a single instance of gocron to run as the primary with the other instances checking to see if a new leader needs to be elected. -- Implementations: [go-co-op electors](https://github.com/go-co-op?q=-elector&type=all&language=&sort=) + - Implementations: [go-co-op electors](https://github.com/go-co-op?q=-elector&type=all&language=&sort=) + (don't see what you need? request on slack to get a repo created to contribute it!) - [**Locker**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#WithDistributedLocker): A locker can be used to lock each run of a job to a single instance of gocron. -- Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=) + - Implementations: [go-co-op lockers](https://github.com/go-co-op?q=-lock&type=all&language=&sort=) + (don't see what you need? request on slack to get a repo created to contribute it!) ### Events Job events can trigger actions. @@ -140,6 +142,13 @@ Logs can be enabled. The Logger interface can be implemented with your desired logging library. The provided NewLogger uses the standard library's log package. +### Metrics +Metrics may be collected from the execution of each job. +- [**Monitor**](https://pkg.go.dev/github.com/go-co-op/gocron/v2#Monitor): +A monitor can be used to collect metrics for each job from a scheduler. + - Implementations: [go-co-op monitors](https://github.com/go-co-op?q=-monitor&type=all&language=&sort=) + (don't see what you need? request on slack to get a repo created to contribute it!) + ### Testing The gocron library is set up to enable testing. - Mocks are provided in [the mock package](mocks) using [gomock](https://github.com/uber-go/mock). diff --git a/errors.go b/errors.go index 2c9f7c2..12d85eb 100644 --- a/errors.go +++ b/errors.go @@ -38,6 +38,7 @@ var ( ErrWithLimitConcurrentJobsZero = fmt.Errorf("gocron: WithLimitConcurrentJobs: limit must be greater than 0") ErrWithLocationNil = fmt.Errorf("gocron: WithLocation: location must not be nil") ErrWithLoggerNil = fmt.Errorf("gocron: WithLogger: logger must not be nil") + ErrWithMonitorNil = fmt.Errorf("gocron: WithMonitor: monitor must not be nil") ErrWithNameEmpty = fmt.Errorf("gocron: WithName: name must not be empty") ErrWithStartDateTimePast = fmt.Errorf("gocron: WithStartDateTime: start must not be in the past") ErrWithStopTimeoutZeroOrNegative = fmt.Errorf("gocron: WithStopTimeout: timeout must be greater than 0") diff --git a/example_test.go b/example_test.go index ed77a33..e877965 100644 --- a/example_test.go +++ b/example_test.go @@ -684,6 +684,69 @@ func ExampleWithLogger() { ) } +func ExampleWithMonitor() { + //type exampleMonitor struct { + // mu sync.Mutex + // counter map[string]int + // time map[string][]time.Duration + //} + // + //func newExampleMonitor() *exampleMonitor { + // return &exampleMonitor{ + // counter: make(map[string]int), + // time: make(map[string][]time.Duration), + //} + //} + // + //func (t *exampleMonitor) IncrementJob(_ uuid.UUID, name string, _ []string, _ JobStatus) { + // t.mu.Lock() + // defer t.mu.Unlock() + // _, ok := t.counter[name] + // if !ok { + // t.counter[name] = 0 + // } + // t.counter[name]++ + //} + // + //func (t *exampleMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, name string, _ []string) { + // t.mu.Lock() + // defer t.mu.Unlock() + // _, ok := t.time[name] + // if !ok { + // t.time[name] = make([]time.Duration, 0) + // } + // t.time[name] = append(t.time[name], endTime.Sub(startTime)) + //} + // + //monitor := newExampleMonitor() + //s, _ := NewScheduler( + // WithMonitor(monitor), + //) + //name := "example" + //_, _ = s.NewJob( + // DurationJob( + // time.Second, + // ), + // NewTask( + // func() { + // time.Sleep(1 * time.Second) + // }, + // ), + // WithName(name), + // WithStartAt( + // WithStartImmediately(), + // ), + //) + //s.Start() + //time.Sleep(5 * time.Second) + //_ = s.Shutdown() + // + //fmt.Printf("Job %q total execute count: %d\n", name, monitor.counter[name]) + //for i, val := range monitor.time[name] { + // fmt.Printf("Job %q execute #%d elapsed %.4f seconds\n", name, i+1, val.Seconds()) + //} +} + func ExampleWithName() { s, _ := NewScheduler() defer func() { _ = s.Shutdown() }() diff --git a/executor.go b/executor.go index 1f013f5..b8c7e34 100644 --- a/executor.go +++ b/executor.go @@ -23,6 +23,7 @@ type executor struct { limitMode *limitModeConfig elector Elector locker Locker + monitor Monitor } type jobIn struct { @@ -350,11 +351,21 @@ func (e *executor) runJob(j internalJob, shouldSendOut bool) { } } + startTime := time.Now() err := callJobFuncWithParams(j.function, j.parameters...) + if e.monitor != nil { + e.monitor.RecordJobTiming(startTime, time.Now(), j.id, j.name, j.tags) + } if err != nil { _ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err) + if e.monitor != nil { + e.monitor.IncrementJob(j.id, j.name, j.tags, Fail) + } } else { _ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name) + if e.monitor != nil { + e.monitor.IncrementJob(j.id, j.name, j.tags, Success) + } } } diff --git a/monitor.go b/monitor.go new file mode 100644 index 0000000..5491371 --- /dev/null +++ b/monitor.go @@ -0,0 +1,26 @@ +package gocron + +import ( + "time" + + "github.com/google/uuid" +) + +// JobStatus is the status of job run that should be collected with the metric. +type JobStatus string + +// The different statuses of job that can be used. +const ( + Fail JobStatus = "fail" + Success JobStatus = "success" +) + +// Monitor represents the interface to collect jobs metrics. +type Monitor interface { + // IncrementJob will provide details about the job and expects the underlying implementation + // to handle instantiating and incrementing a value + IncrementJob(id uuid.UUID, name string, tags []string, status JobStatus) + // RecordJobTiming will provide details about the job and the timing and expects the underlying implementation + // to handle instantiating and recording the value + RecordJobTiming(startTime, endTime time.Time, id uuid.UUID, name string, tags []string) +} diff --git a/scheduler.go b/scheduler.go index 7e73404..503a0d8 100644 --- a/scheduler.go +++ b/scheduler.go @@ -792,3 +792,14 @@ func WithStopTimeout(timeout time.Duration) SchedulerOption { return nil } } + +// WithMonitor sets the metrics provider to be used by the Scheduler. +func WithMonitor(monitor Monitor) SchedulerOption { + return func(s *scheduler) error { + if monitor == nil { + return ErrWithMonitorNil + } + s.exec.monitor = monitor + return nil + } +} diff --git a/scheduler_test.go b/scheduler_test.go index 48ef1d1..565939b 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -900,6 +900,11 @@ func TestScheduler_WithOptionsErrors(t *testing.T) { WithStopTimeout(-1), ErrWithStopTimeoutZeroOrNegative, }, + { + "WithMonitorer nil", + WithMonitor(nil), + ErrWithMonitorNil, + }, } for _, tt := range tests { @@ -1652,6 +1657,91 @@ func TestScheduler_Jobs(t *testing.T) { jobsSecond := s.Jobs() assert.Equal(t, jobsFirst, jobsSecond) + assert.NoError(t, s.Shutdown()) + }) + } +} + +type testMonitor struct { + mu sync.Mutex + counter map[string]int + time map[string][]time.Duration +} + +func newTestMonitor() *testMonitor { + return &testMonitor{ + counter: make(map[string]int), + time: make(map[string][]time.Duration), + } +} + +func (t *testMonitor) IncrementJob(_ uuid.UUID, name string, _ []string, _ JobStatus) { + t.mu.Lock() + defer t.mu.Unlock() + _, ok := t.counter[name] + if !ok { + t.counter[name] = 0 + } + t.counter[name]++ +} + +func (t *testMonitor) RecordJobTiming(startTime, endTime time.Time, _ uuid.UUID, name string, _ []string) { + t.mu.Lock() + defer t.mu.Unlock() + _, ok := t.time[name] + if !ok { + t.time[name] = make([]time.Duration, 0) + } + t.time[name] = append(t.time[name], endTime.Sub(startTime)) +} + +func TestScheduler_WithMonitor(t *testing.T) { + goleak.VerifyNone(t) + tests := []struct { + name string + jd JobDefinition + jobName string + }{ + { + "scheduler with monitorer", + DurationJob(time.Millisecond * 50), + "job", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ch := make(chan struct{}, 20) + monitor := newTestMonitor() + s := newTestScheduler(t, WithMonitor(monitor)) + + opt := []JobOption{ + WithName(tt.jobName), + WithStartAt( + WithStartImmediately(), + ), + } + _, err := s.NewJob( + tt.jd, + NewTask(func() { + ch <- struct{}{} + }), + opt..., + ) + require.NoError(t, err) + s.Start() + time.Sleep(150 * time.Millisecond) + require.NoError(t, s.Shutdown()) + close(ch) + expectedCount := 0 + for range ch { + expectedCount++ + } + + got := monitor.counter[tt.jobName] + if got != expectedCount { + t.Fatalf("job %q counter expected %d, got %d", tt.jobName, expectedCount, got) + } }) } }