-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
195 lines (170 loc) · 6.46 KB
/
main.py
File metadata and controls
195 lines (170 loc) · 6.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
from imagededup.methods import PHash, CNN
from PIL import Image
import pillow_avif # enable AVIF support
import os
import shutil
IMAGES_DIR = 'imagenes'
REVISION_DIR = 'revision'
TEMP_DIR = 'tmp_valid'
use_cnn = False
hash_threshold = 10
cnn_threshold = 0.95
# If True, create one subfolder per group inside 'revision' and do not rename files.
# If False, keep the current behavior: prefix the group number and do not create subfolders.
create_subfolders = True
VALID_EXTENSIONS = (".jpg", ".jpeg", ".png", ".avif", ".webp", ".bmp", ".tiff")
def clean_directory(path):
os.makedirs(path, exist_ok=True)
for name in os.listdir(path):
item_path = os.path.join(path, name)
try:
if os.path.isfile(item_path) or os.path.islink(item_path):
os.remove(item_path)
elif os.path.isdir(item_path):
shutil.rmtree(item_path)
except Exception as e:
print(f"Could not clean '{item_path}': {e}")
def filter_images(path):
return [f for f in os.listdir(path) if f.lower().endswith(VALID_EXTENSIONS)]
def is_image_readable(filepath):
try:
with Image.open(filepath) as im:
im.verify()
# Reopen to ensure full loading and conversion
with Image.open(filepath) as im:
im.convert('RGB')
return True
except Exception:
return False
def detect_duplicates():
os.makedirs(REVISION_DIR, exist_ok=True)
# Ensure 'revision' folder is empty before proceeding
try:
if os.listdir(REVISION_DIR):
print("The 'revision' folder is not empty. Cannot continue. Please empty the folder and run again.")
return
except Exception as e:
print(f"Could not check contents of 'revision' folder: {e}")
return
print(f"Using method: {'CNN' if use_cnn else 'PHash'}")
# Prepare working directory according to method
work_dir = IMAGES_DIR
map_temp_to_orig = {}
# Prepare tmp with copies/converted (AVIF→PNG) for both methods
clean_directory(TEMP_DIR)
candidate_images = filter_images(IMAGES_DIR)
print(f"Processing {len(candidate_images)} images (filtered by extension)")
if not candidate_images:
print("No valid images to process. Exiting.")
return
valid_images = []
unreadable_count = 0
avif_converted_count = 0
for name in candidate_images:
src = os.path.join(IMAGES_DIR, name)
if not is_image_readable(src):
unreadable_count += 1
continue
base, ext = os.path.splitext(name)
ext_low = ext.lower()
if ext_low == '.avif':
# Convert AVIF to PNG so both CNN and PHash can handle it
temp_name = f"{base}.png"
dst = os.path.join(TEMP_DIR, temp_name)
try:
with Image.open(src) as im:
im.convert('RGB').save(dst, format='PNG')
avif_converted_count += 1
map_temp_to_orig[temp_name] = name
valid_images.append(temp_name)
except Exception as e:
unreadable_count += 1
print(f"Could not convert '{name}' from AVIF: {e}")
else:
temp_name = name
dst = os.path.join(TEMP_DIR, temp_name)
try:
shutil.copy2(src, dst)
map_temp_to_orig[temp_name] = name
valid_images.append(temp_name)
except Exception as e:
unreadable_count += 1
print(f"Could not copy '{name}': {e}")
if unreadable_count:
print(f"Skipping {unreadable_count} unreadable/corrupt images.")
if avif_converted_count:
print(f"Converted {avif_converted_count} AVIF images to PNG for processing.")
if not valid_images:
print("No valid images after verification and preparation. Exiting.")
return
work_dir = TEMP_DIR
# Create encoder
encoder = CNN() if use_cnn else PHash()
# Compute encodings and duplicates
if use_cnn:
encodings = encoder.encode_images(image_dir=work_dir)
duplicates = encoder.find_duplicates(
encoding_map=encodings,
min_similarity_threshold=cnn_threshold,
scores=True
)
else:
# Hashing processes the entire directory
encodings = encoder.encode_images(image_dir=work_dir)
duplicates = encoder.find_duplicates(
encoding_map=encodings,
max_distance_threshold=hash_threshold,
scores=True
)
moved = set()
total_moved = 0
assigned_group = {}
group_counter = 1
for img, dups in duplicates.items():
if not dups:
continue
group = [img]
for item in dups:
if isinstance(item, tuple):
dup_name, _ = item
else:
dup_name = item
group.append(dup_name)
# Translate all temp names to original names
original_group = [map_temp_to_orig.get(f, f) for f in group]
# Determine the group number: reuse if any already has one, otherwise assign a new one
group_id = None
for name in original_group:
if name in assigned_group:
group_id = assigned_group[name]
break
if group_id is None:
group_id = group_counter
group_counter += 1
# Register the group id for all members
for name in original_group:
assigned_group[name] = group_id
print(f"- Detected group #{group_id:05d} ({len(original_group)} files): {original_group}")
group_name = f"{group_id:05d}"
if create_subfolders:
group_dir = os.path.join(REVISION_DIR, group_name)
os.makedirs(group_dir, exist_ok=True)
else:
prefix = f"{group_name}_"
for orig_name in original_group:
src_path = os.path.join(IMAGES_DIR, orig_name)
if create_subfolders:
dst_path = os.path.join(group_dir, orig_name)
else:
dst_name = prefix + orig_name
dst_path = os.path.join(REVISION_DIR, dst_name)
key = orig_name
if os.path.exists(src_path) and key not in moved:
shutil.move(src_path, dst_path)
moved.add(key)
total_moved += 1
# Clean temporaries
clean_directory(TEMP_DIR)
print(f"\nMoved {total_moved} files to '{REVISION_DIR}/'.")
if __name__ == "__main__":
detect_duplicates()