Skip to content

Commit a1e00b4

Browse files
committed
adding signed ingestion
1 parent b29539f commit a1e00b4

5 files changed

Lines changed: 103 additions & 1 deletion

File tree

alephclient/api.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import importlib.metadata
22
import json
3+
import mimetypes
34
import uuid
45
import logging
56
from itertools import count
@@ -471,6 +472,7 @@ def ingest_upload(
471472
metadata: Optional[Dict] = None,
472473
sync: bool = False,
473474
index: bool = True,
475+
signed_url: bool = False,
474476
) -> Dict:
475477
"""
476478
Create an empty folder in a collection or upload a document to it
@@ -483,6 +485,9 @@ def ingest_upload(
483485
files, metadata contains foreign_id of the parent. Metadata for a
484486
directory contains foreign_id for itself as well as its parent and the
485487
name of the directory.
488+
signed_url: use the signed URL workflow for file uploads. When True,
489+
files are uploaded via a signed URL instead of multipart ingest.
490+
Directories always use the standard ingest endpoint.
486491
"""
487492
url_path = "collections/{0}/ingest".format(collection_id)
488493
params = {"sync": sync, "index": index}
@@ -491,6 +496,9 @@ def ingest_upload(
491496
data = {"meta": json.dumps(metadata)}
492497
return self._request("POST", url, data=data)
493498

499+
if signed_url:
500+
return self._signed_url_upload(collection_id, file_path, metadata, index)
501+
494502
for attempt in count(1):
495503
try:
496504
with file_path.open("rb") as fh:
@@ -509,6 +517,54 @@ def ingest_upload(
509517
backoff(ae, attempt)
510518
return {}
511519

520+
def _signed_url_upload(
521+
self,
522+
collection_id: str,
523+
file_path: Path,
524+
metadata: Optional[Dict],
525+
index: bool,
526+
) -> Dict:
527+
mime_type = mimetypes.guess_type(file_path.name)[0] or MIME
528+
meta = dict(metadata or {})
529+
meta["file_name"] = file_path.name
530+
meta["mime_type"] = mime_type
531+
532+
for attempt in count(1):
533+
try:
534+
# Request a signed upload URL
535+
upload_url = self._make_url("file/uploadUrl")
536+
result = self._request("POST", upload_url)
537+
signed_url = result["url"]
538+
upload_id = result["id"]
539+
540+
# PUT file content to the signed URL
541+
try:
542+
with file_path.open("rb") as fh:
543+
response = self.session.put(
544+
signed_url,
545+
data=fh,
546+
headers={"Content-Type": "application/octet-stream"},
547+
)
548+
response.raise_for_status()
549+
except (RequestException, HTTPError) as exc:
550+
raise AlephException(exc) from exc
551+
552+
# Create document record.
553+
# The server returns an empty 200 when a document with
554+
# the same foreign_id already exists in the collection.
555+
doc_url_path = f"collections/{collection_id}/document"
556+
doc_url = self._make_url(doc_url_path, params={"index": index})
557+
payload = {"upload_id": upload_id, "meta": meta}
558+
result = self._request("POST", doc_url, json=payload)
559+
if not result:
560+
return {"id": upload_id, "status": "ok"}
561+
return result
562+
except AlephException as ae:
563+
if not ae.transient or attempt > self.retries:
564+
raise ae from ae
565+
backoff(ae, attempt)
566+
return {}
567+
512568
def create_entityset(
513569
self, collection_id: str, type: str, label: str, summary: Optional[str]
514570
) -> Dict:

alephclient/cli.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,12 @@ def cli(ctx, host, api_key, retries):
8787
help="maximum number of parallel uploads",
8888
)
8989
@click.option("-f", "--foreign-id", required=True, help="foreign-id of the collection")
90+
@click.option(
91+
"--signed-url",
92+
is_flag=True,
93+
default=False,
94+
help="use signed URL workflow for file uploads",
95+
)
9096
@click.argument("path", type=click.Path(exists=True))
9197
@click.pass_context
9298
def crawldir(
@@ -98,6 +104,7 @@ def crawldir(
98104
noindex=False,
99105
nojunk=False,
100106
parallel=1,
107+
signed_url=False,
101108
):
102109
"""Crawl a directory recursively and upload the documents in it to a
103110
collection."""
@@ -112,6 +119,7 @@ def crawldir(
112119
index=not noindex,
113120
nojunk=nojunk,
114121
parallel=parallel,
122+
signed_url=signed_url,
115123
)
116124
except AlephException as exc:
117125
raise click.ClickException(str(exc))

alephclient/crawldir.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ def __init__(
2222
path: Path,
2323
index: bool = True,
2424
nojunk: bool = False,
25+
signed_url: bool = False,
2526
):
2627
self.api = api
2728
self.index = index
29+
self.signed_url = signed_url
2830
self.exclude = (
2931
{
3032
"f": re.compile(r"\..*|thumbs\.db|desktop\.ini", re.I),
@@ -128,11 +130,15 @@ def ingest_upload(self, path: Path, parent_id: str, foreign_id: str) -> str:
128130
log.info("Upload [%s->%s]: %s", self.collection_id, parent_id, foreign_id)
129131
if parent_id is not None:
130132
metadata["parent_id"] = parent_id
133+
kwargs = {}
134+
if self.signed_url:
135+
kwargs["signed_url"] = True
131136
result = self.api.ingest_upload(
132137
self.collection_id,
133138
path,
134139
metadata=metadata,
135140
index=self.index,
141+
**kwargs,
136142
)
137143
if "id" not in result and not hasattr(result, "id"):
138144
raise AlephException("Upload failed")
@@ -147,6 +153,7 @@ def crawl_dir(
147153
index: bool = True,
148154
nojunk: bool = False,
149155
parallel: int = 1,
156+
signed_url: bool = False,
150157
):
151158
"""Crawl a directory and upload its content to a collection
152159
@@ -158,7 +165,9 @@ def crawl_dir(
158165
"""
159166
root = Path(path).resolve()
160167
collection = api.load_collection_by_foreign_id(foreign_id, config)
161-
crawler = CrawlDirectory(api, collection, root, index=index, nojunk=nojunk)
168+
crawler = CrawlDirectory(
169+
api, collection, root, index=index, nojunk=nojunk, signed_url=signed_url
170+
)
162171
consumers = []
163172

164173
# Use one thread to produce using scandir and at least one to consume

alephclient/tests/test_crawldir.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,3 +48,13 @@ def test_is_excluded_exclude_dir(self):
4848
crawldir.exclude["d"] = re.compile(r"week1\/*", re.I)
4949
is_excluded = crawldir.is_excluded(path)
5050
assert is_excluded
51+
52+
def test_signed_url_default_false(self):
53+
path = Path(os.path.join(self.base_path, "jan/week1"))
54+
crawldir = CrawlDirectory(AlephAPI, {}, path)
55+
assert crawldir.signed_url is False
56+
57+
def test_signed_url_true(self):
58+
path = Path(os.path.join(self.base_path, "jan/week1"))
59+
crawldir = CrawlDirectory(AlephAPI, {}, path, signed_url=True)
60+
assert crawldir.signed_url is True

alephclient/tests/test_tasks.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,22 @@ def test_ingest(self, mocker):
120120
]
121121
for call in expected_calls:
122122
assert call in self.api.ingest_upload.mock_calls
123+
124+
def test_ingest_signed_url(self, mocker):
125+
mocker.patch.object(self.api, "ingest_upload", return_value={"id": 42})
126+
mocker.patch.object(
127+
self.api, "load_collection_by_foreign_id", return_value={"id": 2}
128+
)
129+
mocker.patch.object(self.api, "update_collection")
130+
crawl_dir(
131+
self.api,
132+
"alephclient/tests/testdata",
133+
"test153",
134+
{},
135+
True,
136+
True,
137+
signed_url=True,
138+
)
139+
assert self.api.ingest_upload.call_count == 6
140+
for call in self.api.ingest_upload.call_args_list:
141+
assert call.kwargs.get("signed_url") is True

0 commit comments

Comments
 (0)