diff --git a/core/src/klio_core/config/_io.py b/core/src/klio_core/config/_io.py index a964b2b4..f1fac34d 100644 --- a/core/src/klio_core/config/_io.py +++ b/core/src/klio_core/config/_io.py @@ -73,6 +73,7 @@ def wrapper(cls): class KlioIOConfig(object): io_type = attr.attrib(type=KlioIOType) io_direction = attr.attrib(type=KlioIODirection) + name = attr.attrib(type=str) # these must be filled in by subclasses so Klio knows what supports what SUPPORTED_TYPES = [] @@ -116,7 +117,7 @@ def as_dict(self): # since dicts preserve order by default in py3, let's force # type to be first - particularly helpful/useful for dumping # config via `klio job config show` - copy = {"type": self.name} + copy = {"type": self.type_name} copy.update(config_dict) return copy @@ -141,15 +142,19 @@ def from_dict(cls, config_dict, *args, **kwargs): # to be defined without a default after those that have defaults, # we're inserting the default value here if the user doesn't have # it already in their config + copy = config_dict.copy() if "skip_klio_read" not in config_dict: - copy = config_dict.copy() copy["skip_klio_read"] = False + if "name" not in config_dict: + copy["name"] = None + if "skip_klio_read" not in config_dict or "name" not in config_dict: return super().from_dict(copy, *args, **kwargs) return super().from_dict(config_dict, *args, **kwargs) def to_io_kwargs(self): kwargs = super().to_io_kwargs() kwargs.pop("skip_klio_read", None) + kwargs.pop("name", None) return kwargs @@ -163,15 +168,19 @@ def from_dict(cls, config_dict, *args, **kwargs): # to be defined without a default after those that have defaults, # we're inserting the default value here if the user doesn't have # it already in their config + copy = config_dict.copy() if "skip_klio_write" not in config_dict: - copy = config_dict.copy() copy["skip_klio_write"] = False + if "name" not in config_dict: + copy["name"] = None + if "skip_klio_write" not in config_dict or "name" not in config_dict: return super().from_dict(copy, *args, **kwargs) return super().from_dict(config_dict, *args, **kwargs) def to_io_kwargs(self): kwargs = super().to_io_kwargs() kwargs.pop("skip_klio_write", None) + kwargs.pop("name", None) return kwargs @@ -185,9 +194,15 @@ def from_dict(cls, config_dict, *args, **kwargs): # to be defined without a default after those that have defaults, # we're inserting the default value here if the user doesn't have # it already in their config + copy = config_dict.copy() if "skip_klio_existence_check" not in config_dict: - copy = config_dict.copy() copy["skip_klio_existence_check"] = False + if "name" not in config_dict: + copy["name"] = None + if ( + "skip_klio_existence_check" not in config_dict + or "name" not in config_dict + ): return super().from_dict(copy, *args, **kwargs) return super().from_dict(config_dict, *args, **kwargs) @@ -199,7 +214,7 @@ def to_io_kwargs(self): @attr.attrs(frozen=True) class KlioPubSubConfig(object): - name = "pubsub" + type_name = "pubsub" topic = attr.attrib(type=str) @staticmethod @@ -254,7 +269,7 @@ def from_dict(cls, config_dict, *args, **kwargs): class KlioFileConfig(object): - name = "file" + type_name = "file" @attr.attrs(frozen=True) @@ -331,7 +346,7 @@ class KlioFileOutputDataConfig(KlioDataIOConfig, KlioFileConfig): class KlioAvroConfig(object): - name = "avro" + type_name = "avro" @attr.attrs(frozen=True) @@ -387,7 +402,7 @@ def _convert_bigquery_input_coder(coder_str): @attr.attrs(frozen=True) class KlioBigQueryConfig(object): - name = "bq" + type_name = "bq" project = attr.attrib(type=str, default=None) dataset = attr.attrib(type=str, default=None) table = attr.attrib(type=str, default=None) @@ -493,7 +508,7 @@ def contains_invalid_fields(field_list): @attr.attrs(frozen=True) class KlioGCSConfig(KlioIOConfig): - name = "gcs" + type_name = "gcs" location = attr.attrib(type=str) @staticmethod diff --git a/core/src/klio_core/config/_preprocessing.py b/core/src/klio_core/config/_preprocessing.py index c9d6ce48..567ac428 100644 --- a/core/src/klio_core/config/_preprocessing.py +++ b/core/src/klio_core/config/_preprocessing.py @@ -13,7 +13,6 @@ # limitations under the License. # -import collections import string import glom @@ -46,7 +45,7 @@ def _transform_io_list(io_subsection_list): """Transform lists of dicts into a nested dict of dicts, where the keys for the top-level dict come from the `name` field in the nested dict. If `name` is not present, a name is auto-generated based on the index - of the I/O and it's type. + of the I/O. example: @@ -77,20 +76,12 @@ def _transform_io_list(io_subsection_list): } """ - type_counters = collections.defaultdict(int) io_dict = {} - for conf in io_subsection_list: + for count, conf in enumerate(io_subsection_list): if "name" in conf: name = conf["name"] - # TODO: right now "name" isn't supported in IOConfig (conflicts - # with existing "name" attribute), once that's fixed we - # shouldn't drop name here - conf.pop("name") else: - type_name = conf.get("type", "unknown") - type_id = type_counters[type_name] - type_counters[type_name] += 1 - name = "{}{}".format(type_name, type_id) + name = count io_dict[name] = conf diff --git a/core/src/klio_core/config/core.py b/core/src/klio_core/config/core.py index 793e5755..cc087e78 100644 --- a/core/src/klio_core/config/core.py +++ b/core/src/klio_core/config/core.py @@ -268,7 +268,7 @@ def traverse(cls): def _create_config_objects(self, configs, io_type, io_direction): options = dict( [ - (x.name, x) + (x.type_name, x) for x in self._get_all_config_subclasses() if x.supports_type(io_type) and x.supports_direction(io_direction) diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index 886c66b8..db387aee 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -36,7 +36,11 @@ def job_config_dict(): }, }, "outputs": { - "pubsub0": {"type": "pubsub", "topic": "test-job-out"} + "pubsub0": { + "type": "pubsub", + "topic": "test-job-out", + "name": "test-event-output0", + } }, }, "data": { @@ -44,6 +48,7 @@ def job_config_dict(): "gcs0": { "type": "GCS", "location": "gs://sigint-output/test-parent-job-out", + "name": "test-data-input0", } }, "outputs": { @@ -63,21 +68,25 @@ def job_config_dict(): @pytest.fixture def final_job_config_dict(): return { + "allow_non_klio_messages": False, "metrics": {"logger": {}}, + "blocking": False, "events": { "inputs": [ { "type": "pubsub", + "name": None, + "skip_klio_read": False, "topic": "test-parent-job-out", "subscription": "test-parent-job-out-sub", - "skip_klio_read": False, - }, + } ], "outputs": [ { "type": "pubsub", - "topic": "test-job-out", + "name": "test-event-output0", "skip_klio_write": False, + "topic": "test-job-out", } ], }, @@ -85,8 +94,9 @@ def final_job_config_dict(): "inputs": [ { "type": "gcs", - "location": "gs://sigint-output/test-parent-job-out", + "name": "test-data-input0", "skip_klio_existence_check": False, + "location": "gs://sigint-output/test-parent-job-out", "file_suffix": "", "ping": False, } @@ -94,18 +104,17 @@ def final_job_config_dict(): "outputs": [ { "type": "gcs", + "name": None, + "skip_klio_existence_check": False, + "location": "gs://sigint-output/test-job-out", "file_suffix": "", "force": False, - "location": "gs://sigint-output/test-job-out", - "skip_klio_existence_check": False, } ], }, "more": "config", "that": {"the": "user"}, "might": ["include"], - "blocking": False, - "allow_non_klio_messages": False, } @@ -427,7 +436,7 @@ def test_klio_read_file_config(): config_dict, io.KlioIOType.DATA, io.KlioIODirection.INPUT ) - assert "file" == klio_read_file_config.name + assert "file" == klio_read_file_config.type_name assert config_dict["location"] == klio_read_file_config.file_pattern @@ -440,7 +449,7 @@ def test_klio_write_file_config(): config_dict, io.KlioIOType.DATA, io.KlioIODirection.OUTPUT ) - assert "file" == klio_write_file_config.name + assert "file" == klio_write_file_config.type_name assert config_dict["location"] == klio_write_file_config.file_path_prefix @@ -461,7 +470,7 @@ def test_klio_write_bigquery_config(): config_dict, io.KlioIOType.EVENT, io.KlioIODirection.OUTPUT ) - assert "bq" == klio_write_bq_cfg.name + assert "bq" == klio_write_bq_cfg.type_name assert config_dict["schema"] == klio_write_bq_cfg.schema assert ( config_dict["create_disposition"] diff --git a/core/tests/config/test_preprocessing.py b/core/tests/config/test_preprocessing.py index a2d41fec..533671d4 100644 --- a/core/tests/config/test_preprocessing.py +++ b/core/tests/config/test_preprocessing.py @@ -223,53 +223,68 @@ def test_apply_templates( assert expected == json.loads(new_dict) -def test_transform_io(kcp): - config = { - "job_config": { - "events": { - "inputs": [ - {"type": "bq", "key": "value"}, - {"type": "bq", "key": "value2"}, - {"type": "gcs", "gcskey": "gcsvalue"}, - ], - "outputs": [ - {"type": "bq", "key": "value", "name": "mybq"}, - {"type": "bq", "key": "value2"}, - ], - }, - "data": { - "inputs": [], - "outputs": [ - {"type": "bq", "key": "value", "name": "mybq"}, - {"type": "bq", "key": "value2"}, - ], - }, - } - } - expected = { - "job_config": { - "events": { - "inputs": { - "bq0": {"type": "bq", "key": "value"}, - "bq1": {"type": "bq", "key": "value2"}, - "gcs0": {"type": "gcs", "gcskey": "gcsvalue"}, - }, - "outputs": { - "mybq": {"type": "bq", "key": "value"}, - "bq0": {"type": "bq", "key": "value2"}, - }, +@pytest.mark.parametrize( + "config,expected", + ( + # No overrides given - no changes in returned dict + ( + { + "job_config": { + "events": { + "inputs": [ + {"type": "bq", "key": "value"}, + {"type": "bq", "key": "value2"}, + {"type": "gcs", "gcskey": "gcsvalue"}, + ], + "outputs": [ + {"type": "bq", "key": "value", "name": "mybq"}, + {"type": "bq", "key": "value2"}, + ], + }, + "data": { + "inputs": [], + "outputs": [ + {"type": "bq", "key": "value", "name": "mybq"}, + {"type": "bq", "key": "value2"}, + ], + }, + } }, - "data": { - "inputs": {}, - "outputs": { - "mybq": {"type": "bq", "key": "value"}, - "bq0": {"type": "bq", "key": "value2"}, - }, + { + "job_config": { + "events": { + "inputs": { + 0: {"type": "bq", "key": "value"}, + 1: {"type": "bq", "key": "value2"}, + 2: {"type": "gcs", "gcskey": "gcsvalue"}, + }, + "outputs": { + "mybq": { + "type": "bq", + "key": "value", + "name": "mybq", + }, + 1: {"type": "bq", "key": "value2"}, + }, + }, + "data": { + "inputs": {}, + "outputs": { + "mybq": { + "type": "bq", + "key": "value", + "name": "mybq", + }, + 1: {"type": "bq", "key": "value2"}, + }, + }, + } }, - } - } + ), + ), +) +def test_transform_io(kcp, config, expected): actual = kcp._transform_io_sections(config) - assert actual == expected diff --git a/exec/src/klio_exec/commands/run.py b/exec/src/klio_exec/commands/run.py index a83cf967..c5dbdcd7 100644 --- a/exec/src/klio_exec/commands/run.py +++ b/exec/src/klio_exec/commands/run.py @@ -459,28 +459,32 @@ def lbl(label): # TODO this can prob go away if/when we make event_inputs a # dictionary rather than a list of dicts (@lynn) - def _generate_input_conf_names(self): + def _validate_input_conf_names(self): ev_inputs = self.config.job_config.events.inputs - input_dict = {} - for index, ev in enumerate(ev_inputs): - name = "{}{}".format(ev.name, index) - input_dict[name] = ev - return input_dict + ev_names = [input.name for input in ev_inputs] + # Enforce no duplicate event input names + # We could beef up the error message by returning the duplicates + # but this will go away when we make event_inputs a dict. + return len(ev_names) == len(set(ev_names)) def _generate_pcoll_per_input(self, pipeline): - inputs = self._generate_input_conf_names() + if not self._validate_input_conf_names(): + msg = "Duplicate event input names found." + logging.error(msg) + raise SystemExit(1) + inputs = self.config.job_config.events.inputs MultiInputPCollTuple = collections.namedtuple( - "MultiInputPCollTuple", list(inputs.keys()) + "MultiInputPCollTuple", [input.name for input in inputs] ) input_name_to_input_pcolls = {} multi_to_pass_thru = [] - for input_name, input_conf in inputs.items(): + for input_conf in inputs: input_to_process, input_to_pass_thru = self._generate_pcoll( - pipeline, input_conf, label_prefix=input_name + pipeline, input_conf, label_prefix=input_conf.name ) if input_to_pass_thru: multi_to_pass_thru.append(input_to_pass_thru) - input_name_to_input_pcolls[input_name] = input_to_process + input_name_to_input_pcolls[input_conf.name] = input_to_process to_process = MultiInputPCollTuple(**input_name_to_input_pcolls) to_pass_thru = ( @@ -500,7 +504,7 @@ def _generate_pcoll(self, pipeline, input_config, label_prefix=None): if label_prefix: label = "[{}] {}".format(label_prefix, label) - transform_cls_in = self._io_mapper.input[input_config.name] + transform_cls_in = self._io_mapper.input[input_config.type_name] in_pcol = pipeline | label >> transform_cls_in( **input_config.to_io_kwargs() ) @@ -534,7 +538,9 @@ def _setup_pipeline(self, pipeline): if self._has_event_outputs: output_config = self.config.job_config.events.outputs[0] if not output_config.skip_klio_write: - transform_cls_out = self._io_mapper.output[output_config.name] + transform_cls_out = self._io_mapper.output[ + output_config.type_name + ] to_output = out_pcol if to_pass_thru: to_output_tuple = (out_pcol, to_pass_thru) diff --git a/integration/multi-event-input-batch/klio-job.yaml b/integration/multi-event-input-batch/klio-job.yaml index b9e9af7e..71e3b212 100644 --- a/integration/multi-event-input-batch/klio-job.yaml +++ b/integration/multi-event-input-batch/klio-job.yaml @@ -14,8 +14,10 @@ job_config: events: inputs: - type: file + name: "first_input" location: ./batch_track_ids_1.txt - type: file + name: "second_input" location: ./batch_track_ids_2.txt outputs: - type: file diff --git a/integration/multi-event-input-batch/run.py b/integration/multi-event-input-batch/run.py index 841b1fe3..7d031045 100644 --- a/integration/multi-event-input-batch/run.py +++ b/integration/multi-event-input-batch/run.py @@ -58,8 +58,8 @@ def combined_func(ctx, item): def run(pcolls, config): - first = pcolls.file0 | "process first" >> beam.Map(first_func) - second = pcolls.file1 | "process second" >> beam.Map(second_func) + first = pcolls.first_input | "process first" >> beam.Map(first_func) + second = pcolls.second_input | "process second" >> beam.Map(second_func) combined = (first, second) | beam.Flatten() return combined | "process combined" >> beam.Map(combined_func)