diff --git a/db.go b/db.go index 01bf955..91c625f 100644 --- a/db.go +++ b/db.go @@ -26,8 +26,14 @@ func seedDB(db *sql.DB) error { "id" SERIAL PRIMARY KEY, "name" varchar(50) NOT NULL, "payload" text, - "runAt" TIMESTAMP NOT NULL + "runAt" TIMESTAMP NOT NULL, + "cron" varchar(50) DEFAULT '-' ) `) + + if err != nil { + log.Panic("query error: ", err) + } + return err } diff --git a/go.mod b/go.mod index 5e241fd..477c68a 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.15 require ( github.com/joho/godotenv v1.3.0 github.com/lib/pq v1.9.0 + github.com/robfig/cron/v3 v3.0.0 ) diff --git a/go.sum b/go.sum index 5772932..ecbb2fc 100644 --- a/go.sum +++ b/go.sum @@ -223,6 +223,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E= +github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= diff --git a/main.go b/main.go index cee2957..2ec53b0 100644 --- a/main.go +++ b/main.go @@ -32,11 +32,16 @@ func main() { scheduler := NewScheduler(db, eventListeners) + stopCron := scheduler.StartCron() + defer stopCron() + scheduler.CheckEventsInInterval(ctx, time.Minute) scheduler.Schedule("SendEmail", "mail: nilkantha.dipesh@gmail.com", time.Now().Add(1*time.Minute)) scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute)) + scheduler.ScheduleCron("SendEmail", "mail: dipesh.dulal+new@wesionary.team", "* * * * *") + go func() { for range interrupt { log.Println("\nāŒ Interrupt received closing...") diff --git a/scheduler.go b/scheduler.go index be761a9..913c46f 100644 --- a/scheduler.go +++ b/scheduler.go @@ -5,12 +5,16 @@ import ( "database/sql" "log" "time" + + "github.com/robfig/cron/v3" ) // Scheduler data structure type Scheduler struct { - db *sql.DB - listeners Listeners + db *sql.DB + listeners Listeners + cron *cron.Cron + cronEntries map[string]cron.EntryID } // Listeners has attached event listeners @@ -24,14 +28,19 @@ type Event struct { ID uint Name string Payload string + Cron string } // NewScheduler creates a new scheduler func NewScheduler(db *sql.DB, listeners Listeners) Scheduler { + return Scheduler{ - db: db, - listeners: listeners, + db: db, + listeners: listeners, + cron: cron.New(), + cronEntries: map[string]cron.EntryID{}, } + } // AddListener adds the listener function to Listeners @@ -78,7 +87,7 @@ func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Dura // checkDueEvents checks and returns due events func (s Scheduler) checkDueEvents() []Event { events := []Event{} - rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now()) + rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1 AND "cron"='-'`, time.Now()) if err != nil { log.Print("šŸ’€ error: ", err) return nil @@ -99,3 +108,62 @@ func (s Scheduler) Schedule(event string, payload string, runAt time.Time) { log.Print("schedule insert error: ", err) } } + +// ScheduleCron schedules a cron job +func (s Scheduler) ScheduleCron(event string, payload string, cron string) { + log.Print("šŸš€ Scheduling event ", event, " with cron string ", cron) + entryID, ok := s.cronEntries[event] + if ok { + s.cron.Remove(entryID) + _, err := s.db.Exec(`UPDATE "public"."jobs" SET "cron" = $1 , "payload" = $2 WHERE "name" = $3 AND "cron" != '-'`, cron, payload, event) + if err != nil { + log.Print("schedule cron update error: ", err) + } + } else { + _, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt", "cron") VALUES ($1, $2, $3, $4)`, event, payload, time.Now(), cron) + if err != nil { + log.Print("schedule cron insert error: ", err) + } + } + + eventFn, ok := s.listeners[event] + if ok { + entryID, err := s.cron.AddFunc(cron, func() { eventFn(payload) }) + s.cronEntries[event] = entryID + if err != nil { + log.Print("šŸ’€ error: ", err) + } + } +} + +// attachCronJobs attaches cron jobs +func (s Scheduler) attachCronJobs() { + log.Printf("Attaching cron jobs") + rows, err := s.db.Query(`SELECT "id", "name", "payload", "cron" FROM "public"."jobs" WHERE "cron"!='-'`) + if err != nil { + log.Print("šŸ’€ error: ", err) + } + for rows.Next() { + evt := Event{} + rows.Scan(&evt.ID, &evt.Name, &evt.Payload, &evt.Cron) + eventFn, ok := s.listeners[evt.Name] + if ok { + entryID, err := s.cron.AddFunc(evt.Cron, func() { eventFn(evt.Payload) }) + s.cronEntries[evt.Name] = entryID + + if err != nil { + log.Print("šŸ’€ error: ", err) + } + } + } +} + +// StartCron starts cron job +func (s Scheduler) StartCron() func() { + s.attachCronJobs() + s.cron.Start() + + return func() { + s.cron.Stop() + } +}