Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions dtable_events/automations/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,256 @@ def do_action(self):
return


class ModifyLinkedRecordInOtherAction(BaseAction):

VALID_COLUMN_TYPES = [
ColumnTypes.TEXT,
ColumnTypes.DATE,
ColumnTypes.LONG_TEXT,
ColumnTypes.CHECKBOX,
ColumnTypes.SINGLE_SELECT,
ColumnTypes.MULTIPLE_SELECT,
ColumnTypes.URL,
ColumnTypes.DURATION,
ColumnTypes.NUMBER,
ColumnTypes.COLLABORATOR,
ColumnTypes.EMAIL,
ColumnTypes.RATE,
]

def __init__(self, auto_rule, action_type, data, row, link_column_key):
"""
auto_rule: instance of AutomationRule
data: event data from redis
row: {col_key: value_config, ...} updates to apply on linked rows
link_column_key: key of the link column in the triggering table
"""
super().__init__(auto_rule, action_type, data)
self.row = row or {}
self.link_column_key = link_column_key
self.col_name_dict = {}
self.col_key_dict = {}
self.linked_table_id = None
self.link_column = None
self._init_link_info()

def _init_link_info(self):
"""Resolve link column metadata to find the linked table."""
table_columns = self.get_columns(self.auto_rule.table_id)
for col in table_columns:
if col.get('key') == self.link_column_key and col.get('type') == 'link':
self.link_column = col
break
if not self.link_column:
return
col_data = self.link_column.get('data') or {}
table_id = col_data.get('table_id')
other_table_id = col_data.get('other_table_id')
self.linked_table_id = other_table_id if self.auto_rule.table_id == table_id else table_id

def get_table_name(self, table_id):
dtable_metadata = self.auto_rule.dtable_metadata
for table in dtable_metadata.get('tables', []):
if table.get('_id') == table_id:
return table.get('name')

def get_columns(self, table_id):
dtable_metadata = self.auto_rule.dtable_metadata
for table in dtable_metadata.get('tables', []):
if table.get('_id') == table_id:
return table.get('columns', [])
return []

def fill_msg_blanks_with_sql(self, row, text, blanks):
col_name_dict = self.col_name_dict
db_session = self.auto_rule.db_session
return fill_msg_blanks_with_sql_row(text, blanks, col_name_dict, row, db_session)

def format_time_by_offset(self, offset, format_length):
cur_datetime = datetime.now()
cur_datetime_offset = cur_datetime + timedelta(days=offset)
if format_length == 2:
return cur_datetime_offset.strftime("%Y-%m-%d %H:%M")
if format_length == 1:
return cur_datetime_offset.strftime("%Y-%m-%d")

def get_linked_row_ids(self, sql_row):
"""Get linked row IDs from the triggering row's link column value."""
linked_rows = sql_row.get(self.link_column_key, [])
if not linked_rows or not isinstance(linked_rows, list):
return []
return [row.get('row_id') for row in linked_rows if row.get('row_id')]

def build_update_row(self, sql_row):
"""Build the update dict for linked rows, similar to AddRecordToOtherTableAction."""
dst_columns = self.get_columns(self.linked_table_id)
dst_table_name = self.get_table_name(self.linked_table_id)
filtered_updates = {}

for col in dst_columns:
if col.get('type') not in self.VALID_COLUMN_TYPES:
continue
col_name = col.get('name')
col_type = col.get('type')
col_key = col.get('key')
if col_key not in self.row:
continue

if col_type == ColumnTypes.DATE:
time_format = col.get('data', {}).get('format', '')
format_length = len(time_format.split(" "))
try:
time_dict = self.row.get(col_key)
if not time_dict:
continue
set_type = time_dict.get('set_type')
if set_type == 'specific_value':
filtered_updates[col_name] = time_dict.get('value')
elif set_type == 'relative_date':
offset = time_dict.get('offset')
filtered_updates[col_name] = self.format_time_by_offset(int(offset), format_length)
elif set_type == 'date_column':
date_column_key = time_dict.get('date_column_key')
src_col = self.col_key_dict.get(date_column_key)
if src_col and src_col.get('type') == ColumnTypes.DATE:
filtered_updates[col_name] = sql_row.get(src_col['key'])
except Exception as e:
auto_rule_logger.error('rule: %s modify linked record date error: %s, col_key: %s', self.auto_rule.rule_id, e, col_key)

elif col_type in [ColumnTypes.SINGLE_SELECT, ColumnTypes.MULTIPLE_SELECT]:
try:
data_dict = self.row.get(col_key)
if not data_dict:
continue
if isinstance(data_dict, dict):
set_type = data_dict.get('set_type')
if set_type == 'default':
value = data_dict.get('value')
filtered_updates[col_name] = self.parse_column_value(col, value)
elif set_type == 'column':
src_col_key = data_dict.get('value')
src_col = self.col_key_dict.get(src_col_key)
sql_value = sql_row.get(src_col_key)
src_col_data = src_col.get('data') or {}
src_col_data_options = src_col_data.get('options') or []
if col_type == ColumnTypes.SINGLE_SELECT and isinstance(sql_value, str):
option = next(filter(lambda option: option.get('id') == sql_value, src_col_data_options), None)
if option:
filtered_updates[col_name] = self.add_or_create_options(dst_table_name, col, option['name'])
elif col_type == ColumnTypes.MULTIPLE_SELECT and isinstance(sql_value, list):
option_names = [op.get('name') for op in src_col_data_options if op.get('id') in sql_value]
filtered_updates[col_name] = self.add_or_create_options_for_multiple_select(dst_table_name, col, option_names)
else:
filtered_updates[col_name] = self.parse_column_value(col, data_dict)
except Exception as e:
auto_rule_logger.error(e)
filtered_updates[col_name] = self.row.get(col_key)

elif col_type == ColumnTypes.COLLABORATOR:
try:
data_dict = self.row.get(col_key)
if not data_dict:
continue
if isinstance(data_dict, dict):
set_type = data_dict.get('set_type')
if set_type == 'default':
filtered_updates[col_name] = self.parse_column_value(col, data_dict.get('value'))
elif set_type == 'column':
src_col_key = data_dict.get('value')
src_col = self.col_key_dict.get(src_col_key)
if src_col and src_col.get('type') in [
ColumnTypes.COLLABORATOR,
ColumnTypes.CREATOR,
ColumnTypes.LAST_MODIFIER,
]:
value = sql_row.get(src_col['key'])
if not isinstance(value, list):
value = [value]
filtered_updates[col_name] = value
else:
filtered_updates[col_name] = self.parse_column_value(col, data_dict)
except Exception as e:
auto_rule_logger.error(e)
filtered_updates[col_name] = self.row.get(col_key)

elif col_type in [
ColumnTypes.NUMBER, ColumnTypes.DURATION, ColumnTypes.RATE,
ColumnTypes.TEXT, ColumnTypes.URL, ColumnTypes.EMAIL, ColumnTypes.LONG_TEXT,
]:
try:
data_dict = self.row.get(col_key)
if not data_dict:
continue
if isinstance(data_dict, dict):
set_type = data_dict.get('set_type')
if set_type == 'default':
value = data_dict.get('value')
if isinstance(value, str):
blanks = set(re.findall(r'\{([^{]*?)\}', value))
column_blanks = [blank for blank in blanks if blank in self.col_name_dict]
value = self.fill_msg_blanks_with_sql(sql_row, value, column_blanks)
filtered_updates[col_name] = self.parse_column_value(col, value)
elif set_type == 'column':
src_col_key = data_dict.get('value')
src_col = self.col_key_dict.get(src_col_key)
filtered_updates[col_name] = sql_row.get(src_col['key'])
else:
value = data_dict
if isinstance(value, str):
blanks = set(re.findall(r'\{([^{]*?)\}', value))
column_blanks = [blank for blank in blanks if blank in self.col_name_dict]
value = self.fill_msg_blanks_with_sql(sql_row, value, column_blanks)
filtered_updates[col_name] = self.parse_column_value(col, value)
except Exception as e:
auto_rule_logger.exception(e)
filtered_updates[col_name] = self.row.get(col_key)
else:
filtered_updates[col_name] = self.parse_column_value(col, self.row.get(col_key))

return filtered_updates

def do_action(self):
if not self.link_column or not self.linked_table_id:
raise RuleInvalidException('modify_linked_record link_column not found', 'link_column_not_found')

linked_table_name = self.get_table_name(self.linked_table_id)
if not linked_table_name:
raise RuleInvalidException('modify_linked_record linked_table not found', 'linked_table_not_found')

sql_row = self.auto_rule.get_sql_row()
if not sql_row:
return

# init column dicts from source table
src_columns = self.auto_rule.table_info['columns']
self.col_name_dict = {col.get('name'): col for col in src_columns}
self.col_key_dict = {col.get('key'): col for col in src_columns}

# resolve template blanks in row values
for row_key in self.row:
cell_value = self.row.get(row_key)
if not isinstance(cell_value, str):
continue
blanks = set(re.findall(r'\{([^{]*?)\}', cell_value))
column_blanks = [blank for blank in blanks if blank in self.col_name_dict]
self.row[row_key] = self.fill_msg_blanks_with_sql(sql_row, cell_value, column_blanks)

linked_row_ids = self.get_linked_row_ids(sql_row)
if not linked_row_ids:
return

update_row = self.build_update_row(sql_row)
if not update_row:
return

batch_update_list = [{'row_id': row_id, 'row': update_row} for row_id in linked_row_ids]
try:
self.auto_rule.dtable_server_api.batch_update_rows(linked_table_name, batch_update_list)
except Exception as e:
auto_rule_logger.error('modify linked record in dtable: %s, error: %s', self.auto_rule.dtable_uuid, e)
return


class TriggerWorkflowAction(BaseAction):

VALID_COLUMN_TYPES = [
Expand Down Expand Up @@ -4864,6 +5114,10 @@ def can_condition_trigger_action(self, action):
if run_condition == PER_UPDATE:
return True
return False
elif action_type == 'modify_linked_record_in_other':
if run_condition == PER_UPDATE:
return True
return False
elif action_type == 'trigger_workflow':
if run_condition in CRON_CONDITIONS and trigger_condition == CONDITION_PERIODICALLY:
return True
Expand Down Expand Up @@ -5004,6 +5258,11 @@ def do_actions(self, db_session, with_test=False):
dst_table_id = action_info.get('dst_table_id')
AddRecordToOtherTableAction(self, action_info.get('type'), self.data, row, dst_table_id).do_action()

elif action_info.get('type') == 'modify_linked_record_in_other':
row = action_info.get('row')
link_column_key = action_info.get('link_column_key')
ModifyLinkedRecordInOtherAction(self, action_info.get('type'), self.data, row, link_column_key).do_action()

elif action_info.get('type') == 'trigger_workflow':
token = action_info.get('token')
row = action_info.get('row')
Expand Down
Loading