| name | pyqt-threading | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| description | PyQt/PySide6 threading and concurrency - QThread, QThreadPool, QTimer, thread safety, concurrent patterns | ||||||||||||||
| metadata |
|
Comprehensive guide to threading in PyQt applications.
CRITICAL: Qt/PyQt is NOT thread-safe for UI operations. You MUST follow these rules:
- Never access widgets from worker threads - Only the main thread can modify UI
- Use signals for cross-thread communication - Emit signals from worker, connect to slots in main thread
- Use Qt.QueuedConnection for thread-safe signal delivery - AutoConnection handles this automatically
- Never block the main thread - Long operations will freeze the UI
# ❌ WRONG: Direct UI access from thread
class BadWorker(QThread):
def run(self):
# This will crash or cause undefined behavior!
self.label.setText("Done")
# ✅ CORRECT: Use signals
class GoodWorker(QThread):
finished = Signal(str)
def run(self):
result = self.process_data()
self.finished.emit(result) # Signal emitted, UI updated in main threadThe most flexible pattern separates the worker logic from thread lifecycle:
from PySide6.QtCore import QThread, Signal, QObject, Slot
class Worker(QObject):
"""Worker object that does the actual work."""
finished = Signal(object)
progress = Signal(int)
error = Signal(str)
def __init__(self, data):
super().__init__()
self.data = data
self._is_cancelled = False
@Slot()
def process(self):
"""Main processing method called from thread."""
try:
for i, item in enumerate(self.data):
if self._is_cancelled:
return
# Simulate heavy work
result = self.process_item(item)
self.progress.emit(int((i + 1) / len(self.data) * 100))
self.finished.emit({"status": "success", "count": len(self.data)})
except Exception as e:
self.error.emit(str(e))
def cancel(self):
self._is_cancelled = True
def process_item(self, item):
import time
time.sleep(0.1) # Simulate work
return item * 2
class ThreadController(QObject):
"""Manages worker thread lifecycle."""
def __init__(self):
super().__init__()
self.thread = None
self.worker = None
def start_work(self, data):
# Create thread and worker
self.thread = QThread()
self.worker = Worker(data)
# Move worker to thread
self.worker.moveToThread(self.thread)
# Connect signals
self.worker.finished.connect(self.on_finished)
self.worker.progress.connect(self.on_progress)
self.worker.error.connect(self.on_error)
# Thread lifecycle
self.thread.started.connect(self.worker.process)
self.thread.finished.connect(self.thread.deleteLater)
# Start thread
self.thread.start()
def cancel_work(self):
if self.worker:
self.worker.cancel()
if self.thread:
self.thread.quit()
self.thread.wait()
@Slot()
def on_finished(self, result):
print(f"Work completed: {result}")
self.cleanup()
@Slot()
def on_progress(self, percent):
print(f"Progress: {percent}%")
@Slot()
def on_error(self, error):
print(f"Error: {error}")
self.cleanup()
def cleanup(self):
self.thread = None
self.worker = NoneFor simpler cases, subclass QThread directly:
from PySide6.QtCore import QThread, Signal
class DataProcessor(QThread):
"""Thread that processes data and emits progress."""
progress = Signal(int)
result_ready = Signal(list)
error_occurred = Signal(str)
finished = Signal()
def __init__(self, input_data, parent=None):
super().__init__(parent)
self.input_data = input_data
self._cancelled = False
def run(self):
"""Thread entry point - called by start()."""
try:
results = []
total = len(self.input_data)
for i, item in enumerate(self.input_data):
if self._cancelled:
self.error_occurred.emit("Cancelled")
return
# Process item (heavy work here)
processed = self.process_item(item)
results.append(processed)
# Emit progress
progress_percent = int((i + 1) / total * 100)
self.progress.emit(progress_percent)
self.result_ready.emit(results)
except Exception as e:
self.error_occurred.emit(str(e))
finally:
self.finished.emit()
def process_item(self, item):
import time
time.sleep(0.05) # Simulate work
return str(item).upper()
def cancel(self):
self._cancelled = True
# Usage
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.processor = None
self.progress = QProgressBar()
self.start_btn = QPushButton("Start")
self.cancel_btn = QPushButton("Cancel")
self.start_btn.clicked.connect(self.start_processing)
self.cancel_btn.clicked.connect(self.cancel_processing)
def start_processing(self):
data = ["item1", "item2", "item3", "item4", "item5"]
self.processor = DataProcessor(data)
self.processor.progress.connect(self.progress.setValue)
self.processor.result_ready.connect(self.on_results)
self.processor.error_occurred.connect(self.on_error)
self.processor.finished.connect(self.on_finished)
self.processor.start()
self.start_btn.setEnabled(False)
def cancel_processing(self):
if self.processor:
self.processor.cancel()
def on_results(self, results):
print(f"Got {len(results)} results")
def on_error(self, error):
QMessageBox.warning(self, "Error", error)
def on_finished(self):
self.start_btn.setEnabled(True)
self.progress.setValue(0)
self.processor = NoneFor parallel execution of independent tasks:
from PySide6.QtCore import QThreadPool, QRunnable, Signal, QObject, QThread
import time
class TaskSignals(QObject):
"""Signals for QRunnable (QRunnable cannot have signals directly)."""
finished = Signal(object)
error = Signal(str)
progress = Signal(int)
class ParallelTask(QRunnable):
"""Runnable task for thread pool."""
def __init__(self, task_id, data):
super().__init__()
self.task_id = task_id
self.data = data
self.signals = TaskSignals()
self._cancelled = False
def run(self):
"""Executed by thread pool."""
try:
time.sleep(0.5) # Simulate work
if self._cancelled:
return
result = {
"id": self.task_id,
"processed": str(self.data).upper(),
"thread": int(QThread.currentThreadId())
}
self.signals.finished.emit(result)
except Exception as e:
self.signals.error.emit(str(e))
def cancel(self):
self._cancelled = True
class ThreadPoolManager(QObject):
"""Manages parallel task execution."""
all_finished = Signal(int)
def __init__(self, max_threads=4):
super().__init__()
self.pool = QThreadPool()
self.pool.setMaxThreadCount(max_threads)
self.active_tasks = {}
self.completed_count = 0
self.total_tasks = 0
def run_parallel(self, tasks):
"""Run multiple tasks in parallel."""
self.completed_count = 0
self.total_tasks = len(tasks)
self.active_tasks.clear()
for task_id, data in enumerate(tasks):
task = ParallelTask(task_id, data)
task.signals.finished.connect(
lambda result, tid=task_id: self.on_task_finished(result)
)
task.signals.error.connect(self.on_task_error)
self.active_tasks[task_id] = task
self.pool.start(task)
def on_task_finished(self, result):
self.completed_count += 1
task_id = result["id"]
del self.active_tasks[task_id]
if self.completed_count >= self.total_tasks:
self.all_finished.emit(self.completed_count)
def on_task_error(self, error):
print(f"Task error: {error}")
def cancel_all(self):
for task in self.active_tasks.values():
task.cancel()
self.active_tasks.clear()from PySide6.QtCore import QTimer, Slot
class PollingWidget(QWidget):
def __init__(self):
super().__init__()
# Create timer
self.timer = QTimer(self)
self.timer.timeout.connect(self.on_timeout)
# UI
self.status_label = QLabel("Last update: Never")
self.poll_btn = QPushButton("Start Polling")
self.poll_btn.setCheckable(True)
layout = QVBoxLayout(self)
layout.addWidget(self.status_label)
layout.addWidget(self.poll_btn)
self.poll_btn.toggled.connect(self.toggle_polling)
@Slot()
def toggle_polling(self, checked):
if checked:
self.timer.start(1000) # Poll every second
self.poll_btn.setText("Stop Polling")
else:
self.timer.stop()
self.poll_btn.setText("Start Polling")
@Slot()
def on_timeout(self):
from datetime import datetime
self.status_label.setText(f"Last update: {datetime.now().strftime('%H:%M:%S')}")from PySide6.QtCore import QMutex, QMutexLocker, QReadWriteLock
class SharedData:
"""Thread-safe data container."""
def __init__(self):
self._data = {}
self._mutex = QMutex()
def set_value(self, key, value):
"""Thread-safe write."""
locker = QMutexLocker(self._mutex)
self._data[key] = value
def get_value(self, key, default=None):
"""Thread-safe read."""
locker = QMutexLocker(self._mutex)
return self._data.get(key, default)
def get_all(self):
"""Thread-safe copy of all data."""
locker = QMutexLocker(self._mutex)
return dict(self._data)
class ReadWriteData:
"""Read-write lock for read-heavy workloads."""
def __init__(self):
self._data = {}
self._lock = QReadWriteLock()
def read_value(self, key):
"""Multiple readers can hold the lock."""
self._lock.lockForRead()
try:
return self._data.get(key)
finally:
self._lock.unlock()
def write_value(self, key, value):
"""Only one writer at a time."""
self._lock.lockForWrite()
try:
self._data[key] = value
finally:
self._lock.unlock()- Always use signals for cross-thread communication
- Keep worker objects thread-affinity aware - Don't assume they're in main thread
- Clean up threads properly - Use deleteLater() and quit() + wait()
- Handle cancellation - Check flags periodically in long operations
- Use QThreadPool for parallel independent tasks
- Use QThread.moveToThread() for single long operations
- Never use time.sleep() in main thread - Use timers or workers instead
| Issue | Cause | Solution |
|---|---|---|
| UI freezes | Blocking operation in main thread | Move to worker thread |
| Crashes on widget access | Accessing UI from worker thread | Use signals instead |
| Memory leaks | Thread not cleaned up | Use deleteLater() and proper lifecycle |
| Deadlocks | Multiple mutexes acquired in different order | Always acquire in same order, use timeout |
| Race conditions | Shared data without locks | Use QMutex or atomic operations |