2
0
mirror of https://github.com/tenrok/event-scheduling.git synced 2026-05-15 11:59:41 +03:00

feat: update scheduler to add cron jobs

This commit is contained in:
Dipesh Dulal
2021-01-23 19:22:48 +05:45
parent 6f1b238ae2
commit ed1896b4d7
+73 -5
View File
@@ -5,12 +5,16 @@ import (
"database/sql"
"log"
"time"
"github.com/robfig/cron/v3"
)
// Scheduler data structure
type Scheduler struct {
db *sql.DB
listeners Listeners
db *sql.DB
listeners Listeners
cron *cron.Cron
cronEntries map[string]cron.EntryID
}
// Listeners has attached event listeners
@@ -24,14 +28,19 @@ type Event struct {
ID uint
Name string
Payload string
Cron string
}
// NewScheduler creates a new scheduler
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
return Scheduler{
db: db,
listeners: listeners,
db: db,
listeners: listeners,
cron: cron.New(),
cronEntries: map[string]cron.EntryID{},
}
}
// AddListener adds the listener function to Listeners
@@ -78,7 +87,7 @@ func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Dura
// checkDueEvents checks and returns due events
func (s Scheduler) checkDueEvents() []Event {
events := []Event{}
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1 AND "cron"='-'`, time.Now())
if err != nil {
log.Print("💀 error: ", err)
return nil
@@ -99,3 +108,62 @@ func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
log.Print("schedule insert error: ", err)
}
}
// ScheduleCron schedules a cron job
func (s Scheduler) ScheduleCron(event string, payload string, cron string) {
log.Print("🚀 Scheduling event ", event, " with cron string ", cron)
entryID, ok := s.cronEntries[event]
if ok {
s.cron.Remove(entryID)
_, err := s.db.Exec(`UPDATE "public"."jobs" SET "cron" = $1 , "payload" = $2 WHERE "name" = $3 AND "cron" != '-'`, cron, payload, event)
if err != nil {
log.Print("schedule cron update error: ", err)
}
} else {
_, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt", "cron") VALUES ($1, $2, $3, $4)`, event, payload, time.Now(), cron)
if err != nil {
log.Print("schedule cron insert error: ", err)
}
}
eventFn, ok := s.listeners[event]
if ok {
entryID, err := s.cron.AddFunc(cron, func() { eventFn(payload) })
s.cronEntries[event] = entryID
if err != nil {
log.Print("💀 error: ", err)
}
}
}
// attachCronJobs attaches cron jobs
func (s Scheduler) attachCronJobs() {
log.Printf("Attaching cron jobs")
rows, err := s.db.Query(`SELECT "id", "name", "payload", "cron" FROM "public"."jobs" WHERE "cron"!='-'`)
if err != nil {
log.Print("💀 error: ", err)
}
for rows.Next() {
evt := Event{}
rows.Scan(&evt.ID, &evt.Name, &evt.Payload, &evt.Cron)
eventFn, ok := s.listeners[evt.Name]
if ok {
entryID, err := s.cron.AddFunc(evt.Cron, func() { eventFn(evt.Payload) })
s.cronEntries[evt.Name] = entryID
if err != nil {
log.Print("💀 error: ", err)
}
}
}
}
// StartCron starts cron job
func (s Scheduler) StartCron() func() {
s.attachCronJobs()
s.cron.Start()
return func() {
s.cron.Stop()
}
}