Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 101 additions & 49 deletions pipeline_dp/combiners.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,6 @@
ArrayLike = Union[np.ndarray, List[float]]
ExplainComputationReport = Union[Callable, str, List[Union[Callable, str]]]

# This is a workaround which is useful when Quantiles form PyDP are failed to
# be serialized. It was observed only from Google colab. Disabling of the
# proto serialization can work only with LocalBackend.
# Ideally we should find a better solution.
_proto_serialization_disabled = False


def disable_proto_serialization():
global _proto_serialization_disabled
_proto_serialization_disabled = True


class Combiner(abc.ABC):
"""Base class for all combiners.
Expand Down Expand Up @@ -626,6 +615,90 @@ def mechanism_spec(self) -> budget_accounting.MechanismSpec:
return self._params.mechanism_spec


class QuantileAccumulator:
"""Accumulator for QuantileCombiner.

It keeps elements in a list up to 1000 elements. Beyond that, it creates a
QuantileTree and adds elements to it. This avoids expensive serialization
of QuantileTree when there are few elements.
"""

TREE_HEIGHT = 4
BRANCHING_FACTOR = 16
MAX_ELEMENTS_IN_LIST = 1000

def __init__(self, min_value: float, max_value: float):
self.min_value = min_value
self.max_value = max_value
self.elements = []
self.tree = None

def add_entry(self, value: float):
if self.tree is not None:
self.tree.add_entry(value)
else:
self.elements.append(value)
if len(self.elements) >= self.MAX_ELEMENTS_IN_LIST:
self.create_tree()

def create_tree(self):
self.tree = quantile_tree.QuantileTree(
self.min_value,
self.max_value,
self.TREE_HEIGHT,
self.BRANCHING_FACTOR,
)
for v in self.elements:
self.tree.add_entry(v)
self.elements = None

def merge(self, other: 'QuantileAccumulator'):
if self.tree is not None and other.tree is not None:
self.tree.merge(
pydp._pydp.bytes_to_summary(other.tree.serialize().to_bytes()))
elif self.tree is not None and other.elements is not None:
for v in other.elements:
self.tree.add_entry(v)
elif self.elements is not None and other.tree is not None:
self.create_tree()
self.tree.merge(
pydp._pydp.bytes_to_summary(other.tree.serialize().to_bytes()))
else:
self.elements.extend(other.elements)
if len(self.elements) >= self.MAX_ELEMENTS_IN_LIST:
self.create_tree()

def __getstate__(self):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this functions? Should we document it?

if self.tree is not None:
return {
'tree': self.tree.serialize().to_bytes(),
'min_value': self.min_value,
'max_value': self.max_value,
}
else:
return {
'elements': self.elements,
'min_value': self.min_value,
'max_value': self.max_value,
}

def __setstate__(self, state):
self.min_value = state['min_value']
self.max_value = state['max_value']
if 'tree' in state:
self.tree = quantile_tree.QuantileTree(
self.min_value,
self.max_value,
self.TREE_HEIGHT,
self.BRANCHING_FACTOR,
)
self.tree.merge(pydp._pydp.bytes_to_summary(state['tree']))
self.elements = None
else:
self.elements = state['elements']
self.tree = None


class QuantileCombiner(Combiner):
"""Combiner for computing DP quantiles.

Expand All @@ -637,41 +710,30 @@ class QuantileCombiner(Combiner):
The accumulator is QuantileTree object serialized to string.
"""

AccumulatorType = Union[bytes, List[float]]

def __init__(self, params, percentiles_to_compute: List[float]):
self._params = params
self._percentiles = percentiles_to_compute
self._quantiles_to_compute = [p / 100 for p in percentiles_to_compute]

def create_accumulator(self, values) -> AccumulatorType:
if _proto_serialization_disabled:
return values
tree = self._create_empty_quantile_tree()
def create_accumulator(self, values) -> QuantileAccumulator:
acc = QuantileAccumulator(
self._params.aggregate_params.min_value,
self._params.aggregate_params.max_value,
)
for value in values:
tree.add_entry(value)
return tree.serialize().to_bytes()
acc.add_entry(value)
return acc

def merge_accumulators(self, accumulator1: AccumulatorType,
accumulator2: AccumulatorType) -> AccumulatorType:
if _proto_serialization_disabled:
return accumulator1 + accumulator2 # union of lists

tree = self._create_empty_quantile_tree()
if accumulator1:
tree.merge(pydp._pydp.bytes_to_summary(accumulator1))
if accumulator2:
tree.merge(pydp._pydp.bytes_to_summary(accumulator2))
return tree.serialize().to_bytes()

def compute_metrics(self, accumulator: AccumulatorType) -> AccumulatorType:
if _proto_serialization_disabled:
tree = self._create_empty_quantile_tree()
for value in accumulator:
tree.add_entry(value)
else:
tree = self._create_empty_quantile_tree()
tree.merge(pydp._pydp.bytes_to_summary(accumulator))
def merge_accumulators(
self, accumulator1: QuantileAccumulator,
accumulator2: QuantileAccumulator) -> QuantileAccumulator:
accumulator1.merge(accumulator2)
return accumulator1

def compute_metrics(self, accumulator: QuantileAccumulator) -> dict:
if accumulator.tree is None:
accumulator.create_tree()
tree = accumulator.tree

quantiles = dp_computations.compute_dp_quantiles(
tree,
Expand Down Expand Up @@ -699,16 +761,6 @@ def format_metric_name(p: float):
def explain_computation(self) -> ExplainComputationReport:
return lambda: f"Computed percentiles {self._percentiles} with (eps={self._params.eps} delta={self._params.delta})"

def _create_empty_quantile_tree(self):
# The default tree parameters taken from
# https://github.com/google/differential-privacy/blob/605ec87bcbd4a536995b611132dbf4d341d2e91d/cc/algorithms/quantile-tree.h#L47
DEFAULT_TREE_HEIGHT = 4
DEFAULT_BRANCHING_FACTOR = 16
return quantile_tree.QuantileTree(
self._params.aggregate_params.min_value,
self._params.aggregate_params.max_value, DEFAULT_TREE_HEIGHT,
DEFAULT_BRANCHING_FACTOR)

def mechanism_spec(self) -> budget_accounting.MechanismSpec:
return self._params.mechanism_spec

Expand Down
90 changes: 84 additions & 6 deletions tests/combiners_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,90 @@ def test_compute_metrics_with_noise(self):
0.01) # check that noise is added


class QuantileAccumulatorTest(parameterized.TestCase):

def test_add_entry_keeps_elements(self):
acc = dp_combiners.QuantileAccumulator(min_value=0, max_value=100)
for i in range(10):
acc.add_entry(i)
self.assertIsNone(acc.tree)
self.assertEqual(acc.elements, list(range(10)))

def test_add_entry_creates_tree(self):
acc = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
limit = 1000
for i in range(limit + 1):
acc.add_entry(i)
self.assertIsNotNone(acc.tree)
self.assertIsNone(acc.elements)

def test_merge_elements(self):
acc1 = dp_combiners.QuantileAccumulator(min_value=0, max_value=100)
acc1.add_entry(1)
acc2 = dp_combiners.QuantileAccumulator(min_value=0, max_value=100)
acc2.add_entry(2)

acc1.merge(acc2)
self.assertIsNone(acc1.tree)
self.assertEqual(acc1.elements, [1, 2])

def test_merge_tree_and_elements(self):
acc1 = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
limit = 1000 # MAX_ELEMENTS_IN_QUANTILE_ACCUMULATOR
for i in range(limit):
acc1.add_entry(i)
self.assertIsNotNone(acc1.tree)

acc2 = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
acc2.add_entry(1000)

acc1.merge(acc2)
self.assertIsNotNone(acc1.tree)

def test_merge_elements_and_tree(self):
acc1 = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
acc1.add_entry(1)

acc2 = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
limit = 1000 # MAX_ELEMENTS_IN_QUANTILE_ACCUMULATOR
for i in range(limit):
acc2.add_entry(i)
self.assertIsNotNone(acc2.tree)

acc1.merge(acc2)
self.assertIsNotNone(acc1.tree)

def test_serialization_elements(self):
acc = dp_combiners.QuantileAccumulator(min_value=0, max_value=100)
acc.add_entry(1)
acc.add_entry(2)

state = acc.__getstate__()
self.assertIn('elements', state)
self.assertNotIn('tree', state)

acc2 = dp_combiners.QuantileAccumulator(min_value=0, max_value=100)
acc2.__setstate__(state)
self.assertEqual(acc2.elements, [1, 2])
self.assertIsNone(acc2.tree)

def test_serialization_tree(self):
acc = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
limit = 1000
for i in range(limit):
acc.add_entry(i)
self.assertIsNotNone(acc.tree)

state = acc.__getstate__()
self.assertIn('tree', state)
self.assertNotIn('elements', state)

acc2 = dp_combiners.QuantileAccumulator(min_value=0, max_value=1000)
acc2.__setstate__(state)
self.assertIsNotNone(acc2.tree)
self.assertIsNone(acc2.elements)


class QuantileCombinerTest(parameterized.TestCase):

def _create_combiner(self,
Expand All @@ -790,12 +874,6 @@ def _create_combiner(self,
return dp_combiners.QuantileCombiner(params,
percentiles_to_compute=percentiles)

def test_create_accumulator(self):
combiner = self._create_combiner(no_noise=False)
quantile_tree = combiner._create_empty_quantile_tree()
self.assertEqual(16, quantile_tree.branching_factor) # default value
self.assertEqual(4, quantile_tree.height) # default value

def test_compute_metrics_without_merge(self):
# Arrange.
combiner = self._create_combiner(no_noise=True,
Expand Down
Loading