Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/cardinal_project_library.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ Reducers:
name: cardinal
Reduce: CardinalReducer
Module: rail.projects.reducer
rotation_angle: [60, 0, 0]
flip_dec: true


# These describe the various data analysis pipelines
Expand Down
2 changes: 2 additions & 0 deletions examples/flagship_project_library.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ Reducers:
name: flagship
Reduce: FlagshipReducer
Module: rail.projects.reducer
rotation_angle: [-180, 0, 0]
flip_dec: true


# These describe the various data analysis pipelines
Expand Down
12 changes: 12 additions & 0 deletions src/rail/projects/algorithm_holder.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,18 @@ class RailReducerAlgorithmHolder(RailAlgorithmHolder):
required=True,
msg="Data Reducer Class",
),
rotation_angle=StageParameter(
list,
[0.0, 0.0, 0.0],
fmt="%s",
msg="Three rotation angles, applied to RA, DEC, and around the line of sight axis",
),
flip_dec=StageParameter(
float,
False,
fmt="%f",
msg="Shortcut to flip sign of Dec. If True, multiply Dec by -1. Excecuted BEFORE the rotator",
),
)
yaml_tag = "Reducer"

Expand Down
9 changes: 8 additions & 1 deletion src/rail/projects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,14 @@ def reduce_data(
assert issubclass(reducer_class, RailReducer)

reducer_args = library.get_selection(selection)
reducer = reducer_class(**reducer_args.config.to_dict())

# also fetch the reducer configs:
reducer_config = reducer_args.config.to_dict()
reducer_holder = library.get_algorithm("Reducer", reducer_class_name)
reducer_config["rotation_angle"] = reducer_holder.config.rotation_angle
reducer_config["flip_dec"] = reducer_holder.config.flip_dec
reducer = reducer_class(**reducer_config)
#reducer = reducer_class(**reducer_args.config.to_dict())

if not dry_run: # pragma: no cover
for source_, sink_ in zip(sources, sinks):
Expand Down
137 changes: 123 additions & 14 deletions src/rail/projects/reducer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import Any

import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet as pq
Expand All @@ -16,6 +17,59 @@
from .arrow_utils import parse_item
from .dynamic_class import DynamicClass


def rotate_gal_pyarrow(ra, dec, rot_ra, rot_dec, rot_x_ang=0):
"""
Rotate ra, dec in degrees to new coordinate
given three rotation angles
"""

# --- Degrees to radians ---
k = math.pi / 180.0
ra_r = pc.multiply(ra, k)
dec_r = pc.multiply(dec, k)
phi_r = rot_ra * k # scalar
theta_r = rot_dec * k # scalar

# --- Step 1: (RA, Dec) → Cartesian ---
x = pc.multiply(pc.cos(dec_r), pc.cos(ra_r))
y = pc.multiply(pc.cos(dec_r), pc.sin(ra_r))
z = pc.sin(dec_r)

# --- Step 2a: rot_z(-phi) ---
cp, sp = math.cos(-phi_r), math.sin(-phi_r)
x1 = pc.subtract(pc.multiply(pc.scalar(cp), x), pc.multiply(pc.scalar(sp), y))
y1 = pc.add(pc.multiply(pc.scalar(sp), x), pc.multiply(pc.scalar(cp), y))
z1 = z

# --- Step 2b: rot_y(theta) ---
ct, st = math.cos(theta_r), math.sin(theta_r)
x2 = pc.add(pc.multiply(pc.scalar(ct), x1), pc.multiply(pc.scalar(st), z1))
y2 = y1
z2 = pc.subtract(pc.multiply(pc.scalar(ct), z1), pc.multiply(pc.scalar(st), x1))

# --- Step 2c: Optional rot_x ---
if rot_x_ang != 0:
cx = math.cos(rot_x_ang * k)
sx = math.sin(rot_x_ang * k)
x3 = x2
y3 = pc.subtract(pc.multiply(pc.scalar(cx), y2), pc.multiply(pc.scalar(sx), z2))
z3 = pc.add(pc.multiply(pc.scalar(sx), y2), pc.multiply(pc.scalar(cx), z2))
else:
x3, y3, z3 = x2, y2, z2

# --- Step 3: Cartesian → (RA, Dec) ---
ra_deg = pc.multiply(pc.atan2(y3, x3), 180 / math.pi)
new_ra = pc.if_else(
pc.less(ra_deg, 0),
pc.add(ra_deg, 360),
ra_deg
)
new_dec = pc.multiply(pc.asin(pc.min_element_wise(pc.max_element_wise(z3, -1.0), 1.0)), 180/math.pi)

return new_ra, new_dec


COLUMNS = [
"galaxy_id",
"ra",
Expand Down Expand Up @@ -140,8 +194,8 @@
PROJECTIONS_CARDINAL = [
{
# "Roman_K213": pc.field("k213"),
"shift_ra": pc.add(pc.field("ra"), -60.),
"shift_dec": pc.multiply(pc.field("dec"), -1.),
#"shift_ra": pc.add(pc.field("ra"), -60.),
#"shift_dec": pc.multiply(pc.field("dec"), -1.),
"Ellipticity1": pc.field("Ellipticity_1"),
"Ellipticity2": pc.field("Ellipticity_2"),
"mag_y_euclid_nisp": pc.field("Euclid_Y"),
Expand Down Expand Up @@ -214,12 +268,12 @@

PROJECTIONS_FLAGSHIP = [
{
"ra": pc.if_else(
pc.greater(pc.add(pc.field("ra_mag_gal"), pc.scalar(180)), pc.scalar(360)),
pc.subtract(pc.field("ra_mag_gal"), pc.scalar(180)),
pc.add(pc.field("ra_mag_gal"), pc.scalar(180))
),
"dec": pc.multiply(pc.scalar(-1), pc.field("dec_mag_gal")),
#"ra": pc.if_else(
# pc.greater(pc.add(pc.field("ra_mag_gal"), pc.scalar(180)), pc.scalar(360)),
# pc.subtract(pc.field("ra_mag_gal"), pc.scalar(180)),
# pc.add(pc.field("ra_mag_gal"), pc.scalar(180))
# ),
#"dec": pc.multiply(pc.scalar(-1), pc.field("dec_mag_gal")),
"redshift": pc.field("observed_redshift_gal"),
"mag_u_lsst": pc.subtract(
pc.multiply(pc.scalar(-2.5), pc.log10(pc.field("lsst_u_el_model3_ext"))), pc.scalar(48.6)
Expand Down Expand Up @@ -307,7 +361,16 @@ class RailReducer(Configurable, DynamicClass):
output catalog
"""

config_options: dict[str, StageParameter] = {}
config_options: dict[str, StageParameter] = dict(
rotation_angle=StageParameter(
list, [0.0, 0.0, 0.0], fmt="%s",
msg="Three rotation angles, applied to RA, DEC, and around the line of sight axis",
),
flip_dec=StageParameter(
float, False, fmt="%f",
msg="Shortcut to flip sign of Dec. If True, multiply Dec by -1. Excecuted BEFORE the rotator",
),
)
sub_classes: dict[str, type[DynamicClass]] = {}

def __init__(self, **kwargs: Any):
Expand Down Expand Up @@ -343,7 +406,12 @@ def run(
class RomanRubinReducer(RailReducer):
"""Class to reduce the 'roman_rubin' simulation input files for pz analysis"""

config_options: dict[str, StageParameter] = dict(
#config_options: dict[str, StageParameter] = dict(
# name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
# cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
#)
config_options = RailReducer.config_options.copy()
config_options.update(
name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
)
Expand Down Expand Up @@ -430,7 +498,12 @@ class CardinalReducer(RailReducer):
preprocessing stage was performed to put them into pyarrow parquet
"""

config_options: dict[str, StageParameter] = dict(
#config_options: dict[str, StageParameter] = dict(
# name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
# cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
#)
config_options = RailReducer.config_options.copy()
config_options.update(
name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
)
Expand Down Expand Up @@ -479,6 +552,16 @@ def run(
),
)

rot_ra, rot_dec, rot_x = self.config.rotation_angle
ra = pc.field("ra")
if self.config.flip_dec == True:
dec = pc.multiply(pc.scalar(-1), pc.field("dec"))
else:
dec = pc.field("dec")
new_ra, new_dec = rotate_gal_pyarrow(ra, dec, float(rot_ra), float(rot_dec), rot_x_ang=float(rot_x))
PROJECTIONS_CARDINAL[0]['shift_ra'] = new_ra
PROJECTIONS_CARDINAL[0]['shift_dec'] = new_dec

column_projection = {k: pc.field(k) for k in COLUMNS_CARDINAL}
projection = column_projection
project_nodes = []
Expand Down Expand Up @@ -514,7 +597,12 @@ def run(
class FlagshipReducer(RailReducer):
"""Class to reduce the 'flagship' simulation input files for pz analysis"""

config_options: dict[str, StageParameter] = dict(
#config_options: dict[str, StageParameter] = dict(
# name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
# cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
#)
config_options = RailReducer.config_options.copy()
config_options.update(
name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
)
Expand Down Expand Up @@ -559,6 +647,17 @@ def run(
),
)

# add ra, dec projections with rotation
rot_ra, rot_dec, rot_x = self.config.rotation_angle
ra = pc.field("ra_mag_gal")
if self.config.flip_dec == True:
dec = pc.multiply(pc.scalar(-1), pc.field("dec_mag_gal"))
else:
dec = pc.field("dec_mag_gal")
new_ra, new_dec = rotate_gal_pyarrow(ra, dec, float(rot_ra), float(rot_dec), rot_x_ang=float(rot_x))
PROJECTIONS_FLAGSHIP[0]['ra'] = new_ra
PROJECTIONS_FLAGSHIP[0]['dec'] = new_dec

column_projection = {k: pc.field(k) for k in COLUMNS_FLAGSHIP}
projection = column_projection
project_nodes = []
Expand Down Expand Up @@ -594,7 +693,12 @@ def run(
class ComCamReducer(RailReducer):
"""Class to reduce the 'com_cam' input files for pz analysis"""

config_options: dict[str, StageParameter] = dict(
#config_options: dict[str, StageParameter] = dict(
# name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
# cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
#)
config_options = RailReducer.config_options.copy()
config_options.update(
name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
)
Expand Down Expand Up @@ -679,7 +783,12 @@ def run(
class DP1Reducer(RailReducer):
"""Class to reduce the 'DP1' input files for pz analysis"""

config_options: dict[str, StageParameter] = dict(
#config_options: dict[str, StageParameter] = dict(
# name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
# cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
#)
config_options = RailReducer.config_options.copy()
config_options.update(
name=StageParameter(str, None, fmt="%s", required=True, msg="Reducer Name"),
cuts=StageParameter(dict, {}, fmt="%s", msg="Selections"),
)
Expand Down
Loading