mirror of
https://github.com/tenrok/event-scheduling.git
synced 2026-06-23 20:40:39 +03:00
Merge pull request #2 from dipeshdulal/feat/cron-jobs
Add cron jobs scheduling support
This commit is contained in:
@@ -26,8 +26,14 @@ func seedDB(db *sql.DB) error {
|
|||||||
"id" SERIAL PRIMARY KEY,
|
"id" SERIAL PRIMARY KEY,
|
||||||
"name" varchar(50) NOT NULL,
|
"name" varchar(50) NOT NULL,
|
||||||
"payload" text,
|
"payload" text,
|
||||||
"runAt" TIMESTAMP NOT NULL
|
"runAt" TIMESTAMP NOT NULL,
|
||||||
|
"cron" varchar(50) DEFAULT '-'
|
||||||
)
|
)
|
||||||
`)
|
`)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Panic("query error: ", err)
|
||||||
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,4 +5,5 @@ go 1.15
|
|||||||
require (
|
require (
|
||||||
github.com/joho/godotenv v1.3.0
|
github.com/joho/godotenv v1.3.0
|
||||||
github.com/lib/pq v1.9.0
|
github.com/lib/pq v1.9.0
|
||||||
|
github.com/robfig/cron/v3 v3.0.0
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -223,6 +223,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
|
|||||||
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
|
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
|
||||||
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||||
|
github.com/robfig/cron/v3 v3.0.0 h1:kQ6Cb7aHOHTSzNVNEhmp8EcWKLb4CbiMW9h9VyIhO4E=
|
||||||
|
github.com/robfig/cron/v3 v3.0.0/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
|
||||||
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
|
||||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||||
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
|
||||||
|
|||||||
@@ -32,11 +32,16 @@ func main() {
|
|||||||
|
|
||||||
scheduler := NewScheduler(db, eventListeners)
|
scheduler := NewScheduler(db, eventListeners)
|
||||||
|
|
||||||
|
stopCron := scheduler.StartCron()
|
||||||
|
defer stopCron()
|
||||||
|
|
||||||
scheduler.CheckEventsInInterval(ctx, time.Minute)
|
scheduler.CheckEventsInInterval(ctx, time.Minute)
|
||||||
|
|
||||||
scheduler.Schedule("SendEmail", "mail: nilkantha.dipesh@gmail.com", time.Now().Add(1*time.Minute))
|
scheduler.Schedule("SendEmail", "mail: nilkantha.dipesh@gmail.com", time.Now().Add(1*time.Minute))
|
||||||
scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))
|
scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))
|
||||||
|
|
||||||
|
scheduler.ScheduleCron("SendEmail", "mail: dipesh.dulal+new@wesionary.team", "* * * * *")
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for range interrupt {
|
for range interrupt {
|
||||||
log.Println("\n❌ Interrupt received closing...")
|
log.Println("\n❌ Interrupt received closing...")
|
||||||
|
|||||||
+73
-5
@@ -5,12 +5,16 @@ import (
|
|||||||
"database/sql"
|
"database/sql"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Scheduler data structure
|
// Scheduler data structure
|
||||||
type Scheduler struct {
|
type Scheduler struct {
|
||||||
db *sql.DB
|
db *sql.DB
|
||||||
listeners Listeners
|
listeners Listeners
|
||||||
|
cron *cron.Cron
|
||||||
|
cronEntries map[string]cron.EntryID
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listeners has attached event listeners
|
// Listeners has attached event listeners
|
||||||
@@ -24,14 +28,19 @@ type Event struct {
|
|||||||
ID uint
|
ID uint
|
||||||
Name string
|
Name string
|
||||||
Payload string
|
Payload string
|
||||||
|
Cron string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewScheduler creates a new scheduler
|
// NewScheduler creates a new scheduler
|
||||||
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
|
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
|
||||||
|
|
||||||
return Scheduler{
|
return Scheduler{
|
||||||
db: db,
|
db: db,
|
||||||
listeners: listeners,
|
listeners: listeners,
|
||||||
|
cron: cron.New(),
|
||||||
|
cronEntries: map[string]cron.EntryID{},
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddListener adds the listener function to Listeners
|
// AddListener adds the listener function to Listeners
|
||||||
@@ -78,7 +87,7 @@ func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Dura
|
|||||||
// checkDueEvents checks and returns due events
|
// checkDueEvents checks and returns due events
|
||||||
func (s Scheduler) checkDueEvents() []Event {
|
func (s Scheduler) checkDueEvents() []Event {
|
||||||
events := []Event{}
|
events := []Event{}
|
||||||
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
|
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1 AND "cron"='-'`, time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Print("💀 error: ", err)
|
log.Print("💀 error: ", err)
|
||||||
return nil
|
return nil
|
||||||
@@ -99,3 +108,62 @@ func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
|
|||||||
log.Print("schedule insert error: ", err)
|
log.Print("schedule insert error: ", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ScheduleCron schedules a cron job
|
||||||
|
func (s Scheduler) ScheduleCron(event string, payload string, cron string) {
|
||||||
|
log.Print("🚀 Scheduling event ", event, " with cron string ", cron)
|
||||||
|
entryID, ok := s.cronEntries[event]
|
||||||
|
if ok {
|
||||||
|
s.cron.Remove(entryID)
|
||||||
|
_, err := s.db.Exec(`UPDATE "public"."jobs" SET "cron" = $1 , "payload" = $2 WHERE "name" = $3 AND "cron" != '-'`, cron, payload, event)
|
||||||
|
if err != nil {
|
||||||
|
log.Print("schedule cron update error: ", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
_, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt", "cron") VALUES ($1, $2, $3, $4)`, event, payload, time.Now(), cron)
|
||||||
|
if err != nil {
|
||||||
|
log.Print("schedule cron insert error: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
eventFn, ok := s.listeners[event]
|
||||||
|
if ok {
|
||||||
|
entryID, err := s.cron.AddFunc(cron, func() { eventFn(payload) })
|
||||||
|
s.cronEntries[event] = entryID
|
||||||
|
if err != nil {
|
||||||
|
log.Print("💀 error: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// attachCronJobs attaches cron jobs
|
||||||
|
func (s Scheduler) attachCronJobs() {
|
||||||
|
log.Printf("Attaching cron jobs")
|
||||||
|
rows, err := s.db.Query(`SELECT "id", "name", "payload", "cron" FROM "public"."jobs" WHERE "cron"!='-'`)
|
||||||
|
if err != nil {
|
||||||
|
log.Print("💀 error: ", err)
|
||||||
|
}
|
||||||
|
for rows.Next() {
|
||||||
|
evt := Event{}
|
||||||
|
rows.Scan(&evt.ID, &evt.Name, &evt.Payload, &evt.Cron)
|
||||||
|
eventFn, ok := s.listeners[evt.Name]
|
||||||
|
if ok {
|
||||||
|
entryID, err := s.cron.AddFunc(evt.Cron, func() { eventFn(evt.Payload) })
|
||||||
|
s.cronEntries[evt.Name] = entryID
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.Print("💀 error: ", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartCron starts cron job
|
||||||
|
func (s Scheduler) StartCron() func() {
|
||||||
|
s.attachCronJobs()
|
||||||
|
s.cron.Start()
|
||||||
|
|
||||||
|
return func() {
|
||||||
|
s.cron.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user