Skip to content

Commit 097798c

Browse files
committed
Merge branch 'main' into HEA-764/Add-an-AssetDownloadView-to-Django
2 parents 54ec182 + 5d2010a commit 097798c

11 files changed

Lines changed: 230 additions & 22 deletions

File tree

.github/workflows/03-mirror-to-fdw.yml renamed to .github/workflows/03-mirror-to-fewsnet-org.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Mirror to FDW
1+
name: Mirror to FEWS-NET/HEA-Database-Development
22

33
permissions:
44
contents: read
@@ -35,7 +35,7 @@ jobs:
3535
- uses: yesolutions/mirror-action@v0.7.0
3636
with:
3737
REMOTE: "ssh://git@github.com/FEWS-NET/HEA-Database-Development.git"
38-
GIT_SSH_PRIVATE_KEY: ${{ secrets.FDW_GIT_SSH_PRIVATE_KEY }}
38+
GIT_SSH_PRIVATE_KEY: ${{ secrets.GIT_SSH_PRIVATE_KEY }}
3939
# @TODO: change this to secrets.GIT_SSH_KNOWN_HOSTS
4040
GIT_SSH_NO_VERIFY_HOST: "true"
4141
PUSH_ALL_REFS: "false"

apps/common/consumers.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
import asyncio
2+
import logging
3+
import os
4+
5+
import websockets
6+
from channels.exceptions import DenyConnection
7+
from channels.generic.websocket import AsyncWebsocketConsumer
8+
from django.contrib.auth.models import AnonymousUser
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class DagsterWebSocketProxyConsumer(AsyncWebsocketConsumer):
14+
15+
async def connect(self):
16+
logger.info(f"WebSocket connection attempt: {self.scope['path']}")
17+
18+
# Authentication check
19+
if isinstance(self.scope["user"], AnonymousUser):
20+
logger.error("Authentication required")
21+
raise DenyConnection("Authentication required")
22+
23+
perm = "common.access_dagster_ui"
24+
if not self.scope["user"].has_perm(perm):
25+
logger.error(
26+
f"User {self.scope['user'].username} lacks permission {perm} for accessing {self.scope['path']}"
27+
)
28+
raise DenyConnection("Permission denied")
29+
30+
logger.info(f"User {self.scope['user'].username} authenticated")
31+
32+
# Build upstream URL
33+
dagster_url = os.environ.get("DAGSTER_WEBSERVER_URL", "http://localhost:3000")
34+
dagster_prefix = os.environ.get("DAGSTER_WEBSERVER_PREFIX", "pipelines")
35+
36+
path = self.scope["path"]
37+
if path.startswith(f"/{dagster_prefix}/"):
38+
path = path[len(f"/{dagster_prefix}/") :]
39+
40+
# Convert http to ws
41+
if dagster_url.startswith("https://"):
42+
ws_url = dagster_url.replace("https://", "wss://", 1)
43+
else:
44+
ws_url = dagster_url.replace("http://", "ws://", 1)
45+
46+
# Build target URL
47+
if dagster_prefix:
48+
target_url = f"{ws_url}/{dagster_prefix}/{path}"
49+
else:
50+
target_url = f"{ws_url}/{path}"
51+
# Add query string
52+
if self.scope.get("query_string"):
53+
target_url += f"?{self.scope['query_string'].decode()}"
54+
55+
logger.info(f"Connecting to upstream: {target_url}")
56+
57+
# Get subprotocols from client
58+
subprotocols = self.scope.get("subprotocols", [])
59+
60+
try:
61+
self.websocket = await websockets.connect(
62+
target_url,
63+
max_size=52428800, # Increase to 50 MB
64+
ping_interval=20,
65+
open_timeout=30, # Default is 10 sec
66+
subprotocols=subprotocols if subprotocols else None,
67+
)
68+
logger.info("Connected to upstream")
69+
except Exception as e:
70+
logger.error(f"Failed to connect: {e}")
71+
raise DenyConnection(f"Connection to upstream failed: {e}")
72+
73+
await self.accept(self.websocket.subprotocol)
74+
logger.info(f"Client accepted with subprotocol: {self.websocket.subprotocol}")
75+
76+
self.consumer_task = asyncio.create_task(self.consume_from_upstream())
77+
78+
async def disconnect(self, close_code):
79+
logger.info(f"Disconnecting with code {close_code}")
80+
if hasattr(self, "consumer_task"):
81+
self.consumer_task.cancel()
82+
try:
83+
await self.consumer_task
84+
except asyncio.CancelledError:
85+
pass
86+
if hasattr(self, "websocket"):
87+
await self.websocket.close()
88+
89+
async def receive(self, text_data=None, bytes_data=None):
90+
try:
91+
await self.websocket.send(bytes_data or text_data)
92+
except Exception as e:
93+
logger.error(f"Error forwarding to upstream: {e}")
94+
await self.close()
95+
96+
async def consume_from_upstream(self):
97+
try:
98+
async for message in self.websocket:
99+
if isinstance(message, bytes):
100+
await self.send(bytes_data=message)
101+
else:
102+
await self.send(text_data=message)
103+
except asyncio.CancelledError:
104+
pass
105+
except Exception as e:
106+
logger.error(f"Error consuming from upstream: {e}")
107+
await self.close()

apps/common/logging_filters.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import logging
2+
3+
4+
class SuppressWebSocketPings(logging.Filter):
5+
6+
def filter(self, record):
7+
suppress_phrases = [
8+
"sending keepalive ping",
9+
"received keepalive pong",
10+
"> PING",
11+
"< PONG",
12+
"% sending keepalive",
13+
"% received keepalive",
14+
"ASGI 'lifespan' protocol appears unsupported.",
15+
]
16+
17+
message = record.getMessage()
18+
19+
for phrase in suppress_phrases:
20+
if phrase in message:
21+
return False # Don't log this message
22+
23+
return True
24+
25+
26+
class SuppressRevProxyNoise(logging.Filter):
27+
28+
def filter(self, record):
29+
# Suppress these RevProxy messages
30+
suppress_phrases = [
31+
"ProxyView created",
32+
"Normalizing response headers",
33+
"Checking for invalid cookies",
34+
"Starting streaming HTTP Response",
35+
]
36+
37+
message = record.getMessage()
38+
39+
for phrase in suppress_phrases:
40+
if phrase in message:
41+
return False
42+
43+
return True

apps/common/routing.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
from django.urls import re_path
2+
3+
from common.consumers import DagsterWebSocketProxyConsumer
4+
5+
websocket_urlpatterns = [
6+
# Route WebSocket connections for Dagster proxy
7+
re_path(r"^pipelines/(?P<path>.*)$", DagsterWebSocketProxyConsumer.as_asgi()),
8+
]

docker/app/run_django.sh

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ echo Starting Gunicorn with DJANGO_SETTINGS_MODULE=${DJANGO_SETTINGS_MODULE}
4343
if [ x"$LAUNCHER" != x"" ]; then
4444
echo using ${LAUNCHER}
4545
fi
46-
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.wsgi:application \
46+
gosu django ${LAUNCHER} /usr/local/bin/gunicorn ${APP}.asgi:application \
4747
--name ${APP}${ENV} \
48+
--worker-class uvicorn.workers.UvicornWorker \
4849
--config $(dirname $(readlink -f "$0"))/gunicorn_config.py \
49-
$* 2>&1
50+
$* 2>&1

hea/asgi.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,22 @@
99

1010
import os
1111

12+
from channels.auth import AuthMiddlewareStack
13+
from channels.routing import ProtocolTypeRouter, URLRouter
14+
from channels.security.websocket import AllowedHostsOriginValidator
1215
from django.core.asgi import get_asgi_application
1316

1417
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "hea.settings")
1518

16-
application = get_asgi_application()
19+
django_asgi_app = get_asgi_application()
20+
21+
# Import routing after Django setup
22+
from common.routing import websocket_urlpatterns # noqa: E402
23+
24+
application = ProtocolTypeRouter(
25+
{
26+
"http": django_asgi_app,
27+
# WebSocket requests handled by Channels consumers
28+
"websocket": AllowedHostsOriginValidator(AuthMiddlewareStack(URLRouter(websocket_urlpatterns))),
29+
}
30+
)

hea/settings/base.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@
109109
"rest_framework_gis",
110110
"revproxy",
111111
"corsheaders",
112+
"channels",
112113
]
113114
PROJECT_APPS = ["common", "metadata", "baseline"]
114115
INSTALLED_APPS = EXTERNAL_APPS + PROJECT_APPS
@@ -155,6 +156,8 @@
155156
"SEARCH_PARAM": "search",
156157
}
157158

159+
ASGI_APPLICATION = "hea.asgi.application"
160+
CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
158161

159162
########## CORS CONFIGURATION
160163
# See: https://github.com/ottoyiu/django-cors-headers
@@ -250,6 +253,12 @@
250253
},
251254
"filters": {
252255
"require_debug_false": {"()": "django.utils.log.RequireDebugFalse"},
256+
"suppress_ws_pings": {
257+
"()": "common.logging_filters.SuppressWebSocketPings",
258+
},
259+
"suppress_revproxy_noise": {
260+
"()": "common.logging_filters.SuppressRevProxyNoise",
261+
},
253262
},
254263
"handlers": {
255264
"logfile": {
@@ -266,6 +275,7 @@
266275
"stream": sys.stdout,
267276
"class": "logging.StreamHandler",
268277
"formatter": env.str("LOG_FORMATTER", "standard"),
278+
"filters": ["suppress_ws_pings", "suppress_revproxy_noise"],
269279
},
270280
"mail_admins": {
271281
"level": "ERROR",
@@ -280,11 +290,43 @@
280290
"django.security": {"handlers": ["console", "logfile"], "level": "ERROR", "propagate": False},
281291
"factory": {"handlers": ["console", "logfile"], "level": "INFO"},
282292
"faker": {"handlers": ["console", "logfile"], "level": "INFO"},
283-
"luigi": {"level": "INFO"},
284-
"luigi-interface": {"level": "INFO"},
285293
"urllib3": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
286294
"common.models": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
287295
"common.signals": {"handlers": ["console", "logfile"], "level": "INFO", "propagate": False},
296+
"uvicorn": {
297+
"handlers": ["console"],
298+
"level": "INFO",
299+
"propagate": False,
300+
},
301+
"uvicorn.error": {
302+
"handlers": ["console"],
303+
"level": "DEBUG",
304+
"propagate": False,
305+
"filters": ["suppress_ws_pings"],
306+
},
307+
"uvicorn.access": {
308+
"handlers": ["console"],
309+
"level": "INFO",
310+
"propagate": False,
311+
},
312+
"revproxy": {
313+
"handlers": ["console"],
314+
"level": "INFO",
315+
"propagate": False,
316+
"filters": ["suppress_revproxy_noise"],
317+
},
318+
"revproxy.view": {
319+
"handlers": ["console"],
320+
"level": "INFO",
321+
"propagate": False,
322+
"filters": ["suppress_revproxy_noise"],
323+
},
324+
"revproxy.response": {
325+
"handlers": ["console"],
326+
"level": "INFO",
327+
"propagate": False,
328+
"filters": ["suppress_revproxy_noise"],
329+
},
288330
},
289331
# Keep root at DEBUG and use the `level` on the handler to control logging output,
290332
# so that additional handlers can be used to get additional detail, e.g. `common.resources.LoggingResourceMixin`

pipelines/assets/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,7 +314,7 @@ def get_bss_dataframe(
314314
end_strings: Optional[list[str]] = None,
315315
header_rows: list[int] = [3, 4, 5], # List of row indexes that contain the Wealth Group and other headers
316316
num_summary_cols: Optional[int] = None,
317-
) -> pd.DataFrame:
317+
) -> Output[pd.DataFrame]:
318318
"""
319319
Retrieve a worksheet from a BSS and return it as a DataFrame.
320320

pipelines/assets/fixtures.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
def validate_instances(
3232
context: AssetExecutionContext, config: BSSMetadataConfig, instances: dict[str, list[dict]], partition_key: str
33-
) -> tuple[dict[str, list[dict]], dict]:
33+
) -> Output[dict[str, list[dict]]]:
3434
"""
3535
Validate the instances for a set of related models, prior to loading them as a fixture.
3636
@@ -322,7 +322,7 @@ def get_fixture_from_instances(instance_dict: dict[str, list[dict]]) -> tuple[li
322322
return Output(fixture, metadata=metadata)
323323

324324

325-
def import_fixture(fixture: list[dict]) -> dict:
325+
def import_fixture(fixture: list[dict]) -> Output[None]:
326326
"""
327327
Import a Django fixture and return a metadata dictionary.
328328
"""
@@ -436,9 +436,4 @@ def imported_baseline(
436436
"""
437437
Imported Django fixture for a BSS, added to the Django database.
438438
"""
439-
metadata = import_fixture(consolidated_fixture)
440-
441-
return Output(
442-
None,
443-
metadata=metadata,
444-
)
439+
return import_fixture(consolidated_fixture)

pipelines/assets/wealth_characteristic.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -554,8 +554,4 @@ def imported_wealth_characteristics(
554554
"""
555555
Imported Django fixtures for a BSS, added to the Django database.
556556
"""
557-
metadata = import_fixture(wealth_characteristic_fixture)
558-
return Output(
559-
None,
560-
metadata=metadata,
561-
)
557+
return import_fixture(wealth_characteristic_fixture)

0 commit comments

Comments
 (0)