@@ -17,18 +17,19 @@ import (
1717
1818// Airflow service
1919type Airflow struct {
20- url string
21- recovered bool
22- lastDownTime string
23- schedulerStatus string
24- metadatabaseStatus string
25- enabled bool
26- verbose bool
27- logger * log.Logger
28- checkInterval int
29- stopChan chan bool
30- message string
31- notificatorConfig config.Config
20+ url string
21+ recovered bool
22+ lastDownTime string
23+ schedulerStatus string
24+ latestSchedulerHeartbeat string
25+ metadatabaseStatus string
26+ enabled bool
27+ verbose bool
28+ logger * log.Logger
29+ checkInterval int
30+ stopChan chan bool
31+ message string
32+ notificatorConfig config.Config
3233}
3334
3435// Airflow's constructor
@@ -72,35 +73,86 @@ func (a *Airflow) checkClusterStatus(resp *http.Response) error {
7273 return err
7374 }
7475
75- schedulerStatus , ok := data ["scheduler" ].(map [string ]interface {})[ "status" ].( string )
76+ schedulerRaw , ok := data ["scheduler" ].(map [string ]interface {})
7677 if ! ok {
7778 if a .verbose {
78- a .logger .Printf ("cannot read scheduler status: %v \n " , err )
79+ a .logger .Println ("cannot read scheduler block (not a map)" )
7980 }
81+ return errors .New ("invalid scheduler block" )
82+ }
8083
81- return err
84+ schedulerStatus , ok := schedulerRaw ["status" ].(string )
85+ if ! ok {
86+ if a .verbose {
87+ a .logger .Println ("cannot read scheduler status (not a string)" )
88+ }
89+
90+ return errors .New ("invalid scheduler status" )
91+ }
92+
93+ latestSchedulerHeartbeat , ok := schedulerRaw ["latest_scheduler_heartbeat" ].(string )
94+ if ! ok {
95+ if a .verbose {
96+ a .logger .Println ("cannot read scheduler latest_scheduler_heartbeat (not a string)" )
97+ }
98+
99+ return errors .New ("invalid latest_scheduler_heartbeat" )
100+ }
101+
102+ utcTime , err := time .Parse (time .RFC3339Nano , latestSchedulerHeartbeat )
103+ if err != nil {
104+ if a .verbose {
105+ a .logger .Printf ("failed to parse time: %v\n " , err )
106+ }
107+
108+ return errors .New ("invalid latest_scheduler_heartbeat" )
82109 }
110+
111+ timezoneJakarta , err := time .LoadLocation ("Asia/Jakarta" )
112+ if err != nil {
113+ if a .verbose {
114+ a .logger .Printf ("failed to load WIB location: %v\n " , err )
115+ }
116+
117+ return errors .New ("invalid timezone" )
118+ }
119+
120+ wibTime := utcTime .In (timezoneJakarta )
121+ a .latestSchedulerHeartbeat = wibTime .String ()
122+
83123 a .schedulerStatus = schedulerStatus
84124
85- metadatabaseStatus , ok := data ["metadatabase" ].(map [string ]interface {})[ "status" ].( string )
125+ metadatabaseRaw , ok := data ["metadatabase" ].(map [string ]interface {})
86126 if ! ok {
87127 if a .verbose {
88- a .logger .Printf ("cannot read metadatabase status: %v \n " , err )
128+ a .logger .Println ("cannot read metadatabase block (not a map)" )
89129 }
130+ return errors .New ("invalid metadatabase block" )
131+ }
90132
91- return err
133+ metadatabaseStatus , ok := metadatabaseRaw ["status" ].(string )
134+ if ! ok {
135+ if a .verbose {
136+ a .logger .Println ("cannot read metadatabase status" )
137+ }
138+
139+ return errors .New ("invalid metadatabase status" )
92140 }
141+
93142 a .metadatabaseStatus = metadatabaseStatus
94143
144+ message := fmt .Sprintf ("Airflow Scheduler Status: %s\n Latest Scheduler Heartbeat: %s\n \n \n Airflow Metadatabase: %s" ,
145+ a .schedulerStatus , a .latestSchedulerHeartbeat , a .metadatabaseStatus )
95146 if a .schedulerStatus != "healthy" || a .metadatabaseStatus != "healthy" {
96- errMsg := fmt .Sprintf ("airflow is unhealthy: scheduler (%s), metadatabase (%s)" , a .schedulerStatus , a .metadatabaseStatus )
97147 if a .verbose {
98- a .logger .Println (errMsg )
148+ a .logger .Println (message )
99149 }
100150
101- return errors .New (errMsg )
151+ return errors .New (message )
102152 }
103153
154+ a .SetMessage (message )
155+
104156 return nil
105157}
106158
0 commit comments