Skip to content

Commit 5bda2d4

Browse files
authored
Ap 681 : Create Airflow connection for Langfuse (#64)
1 parent 9303280 commit 5bda2d4

5 files changed

Lines changed: 128 additions & 20 deletions

File tree

README.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ Important environment variables for our build/environment:
8686
| `AIRFLOW__API__BASE_URL` | Where Airflow's webserver is reachable. | `AIRFLOW__API__BASE_URL="http://localhost:8080"` |
8787
| `AIRFLOW__CORE__FERNET_KEY` | [Fernet](https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html) encryption key used to encrypt Airflow secrets | `AIRFLOW__CORE__FERNET_KEY="somebase64value="` |
8888
| `AIRFLOW__API_AUTH__JWT_SECRET` | Secret key used to sign JWT tokens for Airflow's API authentication. The default value used in development and testing should be replaced in production. | `AIRFLOW__API_AUTH__JWT_SECRET="some32bytesecret"` |
89+
| `AIRFLOW_CONN_LANGFUSE_DEFAULT` | Airflow connection string for Langfuse access.<br>Note: This will create a `langfuse_default` conn_id, will not store the connection in the Airflow metastore, and will override any other connection settings. | `AIRFLOW_CONN_LANGFUSE_DEFAULT='{"conn_type":"langfuse","host":"us.cloud.langfuse.com","schema":"https","login":"pk-lf-blah-blah-blah","password":"ssk-lf-blah-blah-blah"}'`|
8990
| `AIRFLOW_CONN_TIND_DEFAULT` | (Optional override) Airflow connection json string for TIND access.<br>Note: This will create a `tind_default` conn_id, will not store the connection in the Airflow metastore, and will override any other connection settings. | `AIRFLOW_CONN_TIND_DEFAULT='{"conn_type": "http","password": "your-tind-key-here","host": "https://digicoll.lib.berkeley.edu/api/v1","schema": "https"}'` |
9091
| `OIDC_CLIENT_SECRET` | Client secret for OIDC authentication. Used by the Airflow webserver to authenticate OIDC token requests. In development, also used by `keycloak-config-cli` to configure the client secret. This should match Keycloak configuration in development and testing, and CalNet in production. | `OIDC_CLIENT_SECRET="some32charactersecret"` |
9192
| `OIDC_NAME` | Name appended to the OIDC login button | `OIDC_NAME="keycloak"` |
@@ -98,9 +99,6 @@ Important environment variables for our build/environment:
9899
| `TIND_API_URL` | URL for TIND access | `TIND_API_URL="https://digicoll.lib.berkeley.edu/api/v1"` |
99100
| `TIND_IIIF_MANIFEST_URL_PATTERN` | URL pattern for TIND IIIF manifests | `TIND_IIIF_MANIFEST_URL_PATTERN="https://digicoll.lib.berkeley.edu/record/{tind_id}/export/iiif_manifest"` |
100101
| `MOKELUMNE_TIND_DOWNLOAD_DIR` | Path for downloaded image cache | `MOKELUMNE_TIND_DOWNLOAD_DIR="/some/path/to/download/to"` |
101-
|`LANGFUSE_HOST`|Host for Langfuse|`LANGFUSE_HOST="https://us.cloud.langfuse.com"`|
102-
|`LANGFUSE_SECRET_KEY`|sets langfuse secret key|`LANGFUSE_SECRET_KEY="sk-lf-blah-blah-blah"`|
103-
|`LANGFUSE_PUBLIC_KEY`|sets langfuse public key|`LANGFUSE_PUBLIC_KEY="pk-lf-blah-blah-blah"`|
104102
|`AWS_ENDPOINT_URL`|AWS endpoint (don't forget the `https://`!)|`AWS_ENDPOINT_URL="https://bedrock-runtime.us-west-1.amazonaws.com"`|
105103
|`AWS_DEFAULT_REGION`|The AWS region to use; you probably want `us-west-1`.|`AWS_DEFAULT_REGION=us-west-1`|
106104
|`AWS_BEARER_TOKEN_BEDROCK`|The IAM credential to use to access AWS. Use a short-term API key.<br>The key will expire after AWS console logout or 12 hours (whichever comes first).<br>Make sure that your region for the key matches the region above.|`AWS_BEARER_TOKEN_BEDROCK="bedrock-api-key-blah-blah-blah"`|

example.env

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ AIRFLOW__API_AUTH__JWT_SECRET=
55
# @see https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/fernet.html#generating-fernet-key
66
AIRFLOW__CORE__FERNET_KEY=
77

8+
AIRFLOW_CONN_LANGFUSE_DEFAULT='{"conn_type":"langfuse","host":"us.cloud.langfuse.com","schema":"https","login":"pk-lf-blah-blah-blah","password":"ssk-lf-blah-blah-blah"}'
9+
810
# @see README.md for how to use the optional AIRFLOW_CONN override ariable.
911
# AIRFLOW_CONN_TIND_DEFAULT='{"conn_type": "http","password": "your-tind-key-here","host": "https://digicoll.lib.berkeley.edu/api/v1","schema": "https"}'
1012

@@ -29,10 +31,6 @@ TIND_API_KEY=your-tind-key-here
2931
TIND_API_URL=https://digicoll.lib.berkeley.edu/api/v1
3032
TIND_IIIF_MANIFEST_URL_PATTERN=https://digicoll.lib.berkeley.edu/record/{tind_id}/export/iiif_manifest
3133

32-
LANGFUSE_HOST=https://us.cloud.langfuse.com
33-
LANGFUSE_SECRET_KEY=sk-lf-blah-blah-blah
34-
LANGFUSE_PUBLIC_KEY=pk-lf-blah-blah-blah
35-
3634
AWS_ENDPOINT_URL=https://bedrock-runtime.us-west-1.amazonaws.com
3735
AWS_DEFAULT_REGION=us-west-1
3836
AWS_BEARER_TOKEN_BEDROCK=bedrock-api-key-blah-blah-blah

mokelumne/dags/gen_llm_image_descriptions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ def get_prompt() -> dict[str, str]:
262262
context = get_current_context()
263263
prompt = langfuse.get_prompt(
264264
name=context["params"]["langfuse_prompt_name"],
265-
version_or_label=context["params"]["langfuse_prompt_version_or_label"],
265+
version_or_label=context["params"]["langfuse_prompt_version_or_label"]
266266
)
267267

268268
return {"prompt": prompt.prompt, "version": prompt.version}

mokelumne/util/langfuse.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
from collections import namedtuple
77
from os import environ as ENV
88

9+
from airflow.sdk import BaseHook
910
from langfuse import Langfuse
1011

11-
1212
Prompt = namedtuple('Prompt', ['prompt', 'version'])
1313
Prompt.__doc__ += ': Contains a LLM prompt for generating image descriptions.'
1414
Prompt.prompt.__doc__ = 'The LLM prompt.'
@@ -17,10 +17,34 @@
1717
logger = logging.getLogger(__name__)
1818

1919

20-
def get_prompt(name: str, version_or_label: int | str) -> Prompt:
20+
def _get_langfuse_connection_settings(conn_id: str) -> tuple[str, str, str]:
21+
"""Return host/public/secret key tuple from the Langfuse Airflow connection."""
22+
conn = BaseHook.get_connection(conn_id)
23+
base_url = f'{conn.schema}://{conn.host}'
24+
public_key = conn.login
25+
secret_key = conn.password
26+
27+
if not public_key or not secret_key:
28+
raise ValueError(
29+
f'Missing Langfuse credentials in Airflow connection {conn_id}. '
30+
'Set login/password on the connection.'
31+
)
32+
return base_url, public_key, secret_key
33+
34+
def get_langfuse_client(conn_id: str) -> Langfuse:
35+
"""Return a Langfuse client configured from the ``langfuse_default`` Airflow connection."""
36+
base_url, public_key, secret_key = _get_langfuse_connection_settings(conn_id)
37+
return Langfuse(
38+
base_url=base_url,
39+
public_key=public_key,
40+
secret_key=secret_key,
41+
release=importlib.metadata.version('mokelumne'),
42+
environment=ENV.get('DEPLOYMENT_ID', 'default'),
43+
)
44+
45+
def get_prompt(name: str, version_or_label: int | str, conn_id: str = 'langfuse_default') -> Prompt:
2146
"""Return the current prompt to use."""
22-
langfuse = Langfuse(release=importlib.metadata.version('mokelumne'),
23-
environment=ENV.get('DEPLOYMENT_ID', 'default'))
47+
langfuse = get_langfuse_client(conn_id)
2448
if isinstance(version_or_label, int):
2549
logger.debug(
2650
f"Getting Langfuse prompt {name}, version {version_or_label}"

test/unit/test_langfuse.py

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@
22

33
from unittest.mock import Mock
44
import pytest
5-
65
from mokelumne.util import langfuse
76

8-
97
DEFAULT_TEST_PROMPT: str = "A test prompt."
108
"""The prompt used if none is specified to the mocked Langfuse object."""
119

12-
1310
DEFAULT_TEST_VERSION: int = 67
1411
"""The version used if none is specified to the mocked Langfuse object."""
1512

13+
FAKE_BASE_URL: str = "https://langfuse.example.com"
14+
FAKE_PUBLIC_KEY: str = "pk-test-123"
15+
FAKE_SECRET_KEY: str = "sk-test-456"
16+
FAKE_CONN_SETTINGS: tuple[str, str, str] = (FAKE_BASE_URL, FAKE_PUBLIC_KEY, FAKE_SECRET_KEY)
17+
1618

1719
class MockResult:
1820
"""An object for return by our mock Langfuse."""
@@ -37,14 +39,100 @@ def __init__(self, **kwargs):
3739
self.get_prompt = Mock(return_value=mocked_result)
3840

3941

40-
class TestLangfuse:
41-
"""Tests for the Mokelumne Langfuse integration utility class."""
42+
@pytest.fixture
43+
def mock_conn_settings(monkeypatch):
44+
"""Patch _get_langfuse_connection_settings to avoid hitting Airflow."""
45+
monkeypatch.setattr(
46+
langfuse, '_get_langfuse_connection_settings', lambda conn_id: FAKE_CONN_SETTINGS
47+
)
48+
49+
50+
class MockConnection:
51+
"""Mock Airflow connection object."""
52+
def __init__(self, host, schema=None, login=None, password=None):
53+
self.host = host
54+
self.schema = schema
55+
self.login = login
56+
self.password = password
57+
58+
59+
class TestGetLangfuseConnectionSettings:
60+
"""Tests for _get_langfuse_connection_settings."""
61+
62+
def test_extracts_credentials_from_airflow_connection(self, monkeypatch):
63+
"""Ensure Langfuse credentials are extracted from Airflow connection fields."""
64+
mock_conn = MockConnection(
65+
host="langfuse.example.com",
66+
schema="https",
67+
login="pk-123",
68+
password="sk-456"
69+
)
70+
monkeypatch.setattr(
71+
langfuse.BaseHook, 'get_connection', Mock(return_value=mock_conn)
72+
)
73+
74+
host, public_key, secret_key = langfuse._get_langfuse_connection_settings('langfuse_default')
75+
76+
assert host == "https://langfuse.example.com"
77+
assert public_key == "pk-123"
78+
assert secret_key == "sk-456"
79+
80+
def test_raises_on_missing_login_credentials(self, monkeypatch):
81+
"""Ensure ValueError is raised when login (public key) is missing."""
82+
mock_conn = MockConnection(
83+
host="langfuse.example.com",
84+
schema="https",
85+
login=None,
86+
password="sk-456"
87+
)
88+
monkeypatch.setattr(
89+
langfuse.BaseHook, 'get_connection', Mock(return_value=mock_conn)
90+
)
91+
92+
with pytest.raises(ValueError, match="Missing Langfuse credentials"):
93+
langfuse._get_langfuse_connection_settings('langfuse_default')
94+
95+
96+
@pytest.mark.usefixtures("mock_conn_settings")
97+
class TestGetLangfuseClient:
98+
"""Tests for get_langfuse_client."""
99+
100+
def test_returns_langfuse_instance(self, monkeypatch):
101+
"""Ensure get_langfuse_client constructs a Langfuse with the connection settings."""
102+
mock_langfuse_cls = Mock(return_value=Mock())
103+
monkeypatch.setattr(langfuse, 'Langfuse', mock_langfuse_cls)
104+
105+
client = langfuse.get_langfuse_client('langfuse_default')
106+
107+
mock_langfuse_cls.assert_called_once()
108+
call_kwargs = mock_langfuse_cls.call_args.kwargs
109+
assert call_kwargs['base_url'] == FAKE_BASE_URL
110+
assert call_kwargs['public_key'] == FAKE_PUBLIC_KEY
111+
assert call_kwargs['secret_key'] == FAKE_SECRET_KEY
112+
assert client is mock_langfuse_cls.return_value
113+
114+
def test_passes_release_and_environment(self, monkeypatch):
115+
"""Ensure release and environment are forwarded to the Langfuse constructor."""
116+
mock_langfuse_cls = Mock(return_value=Mock())
117+
monkeypatch.setattr(langfuse, 'Langfuse', mock_langfuse_cls)
118+
monkeypatch.setenv('DEPLOYMENT_ID', 'staging')
119+
120+
langfuse.get_langfuse_client('langfuse_default')
121+
122+
call_kwargs = mock_langfuse_cls.call_args.kwargs
123+
assert call_kwargs['environment'] == 'staging'
124+
assert 'release' in call_kwargs
125+
126+
127+
@pytest.mark.usefixtures("mock_conn_settings")
128+
class TestGetPrompt:
129+
"""Tests for get_prompt."""
42130

43131
def test_simple_prompt(self, monkeypatch):
44132
"""Test a simple call of the `get_prompt` method."""
45133
with monkeypatch.context() as m:
46134
m.setattr(langfuse, 'Langfuse', MockLangfuse)
47-
result = langfuse.get_prompt('test', 'production')
135+
result = langfuse.get_prompt('test', 'production', 'langfuse_default')
48136
assert result.prompt == DEFAULT_TEST_PROMPT
49137
assert result.version == DEFAULT_TEST_VERSION
50138

@@ -58,7 +146,7 @@ def test_label_is_used(self, monkeypatch):
58146

59147
with monkeypatch.context() as m:
60148
m.setattr(langfuse, 'Langfuse', mock)
61-
langfuse.get_prompt(name, label)
149+
langfuse.get_prompt(name, label, 'langfuse_default')
62150

63151
lf_object.get_prompt.assert_called_with(name, label=label)
64152

@@ -72,6 +160,6 @@ def test_version_is_used(self, monkeypatch):
72160

73161
with monkeypatch.context() as m:
74162
m.setattr(langfuse, 'Langfuse', mock)
75-
langfuse.get_prompt(name, version)
163+
langfuse.get_prompt(name, version, 'langfuse_default')
76164

77165
lf_object.get_prompt.assert_called_with(name, version=version)

0 commit comments

Comments
 (0)