Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ Within the activated Python environment, use the following command to install go
pip install gokart
```

If you use S3 or GCS as a data store, install the corresponding extras:

```bash
pip install gokart[s3] # S3 support
pip install gokart[gcs] # GCS support
pip install gokart[all] # both S3 and GCS
```


# Quickstart

Expand Down
7 changes: 7 additions & 0 deletions docs/intro_to_gokart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ Within the activated Python environment, use the following command to install go

pip install gokart

If you use S3 or GCS as a data store, install the corresponding extras:

.. code:: sh

pip install gokart[s3] # S3 support
pip install gokart[gcs] # GCS support
pip install gokart[all] # both S3 and GCS


Quickstart
Expand Down
10 changes: 8 additions & 2 deletions docs/task_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,15 @@ It is recommended to use the config file since it does not change much.
gokart.add_config('base.ini')


To use the S3 or GCS repository, please set the bucket path as ``s3://{YOUR_REPOSITORY_NAME}`` or ``gs://{YOUR_REPOSITORY_NAME}`` respectively.
To use the S3 or GCS repository, please install the corresponding extras and set the bucket path as ``s3://{YOUR_REPOSITORY_NAME}`` or ``gs://{YOUR_REPOSITORY_NAME}`` respectively.

If use S3 or GCS, please set credential information to Environment Variables.
.. code:: sh

pip install gokart[s3] # for S3 support
pip install gokart[gcs] # for GCS support
pip install gokart[all] # for both S3 and GCS

Also, please set credential information to Environment Variables.

.. code:: sh

Expand Down
19 changes: 16 additions & 3 deletions gokart/gcs_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

import json
import os
from typing import cast
from typing import TYPE_CHECKING, cast

import luigi
import luigi.contrib.gcs
from google.oauth2.service_account import Credentials

if TYPE_CHECKING:
import luigi.contrib.gcs
from google.oauth2.service_account import Credentials


class GCSConfig(luigi.Config):
Expand All @@ -19,9 +21,20 @@ def get_gcs_client(self) -> luigi.contrib.gcs.GCSClient:
return self._client

def _get_gcs_client(self) -> luigi.contrib.gcs.GCSClient:
try:
import googleapiclient # noqa: F401
except ImportError:
raise ImportError('GCS support requires additional dependencies. Install them with: pip install gokart[gcs]') from None
import luigi.contrib.gcs

return luigi.contrib.gcs.GCSClient(oauth_credentials=self._load_oauth_credentials())

def _load_oauth_credentials(self) -> Credentials | None:
try:
from google.oauth2.service_account import Credentials
except ImportError:
raise ImportError('GCS support requires additional dependencies. Install them with: pip install gokart[gcs]') from None

json_str = os.environ.get(self.gcs_credential_name)
if not json_str:
return None
Expand Down
12 changes: 9 additions & 3 deletions gokart/gcs_obj_metadata_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from typing import Any, Final
from urllib.parse import urlsplit

from googleapiclient.model import makepatch

from gokart.gcs_config import GCSConfig
from gokart.required_task_output import RequiredTaskOutput
from gokart.utils import FlattenableItems
Expand Down Expand Up @@ -39,6 +37,14 @@ def _path_to_bucket_and_key(path: str) -> tuple[str, str]:
path_without_initial_slash = path[1:]
return netloc, path_without_initial_slash

@staticmethod
def _makepatch(original: dict[str, Any], modified: dict[str, Any]) -> dict[str, Any]:
try:
from googleapiclient.model import makepatch
except ImportError:
raise ImportError('GCS support requires additional dependencies. Install them with: pip install gokart[gcs]') from None
return dict(makepatch(original, modified))

@staticmethod
def add_task_state_labels(
path: str,
Expand Down Expand Up @@ -78,7 +84,7 @@ def add_task_state_labels(
.patch(
bucket=bucket,
object=obj,
body=makepatch({'metadata': original_metadata}, {'metadata': patched_metadata}),
body=GCSObjectMetadataClient._makepatch({'metadata': original_metadata}, {'metadata': patched_metadata}),
)
.execute()
)
Expand Down
3 changes: 2 additions & 1 deletion gokart/gcs_zip_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import shutil
from typing import cast

from gokart.gcs_config import GCSConfig
from gokart.zip_client import ZipClient, _unzip_file


class GCSZipClient(ZipClient):
def __init__(self, file_path: str, temporary_directory: str) -> None:
from gokart.gcs_config import GCSConfig

self._file_path = file_path
self._temporary_directory = temporary_directory
self._client = GCSConfig().get_gcs_client()
Expand Down
42 changes: 36 additions & 6 deletions gokart/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@
from typing import cast

import luigi
import luigi.contrib.gcs
import luigi.contrib.s3
from luigi.format import Format

from gokart.gcs_config import GCSConfig
from gokart.gcs_zip_client import GCSZipClient
from gokart.s3_config import S3Config
from gokart.s3_zip_client import S3ZipClient
from gokart.zip_client import ZipClient

object_storage_path_prefix = ['s3://', 'gs://']
Expand All @@ -28,26 +22,52 @@ def if_object_storage_path(path: str) -> bool:
@staticmethod
def get_object_storage_target(path: str, format: Format) -> luigi.target.FileSystemTarget:
if path.startswith('s3://'):
try:
import boto3 # noqa: F401
except ImportError:
raise ImportError('S3 support requires additional dependencies. Install them with: pip install gokart[s3]') from None

import luigi.contrib.s3

from gokart.s3_config import S3Config

return luigi.contrib.s3.S3Target(path, client=S3Config().get_s3_client(), format=format)
elif path.startswith('gs://'):
try:
import googleapiclient # noqa: F401
except ImportError:
raise ImportError('GCS support requires additional dependencies. Install them with: pip install gokart[gcs]') from None

import luigi.contrib.gcs

from gokart.gcs_config import GCSConfig

return luigi.contrib.gcs.GCSTarget(path, client=GCSConfig().get_gcs_client(), format=format)
else:
raise

@staticmethod
def exists(path: str) -> bool:
if path.startswith('s3://'):
from gokart.s3_config import S3Config

return cast(bool, S3Config().get_s3_client().exists(path))
elif path.startswith('gs://'):
from gokart.gcs_config import GCSConfig

return cast(bool, GCSConfig().get_gcs_client().exists(path))
else:
raise

@staticmethod
def get_timestamp(path: str) -> datetime:
if path.startswith('s3://'):
from gokart.s3_config import S3Config

return cast(datetime, S3Config().get_s3_client().get_key(path).last_modified)
elif path.startswith('gs://'):
from gokart.gcs_config import GCSConfig

# for gcs object
# should PR to luigi
bucket, obj = GCSConfig().get_gcs_client()._path_to_bucket_and_key(path)
Expand All @@ -59,12 +79,22 @@ def get_timestamp(path: str) -> datetime:
@staticmethod
def get_zip_client(file_path: str, temporary_directory: str) -> ZipClient:
if file_path.startswith('s3://'):
from gokart.s3_zip_client import S3ZipClient

return S3ZipClient(file_path=file_path, temporary_directory=temporary_directory)
elif file_path.startswith('gs://'):
from gokart.gcs_zip_client import GCSZipClient

return GCSZipClient(file_path=file_path, temporary_directory=temporary_directory)
else:
raise

@staticmethod
def is_buffered_reader(file: object) -> bool:
try:
import boto3 # noqa: F401
except ImportError:
return True
import luigi.contrib.s3

return not isinstance(file, luigi.contrib.s3.ReadableS3File)
11 changes: 10 additions & 1 deletion gokart/s3_config.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from __future__ import annotations

import os
from typing import TYPE_CHECKING

import luigi
import luigi.contrib.s3

if TYPE_CHECKING:
import luigi.contrib.s3


class S3Config(luigi.Config):
Expand All @@ -18,6 +21,12 @@ def get_s3_client(self) -> luigi.contrib.s3.S3Client:
return self._client

def _get_s3_client(self) -> luigi.contrib.s3.S3Client:
try:
import boto3 # noqa: F401
except ImportError:
raise ImportError('S3 support requires additional dependencies. Install them with: pip install gokart[s3]') from None
import luigi.contrib.s3

return luigi.contrib.s3.S3Client(
aws_access_key_id=os.environ.get(self.aws_access_key_id_name), aws_secret_access_key=os.environ.get(self.aws_secret_access_key_name)
)
3 changes: 2 additions & 1 deletion gokart/s3_zip_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
import shutil
from typing import cast

from gokart.s3_config import S3Config
from gokart.zip_client import ZipClient, _unzip_file


class S3ZipClient(ZipClient):
def __init__(self, file_path: str, temporary_directory: str) -> None:
from gokart.s3_config import S3Config

self._file_path = file_path
self._temporary_directory = temporary_directory
self._client = S3Config().get_s3_client()
Expand Down
9 changes: 6 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,10 @@ readme = "README.md"
requires-python = ">=3.10, <4"
dependencies = [
"luigi>=3.8.0",
"boto3",
"slack-sdk",
"pandas",
"numpy",
"google-auth",
"pyarrow",
"google-api-python-client",
"APScheduler",
"redis",
"dill",
Expand All @@ -39,6 +36,9 @@ classifiers = [
dynamic = ["version"]

[project.optional-dependencies]
s3 = ["boto3"]
gcs = ["google-auth", "google-api-python-client"]
all = ["gokart[s3]", "gokart[gcs]"]
polars = ["polars>=0.19.0"]

[project.urls]
Expand All @@ -48,6 +48,9 @@ Documentation = "https://gokart.readthedocs.io/en/latest/"

[dependency-groups]
test = [
"boto3",
"google-auth",
"google-api-python-client",
"fakeredis",
"lupa",
"matplotlib",
Expand Down
Loading
Loading