End-to-end pipeline that ingests GitHub public events via Databricks Zerobus, processes them through a Spark Declarative Pipelines (SDP) medallion architecture, and normalizes to OCSF v1.7.0 gold tables (API Activity, Entity Management, File System Activity).
GitHub Events API → Zerobus → SDP Pipeline [ bronze → silver → api_activity (6003) / entity_management (3004) / file_system_activity (1001) ]
| Gold Table | OCSF Class | class_uid | GitHub Events |
|---|---|---|---|
api_activity |
API Activity | 6003 | All events |
entity_management |
Entity Management | 3004 | CreateEvent, DeleteEvent, ForkEvent, ReleaseEvent, PublicEvent |
file_system_activity |
File System Activity | 1001 | PushEvent |
- Databricks workspace with Zerobus access enabled
- Service principal with OAuth credentials (client_id / client_secret)
- Databricks CLI installed (
pip install databricks-cli) - Python 3.9+
├── pipeline/ ← SDP source (point pipeline here)
│ ├── utils_pipeline.py
│ ├── ingest_bronze.py
│ ├── flatten_silver.py
│ └── normalize_gold.py
├── setup/ ← run manually before the pipeline
│ ├── utils_setup.py
│ ├── ddl_bronze.py
│ ├── ddl_ocsf.py
│ └── push_zerobus.py
├── requirements.txt
└── README.md
| File | Location | Purpose |
|---|---|---|
utils_pipeline.py |
pipeline/ |
SDP config — UC, TABLES, FQN, TABLE_PROPERTIES, OCSF dicts |
ingest_bronze.py |
pipeline/ |
SDP temporary view — passthrough over the Zerobus-populated bronze table |
flatten_silver.py |
pipeline/ |
SDP streaming table — extracts typed columns from variant data |
normalize_gold.py |
pipeline/ |
SDP append flows — maps silver to 3 OCSF Delta sinks |
utils_setup.py |
setup/ |
Setup config — UC, TABLES, FQN, GITHUB, ZEROBUS dicts |
ddl_bronze.py |
setup/ |
Creates catalog, databases, and the bronze table |
ddl_ocsf.py |
setup/ |
Creates the 3 OCSF gold Delta tables |
push_zerobus.py |
setup/ |
Polls GitHub Events API and pushes to bronze via Zerobus |
Both config files use dictionaries to organize variables:
pipeline/utils_pipeline.py — used by SDP pipeline scripts:
UC = { "catalog": "cyber_lakehouse", "bronze_database": "github", ... }
TABLES = { "bronze": "github_events_bronze", "silver": "github_events_silver", ... }
FQN = { "bronze": "cyber_lakehouse.github.github_events_bronze", ... }
TABLE_PROPERTIES = { "delta.minWriterVersion": "7", "delta.enableDeletionVectors": "true", ... }
OCSF = { "version": "1.7.0", "category": { ... }, "class": { ... } }setup/utils_setup.py — used by DDL and Zerobus scripts:
UC = { "catalog": "cyber_lakehouse", "bronze_database": "github", "gold_database": "ocsf" }
TABLES = { "bronze": "github_events_bronze", "api_activity": "api_activity", "entity_management": ..., "file_system_activity": ... }
FQN = { "bronze": "cyber_lakehouse.github.github_events_bronze", ... }
GITHUB = { "events_url": "https://api.github.com/events", "source": "github", ... }
ZEROBUS = { "workspace_url": "<your-workspace-url>", "workspace_id": "<your-workspace-id>", ... }- Create a service principal
- Store credentials in a Databricks secret scope
- Configure workspace settings
- Create catalog, databases, and bronze table
- Grant service principal permissions
- Create OCSF gold Delta tables
- Start the Zerobus producer
- Deploy the SDP pipeline
- Verify
- Schedule the pipeline
- In your Databricks workspace, go to Settings → Identity and access → Service principals
- Click Add service principal → Add new
- Name it (e.g.
zerobus-ingest) and click Add - Click the new service principal → Secrets tab → Generate secret
- Copy the Client ID and Secret immediately (the secret is only shown once)
Create a secret scope and store the credentials using the Databricks CLI:
# authenticate the CLI
databricks auth login <workspace-url> --profile=<profile-name>
# create the scope
databricks secrets create-scope zerobus --profile=<profile-name>
# store the credentials (use the exact values from Step 1)
databricks secrets put-secret zerobus client-id --string-value "<client-id>" --profile=<profile-name>
databricks secrets put-secret zerobus client-secret --string-value "<client-secret>" --profile=<profile-name>Alternatively, from a Databricks notebook using %sh:
%sh
databricks secrets create-scope zerobus
databricks secrets put-secret zerobus client-id --string-value "<client-id>"
databricks secrets put-secret zerobus client-secret --string-value "<client-secret>"Alternatively, use the Databricks Python SDK from a notebook cell:
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
w.secrets.create_scope(scope="zerobus")
w.secrets.put_secret(scope="zerobus", key="client-id", string_value="<client-id>")
w.secrets.put_secret(scope="zerobus", key="client-secret", string_value="<client-secret>")The pipeline reads credentials at runtime via dbutils.secrets.get().
Edit setup/utils_setup.py and fill in the ZEROBUS dictionary:
ZEROBUS = {
"workspace_url": "https://<your-workspace>.cloud.databricks.com",
"workspace_id": "<your-workspace-id>", # from workspace URL /o=<id>
"region": "us-east-1", # adjust to your region
"secrets_scope": "zerobus",
}Run setup/ddl_bronze.py as a Databricks notebook or execute the SQL manually.
Important: Zerobus does not support tables in default metastore storage. The catalog or schema must have an explicit managed storage location. If your catalog uses default storage, set a managed location on the schema:
CREATE DATABASE IF NOT EXISTS <catalog>.<database> MANAGED LOCATION 's3://<bucket>/<path>';
CREATE CATALOG IF NOT EXISTS cyber_lakehouse;
CREATE DATABASE IF NOT EXISTS cyber_lakehouse.github;
CREATE DATABASE IF NOT EXISTS cyber_lakehouse.ocsf;
CREATE TABLE IF NOT EXISTS cyber_lakehouse.github.github_events_bronze (
data VARIANT,
event_time TIMESTAMP,
event_date DATE,
source STRING,
source_type STRING,
source_path STRING,
processed_time TIMESTAMP
)
USING DELTA
TBLPROPERTIES ('delta.feature.variantType-preview' = 'supported');The
datacolumn uses the VARIANT type, which stores semi-structured JSON natively in Delta.
Run in a Databricks SQL editor (replace <service-principal-application-id> with the SP's Application ID UUID):
-- catalog access
GRANT USE CATALOG ON CATALOG cyber_lakehouse TO `<service-principal-application-id>`;
-- database access
GRANT USE DATABASE ON DATABASE cyber_lakehouse.github TO `<service-principal-application-id>`;
-- read/write table access
GRANT SELECT, MODIFY ON TABLE cyber_lakehouse.github.github_events_bronze TO `<service-principal-application-id>`;Note: Use the service principal's Application ID (UUID), not the display name. Find it under Settings → Identity and access → Service principals → click your SP.
Run setup/ddl_ocsf.py as a Databricks notebook or execute the SQL manually. This creates the three Delta sink tables in cyber_lakehouse.ocsf.
From a Databricks notebook:
%pip install databricks-zerobus-ingest-sdk requestsThen run setup/push_zerobus.py directly (Databricks treats .py files as notebooks) or use %run ./push_zerobus followed by main().
From local terminal:
pip install databricks-zerobus-ingest-sdk requests
cd setup
python push_zerobus.pyThe script auto-installs dependencies on each run. It polls the GitHub Events API every 60 seconds and pushes structured bronze records via Zerobus. By default it stops after 5 minutes (RUN_DURATION_SEC = 300). Set to None to run indefinitely.
Upload the pipeline/ folder to your Databricks workspace. Then create a new Lakeflow Declarative Pipeline:
- Go to Jobs & Pipelines → Create → ETL Pipeline
- Set the source to the uploaded
pipeline/folder - Click Start
The SDP pipeline will:
- Wrap the bronze table as a temporary view (
ingest_bronze.py) so it appears in the pipeline graph - Stream from bronze → silver, flattening variant fields (
flatten_silver.py) - Stream from silver → 3 OCSF gold Delta sinks (
normalize_gold.py)
Query the gold tables:
SELECT * FROM cyber_lakehouse.ocsf.api_activity LIMIT 10;
SELECT * FROM cyber_lakehouse.ocsf.entity_management LIMIT 10;
SELECT * FROM cyber_lakehouse.ocsf.file_system_activity LIMIT 10;To run the pipeline on a recurring schedule:
- Go to Jobs & Pipelines and select your pipeline
- Click Schedule → Add schedule
- Set a cron expression or interval (e.g. every 15 minutes, hourly)
- Click Save
Alternatively, add the pipeline as a task in a Databricks Job for more control over triggers, retries, and notifications. See Pipeline task for jobs for details.
Gold table OCSF logic or DDL changes may require reprocessing data. Resetting streaming flow checkpoints via the REST API avoids a full refresh on the bronze and silver streaming tables. Use the fully qualified flow path as catalog.schema.table (e.g. cyber_lakehouse.ocsf.api_activity):
curl -X POST \
-H "Authorization: Bearer <your-token>" \
-H "Content-Type: application/json" \
-d '{
"reset_checkpoint_selection": ["<catalog.schema.table>"]
}' \
https://<your-workspace>.cloud.databricks.com/api/2.0/pipelines/<your-pipeline-id>/updates-
Databricks Zerobus
-
Spark Declarative Pipelines (SDP)
-
OCSF (Open Cybersecurity Schema Framework)
- OCSF Schema Browser (v1.7.0)
- IAM Category — Entity Management (3004)
- System Activity Category — File System Activity (1001)
- Application Activity Category — API Activity (6003)
- OCSF GitHub
