Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions cc-mage/data_exporters/load_ct_cityid_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.postgres import Postgres
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter


@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a PostgreSQL database.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#postgresql
"""
schema_name = 'raw_data' # Specify the name of the schema to export data to
table_name = 'ct_cityid' # Specify the name of the table to export data to
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
1 change: 1 addition & 0 deletions cc-mage/data_exporters/load_ct_emissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ INSERT INTO modelled.emissions (
geometry_id
FROM raw_data.ct_staging_v2025
GROUP BY emissions_id, datasource_name, actor_id, city_id, gpc_reference_number, emissions_year, gpcmethod_id, gas_name, emissionfactor_id, activity_id, geometry_type, geometry, geometry_id
HAVING SUM(emissions_value::numeric*1000) > 1
ON CONFLICT (emissions_id) DO UPDATE SET
datasource_name = EXCLUDED.datasource_name,
actor_id = EXCLUDED.actor_id,
Expand Down
68 changes: 49 additions & 19 deletions cc-mage/data_loaders/load_ct_city_emissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,64 @@


@data_loader
def load_data_from_api(*args, **kwargs):
def load_data_from_api(data, *args, **kwargs):
"""
Template for loading data from API
Load emissions data from API for all city IDs in the input dataframe.
"""
gases = ['co2', 'ch4', 'n2o']
dfs = []
cityId = kwargs['cityId']

for gas in gases:
url = f'https://api.climatetrace.org/v7/sources/emissions?gas={gas}&cityId={cityId}'
data = requests.get(url).json()
df = json_normalize(data).explode('subsectors.timeseries').reset_index(drop=True)
df = pd.concat([df.drop(columns=['subsectors.timeseries']),
json_normalize(df['subsectors.timeseries']).add_prefix('timeseries_')], axis=1)
df = df[['location.name', 'location.cityId', 'timeseries_year', 'timeseries_month',
'timeseries_sector','timeseries_subsector', 'timeseries_gas', 'timeseries_emissionsQuantity']]
df.columns = ['city_name', 'city_id', 'year', 'month', 'sector','subsector', 'gas', 'emissions_quantity']
df = df.groupby(['city_name', 'city_id', 'year', 'sector', 'subsector', 'gas'], as_index=False)['emissions_quantity'].sum()
dfs.append(df)
year = kwargs['year']

df = pd.concat(dfs, ignore_index=True)
city_records = data[['ct_cityid', 'locode']].drop_duplicates()

for _, row in city_records.iterrows():
city_id = row['ct_cityid']
locode = row['locode']

for gas in gases:
url = f'https://api.climatetrace.org/v7/sources/emissions?year={year}&gas={gas}&cityId={cityId}'
api_data = requests.get(url).json()
df = json_normalize(api_data).explode('subsectors.timeseries').reset_index(drop=True)
df = pd.concat(
[
df.drop(columns=['subsectors.timeseries']),
json_normalize(df['subsectors.timeseries']).add_prefix('timeseries_')
],
axis=1
)
df = df[
[
'location.name',
'location.cityId',
'timeseries_year',
'timeseries_month',
'timeseries_sector',
'timeseries_subsector',
'timeseries_gas',
'timeseries_emissionsQuantity'
]
]
df.columns = [
'city_name',
'city_id',
'year',
'month',
'sector',
'subsector',
'gas',
'emissions_quantity'
]
df['locode'] = locode
df = df.groupby(
['city_name', 'city_id', 'locode', 'year', 'sector', 'subsector', 'gas'],
as_index=False
)['emissions_quantity'].sum()
dfs.append(df)

df = pd.concat(dfs, ignore_index=True)
return df


@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
29 changes: 29 additions & 0 deletions cc-mage/data_loaders/load_ct_cityid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pandas import json_normalize
import io
import pandas as pd
import requests
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test


@data_loader
def load_data_from_api(*args, **kwargs):
"""
Template for loading data from API
"""
country_code3 = kwargs['country_code3']
url = f'https://api.climatetrace.org/v7/cities?name=&country={country_code3}'
data = requests.get(url).json()
df = json_normalize(data)

return df


@test
def test_output(output, *args) -> None:
"""
Template code for testing the output of the block.
"""
assert output is not None, 'The output is undefined'
88 changes: 76 additions & 12 deletions cc-mage/pipelines/ghgi_climatetrace_city_all_sectors/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ blocks:
language: python
name: load_ct_city_emissions
retry_config: null
status: executed
status: updated
timeout: null
type: data_loader
upstream_blocks: []
upstream_blocks:
- transform_city_locode
uuid: load_ct_city_emissions
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration: {}
downstream_blocks:
Expand Down Expand Up @@ -46,7 +47,8 @@ blocks:
status: executed
timeout: null
type: data_loader
upstream_blocks: []
upstream_blocks:
- transform_city_locode
uuid: load_ct_city_mappings
- all_upstream_blocks_executed: true
color: null
Expand All @@ -65,7 +67,7 @@ blocks:
upstream_blocks:
- load_ct_city_mappings
uuid: load_ct_city_mapping_db
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: postgres
Expand All @@ -83,14 +85,14 @@ blocks:
language: sql
name: staging_ct_city
retry_config: null
status: executed
status: updated
timeout: null
type: transformer
upstream_blocks:
- load_ct_city_mapping_db
- load_ct_city_emissions_db
uuid: staging_ct_city
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: postgres
Expand All @@ -116,7 +118,7 @@ blocks:
upstream_blocks:
- staging_ct_city
uuid: ct_city_staging_v2025
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: postgres
Expand All @@ -141,7 +143,7 @@ blocks:
upstream_blocks:
- ct_city_staging_v2025
uuid: load_ct_emissions
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: postgres
Expand All @@ -166,7 +168,7 @@ blocks:
upstream_blocks:
- ct_city_staging_v2025
uuid: load_ct_ef
- all_upstream_blocks_executed: true
- all_upstream_blocks_executed: false
color: null
configuration:
data_provider: postgres
Expand All @@ -191,6 +193,67 @@ blocks:
upstream_blocks:
- ct_city_staging_v2025
uuid: load_ct_method
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: postgres
data_provider_profile: default
export_write_policy: append
downstream_blocks:
- load_ct_cityid_db
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: load_ct_cityid
retry_config: null
status: executed
timeout: null
type: data_loader
upstream_blocks: []
uuid: load_ct_cityid
- all_upstream_blocks_executed: true
color: null
configuration: {}
downstream_blocks:
- transform_city_locode
executor_config: null
executor_type: local_python
has_callback: false
language: python
name: load_ct_cityid_db
retry_config: null
status: executed
timeout: null
type: data_exporter
upstream_blocks:
- load_ct_cityid
uuid: load_ct_cityid_db
- all_upstream_blocks_executed: true
color: null
configuration:
data_provider: postgres
data_provider_profile: default
dbt: {}
disable_query_preprocessing: false
export_write_policy: append
limit: 1000
use_raw_sql: true
downstream_blocks:
- load_ct_city_emissions
- load_ct_city_mappings
executor_config: null
executor_type: local_python
has_callback: false
language: sql
name: transform_city_locode
retry_config: null
status: updated
timeout: null
type: transformer
upstream_blocks:
- load_ct_cityid_db
uuid: transform_city_locode
cache_block_output_in_memory: false
callbacks: []
concurrency_config: {}
Expand All @@ -216,7 +279,8 @@ type: python
uuid: ghgi_climatetrace_city_all_sectors
variables:
bucket_name: test-global-api
cityId: ghs-fua_4249
locode: AR PSS
country_code2: DE
country_code3: DEU
year: 2024
variables_dir: /home/src/mage_data/cc-mage
widgets: []
12 changes: 6 additions & 6 deletions cc-mage/transformers/staging_ct_city.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ SELECT a.sector,
a.subsector,
_year as emissions_year,
upper(gas) as gas_name,
-- lat,
-- lon,
locode,
emissions_quantity,
-- source_name,
null::numeric as activity,
null as activity_units,
null::numeric as emissions_factor,
Expand Down Expand Up @@ -59,12 +57,14 @@ SELECT (MD5(CONCAT_WS('-', methodology_name, gpc_reference_number))::UUID) AS
emissions_quantity as emissions_value,
(MD5(CONCAT_WS('-', methodology_name, gpc_reference_number))::UUID) AS gpcmethod_id,
activity::numeric as activity_value,
'{{locode}}' as actor_id,
(select city_id from modelled.city_polygon where locode = '{{locode}}') as city_id,
a.locode as actor_id,
b.city_id,
null as geometry_type,
null AS geometry_id,
null::geometry as geometry
FROM gpc_data a
FROM gpc_data a
LEFT JOIN modelled.city_polygon b
ON a.locode = b.locode
)
SELECT method_id,
methodology_name,
Expand Down
20 changes: 20 additions & 0 deletions cc-mage/transformers/transform_city_locode.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
WITH city_name_data AS (
SELECT
trim(
replace(
replace(_name, ', {{country_code3}}', ''),
'Urban Area',
''
)
) AS city_name,
id AS ct_cityid
FROM raw_data.ct_cityid
)
SELECT
a.city_name,
a.locode,
b.ct_cityid
FROM modelled.city_polygon AS a
JOIN city_name_data AS b
ON trim(a.city_name) = b.city_name
WHERE a.country_code = '{{country_code2}}';