Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion TM1py/Services/ElementService.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def escape_single_quote(text):
[
f"IF(ElementIsParent('{dimension_name}','{hierarchy_name}','{parent}','{child}')=1);",
f"HierarchyElementComponentDelete('{dimension_name}','{hierarchy_name}','{parent}','{child}');",
f"ENDIF;",
"ENDIF;",
]
)

Expand Down
12 changes: 8 additions & 4 deletions Tests/ElementService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

from mdxpy import MdxBuilder

from TM1py.Exceptions import TM1pyException, TM1pyRestException, TM1pyWritePartialFailureException
from TM1py.Objects import Dimension, Element, ElementAttribute, Hierarchy
from TM1py.Services import TM1Service
from Tests.Utils import (
generate_test_uuid,
skip_if_no_pandas,
skip_if_version_lower_than,
)
from TM1py.Exceptions import (
TM1pyException,
TM1pyRestException,
TM1pyWritePartialFailureException,
)
from TM1py.Objects import Dimension, Element, ElementAttribute, Hierarchy
from TM1py.Services import TM1Service


class TestElementService(unittest.TestCase):
Expand Down Expand Up @@ -1349,7 +1353,7 @@ def test_delete_edges_use_blob_skip_invalid_edges_false(self):
)

@skip_if_version_lower_than(version="11.4")
def test_delete_edges_use_ti_skip_invalid_edges_true(self):
def test_delete_edges_use_ti_and_skip_invalid_edges_true(self):
self.tm1.elements.delete_edges(
dimension_name=self.dimension_name,
hierarchy_name=self.hierarchy_name,
Expand Down
110 changes: 85 additions & 25 deletions Tests/RestService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,71 @@ def test_is_connected(self):
self.assertTrue(self.tm1._tm1_rest.is_connected())

def test_wait_time_generator_with_float_timeout(self):
# With default params (0.1s initial, 1.0s max, 2x factor): 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
expected = [0.1, 0.2, 0.4, 0.8, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
self.assertEqual(expected, list(self.tm1._tm1_rest.wait_time_generator(10.0)))
self.assertEqual(10.5, sum(self.tm1._tm1_rest.wait_time_generator(10.0)))
# Use fixed known values to test the generator logic deterministically
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
original_max = self.tm1._tm1_rest._async_polling_max_delay
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.1
self.tm1._tm1_rest._async_polling_max_delay = 1.0
self.tm1._tm1_rest._async_polling_backoff_factor = 2.0
# With 0.1s initial, 1.0s max, 2x factor: 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
expected = [0.1, 0.2, 0.4, 0.8, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
self.assertEqual(expected, list(self.tm1._tm1_rest.wait_time_generator(10.0)))
self.assertEqual(10.5, sum(self.tm1._tm1_rest.wait_time_generator(10.0)))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial
self.tm1._tm1_rest._async_polling_max_delay = original_max
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_wait_time_generator_with_timeout(self):
# With default params (0.1s initial, 1.0s max, 2x factor): 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
expected = [0.1, 0.2, 0.4, 0.8, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
self.assertEqual(expected, list(self.tm1._tm1_rest.wait_time_generator(10)))
self.assertEqual(10.5, sum(self.tm1._tm1_rest.wait_time_generator(10)))
# Use fixed known values to test the generator logic deterministically
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
original_max = self.tm1._tm1_rest._async_polling_max_delay
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.1
self.tm1._tm1_rest._async_polling_max_delay = 1.0
self.tm1._tm1_rest._async_polling_backoff_factor = 2.0
# With 0.1s initial, 1.0s max, 2x factor: 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
expected = [0.1, 0.2, 0.4, 0.8, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
self.assertEqual(expected, list(self.tm1._tm1_rest.wait_time_generator(10)))
self.assertEqual(10.5, sum(self.tm1._tm1_rest.wait_time_generator(10)))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial
self.tm1._tm1_rest._async_polling_max_delay = original_max
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_wait_time_generator_without_timeout(self):
# With default params (0.1s initial, 1.0s max, 2x factor): 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.0 -> 1.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.1, next(generator))
self.assertEqual(0.2, next(generator))
self.assertEqual(0.4, next(generator))
self.assertEqual(0.8, next(generator))
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))
# Use fixed known values to test the generator logic deterministically
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
original_max = self.tm1._tm1_rest._async_polling_max_delay
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.1
self.tm1._tm1_rest._async_polling_max_delay = 1.0
self.tm1._tm1_rest._async_polling_backoff_factor = 2.0
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.1, next(generator))
self.assertEqual(0.2, next(generator))
self.assertEqual(0.4, next(generator))
self.assertEqual(0.8, next(generator))
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial
self.tm1._tm1_rest._async_polling_max_delay = original_max
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_wait_time_generator_custom_max_delay(self):
# Test with custom max_delay for long-running operations
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
original_max_delay = self.tm1._tm1_rest._async_polling_max_delay
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.1
self.tm1._tm1_rest._async_polling_max_delay = 30.0
self.tm1._tm1_rest._async_polling_backoff_factor = 2.0
# With 0.1s initial, 30s max, 2x factor: 0.1 -> 0.2 -> 0.4 -> 0.8 -> 1.6 -> 3.2 -> 6.4 -> 12.8 -> 25.6 -> 30.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.1, next(generator))
Expand All @@ -64,12 +103,18 @@ def test_wait_time_generator_custom_max_delay(self):
self.assertEqual(30.0, next(generator))
self.assertEqual(30.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial
self.tm1._tm1_rest._async_polling_max_delay = original_max_delay
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_wait_time_generator_custom_backoff_factor(self):
# Test with custom backoff factor (3x instead of 2x)
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
original_max = self.tm1._tm1_rest._async_polling_max_delay
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.1
self.tm1._tm1_rest._async_polling_max_delay = 1.0
self.tm1._tm1_rest._async_polling_backoff_factor = 3.0
# With 0.1s initial, 1.0s max, 3x factor: 0.1 -> 0.3 -> 0.9 -> 1.0 -> 1.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
Expand All @@ -79,33 +124,48 @@ def test_wait_time_generator_custom_backoff_factor(self):
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial
self.tm1._tm1_rest._async_polling_max_delay = original_max
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_wait_time_generator_custom_initial_delay(self):
# Test with custom initial delay
original_initial = self.tm1._tm1_rest._async_polling_initial_delay
original_max = self.tm1._tm1_rest._async_polling_max_delay
original_factor = self.tm1._tm1_rest._async_polling_backoff_factor
try:
self.tm1._tm1_rest._async_polling_initial_delay = 0.5
self.tm1._tm1_rest._async_polling_max_delay = 1.0
self.tm1._tm1_rest._async_polling_backoff_factor = 2.0
# With 0.5s initial, 1.0s max, 2x factor: 0.5 -> 1.0 -> 1.0...
generator = self.tm1._tm1_rest.wait_time_generator(None)
self.assertEqual(0.5, next(generator))
self.assertEqual(1.0, next(generator))
self.assertEqual(1.0, next(generator))
finally:
self.tm1._tm1_rest._async_polling_initial_delay = original_initial
self.tm1._tm1_rest._async_polling_max_delay = original_max
self.tm1._tm1_rest._async_polling_backoff_factor = original_factor

def test_default_remote_disconnect_parameters(self):
# Verify default values for remote disconnect retry parameters
self.assertEqual(5, self.tm1._tm1_rest._remote_disconnect_max_retries)
self.assertEqual(1.0, self.tm1._tm1_rest._remote_disconnect_retry_delay)
self.assertEqual(30.0, self.tm1._tm1_rest._remote_disconnect_max_delay)
self.assertEqual(2.0, self.tm1._tm1_rest._remote_disconnect_backoff_factor)
# Verify values for remote disconnect retry parameters match config or defaults
expected_max_retries = int(self.config["tm1srv01"].get("remote_disconnect_max_retries", 5))
expected_retry_delay = float(self.config["tm1srv01"].get("remote_disconnect_retry_delay", 1.0))
expected_max_delay = float(self.config["tm1srv01"].get("remote_disconnect_max_delay", 30.0))
expected_backoff_factor = float(self.config["tm1srv01"].get("remote_disconnect_backoff_factor", 2.0))
self.assertEqual(expected_max_retries, self.tm1._tm1_rest._remote_disconnect_max_retries)
self.assertEqual(expected_retry_delay, self.tm1._tm1_rest._remote_disconnect_retry_delay)
self.assertEqual(expected_max_delay, self.tm1._tm1_rest._remote_disconnect_max_delay)
self.assertEqual(expected_backoff_factor, self.tm1._tm1_rest._remote_disconnect_backoff_factor)

def test_default_async_polling_parameters(self):
# Verify default values for async polling parameters
self.assertEqual(0.1, self.tm1._tm1_rest._async_polling_initial_delay)
self.assertEqual(1.0, self.tm1._tm1_rest._async_polling_max_delay)
self.assertEqual(2.0, self.tm1._tm1_rest._async_polling_backoff_factor)
# Verify values for async polling parameters match config or defaults
expected_initial_delay = float(self.config["tm1srv01"].get("async_polling_initial_delay", 0.1))
expected_max_delay = float(self.config["tm1srv01"].get("async_polling_max_delay", 1.0))
expected_backoff_factor = float(self.config["tm1srv01"].get("async_polling_backoff_factor", 2.0))
self.assertEqual(expected_initial_delay, self.tm1._tm1_rest._async_polling_initial_delay)
self.assertEqual(expected_max_delay, self.tm1._tm1_rest._async_polling_max_delay)
self.assertEqual(expected_backoff_factor, self.tm1._tm1_rest._async_polling_backoff_factor)

def test_build_response_from_async_response_ok(self):
response_content = (
Expand Down
Loading