diff --git a/anms-core/anms/components/schemas/ARIs/__init__.py b/anms-core/anms/components/schemas/ARIs/__init__.py
index f4b4932..d2ceee7 100644
--- a/anms-core/anms/components/schemas/ARIs/__init__.py
+++ b/anms-core/anms/components/schemas/ARIs/__init__.py
@@ -53,4 +53,5 @@
from .registered_agent import RegisteredAgentInDBBase
from .rpt_entry import RptEntry
from .rpt_entry import RptEntryName
+from .rpt_entry import RptEntryFull
from .rpt_entry import RptEntryBaseInDBBase
diff --git a/anms-core/anms/components/schemas/ARIs/rpt_entry.py b/anms-core/anms/components/schemas/ARIs/rpt_entry.py
index 18812e0..c1cee2c 100644
--- a/anms-core/anms/components/schemas/ARIs/rpt_entry.py
+++ b/anms-core/anms/components/schemas/ARIs/rpt_entry.py
@@ -32,12 +32,25 @@ class RptEntryBase(BaseModel):
class Config:
arbitrary_types_allowed = True
- ari_rptset_id: Optional[str] = None
- reference_time: Optional[datetime] = None
- report_list: Optional[str] = None
- agent_id: Optional[int] = None
-
+ ari_rptset_id: Optional[int] = None
+ reference_time: Optional[datetime] = None
+ mgr_time: Optional[datetime] = None
+ nonce_cbor: Optional[str] = None
+ agent_time: Optional[datetime] = None
+ report_source: Optional[str] = None
+ report_items: Optional[list] = None
+
+# Shared properties
+class RptEntryFull(RptEntryBase):
+ class Config:
+ arbitrary_types_allowed = True
+ orm_mode = True
+ agent_id: Optional[int] = None
+ ari_rptlist_id: Optional[int] = None
+
+
+
class RptEntryBaseInDBBase(RptEntryBase):
class Config:
orm_mode = True
diff --git a/anms-core/anms/models/relational/const.py b/anms-core/anms/models/relational/const.py
new file mode 100644
index 0000000..60265cb
--- /dev/null
+++ b/anms-core/anms/models/relational/const.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+#
+# Copyright (c) 2023 The Johns Hopkins University Applied Physics
+# Laboratory LLC.
+#
+# This file is part of the Asynchronous Network Management System (ANMS).
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This work was performed for the Jet Propulsion Laboratory, California
+# Institute of Technology, sponsored by the United States Government under
+# the prime contract 80NM0018D0004 between the Caltech and NASA under
+# subcontract 1658085.
+#
+from typing import Any
+from typing import Dict
+
+from anms.models.relational import Model
+from sqlalchemy import Column
+from sqlalchemy import Integer
+from sqlalchemy import String
+
+# class for vw_ctrl_definition used for build ari
+class Const(Model):
+ __tablename__ = 'vw_const_actual'
+ obj_actual_definition_id = Column(Integer, primary_key=True)
+ data_type = Column(String)
+ data_value = Column(String)
+ use_desc = Column(String)
+ obj_metadata_id = Column(Integer)
+ data_model_name = Column(String)
+ namespace = Column(String)
+ data_type_id = Column(Integer)
+ name = Column(String)
+ data_model_id = Column(Integer)
+ object_enumeration = Column(Integer)
+ status = Column(String)
+ reference = Column(String)
+ description = Column(String)
+
+ def __repr__(self) -> str:
+ return self.as_dict().__repr__()
+
+ def as_dict(self) -> Dict[str, Any]:
+ dict_obj = {
+ c.name: getattr(self, c.name) for c in self.__table__.columns
+ }
+
+ return dict_obj
diff --git a/anms-core/anms/models/relational/execution_set.py b/anms-core/anms/models/relational/execution_set.py
index 4aad369..185134f 100644
--- a/anms-core/anms/models/relational/execution_set.py
+++ b/anms-core/anms/models/relational/execution_set.py
@@ -29,6 +29,7 @@
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import LargeBinary
+from anms.shared.transmogrifier import TRANSMORGIFIER
# class for vw_ctrl_definition used for build ari
@@ -47,7 +48,7 @@ def __repr__(self) -> str:
def as_dict(self) -> Dict[str, Any]:
dict_obj = {
'execution_set_id': getattr(self, 'execution_set_id'),
- 'nonce_cbor': getattr(self, 'nonce_cbor'),
+ 'nonce_cbor': TRANSMORGIFIER._ace_transcode_just_cbor("0x"+getattr(self, 'nonce_cbor').hex()),
'use_desc': getattr(self, 'use_desc'),
'agent_id': getattr(self, 'agent_id'),
'num_entries': getattr(self, 'num_entries'),
diff --git a/anms-core/anms/models/relational/report.py b/anms-core/anms/models/relational/report.py
index b66f3f2..7a3c1c1 100644
--- a/anms-core/anms/models/relational/report.py
+++ b/anms-core/anms/models/relational/report.py
@@ -24,33 +24,48 @@
from typing import Any
from typing import Dict
+from anms.shared.transmogrifier import TRANSMORGIFIER
from anms.models.relational import Model
from sqlalchemy import Column
from sqlalchemy import Integer
-from sqlalchemy import String
+from sqlalchemy import DateTime
+from sqlalchemy import ARRAY
from sqlalchemy import LargeBinary
-
+from sqlalchemy import orm
# class for vw_ctrl_definition used for build ari
class Report(Model):
- __tablename__ = 'ari_rptset'
- ari_rptset_id = Column(Integer, primary_key=True)
- nonce_cbor = Column(LargeBinary)
- reference_time = Column(Integer)
- report_list = Column(String)
- report_list_cbor = Column(LargeBinary)
- agent_id = Column(Integer)
+ __tablename__ = 'vw_ari_rpt_set'
+ ari_rptset_id = Column(Integer, primary_key=True)
+ mgr_time = Column(DateTime)
+ reference_time = Column(DateTime)
+ nonce_cbor = Column(LargeBinary)
+ agent_id = Column(Integer)
+ ari_rptset_cbor = Column(LargeBinary)
+ ari_rptlist_id = Column(Integer)
+ agent_time = Column( DateTime)
+ report_source = Column(LargeBinary)
+ report_items = Column(ARRAY(LargeBinary) )#bytea[] NULL
+
+ # processing the raw cbor into an ari object
+ @orm.reconstructor
+ def init_on_load(self):
+ self.nonce_cbor = TRANSMORGIFIER.transcode("0x"+getattr(self, 'nonce_cbor').hex())['uri']
+ self.report_source = TRANSMORGIFIER.transcode("0x"+getattr(self, 'report_source').hex())['uri']
+ self.report_items = [TRANSMORGIFIER.transcode("0x"+x.hex())['uri'] for x in getattr(self, 'report_items')]
+
def __repr__(self) -> str:
return self.as_dict().__repr__()
def as_dict(self) -> Dict[str, Any]:
dict_obj = {
'ari_rptset_id': getattr(self, 'ari_rptset_id'),
- 'nonce_cbor': getattr(self, 'nonce_cbor'),
'reference_time': getattr(self, 'reference_time'),
- 'report_list': getattr(self, 'report_list'),
- 'report_list_cbor': getattr(self, 'report_list_cbor'),
- 'agent_id': getattr(self, 'agent_id')
+ 'nonce_cbor': getattr(self, 'nonce_cbor'),
+ 'agent_id': getattr(self, 'agent_id'),
+ 'ari_rptlist_id': getattr(self, 'ari_rptlist_id'),
+ 'agent_time': getattr(self, 'agent_time'),
+ 'report_source': getattr(self, 'report_source'),
+ 'report_items': getattr(self, 'report_items')
}
-
return dict_obj
diff --git a/anms-core/anms/routes/ARIs/reports.py b/anms-core/anms/routes/ARIs/reports.py
index 21844ee..64fb50e 100644
--- a/anms-core/anms/routes/ARIs/reports.py
+++ b/anms-core/anms/routes/ARIs/reports.py
@@ -21,32 +21,34 @@
# the prime contract 80NM0018D0004 between the Caltech and NASA under
# subcontract 1658085.
#
-from typing import List
-import ast
+
+# for handling report set and exec set
+import ace
+
from fastapi import APIRouter, Depends
from fastapi import status
from fastapi_pagination import Page, Params
from fastapi_pagination.ext.async_sqlalchemy import paginate
+from anms.shared.transmogrifier import TRANSMORGIFIER
+
from sqlalchemy import select, and_
from sqlalchemy.engine import Result
+from typing import List
+
from urllib.parse import unquote
from anms.components.schemas import ARIs
-from anms.models.relational import get_async_session, get_session
+from anms.models.relational import get_async_session
from anms.models.relational.report import Report
-from anms.models.relational.execution_set import ExecutionSet
+from anms.models.relational.const import Const
from anms.models.relational.registered_agent import RegisteredAgent
from anms.shared.opensearch_logger import OpenSearchLogger
-import io
-import anms.routes.transcoder as transcoder
-
-# for handling report set and exec set
-import ace
+from datetime import datetime
logger = OpenSearchLogger(__name__, log_console=True)
@@ -54,172 +56,296 @@
# routes for ARIs
-@router.get("/all", status_code=status.HTTP_200_OK, response_model=Page[ARIs.RptEntry], tags=["REPORTS"])
-async def paged_report(params: Params = Depends()):
+@router.get(
+ "/page",
+ status_code=status.HTTP_200_OK,
+ response_model=Page[ARIs.RptEntryFull],
+ tags=["REPORTS"],
+)
+async def paged_reports(params: Params = Depends()):
async with get_async_session() as session:
return await paginate(session, select(Report), params)
-@router.get("/name/all", status_code=status.HTTP_200_OK, response_model=List[ARIs.RptEntryBaseInDBBase], tags=["REPORTS"])
-async def all_report_name():
+@router.get(
+ "/all",
+ status_code=status.HTTP_200_OK,
+ response_model=List[ARIs.RptEntryFull],
+ tags=["REPORTS"],
+)
+async def all_reports():
stmt = select(Report)
+ res = []
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
- return result.all()
-
-
-@router.get("/entry/name/{agent_id}", status_code=status.HTTP_200_OK, response_model=list,
- tags=["REPORTS"])
-async def report_def_by_id(agent_id: int):
- # select all reports belonging to the agent
- final_res = []
- agent_id_str = ""
- dec = ace.ari_cbor.Decoder()
- enc = ace.ari_text.Encoder()
- adms = ace.AdmSet()
- adms.load_default_dirs()
- nn_func = ace.nickname.Converter(ace.nickname.Mode.FROM_NN , adms.db_session(), False)
- stmt = select(Report).where(Report.agent_id == agent_id)
- agent_id_stmt = select(RegisteredAgent).where(RegisteredAgent.registered_agents_id == agent_id)
+ res = result.all()
+ return res
+
+
+# report_source is cbor
+async def _report_from_id_source(
+ agent_idx: int, report_source: str, start_time: str = None, end_time: str = None
+):
+ res = []
+ report_dict = []
+
+ if agent_idx:
+ if start_time is None:
+ start_time = datetime.fromisoformat("2010-01-01T00:00:00+00:00")
+ if end_time is None:
+ end_time = datetime.fromisoformat("2100-01-01T00:00:00+00:00")
+ start_time = start_time.replace(tzinfo=None)
+ end_time = end_time.replace(tzinfo=None)
+
+ stmt = (
+ select(Report)
+ .where(Report.agent_id == agent_idx)
+ .where(Report.report_source == bytes.fromhex(report_source))
+ .filter(Report.reference_time >= start_time)
+ .filter(Report.reference_time <= end_time)
+ )
+ async with get_async_session() as session:
+ result: Result = await session.scalars(stmt)
+ res = result.all()
+
+ if res:
+ # translate report_source if its const use its values as the forms for the final report
+ report_source_ari = TRANSMORGIFIER.transcode("0x" + report_source)
+ report_source_columns = [f"col {x}" for x, _ in enumerate(res[0].report_items)]
+ if isinstance(report_source_ari["ari"], ace.ari.ReferenceARI):
+ if report_source_ari["ari"].ident.type_id == ace.ari.StructType.CONST:
+ stmt = (
+ select(Const.data_value)
+ .where(Const.name == str(report_source_ari["ari"].ident.obj_id))
+ .where(
+ Const.data_model_name
+ == str(report_source_ari["ari"].ident.model_id)
+ )
+ .where(
+ Const.namespace == str(report_source_ari["ari"].ident.org_id)
+ )
+ )
+ async with get_async_session() as session:
+ result: Result = await session.scalars(stmt)
+ result = result.all()
+ if result:
+ report_source_columns = []
+ for val in TRANSMORGIFIER.transcode(result[0])["ari"].value:
+ report_source_columns.append(val.ident.obj_id)
+ else:
+ if isinstance(report_source_ari["ari"].ident, ace.ari.LiteralARI):
+ if isinstance(report_source_ari["ari"].ident.value, list):
+ report_source_columns = []
+ for val in report_source_ari["ari"].ident.value:
+ report_source_columns.append(val.ident.obj_id)
+
+ # data_value
+ for row in res:
+ new_item = {
+ "reference_time": row.reference_time,
+ "mgr_time": row.mgr_time,
+ "agent_time": row.agent_time,
+ "rpt_set_nonce": row.nonce_cbor,
+ "report_source": report_source_ari["uri"],
+ }
+
+ for index, value in enumerate(row.report_items):
+ new_item[report_source_columns[index]] = value
+ report_dict.append(new_item)
+
+ return report_dict
+
+
+async def _source_from_id(agent_idx: int):
+ res = []
+ hold = {}
+ if agent_idx:
+ stmt = (
+ select(Report.report_source).distinct().where(Report.agent_id == agent_idx)
+ )
+ async with get_async_session() as session:
+ result: Result = await session.scalars(stmt)
+ for x in result.all():
+ # compiling same URI that have been stored with or without NN
+ curr_uri = TRANSMORGIFIER.transcode("0x" + x.hex())
+ hold.setdefault(curr_uri["uri"], []).append(x.hex())
+
+ for ari,cbor in hold.items():
+ res.append(
+ {
+ "ari": ari,
+ "cbor": cbor,
+ }
+ )
+
+ return res
+
+
+async def _reports_from_id(agent_idx: int):
+ res = []
+ if agent_idx:
+ stmt = select(Report).where(Report.agent_id == agent_idx)
+ async with get_async_session() as session:
+ result: Result = await session.scalars(stmt)
+ res = result.all()
+ return res
+
+
+@router.get(
+ "/all/eid/{agent_eid}",
+ status_code=status.HTTP_200_OK,
+ response_model=List[ARIs.RptEntry],
+ tags=["REPORTS"],
+)
+async def reports_agent_by_name(agent_eid: str):
+ agent_idx = None
+ agent_id_stmt = select(RegisteredAgent).where(
+ RegisteredAgent.agent_endpoint_uri == unquote(agent_eid)
+ )
async with get_async_session() as session:
- result: Result = await session.scalars(stmt)
# Execution set uses URI as agent_id
result_agent: Result = await session.scalars(agent_id_stmt)
- agent_id_str = result_agent.one_or_none()
- agent_id_str = agent_id_str.agent_endpoint_uri
- for res in result.all():
- # select from exec_set
- try:
- nonce_cbor = res.nonce_cbor
- if(nonce_cbor != b'\xf6'): # not a null nonce
- stmt = select(ExecutionSet).where(and_(ExecutionSet.agent_id == agent_id_str, ExecutionSet.nonce_cbor == nonce_cbor) )
- result: Result = await session.scalars(stmt)
- exc_set = result.all()
- for res_exec in exc_set:
- ari_val = ""
- if(res_exec):
- hex_str = res_exec.entries.hex()
- hex_str = "0x"+hex_str.upper()
- ari_val = await transcoder.transcoder_put_cbor_await(hex_str)
- ari_val = ari_val['data']
- addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
- if addition not in final_res:
- final_res.append(addition)
- else: #null nonce use report source
- rpt_set = res.report_list_cbor.hex()
- # Using Ace to translate CBOR into ARI object to process individual parts
- in_text = '0x'+rpt_set
- ari_rpt = None
- try:
- in_bytes = ace.cborutil.from_hexstr(in_text)
- ari_rpt = dec.decode(io.BytesIO(in_bytes))
- except Exception as err:
- logger.error(err)
-
- # running through and translating all parts of rptset
- for rpt in ari_rpt.value.reports:
- try:
- enc = ace.ari_text.Encoder()
- buf = io.StringIO()
- enc.encode(rpt.source, buf)
- out_text = buf.getvalue()
- ari_val = out_text
- # TODO look at better way to handle storing nonce with null
- addition = {'exec_set': ari_val,'nonce_cbor':str(nonce_cbor)}
- if addition not in final_res:
- final_res.append(addition)
- except Exception as err:
- logger.error(err)
-
- except Exception as e:
- logger.error(f"Error {e}, while processing nonce:{nonce_cbor} for agent: {agent_id_str}")
-
- return final_res
-
-# entries tabulated returns header and values in correct order
-# handling if nonce_cbor is null
-@router.get("/entries/table/{agent_id}/{nonce_cbor}", status_code=status.HTTP_200_OK)
-async def report_ac(agent_id: int, nonce_cbor: str) -> dict:
- ari = None
- dec = ace.ari_cbor.Decoder()
- enc = ace.ari_text.Encoder()
- exec_set_dir = {}
- logger.info(nonce_cbor)
- logger.info(type(nonce_cbor))
- try:
- store_nonce = nonce_cbor
- nonce_cbor = ast.literal_eval(nonce_cbor)
- except Exception as e:
- try:
- nonce_cbor = ast.literal_eval(str(bytes.fromhex(nonce_cbor)))
- except Exception as e:
- logger.error(f"{e} while processing nonce")
- return []
-
-
- # process each report in the rpt set and place inside appropiate nonce case or if null use source as key
- # TODO use td off set in report set to update actual time
- #
- ari = None
- stmt = select(Report).where(and_(Report.agent_id == agent_id, Report.nonce_cbor == nonce_cbor) )
+ agent_idx = result_agent.one_or_none()
+ if agent_idx:
+ agent_idx = agent_idx.registered_agents_id
+
+ return await _reports_from_id(agent_idx)
+
+
+@router.get(
+ "/all/idx/{agent_idx}",
+ status_code=status.HTTP_200_OK,
+ response_model=List[ARIs.RptEntry],
+ tags=["REPORTS"],
+)
+async def reports_agent_by_id(agent_idx: int):
+ return await _reports_from_id(agent_idx)
+
+
+@router.get(
+ "/report_source/eid/{agent_eid}",
+ status_code=status.HTTP_200_OK,
+ response_model=list,
+ tags=["REPORTS"],
+)
+async def reports_source_agent_by_name(agent_eid: str):
+ agent_idx = None
+ agent_id_stmt = select(RegisteredAgent).where(
+ RegisteredAgent.agent_endpoint_uri == unquote(agent_eid)
+ )
async with get_async_session() as session:
- result: Result = await session.scalars(stmt)
- for res in result.all():
- # used to hold final report set
- curr_time = res.reference_time
- # addition = {time:}
- rpt_set = res.report_list_cbor.hex()
- # Using Ace to translate CBOR into ARI object to process individual parts
- in_text = '0x'+rpt_set
- try:
- in_bytes = ace.cborutil.from_hexstr(in_text)
- ari = dec.decode(io.BytesIO(in_bytes))
-
- except Exception as err:
- logger.error(err)
-
- # current ARI should be an report set
- if ari:
- if type(ari.value) == ace.ari.ReportSet:
- # for each report in a rptset
- # add to the top level nonce dict or to source dict if nonce is null null
- for rpt in ari.value.reports:
- try:
- # structure for the reports
- # time: source_name:{[values of reprots ]}
- buf = io.StringIO()
- enc.encode(rpt.source, buf)
- rpt_src = buf.getvalue()
- addition = {"time":curr_time, rpt_src:[]}
- rpt_entries = []
- enc = ace.ari_text.Encoder()
- # running through and translating all parts of rptset
- for item in rpt.items:
- # using ace to decode the components
- # item = dec.decode(item)
- if type(item.value) == ace.ari.Table:
- table_vals = []
- for tab_val in item.value:
- table_vals.append([t.value for t in tab_val])
- rpt_entries.append(table_vals)
- else:#handle values as normal
- buf = io.StringIO()
- enc.encode(item, buf)
- out_text = buf.getvalue()
- rpt_entries.append(out_text)
-
- # placing all the values in the sources section
- addition[rpt_src] = rpt_entries
-
- if(nonce_cbor == b'\xf6' ):
- curr_dic = exec_set_dir.get(rpt_src,[])
- curr_dic.append(addition)
- exec_set_dir[rpt_src] = curr_dic
- else:
- curr_dic = exec_set_dir.get(store_nonce,[])
- curr_dic.append(addition)
- exec_set_dir[store_nonce] = curr_dic
- except Exception as err:
- logger.error(err)
- return exec_set_dir
-
+ # Execution set uses URI as agent_id
+ result_agent: Result = await session.scalars(agent_id_stmt)
+ agent_idx = result_agent.one_or_none()
+ if agent_idx:
+ agent_idx = agent_idx.registered_agents_id
+
+ reports = await _source_from_id(agent_idx)
+ return reports
+
+
+@router.get(
+ "/report_source/idx/{agent_idx}/",
+ status_code=status.HTTP_200_OK,
+ response_model=list,
+ tags=["REPORTS"],
+)
+async def reports_source_agent_by_id(agent_idx: int):
+ reports = await _source_from_id(agent_idx)
+ return reports
+
+
+@router.get(
+ "/dictionary/idx/{agent_idx}/{source_cbor}",
+ status_code=status.HTTP_200_OK,
+ response_model=list,
+ tags=["REPORTS"],
+)
+async def reports_dictionary_by_id_and_report_source(agent_idx: int, source_cbor: str):
+ reports = await _report_from_id_source(agent_idx, source_cbor)
+ return reports
+
+
+@router.get(
+ "/dictionary/eid/{agent_eid}/{source_cbor}",
+ status_code=status.HTTP_200_OK,
+ response_model=list,
+ tags=["REPORTS"],
+)
+async def reports_dictionary_by_name_and_report_source(
+ agent_eid: str, source_cbor: str
+):
+ agent_idx = None
+ agent_id_stmt = select(RegisteredAgent).where(
+ RegisteredAgent.agent_endpoint_uri == unquote(agent_eid)
+ )
+ async with get_async_session() as session:
+ # Execution set uses URI as agent_id
+ result_agent: Result = await session.scalars(agent_id_stmt)
+ agent_idx = result_agent.one_or_none()
+ if agent_idx:
+ agent_idx = agent_idx.registered_agents_id
+
+ reports = await _report_from_id_source(agent_idx, source_cbor)
+ return reports
+
+
+# using the known search criteria to filter the reports
+@router.post(
+ "/dictionary/search/idx/",
+ status_code=status.HTTP_200_OK,
+ response_model=list,
+ tags=["REPORTS"],
+)
+async def reports_dictionary_by_search_idx(
+ agent_idxs: list[int],
+ source_cbors: list,
+ start_time: datetime = None,
+ end_time: datetime = None,
+):
+ reports = []
+ for agent_idx in agent_idxs:
+ rpt_cur = []
+ for source_cbor in source_cbors:
+ rpt_cur.append(
+ await _report_from_id_source(
+ agent_idx, source_cbor, start_time, end_time
+ )
+ )
+ reports.append({"agent": agent_idx, "reports": rpt_cur})
+ return reports
+
+
+@router.post(
+ "/dictionary/search/eid/",
+ status_code=status.HTTP_200_OK,
+ response_model=list,
+ tags=["REPORTS"],
+)
+async def reports_dictionary_by_search_eid(
+ agent_eids: list[str],
+ source_cbors: list,
+ start_time: datetime = None,
+ end_time: datetime = None,
+):
+ reports = []
+ for agent_eid in agent_eids:
+ rpt_cur = []
+ agent_idx = None
+ agent_id_stmt = select(RegisteredAgent).where(
+ RegisteredAgent.agent_endpoint_uri == unquote(agent_eid)
+ )
+ async with get_async_session() as session:
+ # Execution set uses URI as agent_id
+ result_agent: Result = await session.scalars(agent_id_stmt)
+ agent_idx = result_agent.one_or_none()
+ if agent_idx:
+ agent_idx = agent_idx.registered_agents_id
+ for source_cbor in source_cbors:
+ rpt_cur.append(
+ await _report_from_id_source(
+ agent_idx, source_cbor, start_time, end_time
+ )
+ )
+ reports.append({"agent": agent_eid, "reports": rpt_cur})
+ return reports
diff --git a/anms-core/anms/routes/transcoder.py b/anms-core/anms/routes/transcoder.py
index 449a96a..cd6ca8f 100644
--- a/anms-core/anms/routes/transcoder.py
+++ b/anms-core/anms/routes/transcoder.py
@@ -143,7 +143,7 @@ async def transcoder_put_await_str(input_ari: str):
def transcoder_incoming_str(input_ari: str):
return _transcoder_put_str(input_ari)
-def transcoder_put_str(input_ari: str):
+def _transcoder_put_str(input_ari: str):
input_ari = input_ari.strip()
transcoder_log_id = None
send_to_transcode = False
diff --git a/anms-core/anms/shared/transmogrifier.py b/anms-core/anms/shared/transmogrifier.py
index 94267bc..ec644ea 100644
--- a/anms-core/anms/shared/transmogrifier.py
+++ b/anms-core/anms/shared/transmogrifier.py
@@ -23,7 +23,6 @@
from camp.generators import (create_sql)
from anms.shared.config import ConfigBuilder
-import asyncio
import anms.shared.mqtt_client
from anms.shared.opensearch_logger import OpenSearchLogger
from anms.models.relational import get_session
@@ -35,7 +34,6 @@
import traceback
import ace
import io
-import io
import json
import sqlalchemy
@@ -59,12 +57,14 @@ def __init__(self, args):
LOGGER.info(f'Connecting to SQL DB at {db_uri}')
self._dbeng = sqlalchemy.create_engine(db_uri)
self.transcode = self._transcode_internal
+ self.transcode_direct = self._transcode_internal
self.reload = self._reload_internal
self._adm_reload(None)
else:
# setting up the MQTT server instead
self.MQTT_CLIENT = anms.shared.mqtt_client.MQTT_CLIENT
self.transcode = self._transcode_mqtt
+ self.transcode_direct = self._transcode_mqtt_direct
self.reload = self._reload_mqtt
async def handle_adm(self, admset: ace.AdmSet, adm_file: ace.models.AdmModule, session, replace=True):
@@ -153,17 +153,30 @@ def _transcode_mqtt(self, input):
LOGGER.info(f'PUBLISH to transcode/CoreFacing/Outgoing, msg = {msg}')
self.MQTT_CLIENT.publish("transcode/CoreFacing/Outgoing", msg)
+ def _transcode_mqtt_direct(self, input):
+ msg = json.dumps({'uri': input})
+ LOGGER.info(f'PUBLISH to transcode/CoreFacing/Outgoing, msg = {msg}')
+ self.MQTT_CLIENT.publish("transcode/CoreFacing/Outgoing", msg)
+ return "pending"
+
def _transcode_internal(self, input):
- self._ace_transcode(input)
-
- # picking up any stray items that didnt get translated
- pending_uris = TranscoderLog.query.filter_by(parsed_as='pending').all()
- for entrys in pending_uris:
- try:
- self._ace_transcode(entrys.input_string)
- except Exception as err:
- LOGGER.error('Failed to process pending entry: %s', err)
+ LOGGER.info(f"translating {input}")
+ ari = self._ace_transcode(input)
+
+ return ari
+ def _ace_transcode_just_cbor(self, input):
+ dec = ace.ari_cbor.Decoder()
+
+ in_text = input.strip()
+ in_bytes = ace.cborutil.from_hexstr(in_text)
+ ari = dec.decode(io.BytesIO(in_bytes))
+ enc = ace.ari_text.Encoder()
+ buf = io.StringIO()
+ enc.encode(ari, buf)
+ out_text = buf.getvalue()
+ return out_text
+
def _ace_transcode(self, input):
# result object to fill in
res_obj = {}
@@ -185,14 +198,15 @@ def _ace_transcode(self, input):
try:
in_bytes = ace.cborutil.from_hexstr(in_text)
dec = ace.ari_cbor.Decoder()
- ari = dec.decode(io.BytesIO(in_bytes))
- LOGGER.debug(f'decoded as ARI {ari}')
- ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, adms.db_session(), False)(ari)
-
+ ari_no_nn = dec.decode(io.BytesIO(in_bytes))
+ LOGGER.debug(f'decoded as ARI {ari_no_nn}')
+ ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, adms.db_session(), False)(ari_no_nn)
except Exception as err:
- raise RuntimeError(f"Error decoding from `{in_text}`: {err}") from err
+ LOGGER.warning(f"Error decoding from `{in_text}`: {err} using no NN")
+ ari = ari_no_nn
+
res_obj['cbor'] = in_text
- res_obj['ari'] = f"{ari}"
+ res_obj['ari'] = ari
try:
enc = ace.ari_text.Encoder()
@@ -204,7 +218,8 @@ def _ace_transcode(self, input):
out_text = 'ari:' + out_text
LOGGER.debug(f'encoded as text {out_text}')
except Exception as err:
- raise RuntimeError(f"Error encoding from {ari}: {err}") from err
+ LOGGER.error(f"Error encoding from {ari}: {err}")
+
res_obj['uri'] = out_text
else:
@@ -213,11 +228,12 @@ def _ace_transcode(self, input):
try:
dec = ace.ari_text.Decoder()
- ari = dec.decode(io.StringIO(in_text))
- LOGGER.debug(f'decoded as ARI {ari}')
- ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, adms.db_session(), False)(ari)
+ ari_no_nn = dec.decode(io.StringIO(in_text))
+ LOGGER.debug(f'decoded as ARI {ari_no_nn}')
+ ari = ace.nickname.Converter(ace.nickname.Mode.FROM_NN, adms.db_session(), False)(ari_no_nn)
except Exception as err:
- raise RuntimeError(f"Error decoding from `{in_text}`: {err}") from err
+ LOGGER.warning(f"Error decoding from `{in_text}`: {err} using no NN")
+ ari = ari_no_nn
# rencoding ari to ensure using non nicknames
try:
@@ -230,10 +246,11 @@ def _ace_transcode(self, input):
out_text = 'ari:' + out_text
LOGGER.debug(f'encoded as text {out_text}')
except Exception as err:
- raise RuntimeError(f"Error encoding from {ari}: {err}") from err
+ LOGGER.error(f"Error encoding from {ari}: {err}")
+
res_obj['uri'] = out_text
- res_obj['ari'] = f"{ari}"
+ res_obj['ari'] = ari
try:
enc = ace.ari_cbor.Encoder()
@@ -243,7 +260,8 @@ def _ace_transcode(self, input):
hex_str = ace.cborutil.to_hexstr(buf.getvalue())
LOGGER.info(f'encoded as binary {hex_str}')
except Exception as err:
- raise RuntimeError(f"Error encoding from {ari}: {err}") from err
+ LOGGER.error(f"Error encoding from {ari}: {err}")
+
res_obj['cbor'] = hex_str
except Exception as err:
res_obj['ari'] = f'Failed to process: {err}'
@@ -255,22 +273,20 @@ def _ace_transcode(self, input):
with get_session() as session:
session.query(TranscoderLog).filter(TranscoderLog.input_string == input).update({
'parsed_as': res_obj['parsedAs'],
- 'ari': json.dumps(res_obj['ari']),
+ 'ari': json.dumps(f"{res_obj['ari']}"),
'cbor': res_obj['cbor'],
'uri': res_obj['uri']
})
session.commit()
LOGGER.info(f'Response {res_obj}')
- # client.publish('transcode/CodexFacing/Outgoing', json.dumps(res_obj))
- # just log it back into the database
+ return res_obj
def _reload_mqtt(self,adm_name=None):
config = ConfigBuilder.get_config()
host = config.get('MQTT_HOST')
- port = config.get('MQTT_PORT')
LOGGER.info('Connecting to MQTT broker %s to notify aricodec' % host)
diff --git a/anms-core/integration_test/openapi.json b/anms-core/integration_test/openapi.json
index bc197dd..9d5934a 100644
--- a/anms-core/integration_test/openapi.json
+++ b/anms-core/integration_test/openapi.json
@@ -2421,14 +2421,14 @@
}
}
},
- "/report/all": {
+ "/report/page": {
"get": {
"tags": [
"REPORTS",
"REPORTS"
],
- "summary": "Paged Report",
- "operationId": "paged_report_report_all_get",
+ "summary": "Paged Reports",
+ "operationId": "paged_reports_report_page_get",
"parameters": [
{
"required": false,
@@ -2460,7 +2460,7 @@
"content": {
"application/json": {
"schema": {
- "$ref": "#/components/schemas/Page_RptEntry_"
+ "$ref": "#/components/schemas/Page_RptEntryFull_"
}
}
}
@@ -2478,24 +2478,24 @@
}
}
},
- "/report/name/all": {
+ "/report/all": {
"get": {
"tags": [
"REPORTS",
"REPORTS"
],
- "summary": "All Report Name",
- "operationId": "all_report_name_report_name_all_get",
+ "summary": "All Reports",
+ "operationId": "all_reports_report_all_get",
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
- "title": "Response All Report Name Report Name All Get",
+ "title": "Response All Reports Report All Get",
"type": "array",
"items": {
- "$ref": "#/components/schemas/RptEntryBaseInDBBase"
+ "$ref": "#/components/schemas/RptEntryFull"
}
}
}
@@ -2504,22 +2504,116 @@
}
}
},
- "/report/entry/name/{agent_id}": {
+ "/report/all/eid/{agent_eid}": {
"get": {
"tags": [
"REPORTS",
"REPORTS"
],
- "summary": "Report Def By Id",
- "operationId": "report_def_by_id_report_entry_name__agent_id__get",
+ "summary": "Reports Agent By Name",
+ "operationId": "reports_agent_by_name_report_all_eid__agent_eid__get",
"parameters": [
{
"required": true,
"schema": {
- "title": "Agent Id",
+ "title": "Agent Eid",
+ "type": "string"
+ },
+ "name": "agent_eid",
+ "in": "path"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "title": "Response Reports Agent By Name Report All Eid Agent Eid Get",
+ "type": "array",
+ "items": {
+ "$ref": "#/components/schemas/RptEntry"
+ }
+ }
+ }
+ }
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/report/all/idx/{agent_idx}": {
+ "get": {
+ "tags": [
+ "REPORTS",
+ "REPORTS"
+ ],
+ "summary": "Reports Agent By Id",
+ "operationId": "reports_agent_by_id_report_all_idx__agent_idx__get",
+ "parameters": [
+ {
+ "required": true,
+ "schema": {
+ "title": "Agent Idx",
"type": "integer"
},
- "name": "agent_id",
+ "name": "agent_idx",
+ "in": "path"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "title": "Response Reports Agent By Id Report All Idx Agent Idx Get",
+ "type": "array",
+ "items": {
+ "$ref": "#/components/schemas/RptEntry"
+ }
+ }
+ }
+ }
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/report/report_source/eid/{agent_eid}": {
+ "get": {
+ "tags": [
+ "REPORTS",
+ "REPORTS"
+ ],
+ "summary": "Reports Source Agent By Name",
+ "operationId": "reports_source_agent_by_name_report_report_source_eid__agent_eid__get",
+ "parameters": [
+ {
+ "required": true,
+ "schema": {
+ "title": "Agent Eid",
+ "type": "string"
+ },
+ "name": "agent_eid",
"in": "path"
}
],
@@ -2529,7 +2623,7 @@
"content": {
"application/json": {
"schema": {
- "title": "Response Report Def By Id Report Entry Name Agent Id Get",
+ "title": "Response Reports Source Agent By Name Report Report Source Eid Agent Eid Get",
"type": "array",
"items": {}
}
@@ -2549,30 +2643,76 @@
}
}
},
- "/report/entries/table/{agent_id}/{nonce_cbor}": {
+ "/report/report_source/idx/{agent_idx}/": {
"get": {
"tags": [
+ "REPORTS",
"REPORTS"
],
- "summary": "Report Ac",
- "operationId": "report_ac_report_entries_table__agent_id___nonce_cbor__get",
+ "summary": "Reports Source Agent By Id",
+ "operationId": "reports_source_agent_by_id_report_report_source_idx__agent_idx___get",
"parameters": [
{
"required": true,
"schema": {
- "title": "Agent Id",
+ "title": "Agent Idx",
"type": "integer"
},
- "name": "agent_id",
+ "name": "agent_idx",
+ "in": "path"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "title": "Response Reports Source Agent By Id Report Report Source Idx Agent Idx Get",
+ "type": "array",
+ "items": {}
+ }
+ }
+ }
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/report/dictionary/idx/{agent_idx}/{source_cbor}": {
+ "get": {
+ "tags": [
+ "REPORTS",
+ "REPORTS"
+ ],
+ "summary": "Reports Dictionary By Id And Report Source",
+ "operationId": "reports_dictionary_by_id_and_report_source_report_dictionary_idx__agent_idx___source_cbor__get",
+ "parameters": [
+ {
+ "required": true,
+ "schema": {
+ "title": "Agent Idx",
+ "type": "integer"
+ },
+ "name": "agent_idx",
"in": "path"
},
{
"required": true,
"schema": {
- "title": "Nonce Cbor",
+ "title": "Source Cbor",
"type": "string"
},
- "name": "nonce_cbor",
+ "name": "source_cbor",
"in": "path"
}
],
@@ -2581,7 +2721,197 @@
"description": "Successful Response",
"content": {
"application/json": {
- "schema": {}
+ "schema": {
+ "title": "Response Reports Dictionary By Id And Report Source Report Dictionary Idx Agent Idx Source Cbor Get",
+ "type": "array",
+ "items": {}
+ }
+ }
+ }
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/report/dictionary/eid/{agent_eid}/{source_cbor}": {
+ "get": {
+ "tags": [
+ "REPORTS",
+ "REPORTS"
+ ],
+ "summary": "Reports Dictionary By Name And Report Source",
+ "operationId": "reports_dictionary_by_name_and_report_source_report_dictionary_eid__agent_eid___source_cbor__get",
+ "parameters": [
+ {
+ "required": true,
+ "schema": {
+ "title": "Agent Eid",
+ "type": "string"
+ },
+ "name": "agent_eid",
+ "in": "path"
+ },
+ {
+ "required": true,
+ "schema": {
+ "title": "Source Cbor",
+ "type": "string"
+ },
+ "name": "source_cbor",
+ "in": "path"
+ }
+ ],
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "title": "Response Reports Dictionary By Name And Report Source Report Dictionary Eid Agent Eid Source Cbor Get",
+ "type": "array",
+ "items": {}
+ }
+ }
+ }
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/report/dictionary/search/idx/": {
+ "post": {
+ "tags": [
+ "REPORTS",
+ "REPORTS"
+ ],
+ "summary": "Reports Dictionary By Search Idx",
+ "operationId": "reports_dictionary_by_search_idx_report_dictionary_search_idx__post",
+ "parameters": [
+ {
+ "required": false,
+ "schema": {
+ "title": "Start Time",
+ "type": "string",
+ "format": "date-time"
+ },
+ "name": "start_time",
+ "in": "query"
+ },
+ {
+ "required": false,
+ "schema": {
+ "title": "End Time",
+ "type": "string",
+ "format": "date-time"
+ },
+ "name": "end_time",
+ "in": "query"
+ }
+ ],
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/Body_reports_dictionary_by_search_idx_report_dictionary_search_idx__post"
+ }
+ }
+ },
+ "required": true
+ },
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "title": "Response Reports Dictionary By Search Idx Report Dictionary Search Idx Post",
+ "type": "array",
+ "items": {}
+ }
+ }
+ }
+ },
+ "422": {
+ "description": "Validation Error",
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/HTTPValidationError"
+ }
+ }
+ }
+ }
+ }
+ }
+ },
+ "/report/dictionary/search/eid/": {
+ "post": {
+ "tags": [
+ "REPORTS",
+ "REPORTS"
+ ],
+ "summary": "Reports Dictionary By Search Eid",
+ "operationId": "reports_dictionary_by_search_eid_report_dictionary_search_eid__post",
+ "parameters": [
+ {
+ "required": false,
+ "schema": {
+ "title": "Start Time",
+ "type": "string",
+ "format": "date-time"
+ },
+ "name": "start_time",
+ "in": "query"
+ },
+ {
+ "required": false,
+ "schema": {
+ "title": "End Time",
+ "type": "string",
+ "format": "date-time"
+ },
+ "name": "end_time",
+ "in": "query"
+ }
+ ],
+ "requestBody": {
+ "content": {
+ "application/json": {
+ "schema": {
+ "$ref": "#/components/schemas/Body_reports_dictionary_by_search_eid_report_dictionary_search_eid__post"
+ }
+ }
+ },
+ "required": true
+ },
+ "responses": {
+ "200": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {
+ "title": "Response Reports Dictionary By Search Eid Report Dictionary Search Eid Post",
+ "type": "array",
+ "items": {}
+ }
}
}
},
@@ -2957,6 +3287,12 @@
}
}
},
+ "500": {
+ "description": "Error response from NM"
+ },
+ "504": {
+ "description": "Manager response timed out"
+ },
"422": {
"description": "Validation Error",
"content": {
@@ -3155,6 +3491,25 @@
}
}
},
+ "/adms/load_default": {
+ "post": {
+ "tags": [
+ "ADM"
+ ],
+ "summary": "Load Default Adm",
+ "operationId": "load_default_adm_adms_load_default_post",
+ "responses": {
+ "201": {
+ "description": "Successful Response",
+ "content": {
+ "application/json": {
+ "schema": {}
+ }
+ }
+ }
+ }
+ }
+ },
"/alerts/incoming": {
"get": {
"tags": [
@@ -3451,6 +3806,50 @@
}
}
},
+ "Body_reports_dictionary_by_search_eid_report_dictionary_search_eid__post": {
+ "title": "Body_reports_dictionary_by_search_eid_report_dictionary_search_eid__post",
+ "required": [
+ "agent_eids",
+ "source_cbors"
+ ],
+ "type": "object",
+ "properties": {
+ "agent_eids": {
+ "title": "Agent Eids",
+ "type": "array",
+ "items": {
+ "type": "string"
+ }
+ },
+ "source_cbors": {
+ "title": "Source Cbors",
+ "type": "array",
+ "items": {}
+ }
+ }
+ },
+ "Body_reports_dictionary_by_search_idx_report_dictionary_search_idx__post": {
+ "title": "Body_reports_dictionary_by_search_idx_report_dictionary_search_idx__post",
+ "required": [
+ "agent_idxs",
+ "source_cbors"
+ ],
+ "type": "object",
+ "properties": {
+ "agent_idxs": {
+ "title": "Agent Idxs",
+ "type": "array",
+ "items": {
+ "type": "integer"
+ }
+ },
+ "source_cbors": {
+ "title": "Source Cbors",
+ "type": "array",
+ "items": {}
+ }
+ }
+ },
"Body_update_adm_adms__post": {
"title": "Body_update_adm_adms__post",
"required": [
@@ -4111,8 +4510,8 @@
}
}
},
- "Page_RptEntry_": {
- "title": "Page[RptEntry]",
+ "Page_RptEntryFull_": {
+ "title": "Page[RptEntryFull]",
"required": [
"items",
"total",
@@ -4125,7 +4524,7 @@
"title": "Items",
"type": "array",
"items": {
- "$ref": "#/components/schemas/RptEntry"
+ "$ref": "#/components/schemas/RptEntryFull"
}
},
"total": {
@@ -4220,43 +4619,81 @@
"properties": {
"ari_rptset_id": {
"title": "Ari Rptset Id",
- "type": "string"
+ "type": "integer"
},
"reference_time": {
"title": "Reference Time",
"type": "string",
"format": "date-time"
},
- "report_list": {
- "title": "Report List",
+ "mgr_time": {
+ "title": "Mgr Time",
+ "type": "string",
+ "format": "date-time"
+ },
+ "nonce_cbor": {
+ "title": "Nonce Cbor",
"type": "string"
},
- "agent_id": {
- "title": "Agent Id",
- "type": "integer"
+ "time_offset": {
+ "title": "Time Offset",
+ "type": "string",
+ "format": "date-time"
+ },
+ "report_source": {
+ "title": "Report Source",
+ "type": "string"
+ },
+ "report_items": {
+ "title": "Report Items",
+ "type": "array",
+ "items": {}
}
}
},
- "RptEntryBaseInDBBase": {
- "title": "RptEntryBaseInDBBase",
+ "RptEntryFull": {
+ "title": "RptEntryFull",
"type": "object",
"properties": {
"ari_rptset_id": {
"title": "Ari Rptset Id",
- "type": "string"
+ "type": "integer"
},
"reference_time": {
"title": "Reference Time",
"type": "string",
"format": "date-time"
},
- "report_list": {
- "title": "Report List",
+ "mgr_time": {
+ "title": "Mgr Time",
+ "type": "string",
+ "format": "date-time"
+ },
+ "nonce_cbor": {
+ "title": "Nonce Cbor",
+ "type": "string"
+ },
+ "time_offset": {
+ "title": "Time Offset",
+ "type": "string",
+ "format": "date-time"
+ },
+ "report_source": {
+ "title": "Report Source",
"type": "string"
},
+ "report_items": {
+ "title": "Report Items",
+ "type": "array",
+ "items": {}
+ },
"agent_id": {
"title": "Agent Id",
"type": "integer"
+ },
+ "ari_rptlist_id": {
+ "title": "Ari Rptlist Id",
+ "type": "integer"
}
}
},
diff --git a/anms-ui/public/app/components/management/agents/reports.vue b/anms-ui/public/app/components/management/agents/reports.vue
index 34f1012..15c51d2 100644
--- a/anms-ui/public/app/components/management/agents/reports.vue
+++ b/anms-ui/public/app/components/management/agents/reports.vue
@@ -8,21 +8,10 @@