|
| 1 | +package com.commercetools.sync.customers; |
| 2 | + |
| 3 | +import com.commercetools.sync.commons.BaseSync; |
| 4 | +import com.commercetools.sync.commons.exceptions.SyncException; |
| 5 | +import com.commercetools.sync.customers.helpers.CustomerBatchValidator; |
| 6 | +import com.commercetools.sync.customers.helpers.CustomerReferenceResolver; |
| 7 | +import com.commercetools.sync.customers.helpers.CustomerSyncStatistics; |
| 8 | +import com.commercetools.sync.services.CustomerGroupService; |
| 9 | +import com.commercetools.sync.services.CustomerService; |
| 10 | +import com.commercetools.sync.services.TypeService; |
| 11 | +import com.commercetools.sync.services.impl.CustomerGroupServiceImpl; |
| 12 | +import com.commercetools.sync.services.impl.CustomerServiceImpl; |
| 13 | +import com.commercetools.sync.services.impl.TypeServiceImpl; |
| 14 | +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; |
| 15 | +import io.sphere.sdk.commands.UpdateAction; |
| 16 | +import io.sphere.sdk.customers.Customer; |
| 17 | +import io.sphere.sdk.customers.CustomerDraft; |
| 18 | +import org.apache.commons.lang3.tuple.ImmutablePair; |
| 19 | + |
| 20 | +import javax.annotation.Nonnull; |
| 21 | +import java.util.List; |
| 22 | +import java.util.Map; |
| 23 | +import java.util.Optional; |
| 24 | +import java.util.Set; |
| 25 | +import java.util.concurrent.CompletableFuture; |
| 26 | +import java.util.concurrent.CompletionStage; |
| 27 | + |
| 28 | +import static com.commercetools.sync.commons.utils.SyncUtils.batchElements; |
| 29 | +import static com.commercetools.sync.customers.utils.CustomerSyncUtils.buildActions; |
| 30 | +import static java.lang.String.format; |
| 31 | +import static java.util.concurrent.CompletableFuture.completedFuture; |
| 32 | +import static java.util.function.Function.identity; |
| 33 | +import static java.util.stream.Collectors.toMap; |
| 34 | +import static java.util.stream.Collectors.toSet; |
| 35 | + |
| 36 | +/** |
| 37 | + * This class syncs customer drafts with the corresponding customers in the CTP project. |
| 38 | + */ |
| 39 | +public class CustomerSync extends BaseSync<CustomerDraft, CustomerSyncStatistics, CustomerSyncOptions> { |
| 40 | + |
| 41 | + private static final String CTP_CUSTOMER_FETCH_FAILED = "Failed to fetch existing customers with keys: '%s'."; |
| 42 | + private static final String FAILED_TO_PROCESS = "Failed to process the CustomerDraft with key:'%s'. Reason: %s"; |
| 43 | + private static final String CTP_CUSTOMER_UPDATE_FAILED = "Failed to update customer with key: '%s'. Reason: %s"; |
| 44 | + |
| 45 | + private final CustomerService customerService; |
| 46 | + private final CustomerReferenceResolver referenceResolver; |
| 47 | + private final CustomerBatchValidator batchValidator; |
| 48 | + |
| 49 | + /** |
| 50 | + * Takes a {@link CustomerSyncOptions} to instantiate a new {@link CustomerSync} instance that could be used to |
| 51 | + * sync customer drafts in the CTP project specified in the injected {@link CustomerSync} instance. |
| 52 | + * |
| 53 | + * @param customerSyncOptions the container of all the options of the sync process including the CTP project |
| 54 | + * client and/or configuration and other sync-specific options. |
| 55 | + */ |
| 56 | + public CustomerSync(@Nonnull final CustomerSyncOptions customerSyncOptions) { |
| 57 | + this(customerSyncOptions, |
| 58 | + new CustomerServiceImpl(customerSyncOptions), |
| 59 | + new TypeServiceImpl(customerSyncOptions), |
| 60 | + new CustomerGroupServiceImpl(customerSyncOptions)); |
| 61 | + } |
| 62 | + |
| 63 | + /** |
| 64 | + * Takes a {@link CustomerSyncOptions} and service instances to instantiate a new {@link CustomerSync} instance |
| 65 | + * that could be used to sync customer drafts in the CTP project specified in the injected |
| 66 | + * {@link CustomerSyncOptions} instance. |
| 67 | + * |
| 68 | + * <p>NOTE: This constructor is mainly to be used for tests where the services can be mocked and passed to. |
| 69 | + * |
| 70 | + * @param customerSyncOptions the container of all the options of the sync process including the CTP project |
| 71 | + * client and/or configuration and other sync-specific options. |
| 72 | + * @param customerService the customer service which is responsible for fetching/caching the Customers from the |
| 73 | + * CTP project. |
| 74 | + * @param typeService the type service which is responsible for fetching/caching the Types from the CTP project. |
| 75 | + * @param customerGroupService the customer group service which is responsible for fetching/caching the |
| 76 | + * CustomerGroups from the CTP project. |
| 77 | + */ |
| 78 | + protected CustomerSync(@Nonnull final CustomerSyncOptions customerSyncOptions, |
| 79 | + @Nonnull final CustomerService customerService, |
| 80 | + @Nonnull final TypeService typeService, |
| 81 | + @Nonnull final CustomerGroupService customerGroupService) { |
| 82 | + super(new CustomerSyncStatistics(), customerSyncOptions); |
| 83 | + this.customerService = customerService; |
| 84 | + this.referenceResolver = new CustomerReferenceResolver(getSyncOptions(), typeService, customerGroupService); |
| 85 | + this.batchValidator = new CustomerBatchValidator(getSyncOptions(), getStatistics()); |
| 86 | + } |
| 87 | + |
| 88 | + /** |
| 89 | + * Iterates through the whole {@code customerDrafts} list and accumulates its valid drafts to batches. |
| 90 | + * Every batch is then processed by {@link CustomerSync#processBatch(List)}. |
| 91 | + * |
| 92 | + * <p><strong>Inherited doc:</strong> |
| 93 | + * {@inheritDoc} |
| 94 | + * |
| 95 | + * @param customerDrafts {@link List} of {@link CustomerDraft}'s that would be synced into CTP project. |
| 96 | + * @return {@link CompletionStage} with {@link CustomerSyncStatistics} holding statistics of all sync |
| 97 | + * processes performed by this sync instance. |
| 98 | + */ |
| 99 | + @Override |
| 100 | + protected CompletionStage<CustomerSyncStatistics> process(@Nonnull final List<CustomerDraft> customerDrafts) { |
| 101 | + final List<List<CustomerDraft>> batches = batchElements(customerDrafts, syncOptions.getBatchSize()); |
| 102 | + return syncBatches(batches, completedFuture(statistics)); |
| 103 | + |
| 104 | + } |
| 105 | + |
| 106 | + @Override |
| 107 | + protected CompletionStage<CustomerSyncStatistics> processBatch(@Nonnull final List<CustomerDraft> batch) { |
| 108 | + final ImmutablePair<Set<CustomerDraft>, CustomerBatchValidator.ReferencedKeys> result = |
| 109 | + batchValidator.validateAndCollectReferencedKeys(batch); |
| 110 | + |
| 111 | + final Set<CustomerDraft> validCustomerDrafts = result.getLeft(); |
| 112 | + if (validCustomerDrafts.isEmpty()) { |
| 113 | + statistics.incrementProcessed(batch.size()); |
| 114 | + return completedFuture(statistics); |
| 115 | + } |
| 116 | + |
| 117 | + return referenceResolver |
| 118 | + .populateKeyToIdCachesForReferencedKeys(result.getRight()) |
| 119 | + .handle(ImmutablePair::new) |
| 120 | + .thenCompose(cachingResponse -> { |
| 121 | + final Throwable cachingException = cachingResponse.getValue(); |
| 122 | + if (cachingException != null) { |
| 123 | + handleError(new SyncException("Failed to build a cache of keys to ids.", cachingException), |
| 124 | + validCustomerDrafts.size()); |
| 125 | + return CompletableFuture.completedFuture(null); |
| 126 | + } |
| 127 | + |
| 128 | + final Set<String> validCustomerKeys = |
| 129 | + validCustomerDrafts.stream().map(CustomerDraft::getKey).collect(toSet()); |
| 130 | + |
| 131 | + return customerService |
| 132 | + .fetchMatchingCustomersByKeys(validCustomerKeys) |
| 133 | + .handle(ImmutablePair::new) |
| 134 | + .thenCompose(fetchResponse -> { |
| 135 | + final Set<Customer> fetchedCustomers = fetchResponse.getKey(); |
| 136 | + final Throwable exception = fetchResponse.getValue(); |
| 137 | + |
| 138 | + if (exception != null) { |
| 139 | + final String errorMessage = format(CTP_CUSTOMER_FETCH_FAILED, validCustomerKeys); |
| 140 | + handleError(new SyncException(errorMessage, exception), validCustomerKeys.size()); |
| 141 | + return CompletableFuture.completedFuture(null); |
| 142 | + } else { |
| 143 | + return syncBatch(fetchedCustomers, validCustomerDrafts); |
| 144 | + } |
| 145 | + }); |
| 146 | + }) |
| 147 | + .thenApply(ignoredResult -> { |
| 148 | + statistics.incrementProcessed(batch.size()); |
| 149 | + return statistics; |
| 150 | + }); |
| 151 | + } |
| 152 | + |
| 153 | + @Nonnull |
| 154 | + private CompletionStage<Void> syncBatch( |
| 155 | + @Nonnull final Set<Customer> oldCustomers, |
| 156 | + @Nonnull final Set<CustomerDraft> newCustomerDrafts) { |
| 157 | + |
| 158 | + final Map<String, Customer> oldCustomerMap = oldCustomers |
| 159 | + .stream() |
| 160 | + .collect(toMap(Customer::getKey, identity())); |
| 161 | + |
| 162 | + return CompletableFuture.allOf(newCustomerDrafts |
| 163 | + .stream() |
| 164 | + .map(customerDraft -> |
| 165 | + referenceResolver |
| 166 | + .resolveReferences(customerDraft) |
| 167 | + .thenCompose(resolvedCustomerDraft -> syncDraft(oldCustomerMap, resolvedCustomerDraft)) |
| 168 | + .exceptionally(completionException -> { |
| 169 | + final String errorMessage = format(FAILED_TO_PROCESS, customerDraft.getKey(), |
| 170 | + completionException.getMessage()); |
| 171 | + handleError(new SyncException(errorMessage, completionException), 1); |
| 172 | + return null; |
| 173 | + }) |
| 174 | + ) |
| 175 | + .map(CompletionStage::toCompletableFuture) |
| 176 | + .toArray(CompletableFuture[]::new)); |
| 177 | + } |
| 178 | + |
| 179 | + @Nonnull |
| 180 | + private CompletionStage<Void> syncDraft( |
| 181 | + @Nonnull final Map<String, Customer> oldCustomerMap, |
| 182 | + @Nonnull final CustomerDraft newCustomerDraft) { |
| 183 | + |
| 184 | + final Customer oldCustomer = oldCustomerMap.get(newCustomerDraft.getKey()); |
| 185 | + return Optional.ofNullable(oldCustomer) |
| 186 | + .map(customer -> buildActionsAndUpdate(oldCustomer, newCustomerDraft)) |
| 187 | + .orElseGet(() -> applyCallbackAndCreate(newCustomerDraft)); |
| 188 | + } |
| 189 | + |
| 190 | + @SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION") // https://github.com/findbugsproject/findbugs/issues/79 |
| 191 | + @Nonnull |
| 192 | + private CompletionStage<Void> buildActionsAndUpdate( |
| 193 | + @Nonnull final Customer oldCustomer, |
| 194 | + @Nonnull final CustomerDraft newCustomerDraft) { |
| 195 | + |
| 196 | + final List<UpdateAction<Customer>> updateActions = |
| 197 | + buildActions(oldCustomer, newCustomerDraft, syncOptions); |
| 198 | + |
| 199 | + final List<UpdateAction<Customer>> updateActionsAfterCallback |
| 200 | + = syncOptions.applyBeforeUpdateCallback(updateActions, newCustomerDraft, oldCustomer); |
| 201 | + |
| 202 | + if (!updateActionsAfterCallback.isEmpty()) { |
| 203 | + return updateCustomer(oldCustomer, newCustomerDraft, updateActionsAfterCallback); |
| 204 | + } |
| 205 | + |
| 206 | + return completedFuture(null); |
| 207 | + } |
| 208 | + |
| 209 | + @Nonnull |
| 210 | + private CompletionStage<Void> updateCustomer( |
| 211 | + @Nonnull final Customer oldCustomer, |
| 212 | + @Nonnull final CustomerDraft newCustomerDraft, |
| 213 | + @Nonnull final List<UpdateAction<Customer>> updateActionsAfterCallback) { |
| 214 | + |
| 215 | + return customerService |
| 216 | + .updateCustomer(oldCustomer, updateActionsAfterCallback) |
| 217 | + .handle(ImmutablePair::of) |
| 218 | + .thenCompose(updateResponse -> { |
| 219 | + final Throwable exception = updateResponse.getValue(); |
| 220 | + if (exception != null) { |
| 221 | + return executeSupplierIfConcurrentModificationException( |
| 222 | + exception, |
| 223 | + () -> fetchAndUpdate(oldCustomer, newCustomerDraft), |
| 224 | + () -> { |
| 225 | + final String errorMessage = |
| 226 | + format(CTP_CUSTOMER_UPDATE_FAILED, newCustomerDraft.getKey(), exception.getMessage()); |
| 227 | + handleError(new SyncException(errorMessage, exception), 1); |
| 228 | + return CompletableFuture.completedFuture(null); |
| 229 | + }); |
| 230 | + } else { |
| 231 | + statistics.incrementUpdated(); |
| 232 | + return CompletableFuture.completedFuture(null); |
| 233 | + } |
| 234 | + }); |
| 235 | + } |
| 236 | + |
| 237 | + @Nonnull |
| 238 | + private CompletionStage<Void> fetchAndUpdate( |
| 239 | + @Nonnull final Customer oldCustomer, |
| 240 | + @Nonnull final CustomerDraft newCustomerDraft) { |
| 241 | + |
| 242 | + final String customerKey = oldCustomer.getKey(); |
| 243 | + return customerService |
| 244 | + .fetchCustomerByKey(customerKey) |
| 245 | + .handle(ImmutablePair::of) |
| 246 | + .thenCompose(fetchResponse -> { |
| 247 | + final Optional<Customer> fetchedCustomerOptional = fetchResponse.getKey(); |
| 248 | + final Throwable exception = fetchResponse.getValue(); |
| 249 | + |
| 250 | + if (exception != null) { |
| 251 | + final String errorMessage = format(CTP_CUSTOMER_UPDATE_FAILED, customerKey, |
| 252 | + "Failed to fetch from CTP while retrying after concurrency modification."); |
| 253 | + handleError(new SyncException(errorMessage, exception), 1); |
| 254 | + return CompletableFuture.completedFuture(null); |
| 255 | + } |
| 256 | + |
| 257 | + return fetchedCustomerOptional |
| 258 | + .map(fetchedCustomer -> buildActionsAndUpdate(fetchedCustomer, newCustomerDraft)) |
| 259 | + .orElseGet(() -> { |
| 260 | + final String errorMessage = format(CTP_CUSTOMER_UPDATE_FAILED, customerKey, |
| 261 | + "Not found when attempting to fetch while retrying after concurrency modification."); |
| 262 | + handleError(new SyncException(errorMessage, null), 1); |
| 263 | + return CompletableFuture.completedFuture(null); |
| 264 | + }); |
| 265 | + }); |
| 266 | + } |
| 267 | + |
| 268 | + @Nonnull |
| 269 | + private CompletionStage<Void> applyCallbackAndCreate(@Nonnull final CustomerDraft customerDraft) { |
| 270 | + |
| 271 | + return syncOptions |
| 272 | + .applyBeforeCreateCallback(customerDraft) |
| 273 | + .map(draft -> customerService |
| 274 | + .createCustomer(draft) |
| 275 | + .thenAccept(customerOptional -> { |
| 276 | + if (customerOptional.isPresent()) { |
| 277 | + statistics.incrementCreated(); |
| 278 | + } else { |
| 279 | + statistics.incrementFailed(); |
| 280 | + } |
| 281 | + })) |
| 282 | + .orElseGet(() -> CompletableFuture.completedFuture(null)); |
| 283 | + } |
| 284 | + |
| 285 | + private void handleError(@Nonnull final SyncException syncException, |
| 286 | + final int failedTimes) { |
| 287 | + syncOptions.applyErrorCallback(syncException); |
| 288 | + statistics.incrementFailed(failedTimes); |
| 289 | + } |
| 290 | +} |
0 commit comments