|
8 | 8 | """ |
9 | 9 |
|
10 | 10 | import copy |
| 11 | +import logging |
11 | 12 | import random |
12 | | - |
13 | | -import torch |
| 13 | +import numpy as np |
14 | 14 |
|
15 | 15 | from nebula.addons.attacks.dataset.datasetattack import DatasetAttack |
16 | 16 |
|
@@ -87,30 +87,41 @@ def labelFlipping( |
87 | 87 | - In targeted mode, labels that match `target_label` are directly changed to `target_changed_label`. |
88 | 88 | """ |
89 | 89 | new_dataset = copy.deepcopy(dataset) |
| 90 | + if not isinstance(new_dataset.targets, np.ndarray): |
| 91 | + new_dataset.targets = np.array(new_dataset.targets) |
| 92 | + else: |
| 93 | + new_dataset.targets = new_dataset.targets.copy() |
90 | 94 |
|
91 | | - targets = torch.tensor(new_dataset.targets) if isinstance(new_dataset.targets, list) else new_dataset.targets |
| 95 | + # logging.info(f"[{self.__class__.__name__}] First 20 labels before flipping: {new_dataset.targets[:20]}") |
| 96 | + # logging.info(f"[{self.__class__.__name__}] First 20 indices before flipping: {indices[:20]}") |
92 | 97 |
|
93 | | - num_indices = len(indices) |
94 | | - class_list = list(set(targets.tolist())) |
95 | 98 | if not targeted: |
| 99 | + num_indices = len(indices) |
96 | 100 | num_flipped = int(poisoned_percent * num_indices) |
97 | | - if num_indices == 0: |
98 | | - return new_dataset |
99 | | - if num_flipped > num_indices: |
100 | | - return new_dataset |
101 | | - flipped_indice = random.sample(indices, num_flipped) |
102 | | - |
103 | | - for i in flipped_indice: |
104 | | - t = targets[i] |
105 | | - flipped = torch.tensor(random.sample(class_list, 1)[0]) |
106 | | - while t == flipped: |
107 | | - flipped = torch.tensor(random.sample(class_list, 1)[0]) |
108 | | - targets[i] = flipped |
| 101 | + if num_indices == 0 or num_flipped > num_indices: |
| 102 | + return |
| 103 | + flipped_indices = random.sample(indices, num_flipped) |
| 104 | + class_list = list(set(new_dataset.targets.tolist())) |
| 105 | + for i in flipped_indices: |
| 106 | + current_label = new_dataset.targets[i] |
| 107 | + new_label = random.choice(class_list) |
| 108 | + while new_label == current_label: |
| 109 | + new_label = random.choice(class_list) |
| 110 | + new_dataset.targets[i] = new_label |
109 | 111 | else: |
110 | 112 | for i in indices: |
111 | | - if int(targets[i]) == int(target_label): |
112 | | - targets[i] = torch.tensor(target_changed_label) |
113 | | - new_dataset.targets = targets |
| 113 | + if int(new_dataset.targets[i]) == target_label: |
| 114 | + new_dataset.targets[i] = target_changed_label |
| 115 | + |
| 116 | + if target_label in new_dataset.targets: |
| 117 | + logging.info(f"[{self.__class__.__name__}] Target label {target_label} still present after flipping.") |
| 118 | + else: |
| 119 | + logging.info( |
| 120 | + f"[{self.__class__.__name__}] Target label {target_label} successfully flipped to {target_changed_label}." |
| 121 | + ) |
| 122 | + |
| 123 | + # logging.info(f"[{self.__class__.__name__}] First 20 labels after flipping: {new_dataset.targets[:20]}") |
| 124 | + |
114 | 125 | return new_dataset |
115 | 126 |
|
116 | 127 | def get_malicious_dataset(self): |
|
0 commit comments