@@ -42,21 +42,20 @@ const (
4242)
4343
4444const (
45- // TODO(ddelnano): Clickhouse configuration should come from plugin config.
46- schemaCreationScript = `
45+ schemaCreationScriptTmpl = `
4746import px
4847px.display(px.CreateClickHouseSchemas(
49- host="hyperdx-hdx-oss-v2-clickhouse.click.svc.cluster.local ",
50- port=9000 ,
51- username="otelcollector ",
52- password="otelcollectorpass ",
53- database="default "
48+ host="%s ",
49+ port=%s ,
50+ username="%s ",
51+ password="%s ",
52+ database="%s "
5453))
5554`
56- detectionScript = `
55+ detectionScriptTmpl = `
5756import px
5857
59- df = px.DataFrame('kubescape_logs ', clickhouse_dsn='otelcollector:otelcollectorpass@hyperdx-hdx-oss-v2-clickhouse.click.svc.cluster.local:9000/default ', start_time='-%ds')
58+ df = px.DataFrame('%s ', clickhouse_dsn='%s ', start_time='-%ds')
6059df.alert = df.message
6160df.namespace = px.pluck(df.RuntimeK8sDetails, "podNamespace")
6261df.podName = px.pluck(df.RuntimeK8sDetails, "podName")
@@ -66,6 +65,15 @@ px.display(df)
6665`
6766)
6867
68+ func renderSchemaScript (cfg config.ClickHouse ) string {
69+ return fmt .Sprintf (schemaCreationScriptTmpl ,
70+ cfg .Host (), cfg .Port (), cfg .User (), cfg .Password (), cfg .Database ())
71+ }
72+
73+ func renderDetectionScript (cfg config.ClickHouse , lookback int64 ) string {
74+ return fmt .Sprintf (detectionScriptTmpl , cfg .Table (), cfg .DSN (), lookback )
75+ }
76+
6977func main () {
7078 ctx , cancel := context .WithCancel (context .Background ())
7179 defer cancel ()
@@ -95,9 +103,9 @@ func main() {
95103 }
96104
97105 // Start schema creation background task
98- go runSchemaCreationTask (ctx , pxClient , clusterId )
106+ go runSchemaCreationTask (ctx , pxClient , clusterId , cfg . ClickHouse () )
99107
100- // Start detection script that monitors for when to enable persistence
108+ // Start detection + reconcile loop that turns the retention plugin on/off
101109 go runDetectionTask (ctx , pxClient , pluginClient , cfg , clusterId , clusterName )
102110
103111 // Wait for signal to shutdown
@@ -110,85 +118,148 @@ func main() {
110118 time .Sleep (1 * time .Second )
111119}
112120
113- func runSchemaCreationTask (ctx context.Context , client * pxapi.Client , clusterID string ) {
121+ func runSchemaCreationTask (ctx context.Context , client * pxapi.Client , clusterID string , chCfg config. ClickHouse ) {
114122 ticker := time .NewTicker (schemaCreationInterval )
115123 defer ticker .Stop ()
116124
117- // Run immediately on startup
118- log .Info ("Running schema creation script" )
119- execCtx , cancel := context .WithTimeout (ctx , scriptExecutionTimeout )
120- if _ , err := pxl .ExecuteScript (execCtx , client , clusterID , schemaCreationScript ); err != nil {
121- log .WithError (err ).Error ("failed to execute schema creation script" )
122- } else {
125+ runOnce := func () {
126+ log .Info ("Running schema creation script" )
127+ execCtx , cancel := context .WithTimeout (ctx , scriptExecutionTimeout )
128+ defer cancel ()
129+ if _ , err := pxl .ExecuteScript (execCtx , client , clusterID , renderSchemaScript (chCfg )); err != nil {
130+ log .WithError (err ).Error ("failed to execute schema creation script" )
131+ return
132+ }
123133 log .Info ("Schema creation script completed successfully" )
124134 }
125- cancel ()
126135
136+ runOnce ()
127137 for {
128138 select {
129139 case <- ctx .Done ():
130140 log .Info ("Schema creation task shutting down" )
131141 return
132142 case <- ticker .C :
133- log .Info ("Running schema creation script" )
134- execCtx , cancel := context .WithTimeout (ctx , scriptExecutionTimeout )
135- if _ , err := pxl .ExecuteScript (execCtx , client , clusterID , schemaCreationScript ); err != nil {
136- log .WithError (err ).Error ("failed to execute schema creation script" )
137- } else {
138- log .Info ("Schema creation script completed successfully" )
139- }
140- cancel ()
143+ runOnce ()
141144 }
142145 }
143146}
144147
145148func runDetectionTask (ctx context.Context , pxClient * pxapi.Client , pluginClient * pixie.Client , cfg config.Config , clusterID string , clusterName string ) {
146149 detectionInterval := time .Duration (cfg .Worker ().DetectionInterval ()) * time .Second
147150 detectionLookback := cfg .Worker ().DetectionLookback ()
151+ quietTicks := cfg .Worker ().ExportQuietTicks ()
152+ mode := cfg .Worker ().ExportMode ()
148153
149154 ticker := time .NewTicker (detectionInterval )
150155 defer ticker .Stop ()
151156
152- pluginEnabled := false
157+ // pluginEnabled tracks our last-known retention-plugin state. A nil value means
158+ // we haven't reconciled yet; we always query on the first tick.
159+ var pluginEnabled * bool
160+ quietStreak := int64 (0 )
161+
162+ reconcile := func (want bool ) {
163+ if pluginEnabled != nil && * pluginEnabled == want {
164+ log .Debugf ("export already in desired state (enabled=%v), no action taken" , want )
165+ return
166+ }
167+ pluginCtx , pluginCancel := context .WithTimeout (ctx , 2 * time .Minute )
168+ defer pluginCancel ()
169+ if want {
170+ log .Info ("Enabling forensic export" )
171+ if err := enableClickHousePlugin (pluginCtx , pluginClient , cfg , clusterID , clusterName ); err != nil {
172+ log .WithError (err ).Error ("failed to enable forensic export" )
173+ return
174+ }
175+ v := true
176+ pluginEnabled = & v
177+ log .Info ("Forensic export enabled successfully" )
178+ } else {
179+ log .Info ("Disabling forensic export" )
180+ if err := disableClickHousePlugin (pluginCtx , pluginClient , cfg , clusterID , clusterName ); err != nil {
181+ log .WithError (err ).Error ("failed to disable forensic export" )
182+ return
183+ }
184+ v := false
185+ pluginEnabled = & v
186+ quietStreak = 0
187+ log .Info ("Forensic export disabled successfully" )
188+ }
189+ }
190+
191+ log .Infof ("Detection task starting (mode=%s, quietTicks=%d)" , mode , quietTicks )
153192
154193 for {
155194 select {
156195 case <- ctx .Done ():
157196 log .Info ("Detection task shutting down" )
158197 return
159198 case <- ticker .C :
160- log .Info ("Running detection script" )
161- // Run detection script with lookback period
162- detectionPxl := fmt .Sprintf (detectionScript , detectionLookback )
199+ switch mode {
200+ case config .ExportModeAlways :
201+ reconcile (true )
202+ continue
203+ case config .ExportModeNever :
204+ reconcile (false )
205+ continue
206+ }
207+
208+ // auto mode: detection drives the state.
209+ log .Debug ("Running detection script" )
163210 execCtx , cancel := context .WithTimeout (ctx , scriptExecutionTimeout )
164- recordCount , err := pxl .ExecuteScript (execCtx , pxClient , clusterID , detectionPxl )
211+ recordCount , err := pxl .ExecuteScript (execCtx , pxClient , clusterID , renderDetectionScript ( cfg . ClickHouse (), detectionLookback ) )
165212 cancel ()
166-
167213 if err != nil {
168214 log .WithError (err ).Error ("failed to execute detection script" )
169215 continue
170216 }
171-
172217 log .Debugf ("Detection script returned %d records" , recordCount )
173218
174- // If we have records and plugin is not enabled, enable it
175- if recordCount > 0 && ! pluginEnabled {
176- log .Info ("Detection script returned records - enabling forensic export" )
177- pluginCtx , pluginCancel := context .WithTimeout (ctx , 2 * time .Minute )
178- if err := enableClickHousePlugin (pluginCtx , pluginClient , cfg , clusterID , clusterName ); err != nil {
179- log .WithError (err ).Error ("failed to enable forensic export" )
180- } else {
181- pluginEnabled = true
182- log .Info ("Forensic export enabled successfully" )
219+ if recordCount > 0 {
220+ quietStreak = 0
221+ reconcile (true )
222+ } else {
223+ quietStreak ++
224+ if quietStreak >= quietTicks {
225+ reconcile (false )
183226 }
184- pluginCancel ()
185- } else if recordCount > 0 && pluginEnabled {
186- log .Info ("Detection script returned records but forensic export already enabled, no action taken" )
187227 }
188228 }
189229 }
190230}
191231
232+ func disableClickHousePlugin (ctx context.Context , client * pixie.Client , cfg config.Config , clusterID string , clusterName string ) error {
233+ plugin , err := client .GetClickHousePlugin ()
234+ if err != nil {
235+ return fmt .Errorf ("getting data retention plugins failed: %w" , err )
236+ }
237+ if ! plugin .RetentionEnabled {
238+ log .Info ("ClickHouse plugin already disabled; removing any lingering ch-* scripts" )
239+ } else {
240+ if err := client .DisableClickHousePlugin (plugin .LatestVersion ); err != nil {
241+ return fmt .Errorf ("failed to disable ClickHouse plugin: %w" , err )
242+ }
243+ }
244+
245+ // Tear down the per-cluster ch-* retention scripts so the demo can be re-run cleanly.
246+ current , err := client .GetClusterScripts (clusterID , clusterName )
247+ if err != nil {
248+ return fmt .Errorf ("failed to list retention scripts: %w" , err )
249+ }
250+ var errs []error
251+ for _ , s := range current {
252+ log .Infof ("Deleting retention script %s" , s .Name )
253+ if err := client .DeleteDataRetentionScript (s .ScriptId ); err != nil {
254+ errs = append (errs , err )
255+ }
256+ }
257+ if len (errs ) > 0 {
258+ return fmt .Errorf ("errors while deleting retention scripts: %v" , errs )
259+ }
260+ return nil
261+ }
262+
192263func enableClickHousePlugin (ctx context.Context , client * pixie.Client , cfg config.Config , clusterID string , clusterName string ) error {
193264 log .Info ("Checking the current ClickHouse plugin configuration" )
194265 plugin , err := client .GetClickHousePlugin ()
0 commit comments