fix unsafe map usage in singletonMode (#665)
* fix singletonMode unsafe map * update lint issues --------- Co-authored-by: a3sroot <a3sroot@gmail.com>
This commit is contained in:
+7
-4
@@ -19,7 +19,7 @@ type executor struct {
|
|||||||
jobOutRequest chan jobOutRequest
|
jobOutRequest chan jobOutRequest
|
||||||
stopTimeout time.Duration
|
stopTimeout time.Duration
|
||||||
done chan error
|
done chan error
|
||||||
singletonRunners map[uuid.UUID]singletonRunner
|
singletonRunners *sync.Map // map[uuid.UUID]singletonRunner
|
||||||
limitMode *limitModeConfig
|
limitMode *limitModeConfig
|
||||||
elector Elector
|
elector Elector
|
||||||
locker Locker
|
locker Locker
|
||||||
@@ -67,7 +67,7 @@ func (e *executor) start() {
|
|||||||
limitModeJobsWg := &waitGroupWithMutex{}
|
limitModeJobsWg := &waitGroupWithMutex{}
|
||||||
|
|
||||||
// create a fresh map for tracking singleton runners
|
// create a fresh map for tracking singleton runners
|
||||||
e.singletonRunners = make(map[uuid.UUID]singletonRunner)
|
e.singletonRunners = &sync.Map{}
|
||||||
|
|
||||||
// start the for leap that is the executor
|
// start the for leap that is the executor
|
||||||
// selecting on channels for work to do
|
// selecting on channels for work to do
|
||||||
@@ -151,15 +151,18 @@ func (e *executor) start() {
|
|||||||
if j.singletonMode {
|
if j.singletonMode {
|
||||||
// for singleton mode, get the existing runner for the job
|
// for singleton mode, get the existing runner for the job
|
||||||
// or spin up a new one
|
// or spin up a new one
|
||||||
runner, ok := e.singletonRunners[jIn.id]
|
runner := &singletonRunner{}
|
||||||
|
runnerSrc, ok := e.singletonRunners.Load(jIn.id)
|
||||||
if !ok {
|
if !ok {
|
||||||
runner.in = make(chan jobIn, 1000)
|
runner.in = make(chan jobIn, 1000)
|
||||||
if j.singletonLimitMode == LimitModeReschedule {
|
if j.singletonLimitMode == LimitModeReschedule {
|
||||||
runner.rescheduleLimiter = make(chan struct{}, 1)
|
runner.rescheduleLimiter = make(chan struct{}, 1)
|
||||||
}
|
}
|
||||||
e.singletonRunners[jIn.id] = runner
|
e.singletonRunners.Store(jIn.id, runner)
|
||||||
singletonJobsWg.Add(1)
|
singletonJobsWg.Add(1)
|
||||||
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
|
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
|
||||||
|
} else {
|
||||||
|
runner = runnerSrc.(*singletonRunner)
|
||||||
}
|
}
|
||||||
|
|
||||||
if j.singletonLimitMode == LimitModeReschedule {
|
if j.singletonLimitMode == LimitModeReschedule {
|
||||||
|
|||||||
+1
-1
@@ -106,7 +106,7 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
|
|||||||
exec := executor{
|
exec := executor{
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
stopTimeout: time.Second * 10,
|
stopTimeout: time.Second * 10,
|
||||||
singletonRunners: make(map[uuid.UUID]singletonRunner),
|
singletonRunners: nil,
|
||||||
logger: &noOpLogger{},
|
logger: &noOpLogger{},
|
||||||
|
|
||||||
jobsIn: make(chan jobIn),
|
jobsIn: make(chan jobIn),
|
||||||
|
|||||||
Reference in New Issue
Block a user