Skip to content

Commit 3271831

Browse files
authored
feat(storage): Add support for blob object in AAOW (#16577)
Add support for adding custom metadata, content type and kms key name in appendable objects
1 parent aa43c83 commit 3271831

6 files changed

Lines changed: 288 additions & 5 deletions

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
# Copyright 2026 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from google.cloud import _storage_v2
16+
17+
# Map Python Blob attributes to GCS V2 Object proto field names.
18+
_BLOB_ATTR_TO_PROTO_FIELD = {
19+
"content_type": "content_type",
20+
"metadata": "metadata",
21+
"kms_key_name": "kms_key",
22+
}
23+
24+
25+
def blob_to_proto(blob):
26+
"""Converts a Blob instance to a GCS V2 Object proto message."""
27+
28+
resource_params = {
29+
"name": blob.name,
30+
}
31+
32+
if blob.bucket:
33+
resource_params["bucket"] = f"projects/_/buckets/{blob.bucket.name}"
34+
35+
for attr_name, proto_field in _BLOB_ATTR_TO_PROTO_FIELD.items():
36+
value = getattr(blob, attr_name, None)
37+
if value is not None:
38+
resource_params[proto_field] = value
39+
40+
return _storage_v2.Object(**resource_params)

packages/google-cloud-storage/google/cloud/storage/asyncio/async_appendable_object_writer.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from google.cloud import _storage_v2
2525
from google.cloud._storage_v2.types import BidiWriteObjectRedirectedError
2626
from google.cloud._storage_v2.types.storage import BidiWriteObjectRequest
27+
from google.cloud.storage import Blob
2728
from google.cloud.storage.asyncio.async_grpc_client import (
2829
AsyncGrpcClient,
2930
)
@@ -211,6 +212,60 @@ def __init__(
211212
self._routing_token: Optional[str] = None
212213
self.object_resource: Optional[_storage_v2.Object] = None
213214
self._flush_count = 0
215+
self.blob: Optional[Blob] = None
216+
217+
@classmethod
218+
def from_blob(
219+
cls,
220+
client: AsyncGrpcClient,
221+
blob: Blob,
222+
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
223+
writer_options: Optional[dict] = None,
224+
) -> "AsyncAppendableObjectWriter":
225+
"""Creates an AsyncAppendableObjectWriter from an existing Blob object.
226+
227+
This factory method extracts the bucket and object names directly from
228+
the provided blob instance.
229+
230+
.. code-block:: python
231+
232+
from google.cloud.storage.bucket import Bucket
233+
from google.cloud.storage.blob import Blob
234+
235+
bucket = Bucket(client, name="my-bucket")
236+
blob = Blob(name="my-object.txt", bucket=bucket)
237+
238+
writer = AsyncAppendableObjectWriter.from_blob(
239+
client=client,
240+
blob=blob
241+
)
242+
243+
:type client: :class:`~google.cloud.storage.client.AsyncGrpcClient`
244+
:param client: The async gRPC client to use for write operations.
245+
246+
:type blob: :class:`~google.cloud.storage.blob.Blob`
247+
:param blob: The blob instance providing the target path.
248+
249+
:type write_handle: :class:`~google.storage.v2.BidiWriteHandle`
250+
:param write_handle: (Optional) An existing BidiWriteHandle to resume a session.
251+
252+
:type writer_options: dict
253+
:param writer_options: (Optional) Configuration settings for the underlying
254+
appendable writer.
255+
256+
:rtype: :class:`AsyncAppendableObjectWriter`
257+
:returns: An initialized writer instance.
258+
"""
259+
instance = cls(
260+
client=client,
261+
bucket_name=blob.bucket.name,
262+
object_name=blob.name,
263+
generation=blob.generation,
264+
write_handle=write_handle,
265+
writer_options=writer_options,
266+
)
267+
instance.blob = blob
268+
return instance
214269

215270
async def state_lookup(self) -> int:
216271
"""Returns the persisted_size
@@ -297,6 +352,7 @@ async def _do_open():
297352
client=self.client.grpc_client,
298353
bucket_name=self.bucket_name,
299354
object_name=self.object_name,
355+
blob=self.blob,
300356
generation_number=self.generation,
301357
write_handle=self.write_handle,
302358
routing_token=self._routing_token,

packages/google-cloud-storage/google/cloud/storage/asyncio/async_write_object_stream.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
from google.api_core.bidi_async import AsyncBidiRpc
1919

2020
from google.cloud import _storage_v2
21+
from google.cloud.storage import Blob
22+
from google.cloud.storage import _grpc_conversions
2123
from google.cloud.storage.asyncio import _utils
2224
from google.cloud.storage.asyncio.async_abstract_object_stream import (
2325
_AsyncAbstractObjectStream,
@@ -67,6 +69,7 @@ def __init__(
6769
generation_number: Optional[int] = None, # None means new object
6870
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
6971
routing_token: Optional[str] = None,
72+
blob: Optional[Blob] = None,
7073
) -> None:
7174
if client is None:
7275
raise ValueError("client must be provided")
@@ -83,7 +86,7 @@ def __init__(
8386
self.client: AsyncGrpcClient.grpc_client = client
8487
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
8588
self.routing_token: Optional[str] = routing_token
86-
89+
self.blob: Optional[Blob] = blob
8790
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
8891

8992
self.rpc = self.client._client._transport._wrapped_methods[
@@ -118,11 +121,15 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
118121
# if `generation_number` == 0 new object will be created only if there
119122
# isn't any existing object.
120123
if self.generation_number is None or self.generation_number == 0:
124+
if self.blob:
125+
resource = _grpc_conversions.blob_to_proto(self.blob)
126+
else:
127+
resource = _storage_v2.Object(
128+
name=self.object_name, bucket=self._full_bucket_name
129+
)
121130
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
122131
write_object_spec=_storage_v2.WriteObjectSpec(
123-
resource=_storage_v2.Object(
124-
name=self.object_name, bucket=self._full_bucket_name
125-
),
132+
resource=resource,
126133
appendable=True,
127134
if_generation_match=self.generation_number,
128135
),

packages/google-cloud-storage/tests/system/test_zonal.py

Lines changed: 131 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818

1919
# current library imports
20+
from google.cloud import kms
2021
from google.cloud.storage.asyncio.async_grpc_client import AsyncGrpcClient
2122
from google.cloud.storage.asyncio.async_multi_range_downloader import (
2223
AsyncMultiRangeDownloader,
@@ -29,7 +30,7 @@
2930

3031

3132
# TODO: replace this with a fixture once zonal bucket creation / deletion
32-
# is supported in grpc client or json client client.
33+
# is supported in grpc client or json client.
3334
_ZONAL_BUCKET = os.getenv("ZONAL_BUCKET")
3435
_CROSS_REGION_BUCKET = os.getenv("CROSS_REGION_BUCKET")
3536
_BYTES_TO_UPLOAD = b"dummy_bytes_to_write_read_and_delete_appendable_object"
@@ -40,6 +41,58 @@ async def create_async_grpc_client(attempt_direct_path=True):
4041
return AsyncGrpcClient(attempt_direct_path=attempt_direct_path)
4142

4243

44+
@pytest.fixture(scope="session")
45+
def zonal_kms_key(storage_client, kms_client):
46+
"""Provisions a KMS key in the same location as of the zonal bucket."""
47+
# Get the zonal bucket and extract its location
48+
bucket = storage_client.get_bucket(_ZONAL_BUCKET)
49+
location = bucket.location.lower()
50+
51+
project = storage_client.project
52+
keyring_name = "gcs-test-zonal-ring"
53+
key_name = "gcs-test-zonal-key"
54+
55+
keyring_path = kms_client.key_ring_path(project, location, keyring_name)
56+
57+
# Create the KeyRing if it doesn't exist
58+
try:
59+
kms_client.get_key_ring(name=keyring_path)
60+
except NotFound:
61+
parent = f"projects/{project}/locations/{location}"
62+
kms_client.create_key_ring(
63+
request={"parent": parent, "key_ring_id": keyring_name, "key_ring": {}}
64+
)
65+
66+
# Grant GCS service account permissions to use the key
67+
service_account_email = storage_client.get_service_account_email()
68+
policy = {
69+
"bindings": [
70+
{
71+
"role": "roles/cloudkms.cryptoKeyEncrypterDecrypter",
72+
"members": [f"serviceAccount:{service_account_email}"],
73+
}
74+
]
75+
}
76+
kms_client.set_iam_policy(request={"resource": keyring_path, "policy": policy})
77+
78+
# Create the CryptoKey if it doesn't exist
79+
key_path = kms_client.crypto_key_path(project, location, keyring_name, key_name)
80+
try:
81+
kms_client.get_crypto_key(name=key_path)
82+
except NotFound:
83+
purpose = kms.CryptoKey.CryptoKeyPurpose.ENCRYPT_DECRYPT
84+
key = {"purpose": purpose}
85+
kms_client.create_crypto_key(
86+
request={
87+
"parent": keyring_path,
88+
"crypto_key_id": key_name,
89+
"crypto_key": key,
90+
}
91+
)
92+
93+
return key_path
94+
95+
4396
@pytest.fixture(scope="session")
4497
def event_loop():
4598
"""Redefine pytest-asyncio's event_loop fixture to be session-scoped."""
@@ -286,6 +339,83 @@ async def _run():
286339
event_loop.run_until_complete(_run())
287340

288341

342+
def test_write_from_blob(
343+
storage_client,
344+
blobs_to_delete,
345+
event_loop,
346+
grpc_client,
347+
):
348+
object_name = f"test_from_blob-{str(uuid.uuid4())[:4]}"
349+
content_type = "text/plain"
350+
metadata = {"environment": "system-test"}
351+
test_data = b"system-test-data"
352+
353+
async def _run():
354+
# 1. Create a Blob instance
355+
blob = storage_client.bucket(_ZONAL_BUCKET).blob(object_name)
356+
blob.content_type = content_type
357+
blob.metadata = metadata
358+
359+
# 2. Use from_blob to create the writer
360+
writer = AsyncAppendableObjectWriter.from_blob(grpc_client, blob)
361+
await writer.open()
362+
await writer.append(test_data)
363+
await writer.close(finalize_on_close=True)
364+
365+
# 3. Verify the object metadata
366+
obj = await grpc_client.get_object(
367+
bucket_name=_ZONAL_BUCKET,
368+
object_name=object_name,
369+
)
370+
371+
assert obj.content_type == content_type
372+
assert obj.metadata["environment"] == "system-test"
373+
374+
blobs_to_delete.append(blob)
375+
376+
event_loop.run_until_complete(_run())
377+
378+
379+
def test_write_from_blob_with_kms_key(
380+
storage_client,
381+
blobs_to_delete,
382+
event_loop,
383+
grpc_client,
384+
zonal_kms_key,
385+
):
386+
"""Verifies AsyncAppendableObjectWriter.from_blob correctly applies KMS encryption."""
387+
388+
object_name = f"test_from_blob_kms-{str(uuid.uuid4())[:4]}"
389+
test_data = b"kms-protected-data"
390+
391+
async def _run():
392+
# Create a local Blob instance with the KMS key
393+
blob = storage_client.bucket(_ZONAL_BUCKET).blob(
394+
object_name, kms_key_name=zonal_kms_key
395+
)
396+
397+
writer = AsyncAppendableObjectWriter.from_blob(grpc_client, blob)
398+
399+
await writer.open()
400+
await writer.append(test_data)
401+
402+
await writer.close(finalize_on_close=True)
403+
404+
# Verify the encryption metadata
405+
obj = await grpc_client.get_object(
406+
bucket_name=_ZONAL_BUCKET,
407+
object_name=object_name,
408+
)
409+
410+
# Assert that the object was encrypted with the correct key
411+
# GCS appends a version suffix, so we use startswith()
412+
assert obj.kms_key.startswith(zonal_kms_key)
413+
414+
blobs_to_delete.append(blob)
415+
416+
event_loop.run_until_complete(_run())
417+
418+
289419
def test_read_unfinalized_appendable_object(
290420
storage_client, blobs_to_delete, event_loop, grpc_client_direct
291421
):

packages/google-cloud-storage/tests/unit/asyncio/test_async_appendable_object_writer.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import pytest
2020
from google.api_core import exceptions
2121
from google.rpc import status_pb2
22+
from google.cloud.storage import Blob
2223

2324
from google.cloud._storage_v2.types import storage as storage_type
2425
from google.cloud._storage_v2.types.storage import BidiWriteObjectRedirectedError
@@ -166,6 +167,24 @@ def test_init_raises_if_crc32c_missing(self, mock_appendable_writer):
166167
with pytest.raises(exceptions.FailedPrecondition):
167168
self._make_one(mock_appendable_writer["mock_client"])
168169

170+
def test_from_blob(self, mock_appendable_writer):
171+
mock_blob = mock.Mock(spec=Blob)
172+
mock_blob.name = OBJECT
173+
mock_blob.bucket.name = BUCKET
174+
mock_blob.generation = GENERATION
175+
176+
writer = AsyncAppendableObjectWriter.from_blob(
177+
mock_appendable_writer["mock_client"],
178+
mock_blob,
179+
writer_options={"FLUSH_INTERVAL_BYTES": EIGHT_MIB},
180+
)
181+
182+
assert writer.bucket_name == BUCKET
183+
assert writer.object_name == OBJECT
184+
assert writer.generation == GENERATION
185+
assert writer.flush_interval == EIGHT_MIB
186+
assert writer.blob == mock_blob
187+
169188
# -------------------------------------------------------------------------
170189
# Stream Lifecycle Tests
171190
# -------------------------------------------------------------------------

0 commit comments

Comments
 (0)