Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions scheduler/tests/test_worker/test_worker_creation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import socket
import uuid

from scheduler import settings
Expand All @@ -15,7 +15,7 @@ def test_create_worker__two_workers_same_queue(self):
worker1.worker_start()
worker2 = create_worker("default")
worker2.worker_start()
hostname = os.uname()[1]
hostname = socket.gethostname()
self.assertEqual(f"{hostname}-worker.1", worker1.name)
self.assertEqual(f"{hostname}-worker.2", worker2.name)

Expand Down
6 changes: 4 additions & 2 deletions scheduler/types/settings_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from dataclasses import dataclass
from enum import Enum
from typing import Callable, Dict, Optional, List, Tuple, Any, Type, ClassVar, Set
import signal
from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty, TimerDeathPenalty

from scheduler.helpers.timeouts import BaseDeathPenalty, UnixSignalDeathPenalty
_DEATH_PENALTY_CLASS = UnixSignalDeathPenalty if hasattr(signal, "SIGALRM") else TimerDeathPenalty

if sys.version_info >= (3, 11):
from typing import Self
Expand Down Expand Up @@ -40,7 +42,7 @@ class SchedulerConfiguration:
DEFAULT_MAINTENANCE_TASK_INTERVAL: int = 10 * 60 # The interval to run maintenance tasks in seconds. 10 minutes.
DEFAULT_JOB_MONITORING_INTERVAL: int = 30 # The interval to monitor jobs in seconds.
SCHEDULER_FALLBACK_PERIOD_SECS: int = 120 # Period (secs) to wait before requiring to reacquire locks
DEATH_PENALTY_CLASS: Type[BaseDeathPenalty] = UnixSignalDeathPenalty
DEATH_PENALTY_CLASS: Type[BaseDeathPenalty] = _DEATH_PENALTY_CLASS


@dataclass(slots=True, frozen=True, kw_only=True)
Expand Down
4 changes: 2 additions & 2 deletions scheduler/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ def execute_job(self, job: JobModel, queue: Queue) -> None:
The worker will wait for the job execution process and make sure it executes within the given timeout bounds, or
will end the job execution process with SIGALRM.
"""
if self.fork_job_execution:
if hasattr(os, "fork") and self.fork_job_execution:
self._model.set_field("state", WorkerStatus.BUSY, connection=self.connection)
self.fork_job_execution_process(job, queue)
self.monitor_job_execution_process(job, queue)
Expand Down Expand Up @@ -839,7 +839,7 @@ def _ensure_list(obj: Any) -> List[Any]:


def _calc_worker_name(existing_worker_names: Collection[str]) -> str:
hostname = os.uname()[1]
hostname = socket.gethostname()
c = 1
worker_name = f"{hostname}-worker.{c}"
Comment thread
cunla marked this conversation as resolved.
while worker_name in existing_worker_names:
Expand Down
Loading
Loading