Skip to content

Commit 49c7b5a

Browse files
committed
Modified AAOW and AWOS constuctors to include blob object
1 parent 281eaae commit 49c7b5a

File tree

3 files changed

+93
-2
lines changed

3 files changed

+93
-2
lines changed

packages/google-cloud-storage/google/cloud/storage/asyncio/async_appendable_object_writer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from google.cloud import _storage_v2
2525
from google.cloud._storage_v2.types import BidiWriteObjectRedirectedError
2626
from google.cloud._storage_v2.types.storage import BidiWriteObjectRequest
27+
from google.cloud.storage import Blob
2728
from google.cloud.storage.asyncio.async_grpc_client import (
2829
AsyncGrpcClient,
2930
)
@@ -107,6 +108,7 @@ def __init__(
107108
client: AsyncGrpcClient,
108109
bucket_name: str,
109110
object_name: str,
111+
blob: Optional[Blob] = None,
110112
generation: Optional[int] = None,
111113
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
112114
writer_options: Optional[dict] = None,
@@ -185,6 +187,7 @@ def __init__(
185187
self.object_name = object_name
186188
self.write_handle = write_handle
187189
self.generation = generation
190+
self.blob = blob
188191

189192
self.write_obj_stream: Optional[_AsyncWriteObjectStream] = None
190193
self._is_stream_open: bool = False
@@ -297,6 +300,7 @@ async def _do_open():
297300
client=self.client.grpc_client,
298301
bucket_name=self.bucket_name,
299302
object_name=self.object_name,
303+
blob=self.blob,
300304
generation_number=self.generation,
301305
write_handle=self.write_handle,
302306
routing_token=self._routing_token,

packages/google-cloud-storage/google/cloud/storage/asyncio/async_write_object_stream.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from google.api_core.bidi_async import AsyncBidiRpc
1919

2020
from google.cloud import _storage_v2
21+
from google.cloud.storage import Blob
2122
from google.cloud.storage.asyncio import _utils
2223
from google.cloud.storage.asyncio.async_abstract_object_stream import (
2324
_AsyncAbstractObjectStream,
@@ -64,6 +65,7 @@ def __init__(
6465
client: AsyncGrpcClient.grpc_client,
6566
bucket_name: str,
6667
object_name: str,
68+
blob: Optional[Blob] = None,
6769
generation_number: Optional[int] = None, # None means new object
6870
write_handle: Optional[_storage_v2.BidiWriteHandle] = None,
6971
routing_token: Optional[str] = None,
@@ -83,7 +85,7 @@ def __init__(
8385
self.client: AsyncGrpcClient.grpc_client = client
8486
self.write_handle: Optional[_storage_v2.BidiWriteHandle] = write_handle
8587
self.routing_token: Optional[str] = routing_token
86-
88+
self.blob: Optional[Blob] = blob
8789
self._full_bucket_name = f"projects/_/buckets/{self.bucket_name}"
8890

8991
self.rpc = self.client._client._transport._wrapped_methods[
@@ -118,10 +120,19 @@ async def open(self, metadata: Optional[List[Tuple[str, str]]] = None) -> None:
118120
# if `generation_number` == 0 new object will be created only if there
119121
# isn't any existing object.
120122
if self.generation_number is None or self.generation_number == 0:
123+
resource_params = {
124+
"name": self.object_name,
125+
"bucket": self._full_bucket_name,
126+
}
127+
if self.blob:
128+
if self.blob.content_type:
129+
resource_params["content_type"] = self.blob.content_type
130+
if self.blob.metadata:
131+
resource_params["metadata"] = self.blob.metadata
121132
self.first_bidi_write_req = _storage_v2.BidiWriteObjectRequest(
122133
write_object_spec=_storage_v2.WriteObjectSpec(
123134
resource=_storage_v2.Object(
124-
name=self.object_name, bucket=self._full_bucket_name
135+
**resource_params
125136
),
126137
appendable=True,
127138
if_generation_match=self.generation_number,
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Copyright 2026 Google Inc. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the 'License');
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import argparse
16+
import asyncio
17+
18+
from google.cloud.storage.asyncio.async_appendable_object_writer import (
19+
AsyncAppendableObjectWriter,
20+
)
21+
from google.cloud.storage import Blob
22+
from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
23+
24+
25+
async def storage_create_and_write_appendable_object(
26+
bucket_name, object_name, grpc_client=None
27+
):
28+
"""Uploads an appendable object to zonal bucket.
29+
30+
grpc_client: an existing grpc_client to use, this is only for testing.
31+
"""
32+
33+
if grpc_client is None:
34+
grpc_client = AsyncGrpcClient()
35+
blob = Blob.from_uri("gs://{}/{}".format(bucket_name, object_name))
36+
blob.content_type = "text/plain"
37+
writer = AsyncAppendableObjectWriter(
38+
client=grpc_client,
39+
bucket_name=bucket_name,
40+
object_name=object_name,
41+
blob=blob,
42+
generation=0, # throws `FailedPrecondition` if object already exists.
43+
)
44+
# This creates a new appendable object of size 0 and opens it for appending.
45+
await writer.open()
46+
47+
# appends data to the object
48+
# you can perform `.append` multiple times as needed. Data will be appended
49+
# to the end of the object.
50+
await writer.append(b"Some data")
51+
52+
# Once all appends are done, close the gRPC bidirectional stream.
53+
new_object = await writer.close(finalize_on_close=True)
54+
print(new_object)
55+
print(new_object.size)
56+
print(new_object.content_type)
57+
print(
58+
f"Appended object {object_name} created of size {writer.persisted_size} bytes."
59+
)
60+
61+
62+
if __name__ == "__main__":
63+
parser = argparse.ArgumentParser(
64+
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
65+
)
66+
parser.add_argument("--bucket_name", help="Your Cloud Storage bucket name.")
67+
parser.add_argument("--object_name", help="Your Cloud Storage object name.")
68+
69+
args = parser.parse_args()
70+
71+
asyncio.run(
72+
storage_create_and_write_appendable_object(
73+
bucket_name=args.bucket_name,
74+
object_name=args.object_name,
75+
)
76+
)

0 commit comments

Comments
 (0)