Skip to content

Commit df1ea0b

Browse files
Merge pull request ClickHouse#50489 from ClickHouse/vdimir/alter_moving_garbage
Cleanup moving parts
2 parents f7638de + ac63861 commit df1ea0b

32 files changed

Lines changed: 313 additions & 34 deletions

File tree

src/Storages/MergeTree/IMergeTreeDataPart.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,13 +492,17 @@ void IMergeTreeDataPart::removeIfNeeded()
492492

493493
if (is_temp)
494494
{
495-
String file_name = fileName(getDataPartStorage().getPartDirectory());
495+
const auto & part_directory = getDataPartStorage().getPartDirectory();
496+
497+
String file_name = fileName(part_directory);
496498

497499
if (file_name.empty())
498500
throw Exception(ErrorCodes::LOGICAL_ERROR, "relative_path {} of part {} is invalid or not set",
499501
getDataPartStorage().getPartDirectory(), name);
500502

501-
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj"))
503+
const auto part_parent_directory = directoryPath(part_directory);
504+
bool is_moving_part = part_parent_directory.ends_with("moving/");
505+
if (!startsWith(file_name, "tmp") && !endsWith(file_name, ".tmp_proj") && !is_moving_part)
502506
{
503507
LOG_ERROR(
504508
storage.log,
@@ -507,6 +511,11 @@ void IMergeTreeDataPart::removeIfNeeded()
507511
path);
508512
return;
509513
}
514+
515+
if (is_moving_part)
516+
{
517+
LOG_TRACE(storage.log, "Removing unneeded moved part from {}", path);
518+
}
510519
}
511520

512521
remove();

src/Storages/MergeTree/MergeTreeData.cpp

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <Common/CurrentMetrics.h>
2020
#include <Common/ThreadFuzzer.h>
2121
#include <Common/getNumberOfPhysicalCPUCores.h>
22+
#include <Common/Config/ConfigHelper.h>
2223
#include <Compression/CompressedReadBuffer.h>
2324
#include <Core/QueryProcessingStage.h>
2425
#include <DataTypes/DataTypeEnum.h>
@@ -2014,6 +2015,21 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa
20142015

20152016

20162017
size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes)
2018+
{
2019+
size_t cleared_count = 0;
2020+
2021+
cleared_count += clearOldTemporaryDirectories(relative_data_path, custom_directories_lifetime_seconds, valid_prefixes);
2022+
2023+
if (allowRemoveStaleMovingParts())
2024+
{
2025+
/// Clear _all_ parts from the `moving` directory
2026+
cleared_count += clearOldTemporaryDirectories(fs::path(relative_data_path) / "moving", custom_directories_lifetime_seconds, {""});
2027+
}
2028+
2029+
return cleared_count;
2030+
}
2031+
2032+
size_t MergeTreeData::clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes)
20172033
{
20182034
/// If the method is already called from another thread, then we don't need to do anything.
20192035
std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
@@ -2032,7 +2048,7 @@ size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lif
20322048
if (disk->isBroken())
20332049
continue;
20342050

2035-
for (auto it = disk->iterateDirectory(relative_data_path); it->isValid(); it->next())
2051+
for (auto it = disk->iterateDirectory(root_path); it->isValid(); it->next())
20362052
{
20372053
const std::string & basename = it->name();
20382054
bool start_with_valid_prefix = false;
@@ -7857,7 +7873,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
78577873
for (const auto & moving_part : moving_tagger->parts_to_move)
78587874
{
78597875
Stopwatch stopwatch;
7860-
MutableDataPartPtr cloned_part;
7876+
MergeTreePartsMover::TemporaryClonedPart cloned_part;
78617877
ProfileEventsScope profile_events_scope;
78627878

78637879
auto write_part_log = [&](const ExecutionStatus & execution_status)
@@ -7867,7 +7883,7 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
78677883
execution_status,
78687884
stopwatch.elapsed(),
78697885
moving_part.part->name,
7870-
cloned_part,
7886+
cloned_part.part,
78717887
{moving_part.part},
78727888
nullptr,
78737889
profile_events_scope.getSnapshot());
@@ -7943,9 +7959,6 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
79437959
catch (...)
79447960
{
79457961
write_part_log(ExecutionStatus::fromCurrentException("", true));
7946-
if (cloned_part)
7947-
cloned_part->remove();
7948-
79497962
throw;
79507963
}
79517964
}
@@ -8460,6 +8473,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart(
84608473
return new_data_part;
84618474
}
84628475

8476+
bool MergeTreeData::allowRemoveStaleMovingParts() const
8477+
{
8478+
return ConfigHelper::getBool(getContext()->getConfigRef(), "allow_remove_stale_moving_parts");
8479+
}
8480+
84638481
CurrentlySubmergingEmergingTagger::~CurrentlySubmergingEmergingTagger()
84648482
{
84658483
std::lock_guard lock(storage.currently_submerging_emerging_mutex);

src/Storages/MergeTree/MergeTreeData.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,7 @@ class MergeTreeData : public IStorage, public WithMutableContext
679679
/// Delete all directories which names begin with "tmp"
680680
/// Must be called with locked lockForShare() because it's using relative_data_path.
681681
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
682+
size_t clearOldTemporaryDirectories(const String & root_path, size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes);
682683

683684
size_t clearEmptyParts();
684685

@@ -1064,6 +1065,9 @@ class MergeTreeData : public IStorage, public WithMutableContext
10641065
void waitForOutdatedPartsToBeLoaded() const;
10651066
bool canUsePolymorphicParts() const;
10661067

1068+
/// TODO: make enabled by default in the next release if no problems found.
1069+
bool allowRemoveStaleMovingParts() const;
1070+
10671071
protected:
10681072
friend class IMergeTreeDataPart;
10691073
friend class MergeTreeDataMergerMutator;

src/Storages/MergeTree/MergeTreePartsMover.cpp

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ namespace DB
1111
namespace ErrorCodes
1212
{
1313
extern const int ABORTED;
14+
extern const int DIRECTORY_ALREADY_EXISTS;
1415
}
1516

1617
namespace
@@ -203,7 +204,7 @@ bool MergeTreePartsMover::selectPartsForMove(
203204
return false;
204205
}
205206

206-
MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
207+
MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part) const
207208
{
208209
if (moves_blocker.isCancelled())
209210
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
@@ -212,6 +213,8 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
212213
auto part = moving_part.part;
213214
auto disk = moving_part.reserved_space->getDisk();
214215
LOG_DEBUG(log, "Cloning part {} from '{}' to '{}'", part->name, part->getDataPartStorage().getDiskName(), disk->getName());
216+
TemporaryClonedPart cloned_part;
217+
cloned_part.temporary_directory_lock = data->getTemporaryPartDirectoryHolder(part->name);
215218

216219
MutableDataPartStoragePtr cloned_part_storage;
217220
if (disk->supportZeroCopyReplication() && settings->allow_remote_fs_zero_copy_replication)
@@ -222,8 +225,10 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
222225
String relative_path = part->getDataPartStorage().getPartDirectory();
223226
if (disk->exists(path_to_clone + relative_path))
224227
{
225-
LOG_WARNING(log, "Path {} already exists. Will remove it and clone again.", fullPath(disk, path_to_clone + relative_path));
226-
disk->removeRecursive(fs::path(path_to_clone) / relative_path / "");
228+
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS,
229+
"Cannot clone part {} from '{}' to '{}': path '{}' already exists",
230+
part->name, part->getDataPartStorage().getDiskName(), disk->getName(),
231+
fullPath(disk, path_to_clone + relative_path));
227232
}
228233

229234
disk->createDirectories(path_to_clone);
@@ -242,37 +247,48 @@ MergeTreeMutableDataPartPtr MergeTreePartsMover::clonePart(const MergeTreeMoveEn
242247
}
243248

244249
MergeTreeDataPartBuilder builder(*data, part->name, cloned_part_storage);
245-
auto cloned_part = std::move(builder).withPartFormatFromDisk().build();
246-
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part->getDataPartStorage().getFullPath());
250+
cloned_part.part = std::move(builder).withPartFormatFromDisk().build();
251+
LOG_TRACE(log, "Part {} was cloned to {}", part->name, cloned_part.part->getDataPartStorage().getFullPath());
247252

248-
cloned_part->loadColumnsChecksumsIndexes(true, true);
249-
cloned_part->loadVersionMetadata();
250-
cloned_part->modification_time = cloned_part->getDataPartStorage().getLastModified().epochTime();
253+
cloned_part.part->is_temp = data->allowRemoveStaleMovingParts();
254+
cloned_part.part->loadColumnsChecksumsIndexes(true, true);
255+
cloned_part.part->loadVersionMetadata();
256+
cloned_part.part->modification_time = cloned_part.part->getDataPartStorage().getLastModified().epochTime();
251257
return cloned_part;
252258
}
253259

254260

255-
void MergeTreePartsMover::swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_part) const
261+
void MergeTreePartsMover::swapClonedPart(TemporaryClonedPart & cloned_part) const
256262
{
257263
if (moves_blocker.isCancelled())
258264
throw Exception(ErrorCodes::ABORTED, "Cancelled moving parts.");
259265

260-
auto active_part = data->getActiveContainingPart(cloned_part->name);
266+
auto active_part = data->getActiveContainingPart(cloned_part.part->name);
261267

262268
/// It's ok, because we don't block moving parts for merges or mutations
263-
if (!active_part || active_part->name != cloned_part->name)
269+
if (!active_part || active_part->name != cloned_part.part->name)
264270
{
265-
LOG_INFO(log, "Failed to swap {}. Active part doesn't exist. Possible it was merged or mutated. Will remove copy on path '{}'.", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
271+
LOG_INFO(log,
272+
"Failed to swap {}. Active part doesn't exist (containing part {}). "
273+
"Possible it was merged or mutated. Part on path '{}' {}",
274+
cloned_part.part->name,
275+
active_part ? active_part->name : "doesn't exist",
276+
cloned_part.part->getDataPartStorage().getFullPath(),
277+
data->allowRemoveStaleMovingParts() ? "will be removed" : "will remain intact (set <allow_remove_stale_moving_parts> in config.xml, exercise caution when using)");
266278
return;
267279
}
268280

281+
cloned_part.part->is_temp = false;
282+
269283
/// Don't remove new directory but throw an error because it may contain part which is currently in use.
270-
cloned_part->renameTo(active_part->name, false);
284+
cloned_part.part->renameTo(active_part->name, false);
271285

272286
/// TODO what happen if server goes down here?
273-
data->swapActivePart(cloned_part);
287+
data->swapActivePart(cloned_part.part);
288+
289+
LOG_TRACE(log, "Part {} was moved to {}", cloned_part.part->name, cloned_part.part->getDataPartStorage().getFullPath());
274290

275-
LOG_TRACE(log, "Part {} was moved to {}", cloned_part->name, cloned_part->getDataPartStorage().getFullPath());
291+
cloned_part.temporary_directory_lock = {};
276292
}
277293

278294
}

src/Storages/MergeTree/MergeTreePartsMover.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <functional>
44
#include <optional>
55
#include <vector>
6+
#include <base/scope_guard.h>
67
#include <Disks/StoragePolicy.h>
78
#include <Storages/MergeTree/IMergeTreeDataPart.h>
89
#include <Storages/MergeTree/MovesList.h>
@@ -43,12 +44,19 @@ class MergeTreePartsMover
4344
using AllowedMovingPredicate = std::function<bool(const std::shared_ptr<const IMergeTreeDataPart> &, String * reason)>;
4445

4546
public:
47+
4648
explicit MergeTreePartsMover(MergeTreeData * data_)
4749
: data(data_)
4850
, log(&Poco::Logger::get("MergeTreePartsMover"))
4951
{
5052
}
5153

54+
struct TemporaryClonedPart
55+
{
56+
MergeTreeMutableDataPartPtr part;
57+
scope_guard temporary_directory_lock;
58+
};
59+
5260
/// Select parts for background moves according to storage_policy configuration.
5361
/// Returns true if at least one part was selected for move.
5462
bool selectPartsForMove(
@@ -57,14 +65,14 @@ class MergeTreePartsMover
5765
const std::lock_guard<std::mutex> & moving_parts_lock);
5866

5967
/// Copies part to selected reservation in detached folder. Throws exception if part already exists.
60-
MergeTreeMutableDataPartPtr clonePart(const MergeTreeMoveEntry & moving_part) const;
68+
TemporaryClonedPart clonePart(const MergeTreeMoveEntry & moving_part) const;
6169

6270
/// Replaces cloned part from detached directory into active data parts set.
6371
/// Replacing part changes state to DeleteOnDestroy and will be removed from disk after destructor of
6472
/// IMergeTreeDataPart called. If replacing part doesn't exists or not active (committed) than
6573
/// cloned part will be removed and log message will be reported. It may happen in case of concurrent
6674
/// merge or mutation.
67-
void swapClonedPart(const MergeTreeMutableDataPartPtr & cloned_parts) const;
75+
void swapClonedPart(TemporaryClonedPart & cloned_part) const;
6876

6977
/// Can stop background moves and moves from queries
7078
ActionBlocker moves_blocker;

tests/integration/test_alter_moving_garbage/__init__.py

Whitespace-only changes.
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<clickhouse>
2+
<remote_servers>
3+
<test_cluster>
4+
<shard>
5+
<replica>
6+
<host>node1</host>
7+
<port>9000</port>
8+
</replica>
9+
<replica>
10+
<host>node2</host>
11+
<port>9000</port>
12+
</replica>
13+
</shard>
14+
</test_cluster>
15+
</remote_servers>
16+
</clickhouse>
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
<clickhouse>
2+
<storage_configuration>
3+
<disks>
4+
<s3>
5+
<type>s3</type>
6+
<endpoint>http://minio1:9001/root/data/</endpoint>
7+
<access_key_id>minio</access_key_id>
8+
<secret_access_key>minio123</secret_access_key>
9+
</s3>
10+
</disks>
11+
<policies>
12+
<two_disks>
13+
<volumes>
14+
<default>
15+
<disk>default</disk>
16+
</default>
17+
<external>
18+
<disk>s3</disk>
19+
</external>
20+
</volumes>
21+
</two_disks>
22+
</policies>
23+
</storage_configuration>
24+
25+
<allow_remove_stale_moving_parts>true</allow_remove_stale_moving_parts>
26+
</clickhouse>
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<clickhouse>
2+
<tcp_port>9000</tcp_port>
3+
<listen_host>127.0.0.1</listen_host>
4+
<max_concurrent_queries>500</max_concurrent_queries>
5+
<path>./clickhouse/</path>
6+
<users_config>users.xml</users_config>
7+
</clickhouse>

0 commit comments

Comments
 (0)