Skip to content

Commit 19ac4ed

Browse files
committed
Discover Index ID
Discover Index ID by spawning a pod with the rag-content container image. By inspecting the value stored in INDEX_NAME env variable we can conclude what Index ID value should be utilized [1]. This is only temporarly solution until the IndexID gets removed from required fields in OLSConfig. [1] https://github.com/openstack-lightspeed/rag-content/blob/5e8339870c3600120bf6080e68a6ffb14a115d61/Containerfile#L53C5-L53C15
1 parent 5269d72 commit 19ac4ed

4 files changed

Lines changed: 187 additions & 16 deletions

File tree

bindata/rbac/rbac.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,12 @@ rules:
113113
- patch
114114
- update
115115
- watch
116+
- apiGroups:
117+
- ""
118+
resources:
119+
- pods/log
120+
verbs:
121+
- get
116122
- apiGroups:
117123
- ""
118124
resources:

config/rbac/role.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ rules:
7070
- patch
7171
- update
7272
- watch
73+
- apiGroups:
74+
- ""
75+
resources:
76+
- pods/log
77+
verbs:
78+
- get
7379
- apiGroups:
7480
- ""
7581
resources:

controllers/lightspeed/openstacklightspeed_controller.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ func (r *OpenStackLightspeedReconciler) GetLogger(ctx context.Context) logr.Logg
5858
// +kubebuilder:rbac:groups=ols.openshift.io,resources=olsconfigs/status,verbs=get;list;watch;create;update;patch;delete
5959
// +kubebuilder:rbac:groups=ols.openshift.io,resources=olsconfigs/finalizers,verbs=get;list;watch;create;update;patch;delete
6060
// +kubebuilder:rbac:groups=operators.coreos.com,resources=clusterserviceversions,verbs=get;list;
61+
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete;
62+
// +kubebuilder:rbac:groups="",resources=pods,verbs=create;delete;get;list;patch;update;watch
63+
// +kubebuilder:rbac:groups="",resources=pods/log,verbs=get
6164

6265
// Reconcile is part of the main kubernetes reconciliation loop which aims to
6366
// move the current state of the cluster closer to the desired state.
@@ -142,6 +145,19 @@ func (r *OpenStackLightspeedReconciler) Reconcile(ctx context.Context, req ctrl.
142145
return ctrl.Result{}, fmt.Errorf("installation of OpenShift LightSpeed not detected: %w", err)
143146
}
144147

148+
indexID, result, err := lightspeed.ResolveIndexID(ctx, helper, instance)
149+
if err != nil {
150+
instance.Status.Conditions.Set(condition.FalseCondition(
151+
lightspeedv1.OpenStackLightspeedReadyCondition,
152+
condition.ErrorReason,
153+
condition.SeverityWarning,
154+
condition.DeploymentReadyErrorMessage,
155+
err.Error()))
156+
return result, err
157+
} else if (result != ctrl.Result{}) {
158+
return result, nil
159+
}
160+
145161
// NOTE: We cannot consume the OLSConfig definition directly from the OLS operator's code due to
146162
// a conflict in Go versions. When this comment was written, the min. required Go version for
147163
// openstack-operator was 1.21 whereas OLS operator required at least Go version 1.23. Once the
@@ -170,7 +186,7 @@ func (r *OpenStackLightspeedReconciler) Reconcile(ctx context.Context, req ctrl.
170186
return fmt.Errorf("OLSConfig is managed by different OpenStackLightspeed instance")
171187
}
172188

173-
err = lightspeed.PatchOLSConfig(&olsConfig, instance, helper)
189+
err = lightspeed.PatchOLSConfig(helper, instance, &olsConfig, indexID)
174190
if err != nil {
175191
return err
176192
}

pkg/lightspeed/funcs.go

Lines changed: 158 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,21 @@ package lightspeed
1818

1919
import (
2020
"context"
21+
"crypto/sha256"
2122
"encoding/json"
2223
"fmt"
24+
"io"
25+
batchv1 "k8s.io/api/batch/v1"
26+
"k8s.io/client-go/kubernetes"
27+
ctrl "sigs.k8s.io/controller-runtime"
28+
"sigs.k8s.io/controller-runtime/pkg/client"
29+
"sigs.k8s.io/controller-runtime/pkg/client/config"
2330
"strings"
31+
"time"
2432

2533
common_helper "github.com/openstack-k8s-operators/lib-common/modules/common/helper"
2634
lightspeedv1beta1 "github.com/openstack-k8s-operators/openstack-operator/apis/lightspeed/v1beta1"
35+
corev1 "k8s.io/api/core/v1"
2736
k8s_errors "k8s.io/apimachinery/pkg/api/errors"
2837
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2938
uns "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -44,6 +53,10 @@ const (
4453
// located
4554
OpenStackLightspeedVectorDBPath = "/rag/vector_db/os_product_docs"
4655

56+
// OpenStackLightspeedJobName - name of the pod that is used to discover environment variables inside of the RAG
57+
// container image
58+
OpenStackLightspeedJobName = "openstack-lightspeed"
59+
4760
// OLSConfigName - OLS forbids other name for OLSConfig instance than OLSConfigName
4861
OLSConfigName = "cluster"
4962
)
@@ -98,9 +111,10 @@ func IsOLSOperatorInstalled(ctx context.Context, helper *common_helper.Helper) (
98111

99112
// PatchOLSConfig patches OLSConfig with information from OpenStackLightspeed instance.
100113
func PatchOLSConfig(
101-
olsConfig *uns.Unstructured,
102-
instance *lightspeedv1beta1.OpenStackLightspeed,
103114
helper *common_helper.Helper,
115+
instance *lightspeedv1beta1.OpenStackLightspeed,
116+
olsConfig *uns.Unstructured,
117+
indexID string,
104118
) error {
105119
// 1. Patch the Providers section
106120
providersPatch := []interface{}{
@@ -124,11 +138,6 @@ func PatchOLSConfig(
124138
return err
125139
}
126140

127-
indexID, err := getIndexID(instance.Spec.RAGImage)
128-
if err != nil {
129-
return err
130-
}
131-
132141
// 2. Patch the RAG section
133142
openstackRAG := []interface{}{
134143
map[string]interface{}{
@@ -143,7 +152,7 @@ func PatchOLSConfig(
143152
}
144153

145154
tlsCaCertBundle := StringPtrValue(instance.Spec.TLSCACertBundle)
146-
err = uns.SetNestedField(olsConfig.Object, tlsCaCertBundle, "spec", "ols", "additionalCAConfigMapRef", "name")
155+
err := uns.SetNestedField(olsConfig.Object, tlsCaCertBundle, "spec", "ols", "additionalCAConfigMapRef", "name")
147156
if err != nil {
148157
return err
149158
}
@@ -216,15 +225,50 @@ func IsOLSConfigReady(ctx context.Context, helper *common_helper.Helper) (bool,
216225
return true, nil
217226
}
218227

219-
// getIndexID - returns index ID for the data stored in the vector DB container image.
220-
// It expects that the index ID is equal to the image tag.
221-
func getIndexID(imageName string) (string, error) {
222-
imageNameSections := strings.Split(imageName, ":")
223-
if len(imageNameSections) != 2 {
224-
return "", fmt.Errorf("failed to discvoer index ID")
228+
// ResolveIndexID - returns index ID for the data stored in the vector DB container image. The discovery of the
229+
// index ID is done through spawning a pod with the rag-content image and looking at the INDEX_NAME env variable value.
230+
func ResolveIndexID(
231+
ctx context.Context,
232+
helper *common_helper.Helper,
233+
instance *lightspeedv1beta1.OpenStackLightspeed,
234+
) (string, ctrl.Result, error) {
235+
result, err := createOLSPod(ctx, helper, instance)
236+
if err != nil {
237+
return "", result, err
238+
}
239+
240+
podList := &corev1.PodList{}
241+
labelSelector := client.MatchingLabels{"app": OpenStackLightspeedJobName}
242+
if err := helper.GetClient().List(ctx, podList, client.InNamespace(instance.Namespace), labelSelector); err != nil {
243+
return "", ctrl.Result{}, err
225244
}
226245

227-
return imageNameSections[1], nil
246+
var OLSPod *corev1.Pod
247+
for _, pod := range podList.Items {
248+
if pod.Spec.Containers[0].Image == instance.Spec.RAGImage {
249+
OLSPod = &pod
250+
break
251+
}
252+
}
253+
if OLSPod == nil {
254+
helper.GetLogger().Info("waiting for OpenStackLightspeed pod")
255+
return "", ctrl.Result{RequeueAfter: 5 * time.Second}, nil
256+
}
257+
258+
switch OLSPod.Status.Phase {
259+
case corev1.PodSucceeded:
260+
indexName, err := extractEnvFromPodLogs(ctx, OLSPod, "INDEX_NAME")
261+
if err != nil && k8s_errors.IsNotFound(err) {
262+
helper.GetLogger().Info("waiting for OpenStackLightspeed pod")
263+
return "", ctrl.Result{RequeueAfter: 5 * time.Second}, nil
264+
}
265+
return indexName, ctrl.Result{}, err
266+
case corev1.PodFailed:
267+
return "", ctrl.Result{}, fmt.Errorf("failed to start OpenStack Lightpseed RAG pod")
268+
default:
269+
helper.GetLogger().Info("waiting for OpenStackLightspeed pod")
270+
return "", ctrl.Result{RequeueAfter: 5 * time.Second}, nil
271+
}
228272
}
229273

230274
// StringPtrValue - dereference safely string pointer
@@ -235,3 +279,102 @@ func StringPtrValue(s *string) string {
235279

236280
return *s
237281
}
282+
283+
// extractEnvFromPodLogs - discovers an environment variable value from the pod logs. The pod must be started using
284+
// createOLSPod.
285+
func extractEnvFromPodLogs(ctx context.Context, pod *corev1.Pod, envVarName string) (string, error) {
286+
cfg, err := config.GetConfig()
287+
if err != nil {
288+
return "", err
289+
}
290+
291+
k8sClient, err := kubernetes.NewForConfig(cfg)
292+
if err != nil {
293+
return "", err
294+
}
295+
296+
req := k8sClient.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{})
297+
podLogs, err := req.Stream(ctx)
298+
if err != nil {
299+
return "", err
300+
}
301+
defer podLogs.Close()
302+
303+
buf := new(strings.Builder)
304+
_, err = io.Copy(buf, podLogs)
305+
if err != nil {
306+
return "", fmt.Errorf("error in copying logs: %w", err)
307+
}
308+
309+
logs := buf.String()
310+
for _, envLine := range strings.Split(logs, "\n") {
311+
parts := strings.Split(envLine, "=")
312+
if len(parts) != 2 {
313+
continue
314+
}
315+
316+
if parts[0] == envVarName {
317+
return parts[1], nil
318+
}
319+
}
320+
321+
return "", fmt.Errorf("env var not discovered: %s", envVarName)
322+
}
323+
324+
// createOLSPod - starts OLS pod with entrypoint that lists environment variables after the start of the pod. It used
325+
// to discover INDEX_NAME value.
326+
func createOLSPod(
327+
ctx context.Context,
328+
helper *common_helper.Helper,
329+
instance *lightspeedv1beta1.OpenStackLightspeed,
330+
) (ctrl.Result, error) {
331+
imageHash := sha256.Sum256([]byte(instance.Spec.RAGImage))
332+
imageHashStr := fmt.Sprintf("%x", imageHash)
333+
imageHashStr = imageHashStr[len(imageHashStr)-9:]
334+
imageName := fmt.Sprintf("%s-%s", OpenStackLightspeedJobName, imageHashStr)
335+
336+
ttlSecondsAfterFinished := int32(600) // 10 mins
337+
activeDeadlineSeconds := int64(1200) // 20 mins
338+
OLSPod := &batchv1.Job{
339+
ObjectMeta: metav1.ObjectMeta{
340+
Name: imageName,
341+
Namespace: instance.Namespace,
342+
Labels: map[string]string{
343+
"app": OpenStackLightspeedJobName,
344+
},
345+
},
346+
Spec: batchv1.JobSpec{
347+
TTLSecondsAfterFinished: &ttlSecondsAfterFinished,
348+
ActiveDeadlineSeconds: &activeDeadlineSeconds,
349+
Template: corev1.PodTemplateSpec{
350+
ObjectMeta: metav1.ObjectMeta{
351+
Labels: map[string]string{
352+
"app": OpenStackLightspeedJobName,
353+
},
354+
},
355+
Spec: corev1.PodSpec{
356+
Containers: []corev1.Container{
357+
{
358+
Name: "rag-content",
359+
Image: instance.Spec.RAGImage,
360+
Command: []string{"/bin/sh", "-c"},
361+
Args: []string{"env"},
362+
},
363+
},
364+
RestartPolicy: corev1.RestartPolicyNever,
365+
},
366+
},
367+
},
368+
}
369+
370+
if err := controllerutil.SetControllerReference(instance, OLSPod, helper.GetScheme()); err != nil {
371+
return ctrl.Result{}, err
372+
}
373+
374+
err := helper.GetClient().Create(ctx, OLSPod)
375+
if err != nil && !k8s_errors.IsAlreadyExists(err) {
376+
return ctrl.Result{}, err
377+
}
378+
379+
return ctrl.Result{}, nil
380+
}

0 commit comments

Comments
 (0)