Skip to content

Commit f0f4485

Browse files
author
Amanda
committed
feat: pipeline for waste argentina
1 parent 6cf4ea8 commit f0f4485

10 files changed

Lines changed: 791 additions & 0 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
WITH activity_data AS (
2+
SELECT DISTINCT
3+
MD5(CONCAT_WS(
4+
'-',
5+
activity_name,
6+
activity_subcategory_type::TEXT,
7+
method_id::TEXT
8+
))::UUID AS activity_id,
9+
activity_name,
10+
activity_units,
11+
method_id,
12+
activity_subcategory_type
13+
FROM raw_data.arg_solid_waste_indec
14+
)
15+
INSERT INTO modelled.activity_subcategory (
16+
activity_id,
17+
activity_name,
18+
activity_units,
19+
gpcmethod_id,
20+
activity_subcategory_type
21+
)
22+
SELECT activity_id,
23+
activity_name,
24+
activity_units,
25+
method_id as gpcmethod_id,
26+
activity_subcategory_type::json
27+
FROM activity_data
28+
ON CONFLICT (activity_id) DO UPDATE SET
29+
activity_name = EXCLUDED.activity_name,
30+
activity_units = EXCLUDED.activity_units,
31+
gpcmethod_id = EXCLUDED.gpcmethod_id,
32+
activity_subcategory_type = EXCLUDED.activity_subcategory_type;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
INSERT INTO modelled.emissions_factor (
2+
emissionfactor_id,
3+
gas_name,
4+
emissionfactor_value,
5+
unit_denominator,
6+
activity_id,
7+
datasource_name,
8+
active_from,
9+
active_to,
10+
actor_id,
11+
publisher_id,
12+
dataset_id
13+
)
14+
SELECT emissionfactor_id,
15+
gas_name,
16+
avg(emissionfactor_value) as emissionfactor_value, -- have to take the average because ef changes each month
17+
emissions_factor_units AS unit_denominator,
18+
activity_id,
19+
datasource_name,
20+
DATE(emissions_year || '-01-01') AS active_from,
21+
DATE(emissions_year || '-12-31') AS active_to,
22+
actor_id,
23+
publisher_id,
24+
dataset_id
25+
FROM raw_data.ct_staging_v2025
26+
GROUP BY emissionfactor_id, gas_name, emissions_factor_units, activity_id, datasource_name, emissions_year, actor_id, publisher_id, dataset_id
27+
ON CONFLICT (emissionfactor_id)
28+
DO UPDATE SET
29+
gas_name = EXCLUDED.gas_name,
30+
emissionfactor_value = EXCLUDED.emissionfactor_value,
31+
unit_denominator = EXCLUDED.unit_denominator,
32+
activity_id = EXCLUDED.activity_id,
33+
datasource_name = COALESCE(EXCLUDED.datasource_name, emissions_factor.datasource_name),
34+
active_from = COALESCE(EXCLUDED.active_from, emissions_factor.active_from),
35+
active_to = COALESCE(EXCLUDED.active_to, emissions_factor.active_to),
36+
actor_id = EXCLUDED.actor_id,
37+
publisher_id = EXCLUDED.publisher_id,
38+
dataset_id = EXCLUDED.dataset_id;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
INSERT INTO modelled.emissions (
2+
emissions_id,
3+
datasource_name,
4+
actor_id,
5+
city_id,
6+
gpc_reference_number,
7+
emissions_value,
8+
emissions_year,
9+
emissions_units,
10+
gpcmethod_id,
11+
gas_name,
12+
emissionfactor_id,
13+
activity_id,
14+
activity_value,
15+
spatial_granularity,
16+
geometry_type,
17+
geometry,
18+
geometry_id
19+
)
20+
SELECT
21+
emissions_id,
22+
datasource_name,
23+
actor_id,
24+
city_id,
25+
gpc_reference_number,
26+
emissions_value::numeric as emissions_value,
27+
emissions_year,
28+
'kg' as emissions_units,
29+
method_id as gpcmethod_id,
30+
upper(gas_name) as gas_name,
31+
emissionfactor_id,
32+
activity_id,
33+
activity_value,
34+
'city' as spatial_granularity,
35+
geometry_type,
36+
geometry,
37+
geometry_id
38+
FROM raw_data.arg_solid_waste_indec
39+
ON CONFLICT (emissions_id) DO UPDATE SET
40+
datasource_name = EXCLUDED.datasource_name,
41+
actor_id = EXCLUDED.actor_id,
42+
city_id = EXCLUDED.city_id,
43+
gpc_reference_number = EXCLUDED.gpc_reference_number,
44+
emissions_value = EXCLUDED.emissions_value,
45+
emissions_year = EXCLUDED.emissions_year,
46+
emissions_units = EXCLUDED.emissions_units,
47+
gpcmethod_id = EXCLUDED.gpcmethod_id,
48+
gas_name = EXCLUDED.gas_name,
49+
emissionfactor_id = EXCLUDED.emissionfactor_id,
50+
activity_id = EXCLUDED.activity_id,
51+
activity_value = EXCLUDED.activity_value,
52+
spatial_granularity = EXCLUDED.spatial_granularity,
53+
geometry_type = EXCLUDED.geometry_type,
54+
geometry = EXCLUDED.geometry,
55+
geometry_id = EXCLUDED.geometry_id;
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
WITH methodology AS (
2+
SELECT DISTINCT
3+
method_id,
4+
methodology_name,
5+
null as methodology_description,
6+
gpc_reference_number
7+
FROM raw_data.arg_solid_waste_indec
8+
)
9+
INSERT INTO modelled.ghgi_methodology
10+
(method_id, methodology_name, methodology_description, gpc_reference_number)
11+
SELECT
12+
method_id,
13+
methodology_name,
14+
methodology_description,
15+
gpc_reference_number
16+
FROM methodology
17+
ON CONFLICT (method_id) DO UPDATE SET
18+
methodology_name = EXCLUDED.methodology_name,
19+
methodology_description = EXCLUDED.methodology_description,
20+
gpc_reference_number = EXCLUDED.gpc_reference_number;
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from mage_ai.settings.repo import get_repo_path
2+
from mage_ai.io.config import ConfigFileLoader
3+
from mage_ai.io.postgres import Postgres
4+
from pandas import DataFrame
5+
from os import path
6+
7+
if 'data_exporter' not in globals():
8+
from mage_ai.data_preparation.decorators import data_exporter
9+
10+
11+
@data_exporter
12+
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
13+
"""
14+
Template for exporting data to a PostgreSQL database.
15+
Specify your configuration settings in 'io_config.yaml'.
16+
17+
Docs: https://docs.mage.ai/design/data-loading#postgresql
18+
"""
19+
schema_name = 'raw_data' # Specify the name of the schema to export data to
20+
table_name = 'arg_solid_waste_staging' # Specify the name of the table to export data to
21+
config_path = path.join(get_repo_path(), 'io_config.yaml')
22+
config_profile = 'default'
23+
24+
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
25+
loader.export(
26+
df,
27+
schema_name,
28+
table_name,
29+
index=False, # Specifies whether to include index in exported table
30+
if_exists='replace', # Specify resolution policy if table name already exists
31+
)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from mage_ai.settings.repo import get_repo_path
2+
from mage_ai.io.config import ConfigFileLoader
3+
from mage_ai.io.s3 import S3
4+
from os import path
5+
if 'data_loader' not in globals():
6+
from mage_ai.data_preparation.decorators import data_loader
7+
if 'test' not in globals():
8+
from mage_ai.data_preparation.decorators import test
9+
10+
11+
@data_loader
12+
def load_from_s3_bucket(*args, **kwargs):
13+
"""
14+
Template for loading data from a S3 bucket.
15+
Specify your configuration settings in 'io_config.yaml'.
16+
17+
Docs: https://docs.mage.ai/design/data-loading#s3
18+
"""
19+
config_path = path.join(get_repo_path(), 'io_config.yaml')
20+
config_profile = 'default'
21+
22+
bucket_name = kwargs['bucket_name']
23+
object_key = 'files/local/argentina/indec/arg_indec.csv'
24+
25+
return S3.with_config(ConfigFileLoader(config_path, config_profile)).load(
26+
bucket_name,
27+
object_key,
28+
)
29+
30+
31+
@test
32+
def test_output(output, *args) -> None:
33+
"""
34+
Template code for testing the output of the block.
35+
"""
36+
assert output is not None, 'The output is undefined'

0 commit comments

Comments
 (0)