From 308fb9f4c2525b2a2ebabc3573e7bf7925264d39 Mon Sep 17 00:00:00 2001 From: Shireen Kheradpey Date: Mon, 29 Mar 2021 17:05:25 -0400 Subject: [PATCH 1/3] Rename name to type_name for type attribute --- core/src/klio_core/config/_io.py | 12 ++++++------ core/src/klio_core/config/core.py | 2 +- core/tests/config/test_config.py | 6 +++--- exec/src/klio_exec/commands/run.py | 6 ++++-- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/klio_core/config/_io.py b/core/src/klio_core/config/_io.py index a964b2b4..2c1eb89c 100644 --- a/core/src/klio_core/config/_io.py +++ b/core/src/klio_core/config/_io.py @@ -116,7 +116,7 @@ def as_dict(self): # since dicts preserve order by default in py3, let's force # type to be first - particularly helpful/useful for dumping # config via `klio job config show` - copy = {"type": self.name} + copy = {"type": self.type_name} copy.update(config_dict) return copy @@ -199,7 +199,7 @@ def to_io_kwargs(self): @attr.attrs(frozen=True) class KlioPubSubConfig(object): - name = "pubsub" + type_name = "pubsub" topic = attr.attrib(type=str) @staticmethod @@ -254,7 +254,7 @@ def from_dict(cls, config_dict, *args, **kwargs): class KlioFileConfig(object): - name = "file" + type_name = "file" @attr.attrs(frozen=True) @@ -331,7 +331,7 @@ class KlioFileOutputDataConfig(KlioDataIOConfig, KlioFileConfig): class KlioAvroConfig(object): - name = "avro" + type_name = "avro" @attr.attrs(frozen=True) @@ -387,7 +387,7 @@ def _convert_bigquery_input_coder(coder_str): @attr.attrs(frozen=True) class KlioBigQueryConfig(object): - name = "bq" + type_name = "bq" project = attr.attrib(type=str, default=None) dataset = attr.attrib(type=str, default=None) table = attr.attrib(type=str, default=None) @@ -493,7 +493,7 @@ def contains_invalid_fields(field_list): @attr.attrs(frozen=True) class KlioGCSConfig(KlioIOConfig): - name = "gcs" + type_name = "gcs" location = attr.attrib(type=str) @staticmethod diff --git a/core/src/klio_core/config/core.py b/core/src/klio_core/config/core.py index 793e5755..cc087e78 100644 --- a/core/src/klio_core/config/core.py +++ b/core/src/klio_core/config/core.py @@ -268,7 +268,7 @@ def traverse(cls): def _create_config_objects(self, configs, io_type, io_direction): options = dict( [ - (x.name, x) + (x.type_name, x) for x in self._get_all_config_subclasses() if x.supports_type(io_type) and x.supports_direction(io_direction) diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index 886c66b8..e0dc6781 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -427,7 +427,7 @@ def test_klio_read_file_config(): config_dict, io.KlioIOType.DATA, io.KlioIODirection.INPUT ) - assert "file" == klio_read_file_config.name + assert "file" == klio_read_file_config.type_name assert config_dict["location"] == klio_read_file_config.file_pattern @@ -440,7 +440,7 @@ def test_klio_write_file_config(): config_dict, io.KlioIOType.DATA, io.KlioIODirection.OUTPUT ) - assert "file" == klio_write_file_config.name + assert "file" == klio_write_file_config.type_name assert config_dict["location"] == klio_write_file_config.file_path_prefix @@ -461,7 +461,7 @@ def test_klio_write_bigquery_config(): config_dict, io.KlioIOType.EVENT, io.KlioIODirection.OUTPUT ) - assert "bq" == klio_write_bq_cfg.name + assert "bq" == klio_write_bq_cfg.type_name assert config_dict["schema"] == klio_write_bq_cfg.schema assert ( config_dict["create_disposition"] diff --git a/exec/src/klio_exec/commands/run.py b/exec/src/klio_exec/commands/run.py index a83cf967..f0903091 100644 --- a/exec/src/klio_exec/commands/run.py +++ b/exec/src/klio_exec/commands/run.py @@ -500,7 +500,7 @@ def _generate_pcoll(self, pipeline, input_config, label_prefix=None): if label_prefix: label = "[{}] {}".format(label_prefix, label) - transform_cls_in = self._io_mapper.input[input_config.name] + transform_cls_in = self._io_mapper.input[input_config.type_name] in_pcol = pipeline | label >> transform_cls_in( **input_config.to_io_kwargs() ) @@ -534,7 +534,9 @@ def _setup_pipeline(self, pipeline): if self._has_event_outputs: output_config = self.config.job_config.events.outputs[0] if not output_config.skip_klio_write: - transform_cls_out = self._io_mapper.output[output_config.name] + transform_cls_out = self._io_mapper.output[ + output_config.type_name + ] to_output = out_pcol if to_pass_thru: to_output_tuple = (out_pcol, to_pass_thru) From dcb64c3501e350266b25006b18eee4c285dbc54a Mon Sep 17 00:00:00 2001 From: Shireen Kheradpey Date: Mon, 29 Mar 2021 20:48:42 -0400 Subject: [PATCH 2/3] [core] Expose name in config dict --- core/src/klio_core/config/_io.py | 21 ++++- core/src/klio_core/config/_preprocessing.py | 4 - core/tests/config/test_config.py | 43 +++++---- core/tests/config/test_preprocessing.py | 99 +++++++++++---------- 4 files changed, 98 insertions(+), 69 deletions(-) diff --git a/core/src/klio_core/config/_io.py b/core/src/klio_core/config/_io.py index 2c1eb89c..f1fac34d 100644 --- a/core/src/klio_core/config/_io.py +++ b/core/src/klio_core/config/_io.py @@ -73,6 +73,7 @@ def wrapper(cls): class KlioIOConfig(object): io_type = attr.attrib(type=KlioIOType) io_direction = attr.attrib(type=KlioIODirection) + name = attr.attrib(type=str) # these must be filled in by subclasses so Klio knows what supports what SUPPORTED_TYPES = [] @@ -141,15 +142,19 @@ def from_dict(cls, config_dict, *args, **kwargs): # to be defined without a default after those that have defaults, # we're inserting the default value here if the user doesn't have # it already in their config + copy = config_dict.copy() if "skip_klio_read" not in config_dict: - copy = config_dict.copy() copy["skip_klio_read"] = False + if "name" not in config_dict: + copy["name"] = None + if "skip_klio_read" not in config_dict or "name" not in config_dict: return super().from_dict(copy, *args, **kwargs) return super().from_dict(config_dict, *args, **kwargs) def to_io_kwargs(self): kwargs = super().to_io_kwargs() kwargs.pop("skip_klio_read", None) + kwargs.pop("name", None) return kwargs @@ -163,15 +168,19 @@ def from_dict(cls, config_dict, *args, **kwargs): # to be defined without a default after those that have defaults, # we're inserting the default value here if the user doesn't have # it already in their config + copy = config_dict.copy() if "skip_klio_write" not in config_dict: - copy = config_dict.copy() copy["skip_klio_write"] = False + if "name" not in config_dict: + copy["name"] = None + if "skip_klio_write" not in config_dict or "name" not in config_dict: return super().from_dict(copy, *args, **kwargs) return super().from_dict(config_dict, *args, **kwargs) def to_io_kwargs(self): kwargs = super().to_io_kwargs() kwargs.pop("skip_klio_write", None) + kwargs.pop("name", None) return kwargs @@ -185,9 +194,15 @@ def from_dict(cls, config_dict, *args, **kwargs): # to be defined without a default after those that have defaults, # we're inserting the default value here if the user doesn't have # it already in their config + copy = config_dict.copy() if "skip_klio_existence_check" not in config_dict: - copy = config_dict.copy() copy["skip_klio_existence_check"] = False + if "name" not in config_dict: + copy["name"] = None + if ( + "skip_klio_existence_check" not in config_dict + or "name" not in config_dict + ): return super().from_dict(copy, *args, **kwargs) return super().from_dict(config_dict, *args, **kwargs) diff --git a/core/src/klio_core/config/_preprocessing.py b/core/src/klio_core/config/_preprocessing.py index c9d6ce48..caf2b583 100644 --- a/core/src/klio_core/config/_preprocessing.py +++ b/core/src/klio_core/config/_preprocessing.py @@ -82,10 +82,6 @@ def _transform_io_list(io_subsection_list): for conf in io_subsection_list: if "name" in conf: name = conf["name"] - # TODO: right now "name" isn't supported in IOConfig (conflicts - # with existing "name" attribute), once that's fixed we - # shouldn't drop name here - conf.pop("name") else: type_name = conf.get("type", "unknown") type_id = type_counters[type_name] diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index e0dc6781..cb672da0 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -36,7 +36,11 @@ def job_config_dict(): }, }, "outputs": { - "pubsub0": {"type": "pubsub", "topic": "test-job-out"} + "pubsub0": { + "type": "pubsub", + "topic": "test-job-out", + "name": "test-event-output0", + } }, }, "data": { @@ -44,6 +48,7 @@ def job_config_dict(): "gcs0": { "type": "GCS", "location": "gs://sigint-output/test-parent-job-out", + "name": "test-data-input0", } }, "outputs": { @@ -63,49 +68,55 @@ def job_config_dict(): @pytest.fixture def final_job_config_dict(): return { - "metrics": {"logger": {}}, + "allow_non_klio_messages": False, + "metrics": { + "logger": {} + }, + "blocking": False, "events": { "inputs": [ { "type": "pubsub", - "topic": "test-parent-job-out", - "subscription": "test-parent-job-out-sub", + "name": None, "skip_klio_read": False, - }, + "topic": "test-parent-job-out", + "subscription": "test-parent-job-out-sub" + } ], "outputs": [ { "type": "pubsub", - "topic": "test-job-out", + "name": "test-event-output0", "skip_klio_write": False, + "topic": "test-job-out" } - ], + ] }, "data": { "inputs": [ { "type": "gcs", - "location": "gs://sigint-output/test-parent-job-out", + "name": "test-data-input0", "skip_klio_existence_check": False, + "location": "gs://sigint-output/test-parent-job-out", "file_suffix": "", - "ping": False, + "ping": False } ], - "outputs": [ + "outputs":[ { "type": "gcs", + "name": None, + "skip_klio_existence_check": False, + "location": "gs://sigint-output/test-job-out", "file_suffix": "", "force": False, - "location": "gs://sigint-output/test-job-out", - "skip_klio_existence_check": False, } - ], + ] }, "more": "config", "that": {"the": "user"}, - "might": ["include"], - "blocking": False, - "allow_non_klio_messages": False, + "might": ["include"] } diff --git a/core/tests/config/test_preprocessing.py b/core/tests/config/test_preprocessing.py index a2d41fec..8a393fc2 100644 --- a/core/tests/config/test_preprocessing.py +++ b/core/tests/config/test_preprocessing.py @@ -223,53 +223,60 @@ def test_apply_templates( assert expected == json.loads(new_dict) -def test_transform_io(kcp): - config = { - "job_config": { - "events": { - "inputs": [ - {"type": "bq", "key": "value"}, - {"type": "bq", "key": "value2"}, - {"type": "gcs", "gcskey": "gcsvalue"}, - ], - "outputs": [ - {"type": "bq", "key": "value", "name": "mybq"}, - {"type": "bq", "key": "value2"}, - ], - }, - "data": { - "inputs": [], - "outputs": [ - {"type": "bq", "key": "value", "name": "mybq"}, - {"type": "bq", "key": "value2"}, - ], - }, - } - } - expected = { - "job_config": { - "events": { - "inputs": { - "bq0": {"type": "bq", "key": "value"}, - "bq1": {"type": "bq", "key": "value2"}, - "gcs0": {"type": "gcs", "gcskey": "gcsvalue"}, - }, - "outputs": { - "mybq": {"type": "bq", "key": "value"}, - "bq0": {"type": "bq", "key": "value2"}, - }, - }, - "data": { - "inputs": {}, - "outputs": { - "mybq": {"type": "bq", "key": "value"}, - "bq0": {"type": "bq", "key": "value2"}, - }, - }, - } - } +@pytest.mark.parametrize( + "config,expected", + ( + # No overrides given - no changes in returned dict + ( + { + "job_config": { + "events": { + "inputs": [ + {"type": "bq", "key": "value"}, + {"type": "bq", "key": "value2"}, + {"type": "gcs", "gcskey": "gcsvalue"}, + ], + "outputs": [ + {"type": "bq", "key": "value", "name": "mybq"}, + {"type": "bq", "key": "value2"}, + ], + }, + "data": { + "inputs": [], + "outputs": [ + {"type": "bq", "key": "value", "name": "mybq"}, + {"type": "bq", "key": "value2"}, + ], + }, + } + }, + { + "job_config": { + "events": { + "inputs": { + "bq0": {"type": "bq", "key": "value"}, + "bq1": {"type": "bq", "key": "value2"}, + "gcs0": {"type": "gcs", "gcskey": "gcsvalue"}, + }, + "outputs": { + "mybq": {"type": "bq", "key": "value", "name": "mybq"}, + "bq0": {"type": "bq", "key": "value2"}, + }, + }, + "data": { + "inputs": {}, + "outputs": { + "mybq": {"type": "bq", "key": "value", "name": "mybq"}, + "bq0": {"type": "bq", "key": "value2"}, + }, + }, + } + } + ), + ), +) +def test_transform_io(kcp, config, expected): actual = kcp._transform_io_sections(config) - assert actual == expected From 6d309d5ad78a3e160d81402448a8f7782973929d Mon Sep 17 00:00:00 2001 From: Shireen Kheradpey Date: Tue, 30 Mar 2021 11:14:06 -0400 Subject: [PATCH 3/3] [core] Remove multi-naming schema --- core/src/klio_core/config/_preprocessing.py | 11 +-- core/tests/config/test_config.py | 18 ++-- core/tests/config/test_preprocessing.py | 96 ++++++++++--------- exec/src/klio_exec/commands/run.py | 26 ++--- .../multi-event-input-batch/klio-job.yaml | 2 + integration/multi-event-input-batch/run.py | 4 +- 6 files changed, 82 insertions(+), 75 deletions(-) diff --git a/core/src/klio_core/config/_preprocessing.py b/core/src/klio_core/config/_preprocessing.py index caf2b583..567ac428 100644 --- a/core/src/klio_core/config/_preprocessing.py +++ b/core/src/klio_core/config/_preprocessing.py @@ -13,7 +13,6 @@ # limitations under the License. # -import collections import string import glom @@ -46,7 +45,7 @@ def _transform_io_list(io_subsection_list): """Transform lists of dicts into a nested dict of dicts, where the keys for the top-level dict come from the `name` field in the nested dict. If `name` is not present, a name is auto-generated based on the index - of the I/O and it's type. + of the I/O. example: @@ -77,16 +76,12 @@ def _transform_io_list(io_subsection_list): } """ - type_counters = collections.defaultdict(int) io_dict = {} - for conf in io_subsection_list: + for count, conf in enumerate(io_subsection_list): if "name" in conf: name = conf["name"] else: - type_name = conf.get("type", "unknown") - type_id = type_counters[type_name] - type_counters[type_name] += 1 - name = "{}{}".format(type_name, type_id) + name = count io_dict[name] = conf diff --git a/core/tests/config/test_config.py b/core/tests/config/test_config.py index cb672da0..db387aee 100644 --- a/core/tests/config/test_config.py +++ b/core/tests/config/test_config.py @@ -69,9 +69,7 @@ def job_config_dict(): def final_job_config_dict(): return { "allow_non_klio_messages": False, - "metrics": { - "logger": {} - }, + "metrics": {"logger": {}}, "blocking": False, "events": { "inputs": [ @@ -80,7 +78,7 @@ def final_job_config_dict(): "name": None, "skip_klio_read": False, "topic": "test-parent-job-out", - "subscription": "test-parent-job-out-sub" + "subscription": "test-parent-job-out-sub", } ], "outputs": [ @@ -88,9 +86,9 @@ def final_job_config_dict(): "type": "pubsub", "name": "test-event-output0", "skip_klio_write": False, - "topic": "test-job-out" + "topic": "test-job-out", } - ] + ], }, "data": { "inputs": [ @@ -100,10 +98,10 @@ def final_job_config_dict(): "skip_klio_existence_check": False, "location": "gs://sigint-output/test-parent-job-out", "file_suffix": "", - "ping": False + "ping": False, } ], - "outputs":[ + "outputs": [ { "type": "gcs", "name": None, @@ -112,11 +110,11 @@ def final_job_config_dict(): "file_suffix": "", "force": False, } - ] + ], }, "more": "config", "that": {"the": "user"}, - "might": ["include"] + "might": ["include"], } diff --git a/core/tests/config/test_preprocessing.py b/core/tests/config/test_preprocessing.py index 8a393fc2..533671d4 100644 --- a/core/tests/config/test_preprocessing.py +++ b/core/tests/config/test_preprocessing.py @@ -226,53 +226,61 @@ def test_apply_templates( @pytest.mark.parametrize( "config,expected", ( - # No overrides given - no changes in returned dict - ( - { - "job_config": { - "events": { - "inputs": [ - {"type": "bq", "key": "value"}, - {"type": "bq", "key": "value2"}, - {"type": "gcs", "gcskey": "gcsvalue"}, - ], - "outputs": [ - {"type": "bq", "key": "value", "name": "mybq"}, - {"type": "bq", "key": "value2"}, - ], - }, - "data": { - "inputs": [], - "outputs": [ - {"type": "bq", "key": "value", "name": "mybq"}, - {"type": "bq", "key": "value2"}, - ], - }, - } + # No overrides given - no changes in returned dict + ( + { + "job_config": { + "events": { + "inputs": [ + {"type": "bq", "key": "value"}, + {"type": "bq", "key": "value2"}, + {"type": "gcs", "gcskey": "gcsvalue"}, + ], + "outputs": [ + {"type": "bq", "key": "value", "name": "mybq"}, + {"type": "bq", "key": "value2"}, + ], }, - { - "job_config": { - "events": { - "inputs": { - "bq0": {"type": "bq", "key": "value"}, - "bq1": {"type": "bq", "key": "value2"}, - "gcs0": {"type": "gcs", "gcskey": "gcsvalue"}, - }, - "outputs": { - "mybq": {"type": "bq", "key": "value", "name": "mybq"}, - "bq0": {"type": "bq", "key": "value2"}, - }, + "data": { + "inputs": [], + "outputs": [ + {"type": "bq", "key": "value", "name": "mybq"}, + {"type": "bq", "key": "value2"}, + ], + }, + } + }, + { + "job_config": { + "events": { + "inputs": { + 0: {"type": "bq", "key": "value"}, + 1: {"type": "bq", "key": "value2"}, + 2: {"type": "gcs", "gcskey": "gcsvalue"}, + }, + "outputs": { + "mybq": { + "type": "bq", + "key": "value", + "name": "mybq", }, - "data": { - "inputs": {}, - "outputs": { - "mybq": {"type": "bq", "key": "value", "name": "mybq"}, - "bq0": {"type": "bq", "key": "value2"}, - }, + 1: {"type": "bq", "key": "value2"}, + }, + }, + "data": { + "inputs": {}, + "outputs": { + "mybq": { + "type": "bq", + "key": "value", + "name": "mybq", }, - } - } - ), + 1: {"type": "bq", "key": "value2"}, + }, + }, + } + }, + ), ), ) def test_transform_io(kcp, config, expected): diff --git a/exec/src/klio_exec/commands/run.py b/exec/src/klio_exec/commands/run.py index f0903091..c5dbdcd7 100644 --- a/exec/src/klio_exec/commands/run.py +++ b/exec/src/klio_exec/commands/run.py @@ -459,28 +459,32 @@ def lbl(label): # TODO this can prob go away if/when we make event_inputs a # dictionary rather than a list of dicts (@lynn) - def _generate_input_conf_names(self): + def _validate_input_conf_names(self): ev_inputs = self.config.job_config.events.inputs - input_dict = {} - for index, ev in enumerate(ev_inputs): - name = "{}{}".format(ev.name, index) - input_dict[name] = ev - return input_dict + ev_names = [input.name for input in ev_inputs] + # Enforce no duplicate event input names + # We could beef up the error message by returning the duplicates + # but this will go away when we make event_inputs a dict. + return len(ev_names) == len(set(ev_names)) def _generate_pcoll_per_input(self, pipeline): - inputs = self._generate_input_conf_names() + if not self._validate_input_conf_names(): + msg = "Duplicate event input names found." + logging.error(msg) + raise SystemExit(1) + inputs = self.config.job_config.events.inputs MultiInputPCollTuple = collections.namedtuple( - "MultiInputPCollTuple", list(inputs.keys()) + "MultiInputPCollTuple", [input.name for input in inputs] ) input_name_to_input_pcolls = {} multi_to_pass_thru = [] - for input_name, input_conf in inputs.items(): + for input_conf in inputs: input_to_process, input_to_pass_thru = self._generate_pcoll( - pipeline, input_conf, label_prefix=input_name + pipeline, input_conf, label_prefix=input_conf.name ) if input_to_pass_thru: multi_to_pass_thru.append(input_to_pass_thru) - input_name_to_input_pcolls[input_name] = input_to_process + input_name_to_input_pcolls[input_conf.name] = input_to_process to_process = MultiInputPCollTuple(**input_name_to_input_pcolls) to_pass_thru = ( diff --git a/integration/multi-event-input-batch/klio-job.yaml b/integration/multi-event-input-batch/klio-job.yaml index b9e9af7e..71e3b212 100644 --- a/integration/multi-event-input-batch/klio-job.yaml +++ b/integration/multi-event-input-batch/klio-job.yaml @@ -14,8 +14,10 @@ job_config: events: inputs: - type: file + name: "first_input" location: ./batch_track_ids_1.txt - type: file + name: "second_input" location: ./batch_track_ids_2.txt outputs: - type: file diff --git a/integration/multi-event-input-batch/run.py b/integration/multi-event-input-batch/run.py index 841b1fe3..7d031045 100644 --- a/integration/multi-event-input-batch/run.py +++ b/integration/multi-event-input-batch/run.py @@ -58,8 +58,8 @@ def combined_func(ctx, item): def run(pcolls, config): - first = pcolls.file0 | "process first" >> beam.Map(first_func) - second = pcolls.file1 | "process second" >> beam.Map(second_func) + first = pcolls.first_input | "process first" >> beam.Map(first_func) + second = pcolls.second_input | "process second" >> beam.Map(second_func) combined = (first, second) | beam.Flatten() return combined | "process combined" >> beam.Map(combined_func)