Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 89 additions & 44 deletions iotfunctions/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,39 +458,12 @@ def __init__(self, credentials=None, start_session=False, echo=False, tenant_id=
self.native_connection = None
self.native_connection_dbi = None

# cache entity types
self.entity_type_metadata = {}

if entity_metadata is None:

metadata = self.http_request(object_type='allEntityTypes', object_name='', request='GET', payload={},
object_name_2='')
if metadata is not None:
try:
metadata = json.loads(metadata)
if metadata is None:
msg = 'Unable to retrieve entity metadata from the server. Proceeding with limited metadata'
logger.warning(msg)
for m in metadata:
self.entity_type_metadata[m['entityTypeId']] = m
except Exception:
pass
else:
metadata = entity_metadata
if metadata.get('entityTypeId') is not None:
entity_type_id = metadata['entityTypeId']
self.entity_type_metadata[entity_type_id] = metadata

# Figure out entity_type, entity_type_id and schema
self.entity_type_id = None
self.entity_type = None
self.schema = None
if entity_type_id is not None:
self.entity_type_id = entity_type_id
metadata = self.entity_type_metadata.get(entity_type_id)
if metadata is not None:
self.entity_type = metadata.get('name')
self.schema = metadata.get('schemaName')


# Create DBModelStore if it was not handed in
if self.model_store is None and self.db_type in ['db2'] and self.entity_type_id is not None and self.schema is not None:
Expand Down Expand Up @@ -828,36 +801,93 @@ def get_catalog_module(self, function_name):

return (package, module, class_name)

def get_metadata_by_entity_type_name(self, name):
# this function is being used by predict, while migrating API V2 call, created this
entity_type_id = None
try:
# Get the ENTITY_TYPE table
query, table = self.query('ENTITY_TYPE', 'IOTANALYTICS',
['ENTITY_TYPE_ID'], filters={'NAME': name})
df = self.read_sql_query(query.statement)
if not df.empty:
entity_type_id = df.iloc[0]['entity_type_id']
logger.debug(f"Found entity_type_id: {entity_type_id} for name: {name}")
else:
error_msg = f"No entity type found with name: '{name}'"
raise ValueError(error_msg)
except Exception as e:
error_msg = f"Error querying entity type by name '{name}': {e}"
raise RuntimeError(error_msg) from e

return self.get_engine_input(entity_type_id)

def get_engine_input(self, uu_id):
try:
response = self.http_request(object_type='input', object_name=uu_id,
request='GET', raise_error=True)
engine_input = json.loads(response)
logger.debug(f"Retrieved engine input for resource_id: {uu_id}")
return engine_input
except Exception as e:
raise RuntimeError(f"Error getting engine input for resource_id '{uu_id}': {e}")

def get_entity_type_by_name(self, name):
# Warning: entity type name is not unique anymore. This function is used for backwards compatibility taken
# the risk to return the wrong entity type. This function is only used in deprecated function 'GetEntityData'
entity_type_id = None
for id, metadata in self.entity_type_metadata.items():
try:
if metadata['name'] == name:
entity_type_id = id
break
except Exception as e:
print(e)
try:
query, table = self.query('ENTITY_TYPE', 'IOTANALYTICS',
['ENTITY_TYPE_ID'], filters={'NAME': name})
df = self.read_sql_query(query.statement)
if not df.empty:
entity_type_id = df.iloc[0]['entity_type_id']
logger.debug(f"Found entity_type_id: {entity_type_id} for name: {name}")
else:
error_msg = f"No entity type found with name: '{name}'"
raise ValueError(error_msg)
except Exception as e:
error_msg = f"Error querying entity type by name '{name}': {e}"
raise RuntimeError(error_msg) from e
return self.get_entity_type(entity_type_id)

def get_resource_id_by_entity_type_id(self, entity_type_id):
"""
Needed because get_entity_type is used to train supervised models.
Used in train_lightgbm_regressor.py, train_simple_regressor.py
"""
try:
query, table = self.query('ENTITY_TYPE', 'IOTANALYTICS',
['UUID'], filters={'ENTITY_TYPE_ID': [entity_type_id]})
df = self.read_sql_query(query.statement)
if not df.empty:
uuid = df.iloc[0]['uuid']
logger.debug(f"Found uuid: {uuid} for entity id: {entity_type_id}")
return uuid
else:
error_msg = f"No entity type found with entity id: '{entity_type_id}'"
raise ValueError(error_msg)
except Exception as e:
error_msg = f"Error querying entity type by entity id '{entity_type_id}': {e}"
raise RuntimeError(error_msg) from e

def get_entity_type(self, entity_type_id):
"""
Get an EntityType instance by name. Name may be the logical name shown in the UI or the table name.'

"""
metadata = None
resource_id = None
try:
metadata = self.entity_type_metadata[entity_type_id]
except KeyError:
msg = 'No entity type with id %s in the cached metadata.' % entity_type_id
resource_id = self.get_resource_id_by_entity_type_id(entity_type_id)
metadata = self.get_engine_input(resource_id)
except Exception as e:
msg = 'No entity type with id %s could be retrieved. Error: %s' % (resource_id, str(e))
raise ValueError(msg)

try:
timestamp = metadata['metricTimestampColumn']
schema = metadata['schemaName']
dim_table = metadata['dimensionTableName']
except TypeError:
dim_table = metadata['dimensionsTable']
except (TypeError, KeyError):
try:
is_entity_type = metadata.is_entity_type
except AttributeError:
Expand All @@ -869,10 +899,10 @@ def get_entity_type(self, entity_type_id):
msg = 'Entity type %s not found in the database metadata' % entity_type_id
raise KeyError(msg)
else:
entity = md.EntityType(name=metadata['name'], db=self,
entity = md.EntityType(name=metadata['resourceName'], db=self,
**{'auto_create_table': False, '_timestamp': timestamp, '_db_schema': schema,
'_entity_type_id': entity_type_id, '_dimension_table_name': dim_table,
'metric_table_name': metadata['metricTableName'], '_data_items': metadata.get('dataItemDto')})
'metric_table_name': metadata['metricsTableName'], '_data_items': metadata.get('dataItems')})

return entity

Expand Down Expand Up @@ -1026,7 +1056,6 @@ def http_request(self, object_type, object_name, request, payload=None, object_n
# self.url[('dataItem', 'PUT')] = '/'.join(
# [base_meta_url, 'kpi', 'v1', self.tenant_id, 'entityType', object_name, object_type, object_name_2])

self.url[('allEntityTypes', 'GET')] = '/'.join([base_meta_url, 'meta', 'v1', self.tenant_id, 'entityType'])

if sample_entity_type:
self.url[('entityType', 'POST')] = '/'.join(
Expand Down Expand Up @@ -1070,6 +1099,7 @@ def http_request(self, object_type, object_name, request, payload=None, object_n
[core_url, 'v2', 'core', 'deviceTypes', object_name, 'devices', object_type])
self.url['dimensions', 'PUT'] = '/'.join(
[core_url, 'v2', 'core', 'deviceTypes', object_name, 'devices', object_type])
self.url[('deviceTypesSearch', 'POST')] = '/'.join([core_url, 'v2', 'core', 'deviceTypes', 'search'])

encoded_payload = json.dumps(payload).encode('utf-8')
headers = {'Content-Type': "application/json", 'X-api-key': self.credentials['as']['api_key'],
Expand Down Expand Up @@ -2795,6 +2825,21 @@ def unregister_constants(self, constant_names):
msg = 'Constants deletion status: %s' % r
logger.debug(msg)

def search_device_types(self):
"""
Search device types.
"""
payload = {
'search': '',
'filter': {}
}
try:
response = self.http_request(object_type='deviceTypesSearch', object_name=None, request='POST',
payload=payload, raise_error=True)
return json.loads(response)
except Exception as e:
raise RuntimeError(f"Error occurred during search deviceType ... {e}")

def write_frame(self, df, table_name, version_db_writes=False, if_exists='append', timestamp_col=None, schema=None,
chunksize=None, auto_index_name='_auto_index_'):
"""
Expand Down
2 changes: 0 additions & 2 deletions iotfunctions/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ def __init__(self, name, db, *args, **kwargs):
self._functions = []
self._functions.extend(functions)

if name is not None and db is not None and not self.is_local:
db.entity_type_metadata[self.logical_name] = self

logger.debug(('Initialized entity type %s'), str(self))

Expand Down