mirror of
https://github.com/tenrok/event-scheduling.git
synced 2026-06-20 20:00:38 +03:00
feat: run scheduler event in given interval
This commit is contained in:
+29
-12
@@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
@@ -38,30 +39,46 @@ func (s Scheduler) AddListener(event string, listenFunc ListenFunc) {
|
|||||||
s.listeners[event] = listenFunc
|
s.listeners[event] = listenFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallListeners calls the event listener of provided event
|
// callListeners calls the event listener of provided event
|
||||||
func (s Scheduler) CallListeners(event Event) {
|
func (s Scheduler) callListeners(event Event) {
|
||||||
eventFn, ok := s.listeners[event.Name]
|
eventFn, ok := s.listeners[event.Name]
|
||||||
if ok {
|
if ok {
|
||||||
go eventFn(event.Payload)
|
go eventFn(event.Payload)
|
||||||
s.ClearEvent(event)
|
_, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
|
||||||
|
if err != nil {
|
||||||
|
log.Print("💀 error: ", err)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
|
log.Print("💀 error: couldn't find event listeners attached to ", event.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearEvent clears job from database
|
// CheckEventsInInterval checks the event in given interval
|
||||||
func (s Scheduler) ClearEvent(event Event) {
|
func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) {
|
||||||
_, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
|
ticker := time.NewTicker(duration)
|
||||||
if err != nil {
|
go func() {
|
||||||
log.Print("💀 error: ", err)
|
for {
|
||||||
}
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
ticker.Stop()
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
log.Println("⏰ Ticks Received...")
|
||||||
|
events := s.checkDueEvents()
|
||||||
|
for _, e := range events {
|
||||||
|
s.callListeners(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckDueEvents checks and returns due events
|
// checkDueEvents checks and returns due events
|
||||||
func (s Scheduler) CheckDueEvents() []Event {
|
func (s Scheduler) checkDueEvents() []Event {
|
||||||
events := []Event{}
|
events := []Event{}
|
||||||
rows, err := s.db.Query(`SELECT id, name, payload FROM jobs WHERE runAt < $1`, time.Now())
|
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("💀 error: ", err)
|
log.Print("💀 error: ", err)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Reference in New Issue
Block a user