allow max concurrent runs and singleton mode together (#625)
This commit is contained in:
+155
-99
@@ -36,10 +36,16 @@ type limitModeConfig struct {
|
||||
limit uint
|
||||
rescheduleLimiter chan struct{}
|
||||
in chan uuid.UUID
|
||||
// singletonJobs is used to track singleton jobs that are running
|
||||
// in the limit mode runner. This is used to prevent the same job
|
||||
// from running multiple times across limit mode runners when both
|
||||
// a limit mode and singleton mode are enabled.
|
||||
singletonJobs map[uuid.UUID]struct{}
|
||||
singletonJobsMu sync.Mutex
|
||||
}
|
||||
|
||||
func (e *executor) start() {
|
||||
e.logger.Debug("executor started")
|
||||
e.logger.Debug("gocron: executor started")
|
||||
|
||||
// creating the executor's context here as the executor
|
||||
// is the only goroutine that should access this context
|
||||
@@ -48,20 +54,11 @@ func (e *executor) start() {
|
||||
e.ctx, e.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// the standardJobsWg tracks
|
||||
standardJobsWg := &waitGroupWithMutex{
|
||||
wg: sync.WaitGroup{},
|
||||
mu: sync.Mutex{},
|
||||
}
|
||||
standardJobsWg := &waitGroupWithMutex{}
|
||||
|
||||
singletonJobsWg := &waitGroupWithMutex{
|
||||
wg: sync.WaitGroup{},
|
||||
mu: sync.Mutex{},
|
||||
}
|
||||
singletonJobsWg := &waitGroupWithMutex{}
|
||||
|
||||
limitModeJobsWg := &waitGroupWithMutex{
|
||||
wg: sync.WaitGroup{},
|
||||
mu: sync.Mutex{},
|
||||
}
|
||||
limitModeJobsWg := &waitGroupWithMutex{}
|
||||
|
||||
// create a fresh map for tracking singleton runners
|
||||
e.singletonRunners = make(map[uuid.UUID]singletonRunner)
|
||||
@@ -141,7 +138,6 @@ func (e *executor) start() {
|
||||
j := requestJobCtx(ctx, id, e.jobOutRequest)
|
||||
if j == nil {
|
||||
// safety check as it'd be strange bug if this occurred
|
||||
// TODO add a log line here
|
||||
return
|
||||
}
|
||||
if j.singletonMode {
|
||||
@@ -155,7 +151,7 @@ func (e *executor) start() {
|
||||
}
|
||||
e.singletonRunners[id] = runner
|
||||
singletonJobsWg.Add(1)
|
||||
go e.limitModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
|
||||
go e.singletonModeRunner("singleton-"+id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
|
||||
}
|
||||
|
||||
if j.singletonLimitMode == LimitModeReschedule {
|
||||
@@ -204,95 +200,14 @@ func (e *executor) start() {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
|
||||
e.logger.Debug("stopping executor")
|
||||
// we've been asked to stop. This is either because the scheduler has been told
|
||||
// to stop all jobs or the scheduler has been asked to completely shutdown.
|
||||
//
|
||||
// cancel tells all the functions to stop their work and send in a done response
|
||||
e.cancel()
|
||||
|
||||
// the wait for job channels are used to report back whether we successfully waited
|
||||
// for all jobs to complete or if we hit the configured timeout.
|
||||
waitForJobs := make(chan struct{}, 1)
|
||||
waitForSingletons := make(chan struct{}, 1)
|
||||
waitForLimitMode := make(chan struct{}, 1)
|
||||
|
||||
// the waiter context is used to cancel the functions waiting on jobs.
|
||||
// this is done to avoid goroutine leaks.
|
||||
waiterCtx, waiterCancel := context.WithCancel(context.Background())
|
||||
|
||||
// wait for standard jobs to complete
|
||||
go func() {
|
||||
e.logger.Debug("waiting for standard jobs to complete")
|
||||
go func() {
|
||||
// this is done in a separate goroutine, so we aren't
|
||||
// blocked by the WaitGroup's Wait call in the event
|
||||
// that the waiter context is cancelled.
|
||||
// This particular goroutine could leak in the event that
|
||||
// some long-running standard job doesn't complete.
|
||||
standardJobsWg.Wait()
|
||||
e.logger.Debug("standard jobs completed")
|
||||
waitForJobs <- struct{}{}
|
||||
}()
|
||||
<-waiterCtx.Done()
|
||||
}()
|
||||
|
||||
// wait for per job singleton limit mode runner jobs to complete
|
||||
go func() {
|
||||
e.logger.Debug("waiting for singleton jobs to complete")
|
||||
go func() {
|
||||
singletonJobsWg.Wait()
|
||||
e.logger.Debug("singleton jobs completed")
|
||||
waitForSingletons <- struct{}{}
|
||||
}()
|
||||
<-waiterCtx.Done()
|
||||
}()
|
||||
|
||||
// wait for limit mode runners to complete
|
||||
go func() {
|
||||
e.logger.Debug("waiting for limit mode jobs to complete")
|
||||
go func() {
|
||||
limitModeJobsWg.Wait()
|
||||
e.logger.Debug("limitMode jobs completed")
|
||||
waitForLimitMode <- struct{}{}
|
||||
}()
|
||||
<-waiterCtx.Done()
|
||||
}()
|
||||
|
||||
// now either wait for all the jobs to complete,
|
||||
// or hit the timeout.
|
||||
var count int
|
||||
timeout := time.Now().Add(e.stopTimeout)
|
||||
for time.Now().Before(timeout) && count < 3 {
|
||||
select {
|
||||
case <-waitForJobs:
|
||||
count++
|
||||
case <-waitForSingletons:
|
||||
count++
|
||||
case <-waitForLimitMode:
|
||||
count++
|
||||
default:
|
||||
}
|
||||
}
|
||||
if count < 3 {
|
||||
e.done <- ErrStopJobsTimedOut
|
||||
e.logger.Debug("executor stopped - timed out")
|
||||
} else {
|
||||
e.done <- nil
|
||||
e.logger.Debug("executor stopped")
|
||||
}
|
||||
waiterCancel()
|
||||
}
|
||||
|
||||
func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
|
||||
e.logger.Debug("limitModeRunner starting", "name", name)
|
||||
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
|
||||
for {
|
||||
select {
|
||||
case id := <-in:
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
e.logger.Debug("limitModeRunner shutting down", "name", name)
|
||||
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
|
||||
wg.Done()
|
||||
return
|
||||
default:
|
||||
@@ -300,10 +215,70 @@ func (e *executor) limitModeRunner(name string, in chan uuid.UUID, wg *waitGroup
|
||||
|
||||
ctx, cancel := context.WithCancel(e.ctx)
|
||||
j := requestJobCtx(ctx, id, e.jobOutRequest)
|
||||
cancel()
|
||||
if j != nil {
|
||||
if j.singletonMode {
|
||||
e.limitMode.singletonJobsMu.Lock()
|
||||
_, ok := e.limitMode.singletonJobs[id]
|
||||
if ok {
|
||||
// this job is already running, so don't run it
|
||||
// but instead reschedule it
|
||||
e.limitMode.singletonJobsMu.Unlock()
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
return
|
||||
case <-j.ctx.Done():
|
||||
return
|
||||
case e.jobIDsOut <- j.id:
|
||||
}
|
||||
continue
|
||||
}
|
||||
e.limitMode.singletonJobs[id] = struct{}{}
|
||||
e.limitMode.singletonJobsMu.Unlock()
|
||||
}
|
||||
e.runJob(*j)
|
||||
|
||||
if j.singletonMode {
|
||||
e.limitMode.singletonJobsMu.Lock()
|
||||
delete(e.limitMode.singletonJobs, id)
|
||||
e.limitMode.singletonJobsMu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// remove the limiter block to allow another job to be scheduled
|
||||
if limitMode == LimitModeReschedule {
|
||||
select {
|
||||
case <-rescheduleLimiter:
|
||||
default:
|
||||
}
|
||||
}
|
||||
case <-e.ctx.Done():
|
||||
e.logger.Debug("limitModeRunner shutting down", "name", name)
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executor) singletonModeRunner(name string, in chan uuid.UUID, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
|
||||
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
|
||||
for {
|
||||
select {
|
||||
case id := <-in:
|
||||
select {
|
||||
case <-e.ctx.Done():
|
||||
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
|
||||
wg.Done()
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(e.ctx)
|
||||
j := requestJobCtx(ctx, id, e.jobOutRequest)
|
||||
cancel()
|
||||
if j != nil {
|
||||
e.runJob(*j)
|
||||
}
|
||||
cancel()
|
||||
|
||||
// remove the limiter block to allow another job to be scheduled
|
||||
if limitMode == LimitModeReschedule {
|
||||
@@ -360,3 +335,84 @@ func (e *executor) runJob(j internalJob) {
|
||||
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
|
||||
e.logger.Debug("gocron: stopping executor")
|
||||
// we've been asked to stop. This is either because the scheduler has been told
|
||||
// to stop all jobs or the scheduler has been asked to completely shutdown.
|
||||
//
|
||||
// cancel tells all the functions to stop their work and send in a done response
|
||||
e.cancel()
|
||||
|
||||
// the wait for job channels are used to report back whether we successfully waited
|
||||
// for all jobs to complete or if we hit the configured timeout.
|
||||
waitForJobs := make(chan struct{}, 1)
|
||||
waitForSingletons := make(chan struct{}, 1)
|
||||
waitForLimitMode := make(chan struct{}, 1)
|
||||
|
||||
// the waiter context is used to cancel the functions waiting on jobs.
|
||||
// this is done to avoid goroutine leaks.
|
||||
waiterCtx, waiterCancel := context.WithCancel(context.Background())
|
||||
|
||||
// wait for standard jobs to complete
|
||||
go func() {
|
||||
e.logger.Debug("gocron: waiting for standard jobs to complete")
|
||||
go func() {
|
||||
// this is done in a separate goroutine, so we aren't
|
||||
// blocked by the WaitGroup's Wait call in the event
|
||||
// that the waiter context is cancelled.
|
||||
// This particular goroutine could leak in the event that
|
||||
// some long-running standard job doesn't complete.
|
||||
standardJobsWg.Wait()
|
||||
e.logger.Debug("gocron: standard jobs completed")
|
||||
waitForJobs <- struct{}{}
|
||||
}()
|
||||
<-waiterCtx.Done()
|
||||
}()
|
||||
|
||||
// wait for per job singleton limit mode runner jobs to complete
|
||||
go func() {
|
||||
e.logger.Debug("gocron: waiting for singleton jobs to complete")
|
||||
go func() {
|
||||
singletonJobsWg.Wait()
|
||||
e.logger.Debug("gocron: singleton jobs completed")
|
||||
waitForSingletons <- struct{}{}
|
||||
}()
|
||||
<-waiterCtx.Done()
|
||||
}()
|
||||
|
||||
// wait for limit mode runners to complete
|
||||
go func() {
|
||||
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
|
||||
go func() {
|
||||
limitModeJobsWg.Wait()
|
||||
e.logger.Debug("gocron: limitMode jobs completed")
|
||||
waitForLimitMode <- struct{}{}
|
||||
}()
|
||||
<-waiterCtx.Done()
|
||||
}()
|
||||
|
||||
// now either wait for all the jobs to complete,
|
||||
// or hit the timeout.
|
||||
var count int
|
||||
timeout := time.Now().Add(e.stopTimeout)
|
||||
for time.Now().Before(timeout) && count < 3 {
|
||||
select {
|
||||
case <-waitForJobs:
|
||||
count++
|
||||
case <-waitForSingletons:
|
||||
count++
|
||||
case <-waitForLimitMode:
|
||||
count++
|
||||
default:
|
||||
}
|
||||
}
|
||||
if count < 3 {
|
||||
e.done <- ErrStopJobsTimedOut
|
||||
e.logger.Debug("gocron: executor stopped - timed out")
|
||||
} else {
|
||||
e.done <- nil
|
||||
e.logger.Debug("gocron: executor stopped")
|
||||
}
|
||||
waiterCancel()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user