Skip to content

Latest commit

 

History

History
181 lines (122 loc) · 8.24 KB

File metadata and controls

181 lines (122 loc) · 8.24 KB
order 4

CLI

Core library comes with CLI program called taskiq, which is used to run different subcommands.

By default taskiq is shipped with only two commands: worker and scheduler. You can search for more taskiq plugins using pypi. Some plugins may add new commands to taskiq.

Worker

To run worker process, you have to specify the broker you want to use and modules with defined tasks. Like this:

taskiq worker mybroker:broker_var my_project.module1 my_project.module2

Sync function

Taskiq can run synchronous functions. However, since it operates asynchronously, it executes them in a separate thread or process. By default, ThreadPoolExecutor is used. But if you're planning to use Taskiq for heavy computations, such as neural network model training or other CPU-intensive tasks, you may want to use ProcessPoolExecutor instead.

For more details on the differences between these two options, refer to the Python docs on executors.

As a rule of thumb:

  • If you're using sync functions for IO then use ThreadPoolExecutor;
  • If you're using sync functions for CPU bound workloads then use ProcessPoolExecutor.

By default taskiq uses threadpool. Here are some worker CLI options that can adjust its behavior:

  • --use-process-pool to switch to ProcessPoolExecutor;
  • --max-process-pool-processes to manually specify worker processes;
  • --max-threadpool-threads to configure maximum threads for ThreadPoolExecutor if it's the one being used.

Auto importing

Enumerating all modules with tasks is not an option sometimes. That's why taskiq can auto-discover tasks in current directory recursively.

We have two options for this:

  • --tasks-pattern or -tp. It's a glob pattern of files to import. By default it is **/tasks.py which searches for all tasks.py files. May be specified multiple times.
  • --fs-discover or -fsd. This option enables search of task files in current directory recursively, using the given pattern.

Acknowledgements

The taskiq supports three types of acknowledgements:

  • when_received - task is acknowledged when it is received by the worker.
  • when_executed - task is acknowledged right after it is executed by the worker.
  • when_saved - task is acknowledged when the result of execution is saved in the result backend.

This can be configured using --ack-type parameter. For example:

taskiq worker --ack-type when_executed mybroker:broker

Type casts

One of features taskiq have is automatic type casts. For example you have a type-hinted task like this:

async def task(val: int) -> int:
    return val + 1

If you'll call task.kiq("2") you'll get 3 as the returned value. Because we parse signatures of tasks and cast incoming parameters to target types. If type-cast fails you won't throw any error. It just leave the value as is. That functionality allows you to use pydantic models, or dataclasses as the input parameters.

To disable this pass the --no-parse option to the taskiq.

Hot reload

This is annoying to restart workers every time you modify tasks. That's why taskiq supports hot-reload. Reload is unavailable by default. To enable this feature install taskiq with reload extra.

::: tabs

@tab pip

pip install "taskiq[reload]"

@tab poetry

poetry add taskiq -E reload

@tab uv

uv add taskiq[reload]

:::

To enable this option simply pass the --reload or -r option to worker taskiq CLI.

You can set --reload-dir to specify directory to watch for changes. It can be specified multiple times if you need to watch multiple directories.

Also this option supports .gitignore files. If you have such file in your directory, it won't reload worker when you modify ignored files. To disable this functionality pass --do-not-use-gitignore option.

Graceful reload (available only on Unix systems)

To perform graceful reload, send SIGHUP signal to the main worker process. This action will reload all workers with new code. It's useful for deployment that requires zero downtime, but without using heavy orchestration tools like Kubernetes.

taskiq worker my_module:broker
kill -HUP <main pid>

Graceful and force shutdowns

If you send SIGINT or SIGKILL to the main process by pressing Ctrl+C or using the kill command, it will initiate the shutdown process. By default, it will stop fetching new messages immediately after receiving the signal but will wait for the completion of all currently executing tasks.

If you don't want to wait too long for tasks to complete each time you shut down the worker, you can either send termination signals three times to the main process to perform a hard kill or configure the --wait-tasks-timeout to set a hard time limit for shutting down.

::: tip Cool tip The number of signals before a hard kill can be configured with the --hardkill-count CLI argument. :::

Other parameters

  • --app-dir - Path to application directory. This path will be used to import tasks modules. If not specified, current working directory will be used.
  • --workers - Number of worker child processes (default: 2).
  • --no-configure-logging - disables default logging configuration for workers.
  • --log-level is used to set a log level (default INFO).
  • --log-format is used to set a log format (default %(asctime)s][%(name)s][%(levelname)-7s][%(processName)s] %(message)s).
  • --max-async-tasks - maximum number of simultaneously running async tasks.
  • --max-async-tasks-jitter – Randomly varies the max async task limit between --max-async-tasks and a jittered value, helping prevent simultaneous worker restarts.
  • --max-prefetch - number of tasks to be prefetched before execution. (Useful for systems with high message rates, but brokers should support acknowledgements).
  • --max-threadpool-threads - number of threads for sync function execution.
  • --no-propagate-errors - if this parameter is enabled, exceptions won't be thrown in generator dependencies.
  • --receiver - python path to custom receiver class.
  • --receiver_arg - custom args for receiver.
  • --ack-type - Type of acknowledgement. This parameter is used to set when to acknowledge the task. Possible values are when_received, when_executed, when_saved. Default is when_saved.
  • --max-tasks-per-child - maximum number of tasks to be executed by a single worker process before restart.
  • --max-fails - Maximum number of child process exits.
  • --shutdown-timeout - maximum amount of time for graceful broker's shutdown in seconds (default 5).
  • --wait-tasks-timeout - if cannot read new messages from the broker or maximum number of tasks is reached, worker will wait for all current tasks to finish. This parameter sets the maximum amount of time to wait until shutdown.
  • --hardkill-count - Number of termination signals to the main process before performing a hardkill.

Scheduler

Scheduler is used to schedule tasks as described in Scheduling tasks section.

To run it simply run

taskiq scheduler <path to scheduler> [optional module to import]...

For example

taskiq scheduler my_project.broker:scheduler my_project.module1 my_project.module2

Parameters

Path to scheduler is the only required argument.

  • --app-dir - Path to application directory. This path will be used to import tasks modules. If not specified, current working directory will be used.
  • --tasks-pattern or -tp. It's a glob pattern of files to import. By default it is **/tasks.py which searches for all tasks.py files. May be specified multiple times.
  • --fs-discover or -fsd. This option enables search of task files in current directory recursively, using the given pattern.
  • --no-configure-logging - use this parameter if your application configures custom logging.
  • --log-level is used to set a log level (default INFO).
  • --skip-first-run - skip first run of scheduler. This option skips running tasks immediately after scheduler start.
  • --update-interval - interval in seconds to check for new tasks. By default scheduler will check for new scheduled tasks every first second of the minute.