Skip to content

Commit 08bc0f4

Browse files
authored
Update ObjectStoragePath for universal_pathlib>=v0.2.1 (#37524)
This updates ObjectStoragePath to be compatible with universal_pathlib >= 0.2.1 which in turn makes it compatible with Python 3.12+.
1 parent 011cd3d commit 08bc0f4

5 files changed

Lines changed: 154 additions & 165 deletions

File tree

airflow/io/path.py

Lines changed: 59 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,20 @@
1717
from __future__ import annotations
1818

1919
import contextlib
20-
import functools
2120
import os
2221
import shutil
2322
import typing
24-
from pathlib import PurePath
23+
from typing import Any, Mapping
2524
from urllib.parse import urlsplit
2625

27-
from fsspec.core import split_protocol
2826
from fsspec.utils import stringify_path
29-
from upath.implementations.cloud import CloudPath, _CloudAccessor
27+
from upath.implementations.cloud import CloudPath
3028
from upath.registry import get_upath_class
3129

3230
from airflow.io.store import attach
3331
from airflow.io.utils.stat import stat_result
3432

3533
if typing.TYPE_CHECKING:
36-
from urllib.parse import SplitResult
37-
3834
from fsspec import AbstractFileSystem
3935

4036

@@ -43,124 +39,68 @@
4339
default = "file"
4440

4541

46-
class _AirflowCloudAccessor(_CloudAccessor):
47-
__slots__ = ("_store",)
48-
49-
def __init__(
50-
self,
51-
parsed_url: SplitResult | None,
52-
conn_id: str | None = None,
53-
**kwargs: typing.Any,
54-
) -> None:
55-
# warning: we are not calling super().__init__ here
56-
# as it will try to create a new fs from a different
57-
# set if registered filesystems
58-
if parsed_url and parsed_url.scheme:
59-
self._store = attach(parsed_url.scheme, conn_id)
60-
else:
61-
self._store = attach("file", conn_id)
62-
63-
@property
64-
def _fs(self) -> AbstractFileSystem:
65-
return self._store.fs
66-
67-
def __eq__(self, other):
68-
return isinstance(other, _AirflowCloudAccessor) and self._store == other._store
69-
70-
7142
class ObjectStoragePath(CloudPath):
7243
"""A path-like object for object storage."""
7344

74-
_accessor: _AirflowCloudAccessor
75-
7645
__version__: typing.ClassVar[int] = 1
7746

78-
_default_accessor = _AirflowCloudAccessor
47+
_protocol_dispatch = False
7948

8049
sep: typing.ClassVar[str] = "/"
8150
root_marker: typing.ClassVar[str] = "/"
8251

83-
_bucket: str
84-
_key: str
85-
_protocol: str
86-
_hash: int | None
87-
88-
__slots__ = (
89-
"_bucket",
90-
"_key",
91-
"_conn_id",
92-
"_protocol",
93-
"_hash",
94-
)
95-
96-
def __new__(
97-
cls: type[PT],
98-
*args: str | os.PathLike,
99-
scheme: str | None = None,
100-
conn_id: str | None = None,
101-
**kwargs: typing.Any,
102-
) -> PT:
103-
args_list = list(args)
104-
105-
if args_list:
106-
other = args_list.pop(0) or "."
107-
else:
108-
other = "."
109-
110-
if isinstance(other, PurePath):
111-
_cls: typing.Any = type(other)
112-
drv, root, parts = _cls._parse_args(args_list)
113-
drv, root, parts = _cls._flavour.join_parsed_parts(
114-
other._drv, # type: ignore[attr-defined]
115-
other._root, # type: ignore[attr-defined]
116-
other._parts, # type: ignore[attr-defined]
117-
drv,
118-
root,
119-
parts, # type: ignore
120-
)
121-
122-
_kwargs = getattr(other, "_kwargs", {})
123-
_url = getattr(other, "_url", None)
124-
other_kwargs = _kwargs.copy()
125-
if _url and _url.scheme:
126-
other_kwargs["url"] = _url
127-
new_kwargs = _kwargs.copy()
128-
new_kwargs.update(kwargs)
129-
130-
return _cls(_cls._format_parsed_parts(drv, root, parts, **other_kwargs), **new_kwargs)
131-
132-
url = stringify_path(other)
133-
parsed_url: SplitResult = urlsplit(url)
134-
135-
if scheme: # allow override of protocol
136-
parsed_url = parsed_url._replace(scheme=scheme)
137-
138-
if not parsed_url.path: # ensure path has root
139-
parsed_url = parsed_url._replace(path="/")
140-
141-
if not parsed_url.scheme and not split_protocol(url)[0]:
142-
args_list.insert(0, url)
143-
else:
144-
args_list.insert(0, parsed_url.path)
52+
__slots__ = ("_hash_cached",)
53+
54+
@classmethod
55+
def _transform_init_args(
56+
cls,
57+
args: tuple[str | os.PathLike, ...],
58+
protocol: str,
59+
storage_options: dict[str, Any],
60+
) -> tuple[tuple[str | os.PathLike, ...], str, dict[str, Any]]:
61+
"""Extract conn_id from the URL and set it as a storage option."""
62+
if args:
63+
arg0 = args[0]
64+
parsed_url = urlsplit(stringify_path(arg0))
65+
userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
66+
if have_info:
67+
storage_options.setdefault("conn_id", userinfo or None)
68+
parsed_url = parsed_url._replace(netloc=hostinfo)
69+
args = (parsed_url.geturl(),) + args[1:]
70+
protocol = protocol or parsed_url.scheme
71+
return args, protocol, storage_options
14572

146-
# This matches the parsing logic in urllib.parse; see:
147-
# https://github.com/python/cpython/blob/46adf6b701c440e047abf925df9a75a/Lib/urllib/parse.py#L194-L203
148-
userinfo, have_info, hostinfo = parsed_url.netloc.rpartition("@")
149-
if have_info:
150-
conn_id = conn_id or userinfo or None
151-
parsed_url = parsed_url._replace(netloc=hostinfo)
73+
@classmethod
74+
def _parse_storage_options(
75+
cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
76+
) -> dict[str, Any]:
77+
fs = attach(protocol or "file", conn_id=storage_options.get("conn_id")).fs
78+
pth_storage_options = type(fs)._get_kwargs_from_urls(urlpath)
79+
return {**pth_storage_options, **storage_options}
15280

153-
return cls._from_parts(args_list, url=parsed_url, conn_id=conn_id, **kwargs) # type: ignore
81+
@classmethod
82+
def _fs_factory(
83+
cls, urlpath: str, protocol: str, storage_options: Mapping[str, Any]
84+
) -> AbstractFileSystem:
85+
return attach(protocol or "file", storage_options.get("conn_id")).fs
15486

155-
@functools.lru_cache
15687
def __hash__(self) -> int:
157-
return hash(str(self))
88+
self._hash_cached: int
89+
try:
90+
return self._hash_cached
91+
except AttributeError:
92+
self._hash_cached = hash(str(self))
93+
return self._hash_cached
15894

15995
def __eq__(self, other: typing.Any) -> bool:
16096
return self.samestore(other) and str(self) == str(other)
16197

16298
def samestore(self, other: typing.Any) -> bool:
163-
return isinstance(other, ObjectStoragePath) and self._accessor == other._accessor
99+
return (
100+
isinstance(other, ObjectStoragePath)
101+
and self.protocol == other.protocol
102+
and self.storage_options.get("conn_id") == other.storage_options.get("conn_id")
103+
)
164104

165105
@property
166106
def container(self) -> str:
@@ -186,12 +126,17 @@ def key(self) -> str:
186126
def namespace(self) -> str:
187127
return f"{self.protocol}://{self.bucket}" if self.bucket else self.protocol
188128

129+
def open(self, mode="r", **kwargs):
130+
"""Open the file pointed to by this path."""
131+
kwargs.setdefault("block_size", kwargs.pop("buffering", None))
132+
return self.fs.open(self.path, mode=mode, **kwargs)
133+
189134
def stat(self) -> stat_result: # type: ignore[override]
190135
"""Call ``stat`` and return the result."""
191136
return stat_result(
192-
self._accessor.stat(self),
137+
self.fs.stat(self.path),
193138
protocol=self.protocol,
194-
conn_id=self._accessor._store.conn_id,
139+
conn_id=self.storage_options.get("conn_id"),
195140
)
196141

197142
def samefile(self, other_path: typing.Any) -> bool:
@@ -368,7 +313,11 @@ def copy(self, dst: str | ObjectStoragePath, recursive: bool = False, **kwargs)
368313
if path == self.path:
369314
continue
370315

371-
src_obj = ObjectStoragePath(path, conn_id=self._accessor._store.conn_id)
316+
src_obj = ObjectStoragePath(
317+
path,
318+
protocol=self.protocol,
319+
conn_id=self.storage_options.get("conn_id"),
320+
)
372321

373322
# skip directories, empty directories will not be created
374323
if src_obj.is_dir():
@@ -401,7 +350,7 @@ def move(self, path: str | ObjectStoragePath, recursive: bool = False, **kwargs)
401350
self.unlink()
402351

403352
def serialize(self) -> dict[str, typing.Any]:
404-
_kwargs = self._kwargs.copy()
353+
_kwargs = {**self.storage_options}
405354
conn_id = _kwargs.pop("conn_id", None)
406355

407356
return {

airflow/providers/common/io/xcom/backend.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ def serialize_value(
132132
if not p.parent.exists():
133133
p.parent.mkdir(parents=True, exist_ok=True)
134134

135-
with p.open("wb", compression=compression) as f:
135+
with p.open(mode="wb", compression=compression) as f:
136136
f.write(s_val)
137137

138138
return BaseXCom.serialize_value(str(p))
@@ -152,7 +152,7 @@ def deserialize_value(
152152

153153
try:
154154
p = ObjectStoragePath(path) / XComObjectStoreBackend._get_key(data)
155-
return json.load(p.open("rb", compression="infer"), cls=XComDecoder)
155+
return json.load(p.open(mode="rb", compression="infer"), cls=XComDecoder)
156156
except TypeError:
157157
return data
158158
except ValueError:

pyproject.toml

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,7 @@ dependencies = [
148148
# We should also remove "licenses/LICENSE-unicodecsv.txt" file when we remove this dependency
149149
"unicodecsv>=0.14.1",
150150
# The Universal Pathlib provides Pathlib-like interface for FSSPEC
151-
# In 0.1. *It was not very well defined for extension, so the way how we use it for 0.1.*
152-
# so we used a lot of private methods and attributes that were not defined in the interface
153-
# an they are broken with version 0.2.0 which is much better suited for extension and supports
154-
# Python 3.12. We should limit it, unti we migrate to 0.2.0
155-
# See: https://github.com/fsspec/universal_pathlib/pull/173#issuecomment-1937090528
156-
# This is prerequistite to make Airflow compatible with Python 3.12
157-
# Tracked in https://github.com/apache/airflow/pull/36755
158-
"universal-pathlib>=0.1.4,<0.2.0",
151+
"universal-pathlib>=0.2.1",
159152
# Werkzug 3 breaks Flask-Login 0.6.2, also connexion needs to be updated to >= 3.0
160153
# we should remove this limitation when FAB supports Flask 2.3 and we migrate connexion to 3+
161154
"werkzeug>=2.0,<3",

0 commit comments

Comments
 (0)