diff --git a/scheduler.go b/scheduler.go index cf9cc63..873346a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -1,6 +1,7 @@ package main import ( + "context" "database/sql" "log" "time" @@ -38,30 +39,46 @@ func (s Scheduler) AddListener(event string, listenFunc ListenFunc) { s.listeners[event] = listenFunc } -// CallListeners calls the event listener of provided event -func (s Scheduler) CallListeners(event Event) { +// callListeners calls the event listener of provided event +func (s Scheduler) callListeners(event Event) { eventFn, ok := s.listeners[event.Name] if ok { go eventFn(event.Payload) - s.ClearEvent(event) + _, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID) + if err != nil { + log.Print("💀 error: ", err) + } } else { log.Print("💀 error: couldn't find event listeners attached to ", event.Name) } } -// ClearEvent clears job from database -func (s Scheduler) ClearEvent(event Event) { - _, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID) - if err != nil { - log.Print("💀 error: ", err) - } +// CheckEventsInInterval checks the event in given interval +func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) { + ticker := time.NewTicker(duration) + go func() { + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + log.Println("⏰ Ticks Received...") + events := s.checkDueEvents() + for _, e := range events { + s.callListeners(e) + } + } + + } + }() } -// CheckDueEvents checks and returns due events -func (s Scheduler) CheckDueEvents() []Event { +// checkDueEvents checks and returns due events +func (s Scheduler) checkDueEvents() []Event { events := []Event{} - rows, err := s.db.Query(`SELECT id, name, payload FROM jobs WHERE runAt < $1`, time.Now()) + rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now()) if err != nil { log.Print("💀 error: ", err) return nil