-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
2048 lines (1804 loc) · 88.4 KB
/
app.py
File metadata and controls
2048 lines (1804 loc) · 88.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# coding: utf-8
import os
import subprocess
import logging
from pathlib import Path
import tyro
import gradio as gr
import os.path as osp
try:
import cv2
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
logging.warning("OpenCV not available. Some features may be limited.")
from src.utils.helper import load_description
from src.gradio_pipeline import GradioPipeline
from src.config.crop_config import CropConfig
from src.config.argument_config import ArgumentConfig
from src.config.inference_config import InferenceConfig
from src.utils.video import has_audio_stream, exec_cmd, VideoEnhancer
try:
from video_enhanced import (
VideoEnhancer as EnhancedVideoEnhancer,
get_video_info,
downscale_video,
extract_frames,
reassemble_video,
find_ffmpeg
)
ENHANCED_VIDEO_AVAILABLE = True
except ImportError:
ENHANCED_VIDEO_AVAILABLE = False
logging.warning("Enhanced video processing not available, using basic version")
find_ffmpeg = lambda: 'ffmpeg'
try:
from face_detection import FaceDetector, detect_faces_in_image
FACE_DETECTION_AVAILABLE = True
except ImportError:
FACE_DETECTION_AVAILABLE = False
logging.warning("Face detection not available")
try:
from aspect_ratio_utils import AspectRatioProcessor
ASPECT_RATIO_AVAILABLE = True
except ImportError:
ASPECT_RATIO_AVAILABLE = False
logging.warning("Aspect ratio processing not available")
try:
from audio_processor import (
extract_audio, sync_audio_to_video, add_background_music,
normalize_audio, process_audio_for_video, check_video_has_audio
)
AUDIO_PROCESSING_AVAILABLE = True
except ImportError:
AUDIO_PROCESSING_AVAILABLE = False
logging.warning("Audio processing not available")
try:
from export_utils import (
export_custom_resolution, export_gif, export_frame_sequence,
get_video_resolution
)
EXPORT_UTILS_AVAILABLE = True
except ImportError:
EXPORT_UTILS_AVAILABLE = False
logging.warning("Export utilities not available")
try:
from retargeting_utils import (
RetargetingParams, ExpressionPresets, apply_expression_intensity,
calculate_blink_pattern, create_retargeting_params_from_preset
)
RETARGETING_UTILS_AVAILABLE = True
except ImportError:
RETARGETING_UTILS_AVAILABLE = False
logging.warning("Retargeting utilities not available")
try:
from performance_utils import (
GPUMemoryManager, ProcessingQueue, ProcessingCheckpoint,
MultiGPUSupport, SystemMonitor, ProcessingState
)
PERFORMANCE_UTILS_AVAILABLE = True
except ImportError:
PERFORMANCE_UTILS_AVAILABLE = False
logging.warning("Performance utilities not available")
try:
from history_manager import HistoryManager, GenerationHistory
HISTORY_AVAILABLE = True
except ImportError:
HISTORY_AVAILABLE = False
logging.warning("History manager not available")
try:
from ui_utils import (
create_comparison_image, extract_video_frame,
create_preview_grid, image_to_base64
)
UI_UTILS_AVAILABLE = True
except ImportError:
UI_UTILS_AVAILABLE = False
logging.warning("UI utilities not available")
# Paths - Use environment variable or default to current directory
project_root = Path(os.getenv('LIVEPORTRAIT_ROOT', Path.cwd()))
live_portrait_output_dir = project_root / 'output'
esrgan_input_dir = live_portrait_output_dir / 'frames'
esrgan_output_dir = live_portrait_output_dir / 'enhanced_frames'
esrgan_model_path = project_root / 'pretrained_weights' / 'RealESRGAN_x4plus_anime_6B.pth'
esrgan_script_path = project_root / 'real-esrgan' / 'inference_realesrgan.py'
# Ensure the output directory exists
os.makedirs(live_portrait_output_dir, exist_ok=True)
os.makedirs(esrgan_input_dir, exist_ok=True)
os.makedirs(esrgan_output_dir, exist_ok=True)
# Initialize enhancer (use enhanced version if available)
if ENHANCED_VIDEO_AVAILABLE:
enhancer = EnhancedVideoEnhancer(esrgan_script_path, project_root / 'pretrained_weights', esrgan_output_dir)
else:
enhancer = VideoEnhancer(esrgan_script_path, esrgan_model_path, esrgan_output_dir)
# Initialize performance utilities
if PERFORMANCE_UTILS_AVAILABLE:
gpu_memory_manager = GPUMemoryManager(device_id=0, low_memory_mode=False)
processing_queue = ProcessingQueue(max_size=100)
checkpoint_manager = ProcessingCheckpoint(live_portrait_output_dir / 'checkpoints')
multi_gpu = MultiGPUSupport()
system_monitor = SystemMonitor()
# Apply low-memory optimizations if needed
if os.getenv('LOW_MEMORY_MODE', 'false').lower() == 'true':
gpu_memory_manager.low_memory_mode = True
gpu_memory_manager.optimize_for_low_memory()
else:
gpu_memory_manager = None
processing_queue = None
checkpoint_manager = None
multi_gpu = None
system_monitor = None
# Initialize history manager
if HISTORY_AVAILABLE:
history_manager = HistoryManager(live_portrait_output_dir / 'history', max_entries=100)
else:
history_manager = None
def partial_fields(target_class, kwargs):
return target_class(**{k: v for k, v in kwargs.items() if hasattr(target_class, k)})
def downscale_video(input_video_path, output_video_path, width=1280):
command = [
'ffmpeg', '-i', str(input_video_path), '-vf', f'scale={width}:-1', str(output_video_path)
]
subprocess.run(command, check=True)
def extract_frames(video_path, output_dir):
command = [
'ffmpeg', '-i', str(video_path),
str(output_dir / 'frame_%04d.png')
]
subprocess.run(command, check=True)
def reassemble_video(input_dir, output_video_path, fps=30):
command = [
'ffmpeg', '-framerate', str(fps), '-i',
str(input_dir / 'frame_%04d.png'), '-c:v', 'libx264',
'-pix_fmt', 'yuv420p', str(output_video_path)
]
subprocess.run(command, check=True)
def enhance_video(video_path, model_name='RealESRGAN_x4plus_anime_6B',
quality='high', format='mp4', fps=30, smoothing_strength=0.0,
denoise_strength=0.0, stabilize=False, progress=gr.Progress()):
"""Enhanced video enhancement with model selection and export options."""
try:
if not video_path:
raise ValueError("No video provided")
video_path = Path(video_path)
if not video_path.exists():
raise FileNotFoundError(f"Video file not found: {video_path}")
downscaled_video_path = live_portrait_output_dir / 'downscaled_video.mp4'
# Generate output filename with format
format_ext = format.lower() if format else 'mp4'
enhanced_video_path = live_portrait_output_dir / f'enhanced_live_portrait.{format_ext}'
# Get original video info for audio preservation
audio_path = None
if ENHANCED_VIDEO_AVAILABLE:
video_info = get_video_info(video_path)
original_fps = video_info.get('fps', 30)
fps = fps if fps > 0 else original_fps
# Extract audio if available
if has_audio_stream(video_path):
audio_path = live_portrait_output_dir / 'extracted_audio.wav'
try:
ffmpeg_cmd = find_ffmpeg() if ENHANCED_VIDEO_AVAILABLE else 'ffmpeg'
cmd = [ffmpeg_cmd, '-i', str(video_path), '-vn', '-acodec', 'pcm_s16le',
'-y', str(audio_path)]
subprocess.run(cmd, check=True, capture_output=True)
except Exception as e:
logging.warning(f"Could not extract audio: {e}")
audio_path = None
# Progress tracking
def update_progress(progress_value, message):
if progress:
progress(progress_value, desc=message)
# Downscale the video
if ENHANCED_VIDEO_AVAILABLE:
downscale_video(video_path, downscaled_video_path, width=1280, progress_callback=update_progress)
else:
downscale_video(video_path, downscaled_video_path)
update_progress(0.3, "Video downscaled")
# Extract frames
if ENHANCED_VIDEO_AVAILABLE:
extract_frames(downscaled_video_path, esrgan_input_dir, progress_callback=update_progress)
else:
extract_frames(downscaled_video_path, esrgan_input_dir)
update_progress(0.5, "Frames extracted")
# Enhance frames using Real-ESRGAN
if ENHANCED_VIDEO_AVAILABLE and hasattr(enhancer, 'enhance_frames'):
enhancer.enhance_frames(esrgan_input_dir, model_name=model_name, progress_callback=update_progress)
else:
enhancer.enhance_frames(esrgan_input_dir)
update_progress(0.8, "Frames enhanced")
# Reassemble the enhanced frames into a video
if ENHANCED_VIDEO_AVAILABLE:
reassemble_video(esrgan_output_dir, enhanced_video_path, fps=fps,
quality=quality, format=format_ext, audio_path=audio_path,
smoothing_strength=smoothing_strength,
denoise_strength=denoise_strength,
stabilize=stabilize,
progress_callback=update_progress)
else:
reassemble_video(esrgan_output_dir, enhanced_video_path, fps=fps)
update_progress(1.0, "Video complete!")
return str(enhanced_video_path)
except Exception as e:
logging.error(f"Error enhancing video: {str(e)}")
raise gr.Error(f"Failed to enhance video: {str(e)}")
# set tyro theme
tyro.extras.set_accent_color("bright_cyan")
args = tyro.cli(ArgumentConfig)
# specify configs for inference
inference_cfg = partial_fields(InferenceConfig, args.__dict__) # use attribute of args to initial InferenceConfig
crop_cfg = partial_fields(CropConfig, args.__dict__) # use attribute of args to initial CropConfig
gradio_pipeline = GradioPipeline(
inference_cfg=inference_cfg,
crop_cfg=crop_cfg,
args=args
)
def detect_faces_interface(image_path, progress=gr.Progress()):
"""Interface for face detection."""
if not image_path:
return None, gr.update(visible=False), gr.update(choices=[], visible=False)
try:
if progress:
progress(0.1, "Detecting faces...")
# Initialize detector with pretrained weights path
model_path = project_root / 'pretrained_weights' / 'insightface' / 'models' / 'buffalo_l'
detector = FaceDetector(str(model_path) if model_path.exists() else None)
faces = detector.detect_faces(image_path)
if progress:
progress(1.0, f"Detected {len(faces)} face(s)")
if not faces:
return None, gr.update(visible=False), gr.update(
choices=[],
visible=False,
value=[]
)
# Draw face boxes
annotated_image = detector.draw_face_boxes(image_path, faces)
# Save annotated image temporarily
if CV2_AVAILABLE:
temp_path = live_portrait_output_dir / 'face_detection_preview.jpg'
cv2.imwrite(str(temp_path), annotated_image)
else:
temp_path = image_path # Fallback
# Create face selection choices
face_choices = [f"Face {i+1} (Confidence: {f['confidence']:.2f})" for i, f in enumerate(faces)]
return (
str(temp_path),
gr.update(visible=True),
gr.update(choices=face_choices, visible=True, value=face_choices if len(faces) == 1 else [])
)
except Exception as e:
logging.error(f"Face detection error: {str(e)}")
return None, gr.update(visible=False), gr.update(choices=[], visible=False)
def gpu_wrapped_execute_video(image_path, video_path, relative_motion, do_crop, remap,
crop_driving_video, smoothing_strength=0.0, denoise_strength=0.0,
stabilize=False, selected_face_indices=None,
aspect_ratio='1:1', custom_width=1024, custom_height=1024,
crop_mode='center', preserve_bg=True,
enable_audio_sync=True, add_background_music=False,
background_music_file=None, music_volume=0.3,
original_audio_volume=1.0, normalize_audio=True,
loop_audio=False, progress=gr.Progress(),
enable_checkpoint=False, job_id=None):
"""Wrapper for video execution with smoothing, multi-face, aspect ratio, audio, and performance options."""
# Performance optimizations
if PERFORMANCE_UTILS_AVAILABLE:
# Clear cache before processing
if gpu_memory_manager:
gpu_memory_manager.clear_cache()
if progress:
mem_info = gpu_memory_manager.get_memory_info_string()
progress(0.01, f"GPU Memory: {mem_info}")
# Create job ID if not provided
if job_id is None:
import uuid
job_id = str(uuid.uuid4())[:8]
# Create checkpoint state
if enable_checkpoint and checkpoint_manager:
state = ProcessingState(
job_id=job_id,
image_path=str(image_path),
video_path=str(video_path),
parameters={
'relative_motion': relative_motion,
'do_crop': do_crop,
'remap': remap,
'crop_driving_video': crop_driving_video,
},
current_step='initialization',
progress=0.0
)
checkpoint_manager.save_checkpoint(state)
# Handle aspect ratio preprocessing
processed_image_path = image_path
if ASPECT_RATIO_AVAILABLE and aspect_ratio != '1:1':
try:
processor = AspectRatioProcessor()
# Get face bbox if available for face-aware cropping
face_bbox = None
if crop_mode == 'face' and FACE_DETECTION_AVAILABLE:
try:
model_path = project_root / 'pretrained_weights' / 'insightface' / 'models' / 'buffalo_l'
detector = FaceDetector(str(model_path) if model_path.exists() else None)
faces = detector.detect_faces(image_path)
if faces:
face_bbox = faces[0]['bbox'] # Use first face
except:
pass
# Process image to target aspect ratio
processed_img = processor.resize_to_aspect(
image_path,
aspect_ratio,
int(custom_width) if custom_width else None,
int(custom_height) if custom_height else None,
max_dimension=1024,
preserve_background=preserve_bg,
crop_mode=crop_mode,
face_bbox=face_bbox
)
# Save processed image
processed_image_path = live_portrait_output_dir / 'aspect_processed_image.jpg'
if CV2_AVAILABLE:
cv2.imwrite(str(processed_image_path), processed_img)
else:
processed_image_path = image_path # Fallback
except Exception as e:
logging.warning(f"Aspect ratio processing failed: {e}, using original image")
processed_image_path = image_path
# Handle multi-face processing if faces are selected
if FACE_DETECTION_AVAILABLE and selected_face_indices and len(selected_face_indices) > 0:
try:
# Process each selected face
results = []
model_path = project_root / 'pretrained_weights' / 'insightface' / 'models' / 'buffalo_l'
detector = FaceDetector(str(model_path) if model_path.exists() else None)
faces = detector.detect_faces(image_path)
selected_faces_list = [faces[int(idx.split()[1]) - 1] for idx in selected_face_indices
if idx.startswith("Face")]
for idx, face in enumerate(selected_faces_list):
if progress:
progress(idx / len(selected_faces_list), f"Processing face {idx+1}/{len(selected_faces_list)}...")
# Crop face from image
if CV2_AVAILABLE:
cropped_face = detector.crop_face(image_path, face)
temp_face_path = live_portrait_output_dir / f'temp_face_{idx}.jpg'
cv2.imwrite(str(temp_face_path), cropped_face)
# Process this face (use processed image if aspect ratio was applied)
face_img_path = str(temp_face_path)
result = gradio_pipeline.execute_video(
face_img_path, video_path, relative_motion,
False, remap, crop_driving_video # Don't crop already cropped face
)
if isinstance(result, tuple):
results.append(result[1] if len(result) > 1 else result[0])
else:
results.append(result)
# For now, return the first result (TODO: composite multiple faces)
if results:
result = results[0]
else:
result = gradio_pipeline.execute_video(image_path, video_path, relative_motion,
do_crop, remap, crop_driving_video)
except Exception as e:
logging.warning(f"Multi-face processing failed: {e}, falling back to single face")
result = gradio_pipeline.execute_video(processed_image_path, video_path, relative_motion,
do_crop, remap, crop_driving_video)
else:
# Standard single-face processing (use processed image if aspect ratio was applied)
result = gradio_pipeline.execute_video(processed_image_path, video_path, relative_motion,
do_crop, remap, crop_driving_video)
# Handle tuple return (original, cropped)
if isinstance(result, tuple):
video_path_output = result[1] if len(result) > 1 else result[0]
else:
video_path_output = result
# Apply smoothing if requested
if (smoothing_strength > 0 or denoise_strength > 0 or stabilize) and ENHANCED_VIDEO_AVAILABLE:
try:
from video_enhanced import apply_smoothing_filters
if progress:
progress(0.95, "Applying smoothing filters...")
apply_smoothing_filters(video_path_output, smoothing_strength,
denoise_strength, stabilize,
lambda p, m: progress(p, desc=m) if progress else None)
except Exception as e:
logging.warning(f"Smoothing failed: {str(e)}")
# Handle audio processing
if AUDIO_PROCESSING_AVAILABLE:
try:
# Determine which video to use for audio processing
if isinstance(result, tuple):
video_to_process = result[1] if len(result) > 1 else result[0]
else:
video_to_process = result
video_to_process = str(video_to_process)
# Process audio if enabled
if enable_audio_sync or (add_background_music and background_music_file):
if progress:
progress(0.98, "Processing audio...")
# Get background music path
music_path = None
if add_background_music and background_music_file:
music_path = background_music_file.name if hasattr(background_music_file, 'name') else str(background_music_file)
# Process audio
audio_output = process_audio_for_video(
video_to_process,
source_video_path=str(video_path) if enable_audio_sync else None,
background_music_path=music_path,
music_volume=float(music_volume),
normalize=normalize_audio,
loop_audio=loop_audio
)
if audio_output and os.path.exists(audio_output):
# Replace result with audio-enhanced version
if isinstance(result, tuple):
result = (result[0], audio_output) if len(result) > 1 else (audio_output,)
else:
result = audio_output
if progress:
progress(1.0, "Audio processing complete!")
except Exception as e:
logging.warning(f"Audio processing failed: {str(e)}")
return result
def gpu_wrapped_execute_image(*args, **kwargs):
return gradio_pipeline.execute_image(*args, **kwargs)
# assets
example_portrait_dir = "assets/examples/source"
example_video_dir = "assets/examples/driving"
data_examples = [
[osp.join(example_portrait_dir, "s9.jpg"), osp.join(example_video_dir, "d0.mp4"), True, True, True, False],
[osp.join(example_portrait_dir, "s6.jpg"), osp.join(example_video_dir, "d0.mp4"), True, True, True, False],
[osp.join(example_portrait_dir, "s10.jpg"), osp.join(example_video_dir, "d0.mp4"), True, True, True, False],
[osp.join(example_portrait_dir, "s5.jpg"), osp.join(example_video_dir, "d18.mp4"), True, True, True, False],
[osp.join(example_portrait_dir, "s7.jpg"), osp.join(example_video_dir, "d19.mp4"), True, True, True, False],
[osp.join(example_portrait_dir, "s2.jpg"), osp.join(example_video_dir, "d13.mp4"), True, True, True, True],
]
#################### interface logic ####################
def enhance_video_interface(video_path, model_display_name=None,
quality='high', format='mp4', fps=30,
smoothing_strength=0.0, denoise_strength=0.0, stabilize=False,
export_custom_resolution=False, export_width=1920, export_height=1080,
export_maintain_aspect=True, export_gif=False, gif_fps=15, gif_width=512,
export_frames=False, frame_format='png', frame_step=1,
progress=gr.Progress()):
"""Interface wrapper for video enhancement with export options."""
try:
# Map display name to model ID
if ENHANCED_VIDEO_AVAILABLE and hasattr(enhancer, 'get_available_models'):
available_models = enhancer.get_available_models()
model_map = {m['description']: m['id'] for m in available_models}
model_name = model_map.get(model_display_name, 'RealESRGAN_x4plus_anime_6B')
else:
model_name = 'RealESRGAN_x4plus_anime_6B'
enhanced_video_path = enhance_video(video_path, model_name, quality, format, fps,
smoothing_strength, denoise_strength, stabilize, progress)
# Handle export options
export_results = []
if EXPORT_UTILS_AVAILABLE:
base_path = Path(enhanced_video_path)
output_dir = base_path.parent
# Custom resolution export
if export_custom_resolution:
if progress:
progress(0.95, "Exporting custom resolution...")
custom_output = output_dir / f"{base_path.stem}_custom_{int(export_width)}x{int(export_height)}.mp4"
if export_custom_resolution(enhanced_video_path, str(custom_output),
int(export_width), int(export_height),
export_maintain_aspect, quality):
export_results.append(str(custom_output))
# GIF export
if export_gif:
if progress:
progress(0.96, "Exporting GIF...")
gif_output = output_dir / f"{base_path.stem}.gif"
if export_gif(enhanced_video_path, str(gif_output), int(gif_fps), int(gif_width)):
export_results.append(str(gif_output))
# Frame sequence export
if export_frames:
if progress:
progress(0.97, "Exporting frame sequence...")
frames_dir = output_dir / f"{base_path.stem}_frames"
success, frame_count = export_frame_sequence(
enhanced_video_path, str(frames_dir), frame_format,
'frame', 0, None, int(frame_step)
)
if success:
export_results.append(str(frames_dir))
# Return main video (export results are saved to disk)
return enhanced_video_path
except Exception as e:
logging.error(f"Enhancement error: {str(e)}")
raise gr.Error(f"Enhancement failed: {str(e)}")
def process_batch(images, videos, relative_motion, do_crop, remap,
enhance_enabled, enhance_model, enhance_quality, progress=gr.Progress()):
"""Process multiple images and videos in batch with queue system and per-item tracking."""
try:
from batch_processor import BatchProcessor
BATCH_PROCESSOR_AVAILABLE = True
except ImportError:
BATCH_PROCESSOR_AVAILABLE = False
import zipfile
from datetime import datetime
import time
if not images or not videos:
return "Error: Please upload at least one image and one video.", None, gr.update(visible=False), gr.update(visible=False)
batch_output_dir = live_portrait_output_dir / 'batch_output'
batch_output_dir.mkdir(exist_ok=True)
# Initialize batch processor if available
if BATCH_PROCESSOR_AVAILABLE:
processor = BatchProcessor(batch_output_dir)
items = processor.create_batch_queue(images, videos)
total_items = len(items)
else:
total_items = len(images) * len(videos)
items = None
results = []
status_updates = []
# Map model display name to model ID
model_name = 'RealESRGAN_x4plus_anime_6B'
if enhance_enabled and enhance_model:
if ENHANCED_VIDEO_AVAILABLE and hasattr(enhancer, 'get_available_models'):
try:
available_models = enhancer.get_available_models()
for model in available_models:
if model['name'] in enhance_model or model['description'] in enhance_model:
model_name = model['id']
break
except:
pass
try:
current_item = 0
for img_idx, image_file in enumerate(images):
for vid_idx, video_file in enumerate(videos):
current_item += 1
progress_value = current_item / total_items
# Update item status if using batch processor
if BATCH_PROCESSOR_AVAILABLE and items:
item = items[current_item - 1]
item.status = 'processing'
item.start_time = time.time()
# Detailed status message
status_msg = f"[{current_item}/{total_items}] Processing Image {img_idx+1} × Video {vid_idx+1}..."
status_updates.append(status_msg)
if progress:
progress(progress_value, desc=status_msg)
# Get status text if using batch processor
if BATCH_PROCESSOR_AVAILABLE and items:
status_text = processor.get_item_status_text()
if progress:
progress(progress_value, desc=status_text.split('\n')[0] if status_text else status_msg)
else:
status_text = "\n".join(status_updates[-5:]) # Show last 5 updates
try:
# Get file paths
image_path = image_file.name if hasattr(image_file, 'name') else str(image_file)
video_path_input = video_file.name if hasattr(video_file, 'name') else str(video_file)
# Process animation
output_video = gradio_pipeline.execute_video(
image_path,
video_path_input,
relative_motion,
do_crop,
remap,
False # crop_driving_video
)
# Handle tuple return (original, cropped)
if isinstance(output_video, tuple):
output_video = output_video[1] if len(output_video) > 1 else output_video[0]
if output_video and os.path.exists(str(output_video)):
final_path = str(output_video)
# Enhance if enabled
if enhance_enabled:
try:
enhanced_path = enhance_video(
final_path,
model_name,
enhance_quality,
'mp4',
30,
progress
)
if enhanced_path and os.path.exists(str(enhanced_path)):
results.append(str(enhanced_path))
if BATCH_PROCESSOR_AVAILABLE and items:
item.result_path = str(enhanced_path)
item.status = 'completed'
item.end_time = time.time()
status_updates.append(f"✅ [{current_item}/{total_items}] Enhanced successfully")
else:
results.append(final_path)
if BATCH_PROCESSOR_AVAILABLE and items:
item.result_path = final_path
item.status = 'completed'
item.end_time = time.time()
status_updates.append(f"✅ [{current_item}/{total_items}] Completed (enhancement skipped)")
except Exception as e:
results.append(final_path)
if BATCH_PROCESSOR_AVAILABLE and items:
item.result_path = final_path
item.status = 'completed'
item.end_time = time.time()
item.error = f"Enhancement failed: {str(e)}"
status_updates.append(f"⚠️ [{current_item}/{total_items}] Completed (enhancement failed)")
else:
results.append(final_path)
if BATCH_PROCESSOR_AVAILABLE and items:
item.result_path = final_path
item.status = 'completed'
item.end_time = time.time()
status_updates.append(f"✅ [{current_item}/{total_items}] Completed")
else:
if BATCH_PROCESSOR_AVAILABLE and items:
item.status = 'failed'
item.error = "No output generated"
item.end_time = time.time()
status_updates.append(f"❌ [{current_item}/{total_items}] Failed: No output")
except Exception as e:
error_msg = f"❌ [{current_item}/{total_items}] Error: {str(e)}"
if BATCH_PROCESSOR_AVAILABLE and items:
item.status = 'failed'
item.error = str(e)
item.end_time = time.time()
status_updates.append(error_msg)
logging.error(error_msg, exc_info=True)
# Update processor results
if BATCH_PROCESSOR_AVAILABLE and items:
processor.results = results
status_text = processor.get_item_status_text()
else:
status_text = "\n".join(status_updates)
# Create ZIP file if we have results
zip_path = None
if results:
if BATCH_PROCESSOR_AVAILABLE and items:
zip_path = processor.create_zip_archive()
else:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
zip_path = batch_output_dir / f"batch_results_{timestamp}.zip"
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for idx, result_file in enumerate(results):
if os.path.exists(result_file):
arcname = f"result_{idx+1:03d}_{os.path.basename(result_file)}"
zipf.write(result_file, arcname)
zip_path = str(zip_path)
except Exception as e:
logging.error(f"ZIP creation failed: {str(e)}")
zip_path = None
if zip_path:
status_text += f"\n\n✅ Batch processing complete! {len(results)}/{total_items} files processed successfully."
status_text += f"\n📦 ZIP archive created: {os.path.basename(zip_path)}"
else:
status_text += f"\n\n✅ Batch processing complete! {len(results)}/{total_items} files processed."
status_text += f"\n⚠️ ZIP creation failed, but individual files are available."
else:
status_text += "\n\n❌ Batch processing completed with errors. No files generated."
return (
status_text,
results if results else None,
gr.update(visible=bool(results)),
gr.update(visible=bool(zip_path))
)
except Exception as e:
error_msg = f"Batch processing failed: {str(e)}"
logging.error(error_msg, exc_info=True)
return error_msg, None, gr.update(visible=False), gr.update(visible=False)
# Modern CSS styling - Dark Theme matching Chat-with-Ollama/MoA
custom_css = """
:root {
--dark-bg: #0d1117;
--dark-surface: #161b22;
--dark-surface-hover: #1c2128;
--glass-bg: rgba(255, 255, 255, 0.03);
--glass-border: rgba(255, 255, 255, 0.08);
--text-primary: #e6edf3;
--text-secondary: rgba(230, 237, 243, 0.6);
--accent: #58a6ff;
--accent-hover: #79c0ff;
--primary-gradient: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
--secondary-gradient: linear-gradient(135deg, #f093fb 0%, #f5576c 100%);
}
.gradio-container {
max-width: 1400px !important;
margin: auto;
font-family: 'Inter', -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: var(--dark-bg) !important;
color: var(--text-primary) !important;
}
.main-header {
text-align: center;
padding: 2rem 0;
background: var(--primary-gradient);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
background-clip: text;
font-weight: 700;
margin-bottom: 0.5rem;
}
.sub-header {
text-align: center;
color: var(--text-secondary);
margin-bottom: 2rem;
font-weight: 400;
}
.section-header {
font-weight: 600;
color: var(--text-primary);
margin-bottom: 1rem;
padding-bottom: 0.5rem;
border-bottom: 2px solid var(--glass-border);
}
.info-text {
background: var(--glass-bg);
backdrop-filter: blur(20px);
padding: 1rem;
border-radius: 12px;
border: 1px solid var(--glass-border);
border-left: 4px solid var(--accent);
margin: 1rem 0;
color: var(--text-secondary);
font-size: 0.95rem;
}
.button-group {
display: flex;
gap: 0.75rem;
flex-wrap: wrap;
justify-content: center;
margin: 1.5rem 0;
}
.output-section {
margin-top: 2rem;
padding-top: 2rem;
border-top: 2px solid var(--glass-border);
}
/* Dark theme overrides for Gradio components */
.gradio-container .gr-form,
.gradio-container .gr-box,
.gradio-container .gr-panel {
background: var(--dark-surface) !important;
border-color: var(--glass-border) !important;
color: var(--text-primary) !important;
}
.gradio-container input,
.gradio-container textarea,
.gradio-container select {
background: var(--glass-bg) !important;
border-color: var(--glass-border) !important;
color: var(--text-primary) !important;
}
.gradio-container .gr-button-primary {
background: var(--primary-gradient) !important;
border: none !important;
}
.gradio-container .gr-button-primary:hover {
opacity: 0.9;
transform: translateY(-2px);
}
.monospace {
font-family: 'Courier New', monospace;
font-size: 0.9rem;
line-height: 1.4;
}
"""
# Create modern dark theme matching Chat-with-Ollama/MoA
theme = gr.themes.Soft(
primary_hue="purple",
secondary_hue="slate",
font=("Inter", "ui-sans-serif", "system-ui", "sans-serif"),
).set(
body_background_fill="#0d1117",
body_background_fill_dark="#0d1117",
block_background_fill="#161b22",
block_background_fill_dark="#161b22",
block_border_color="rgba(255, 255, 255, 0.08)",
block_border_color_dark="rgba(255, 255, 255, 0.08)",
button_primary_background_fill="#667eea",
button_primary_background_fill_hover="#5568d3",
button_primary_text_color="#ffffff",
button_secondary_background_fill="rgba(255, 255, 255, 0.03)",
button_secondary_background_fill_hover="rgba(255, 255, 255, 0.05)",
button_secondary_text_color="#e6edf3",
border_color_accent="#667eea",
shadow_drop_lg="0 8px 24px rgba(0, 0, 0, 0.5)",
input_background_fill="rgba(255, 255, 255, 0.03)",
input_background_fill_dark="rgba(255, 255, 255, 0.03)",
input_border_color="rgba(255, 255, 255, 0.08)",
input_border_color_dark="rgba(255, 255, 255, 0.08)",
input_text_color="#e6edf3",
input_text_color_dark="#e6edf3",
body_text_color="#e6edf3",
body_text_color_dark="#e6edf3",
block_label_text_color="#e6edf3",
block_label_text_color_dark="#e6edf3",
)
with gr.Blocks(theme=theme, css=custom_css, title="PresentaPulse - LivePortrait Animation") as demo:
# Header
gr.HTML("""
<div class="main-header">
<h1 style="margin: 0; font-size: 2.5rem;">🎬 PresentaPulse</h1>
</div>
""")
gr.Markdown(
'<p class="sub-header">Efficient Portrait Animation with Smoothing and Retargeting Control</p>',
elem_classes=["sub-header"]
)
# Performance Monitoring (if available)
if PERFORMANCE_UTILS_AVAILABLE:
with gr.Accordion("⚡ Performance & System Info", open=False, elem_id="performance_panel"):
with gr.Row():
with gr.Column():
performance_status = gr.Markdown(
value="**System Status:**\n- GPU Memory: Loading...\n- CPU Usage: Loading...\n- RAM Usage: Loading...",
label="System Status"
)
refresh_performance = gr.Button("🔄 Refresh Status", size="sm")
low_memory_mode_toggle = gr.Checkbox(
value=False,
label="Low Memory Mode",
info="Enable for GPUs with limited VRAM (<6GB)"
)
if multi_gpu and len(multi_gpu.get_available_devices()) > 1:
gpu_selection = gr.Radio(
choices=[f"GPU {i}" for i in multi_gpu.get_available_devices()],
value=f"GPU {multi_gpu.get_available_devices()[0]}",
label="Select GPU",
info="Choose which GPU to use for processing"
)
else:
gpu_selection = gr.Radio(visible=False)
def update_performance_status():
if not PERFORMANCE_UTILS_AVAILABLE:
return "Performance monitoring not available"
info = system_monitor.get_system_info()
status_lines = ["**System Status:**"]
# CPU and RAM
status_lines.append(f"- **CPU Usage:** {info['cpu_usage']:.1f}%")
ram = info['ram']
status_lines.append(f"- **RAM:** {ram['used']:.0f}MB / {ram['total']:.0f}MB ({ram['percent']:.1f}%)")
# GPU Info
if 'gpus' in info:
status_lines.append(f"\n**GPU Information:**")