Skip to content

Commit bc1470d

Browse files
committed
Add tqdm progress bar to ltfs_ordered_copy
1 parent b2bb482 commit bc1470d

1 file changed

Lines changed: 89 additions & 21 deletions

File tree

src/utils/ltfs_ordered_copy

Lines changed: 89 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ import threading
4444
from logging import getLogger, basicConfig, NOTSET, CRITICAL, ERROR, WARNING, INFO, DEBUG
4545
from collections import deque
4646

47+
try:
48+
from tqdm import tqdm
49+
USE_TQDM = True
50+
except ImportError:
51+
USE_TQDM = False
52+
4753
class CopyItem:
4854
""""""
4955
def __init__(self, src, dst, vea_pre, cp_attr, cp_xattr, logger): #initialization
@@ -77,15 +83,20 @@ class CopyItem:
7783

7884
return (self.vuuid, self.part, self.start)
7985

80-
def run(self):
86+
def _run_copy(self, progress):
87+
with open(self.src, 'rb') as srcf, open(self.dst, 'wb') as dstf:
88+
copyfileobj(srcf, dstf, progress.update)
89+
90+
def run(self, progress):
8191
try:
8292
if len(self.vuuid):
8393
logger.debug('"{0}" ({2}) -> "{1}"'.format(self.src, self.dst, str(self.start)))
8494
else:
8595
logger.debug('"{0}" -> "{1}"'.format(self.src, self.dst))
86-
96+
progress.update_file(self.src)
8797
if self.cp_attr: #Copy data and metadata
88-
shutil.copy2(self.src, self.dst)
98+
self._run_copy(progress)
99+
shutil.copystat(self.src, self.dst)
89100
if self.cp_xattr:
90101
# Capture EAs of the source file
91102
src_attributes = {}
@@ -96,7 +107,8 @@ class CopyItem:
96107
for key in src_attributes:
97108
xattr.set(self.dst, key, src_attributes[key])
98109
else: #Only copy data
99-
shutil.copy(self.src, self.dst)
110+
self._run_copy(progress)
111+
shutil.copymode(self.src, self.dst)
100112
except Exception as e:
101113
self.logger.error('Failed to copy "{0}" to "{1}": {2}'.format(self.src, self.dst, str(str(e))))
102114
return False
@@ -116,9 +128,11 @@ class CopyQueue:
116128
self.items = 0
117129
self.logger = logger
118130
self.sort_files = sort_files
131+
self.total_bytes = 0
119132

120133
def add_copy_item(self, c):
121134
(u, p, s) = c.eval()
135+
self.total_bytes += os.path.getsize(c.src)
122136
if u == '':
123137
# Source is not on LTFS
124138
self.direct.append(c)
@@ -206,24 +220,81 @@ class CopyQueue:
206220
def get_size(self):
207221
return self.items
208222

223+
RESULT_LOCK = threading.Lock()
224+
225+
# Based on shutil's code
226+
def copyfileobj(fsrc, fdst, callback, length=shutil.COPY_BUFSIZE):
227+
try:
228+
# check for optimisation opportunity
229+
if "b" in fsrc.mode and "b" in fdst.mode and fsrc.readinto:
230+
return _copyfileobj_readinto(fsrc, fdst, callback, length)
231+
except AttributeError:
232+
# one or both file objects do not support a .mode or .readinto attribute
233+
pass
234+
235+
fsrc_read = fsrc.read
236+
fdst_write = fdst.write
237+
238+
while True:
239+
buf = fsrc_read(length)
240+
if not buf:
241+
break
242+
fdst_write(buf)
243+
callback(len(buf))
244+
245+
def _copyfileobj_readinto(fsrc, fdst, callback, length=shutil.COPY_BUFSIZE):
246+
"""readinto()/memoryview() based variant of copyfileobj().
247+
*fsrc* must support readinto() method and both files must be
248+
open in binary mode.
249+
"""
250+
# Localize variable access to minimize overhead.
251+
fsrc_readinto = fsrc.readinto
252+
fdst_write = fdst.write
253+
with memoryview(bytearray(length)) as mv:
254+
while True:
255+
n = fsrc_readinto(mv)
256+
if not n:
257+
break
258+
elif n < length:
259+
with mv[:n] as smv:
260+
fdst.write(smv)
261+
else:
262+
fdst_write(mv)
263+
callback(n)
264+
209265
class Progress:
210-
def __init__(self, logger, title, num): #initialization
266+
def __init__(self, logger, title, num_f, num_b): #initialization
211267
self.logger = logger
212268
self.title = title
213-
self.num = num
214-
self.cur = 0
269+
self.num_f = num_f
270+
self.num_b = num_b
271+
self.cur_f = 0
272+
self.tqdm = None
273+
274+
def update_file(self, name):
275+
# Delay the initialization of tqdm to prevent console spam
276+
if self.logger.getEffectiveLevel() == INFO and USE_TQDM and self.tqdm is None:
277+
self.tqdm = tqdm(total=self.num_b, unit='B', unit_scale=True, unit_divisor=1024)
215278

216-
def update(self, step = 1):
279+
self.cur_f += 1
217280
if self.logger.getEffectiveLevel() == INFO:
218-
self.cur = self.cur + 1
219-
sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur, self.num))
220-
sys.stderr.flush()
281+
if self.tqdm is not None:
282+
self.tqdm.set_description(f'{name} [{self.cur_f} / {self.num_f}]')
283+
else:
284+
sys.stderr.write('\r{}: {}/{}'.format(self.title, self.cur_f, self.num_f))
285+
sys.stderr.flush()
286+
287+
def update(self, bytes_add):
288+
if self.tqdm:
289+
self.tqdm.update(bytes_add)
221290

222291
def finish(self):
223-
if self.logger.getEffectiveLevel() == INFO:
224-
logger.info("")
292+
if self.tqdm is not None:
293+
self.tqdm.close()
294+
else:
295+
if self.logger.getEffectiveLevel() == INFO:
296+
logger.info("")
225297

226-
RESULT_LOCK = threading.Lock()
227298

228299
def writer(logger, prog, q, r):
229300
while True:
@@ -236,9 +307,7 @@ def writer(logger, prog, q, r):
236307
logger.error('writer thread error: ' + str(e))
237308
exit(1)
238309

239-
prog.update()
240-
241-
result = ci.run()
310+
result = ci.run(prog)
242311

243312
with RESULT_LOCK:
244313
if result:
@@ -425,7 +494,7 @@ success = 0
425494
fail = 0
426495

427496
direct = copyq.pop_direct()
428-
prog_disk = Progress(logger, 'File copy from disk is on going', len(direct))
497+
prog_disk = Progress(logger, 'File copy from disk is on going', len(direct), copyq.total_bytes)
429498
if len(direct):
430499
logger.info("Copying on {} disk files with {} threads".format(len(direct), direct_write_threads))
431500
writers = []
@@ -444,7 +513,7 @@ if len(direct):
444513
prog_disk.finish()
445514

446515
# Copy files on LTFS
447-
prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size())
516+
prog_tape = Progress(logger, 'File copy from tape is on going', copyq.get_size(), copyq.total_bytes)
448517
(tape_key, tape) = copyq.pop_tape()
449518
while tape != None:
450519
logger.log(NOTSET + 1, "Processing {}".format(len(tape)))
@@ -461,8 +530,7 @@ while tape != None:
461530
for start_block_key in start_block_list:
462531
file_ind = partition[start_block_key]
463532
for cp in file_ind:
464-
prog_tape.update()
465-
result = cp.run()
533+
result = cp.run(prog_tape)
466534
if result:
467535
success = success + 1
468536
else:

0 commit comments

Comments
 (0)