Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 76 additions & 15 deletions lib/dl_api_lib/dl_api_lib/app/control_api/resources/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@
from dl_core.components.accessor import DatasetComponentAccessor
from dl_core.components.editor import DatasetComponentEditor
from dl_core.constants import DatasetConstraints
from dl_core.enums import (
USEntryBranch,
USEntryMode,
)
from dl_core.us_dataset import Dataset
from dl_core.utils import generate_revision_id
import dl_query_processing.exc
Expand Down Expand Up @@ -106,7 +110,10 @@ def post(self, body: dict) -> dict:
loader = self.create_dataset_api_loader()
loader.populate_dataset_from_body(dataset=dataset, body=body["dataset"], us_manager=us_manager)

us_manager.create(dataset)
us_manager.create(
dataset,
mode=USEntryMode.publish.value,
)

LOGGER.info("New dataset was saved with ID %s", dataset.uuid)

Expand All @@ -130,7 +137,14 @@ def delete(self, dataset_id: str) -> None:
us_manager = self.get_us_manager_based_on_required_resources()
us_manager.set_context("connection", connection_headers)

ds, _ = DatasetResource.get_dataset(dataset_id=dataset_id, body={}, load_dependencies=False)
ds, _ = DatasetResource.get_dataset(
dataset_id=dataset_id,
body={},
load_dependencies=False,
params={
"branch": USEntryBranch.published.value,
},
)
utils.need_permission_on_entry(ds, USPermissionKind.admin)

us_manager.delete(ds)
Expand All @@ -153,7 +167,14 @@ def get(self, dataset_id: str) -> dict:
us_manager = self.get_us_manager_based_on_required_resources()
us_manager.set_context("connection", connection_headers)

ds, _ = DatasetResource.get_dataset(dataset_id=dataset_id, body={}, load_dependencies=False)
ds, _ = DatasetResource.get_dataset(
dataset_id=dataset_id,
body={},
load_dependencies=False,
params={
"branch": USEntryBranch.published.value,
},
)
fields = [
{
"title": f.title,
Expand Down Expand Up @@ -188,7 +209,13 @@ def post(self, dataset_id: str, body: dict) -> dict:
us_manager = self.get_regular_us_manager()
us_manager.set_context("connection", connection_headers)

ds, _ = self.get_dataset(dataset_id=dataset_id, body={})
ds, _ = self.get_dataset(
dataset_id=dataset_id,
body={},
params={
"branch": USEntryBranch.published.value,
},
)
orig_ds_loc = ds.entry_key
copy_ds_loc: PathEntryLocation

Expand All @@ -201,9 +228,10 @@ def post(self, dataset_id: str, body: dict) -> dict:

LOGGER.info("Going to copy dataset %s with new key %r", dataset_id, copy_us_key)
ds_copy = us_manager.copy_entry(ds, key=copy_ds_loc)

us_manager.create(ds_copy)

us_manager.create(
ds_copy,
mode=USEntryMode.publish.value,
)
LOGGER.info("Dataset copy was saved with ID %s", ds_copy.uuid)

return self.make_dataset_response_data(dataset=ds_copy, us_entry_buffer=us_manager.get_entry_buffer())
Expand Down Expand Up @@ -237,14 +265,26 @@ def get(self, dataset_id: str, version: str, query: dict) -> dict:
us_manager.set_context("dataset", {DLHeadersCommon.AUDIT_MODE.value: audit_mode})

if "rev_id" in query:
ds, _ = self.get_dataset(dataset_id=dataset_id, body={}, params={"revId": query["rev_id"]})
ds, _ = self.get_dataset(
dataset_id=dataset_id,
body={},
params={
"revId": query["rev_id"],
},
)
utils.need_permission_on_entry(ds, USPermissionKind.edit)
# raw entry to avoid double deserialization
ds_raw = us_manager.get_migrated_entry(dataset_id)
# latest data revision_id for concurrent edit checks
revision_id = ds_raw["data"].get("revision_id")
else:
ds, _ = self.get_dataset(dataset_id=dataset_id, body={})
ds, _ = self.get_dataset(
dataset_id=dataset_id,
body={},
params={
"branch": USEntryBranch.published.value,
},
)
utils.need_permission_on_entry(ds, USPermissionKind.read)
revision_id = ds.revision_id

Expand Down Expand Up @@ -315,10 +355,10 @@ def put(self, dataset_id: str, version: str, body: dict[str, Any]) -> dict:

ds_editor = DatasetComponentEditor(dataset=ds)
ds_editor.set_revision_id(revision_id=generate_revision_id())

us_manager.update(
entry=ds,
original_entry=original_ds,
# Default in US Client is mode = "publish"
mode=body.get("mode", USEntryMode.publish.value),
)

return self.make_dataset_response_data(dataset=ds, us_entry_buffer=us_manager.get_entry_buffer())
Expand Down Expand Up @@ -361,7 +401,13 @@ def post(self, dataset_id: str, body: dict) -> dict:
assert tenant is not None
us_manager.set_tenant_override(tenant)

ds, _ = self.get_dataset(dataset_id=dataset_id, body={})
ds, _ = self.get_dataset(
dataset_id=dataset_id,
body={},
params={
"branch": USEntryBranch.published.value,
},
)
ds_dict = ds.as_dict()
ds_dict.update(
self.make_dataset_response_data(
Expand Down Expand Up @@ -481,7 +527,10 @@ def post(self, body: dict) -> dict:
loader = self.create_dataset_api_loader()
loader.populate_dataset_from_body(dataset=dataset, body=data["dataset"], us_manager=us_manager)

us_manager.create(dataset)
us_manager.create(
dataset,
mode=USEntryMode.publish.value,
)

LOGGER.info("New dataset was saved with ID %s", dataset.uuid)

Expand Down Expand Up @@ -522,7 +571,13 @@ def post(
us_manager.set_context("connection", connection_headers)

assert body is not None
dataset, _ = self.get_dataset(dataset_id=dataset_id, body=body)
dataset, _ = self.get_dataset(
dataset_id=dataset_id,
body=body,
params={
"branch": USEntryBranch.published.value,
},
)
dataset_validator_factory = self.get_service_registry().get_dataset_validator_factory()
ds_validator = dataset_validator_factory.get_dataset_validator(ds=dataset, us_manager=us_manager)
data = {}
Expand Down Expand Up @@ -589,7 +644,13 @@ def post(
us_manager.set_context("connection", connection_headers)

assert body is not None
dataset, _ = self.get_dataset(dataset_id=dataset_id, body=body)
dataset, _ = self.get_dataset(
dataset_id=dataset_id,
body=body,
params={
"branch": USEntryBranch.published.value,
},
)
dataset_validator_factory = self.get_service_registry().get_dataset_validator_factory()
ds_validator = dataset_validator_factory.get_dataset_validator(ds=dataset, us_manager=us_manager)
formula = body["field"]["formula"]
Expand Down
19 changes: 17 additions & 2 deletions lib/dl_api_lib/dl_api_lib/app/data_api/resources/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
from dl_core.data_source.base import DataSource
from dl_core.data_source.collection import DataSourceCollectionFactory
from dl_core.dataset_capabilities import DatasetCapabilities
from dl_core.enums import USEntryBranch
from dl_core.exc import USObjectNotFoundException
from dl_core.us_dataset import Dataset
from dl_core.us_manager.mutation_cache.engine_factory import CacheInitializationError
Expand Down Expand Up @@ -191,7 +192,14 @@ async def resolve_entities(self) -> None:

params: dict[str, str] | None = None
if self.rev_id is not None:
params = {"revId": self.rev_id}
params = {
"revId": self.rev_id,
}
else:
params = {
# NOTE: by default getEntry endpoint uses "saved" if no branch nor revId given
"branch": USEntryBranch.published.value,
}

if self.dataset_id is None:
if self.STORED_DATASET_REQUIRED:
Expand Down Expand Up @@ -312,7 +320,14 @@ async def _prepare_dataset_from_cache_with_dataset_id(

params: dict[str, str] | None = None
if self.rev_id is not None:
params = {"revId": self.rev_id}
params = {
"revId": self.rev_id,
}
else:
params = {
# NOTE: by default getEntry endpoint uses "saved" if no branch nor revId given
"branch": USEntryBranch.published.value,
}

try:
assert self.dataset_id is not None
Expand Down
3 changes: 2 additions & 1 deletion lib/dl_api_lib/dl_api_lib/schemas/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DataSourceCreatedVia,
NotificationLevel,
)
from dl_core.enums import USEntryMode
from dl_model_tools.schema.base import BaseSchema
from dl_model_tools.schema.dynamic_enum_field import DynamicEnumField

Expand Down Expand Up @@ -133,7 +134,7 @@ class UpdateDatasetSchema(BaseSchema):


class DatasetUpdateSchema(DatasetContentSchema):
pass
mode = ma_fields.String(load_default=USEntryMode.publish.value)


class DatasetCopyRequestSchema(BaseSchema):
Expand Down
12 changes: 10 additions & 2 deletions lib/dl_core/dl_core/aio/middlewares/us_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from dl_configs.crypto_keys import CryptoKeysConfig
from dl_core import exc
from dl_core.aio.aiohttp_wrappers_data_core import DLRequestDataCore
from dl_core.enums import USApiType
from dl_core.enums import (
USApiType,
USEntryBranch,
)
from dl_core.services_registry.top_level import DummyServiceRegistry
from dl_core.us_manager.factory import USMFactory
from dl_core.us_manager.us_manager_async import AsyncUSManager
Expand Down Expand Up @@ -97,7 +100,12 @@ async def actual_public_usm_workaround_middleware(
try:
try:
# TODO: context_name not passed due to target type unknown
entry = await usm.get_by_id(entry_id)
entry = await usm.get_by_id(
entry_id,
params={
"branch": USEntryBranch.published.value,
},
)
except exc.USObjectNotFoundException as e:
raise web.HTTPNotFound() from e
else:
Expand Down
12 changes: 12 additions & 0 deletions lib/dl_core/dl_core/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,15 @@ class RoleReason(Enum):
schema_mismatch = "schema_mismatch"
# This role is not allowed for feature-managed sources
forbidden_for_features = "forbidden_for_features"


@unique
class USEntryMode(Enum):
publish = "publish"
save = "save"


@unique
class USEntryBranch(Enum):
published = "published"
saved = "saved"
13 changes: 10 additions & 3 deletions lib/dl_core/dl_core/united_storage_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
DLHeadersCommon,
)
from dl_core.base_models import EntryLocation
from dl_core.enums import USApiType
from dl_core.enums import (
USApiType,
USEntryMode,
)
import dl_core.exc as exc
import dl_retrier

Expand Down Expand Up @@ -458,7 +461,7 @@ def _req_data_create_entry(
type_: str | None = None,
hidden: bool | None = None,
links: dict[str, Any] | None = None,
mode: str = "publish",
mode: str = USEntryMode.publish.value,
unversioned_data: dict[str, Any] | None = None,
**kwargs: Any,
) -> RequestData:
Expand Down Expand Up @@ -533,7 +536,7 @@ def _req_data_update_entry(
unversioned_data: dict[str, Any] | None = None,
meta: dict[str, str] | None = None,
annotation: dict[str, Any] | None = None,
mode: str = "publish",
mode: str = USEntryMode.publish.value,
lock: str | None = None,
hidden: bool | None = None,
links: dict[str, Any] | None = None,
Expand Down Expand Up @@ -799,6 +802,7 @@ def create_entry(
type_: str | None = None,
hidden: bool | None = None,
links: dict[str, Any] | None = None,
mode: str = USEntryMode.publish.value,
**kwargs: Any,
) -> dict[str, Any]:
rq_data = self._req_data_create_entry(
Expand All @@ -811,6 +815,7 @@ def create_entry(
type_=type_,
hidden=hidden,
links=links,
mode=mode,
**kwargs,
)
return self._request(
Expand Down Expand Up @@ -856,6 +861,7 @@ def update_entry(
hidden: bool | None = None,
links: dict[str, Any] | None = None,
update_revision: bool | None = None,
mode: str = USEntryMode.publish.value,
) -> dict[str, Any]:
return self._request(
self._req_data_update_entry(
Expand All @@ -868,6 +874,7 @@ def update_entry(
hidden=hidden,
links=links,
update_revision=update_revision,
mode=mode,
),
retry_policy_name="update_entry",
)
Expand Down
5 changes: 5 additions & 0 deletions lib/dl_core/dl_core/united_storage_client_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dl_api_commons.tracing import get_current_tracing_headers
from dl_app_tools.profiling_base import GenericProfiler
from dl_core.base_models import EntryLocation
from dl_core.enums import USEntryMode
from dl_core.exc import (
USLockUnacquiredException,
USReqException,
Expand Down Expand Up @@ -212,6 +213,7 @@ async def create_entry(
type_: Optional[str] = None,
hidden: Optional[bool] = None,
links: Optional[dict[str, Any]] = None,
mode: str = USEntryMode.publish.value,
**kwargs: Any,
) -> dict[str, Any]:
rq_data = self._req_data_create_entry(
Expand All @@ -224,6 +226,7 @@ async def create_entry(
type_=type_,
hidden=hidden,
links=links,
mode=mode,
**kwargs,
)
return await self._request(
Expand All @@ -242,6 +245,7 @@ async def update_entry(
hidden: Optional[bool] = None,
links: Optional[dict[str, Any]] = None,
update_revision: Optional[bool] = None,
mode: str = USEntryMode.publish.value,
) -> dict[str, Any]:
return await self._request(
self._req_data_update_entry(
Expand All @@ -254,6 +258,7 @@ async def update_entry(
hidden=hidden,
links=links,
update_revision=update_revision,
mode=mode,
),
retry_policy_name="update_entry",
)
Expand Down
Loading
Loading