Skip to content

Commit df8379f

Browse files
Jan-Kazlouski-elasticelasticmachinecursoragent
authored
[Connectors Python] Add max_text_document_size param to configs to keep big documents from being ingested (#4013)
## Closes elastic/search-team#14454 Introduces a hard per-document size cap for the Elasticsearch bulk sink so that clusters are not overwhelmed by oversized **text** documents — for example, docs whose `body` was inflated by the Data Extraction Service (which bypasses `service.max_file_download_size`) or structured-only docs from API connectors with very long fields. The cap is **scoped to non-binary documents**: docs that carry an `_attachment` key (the binary path) remain governed by `service.max_file_download_size`, and are never affected by this cap. The intent is to give operators one knob for the text/structured path without changing how binary attachments flow. ### What changed - New config option **`elasticsearch.bulk.max_text_document_size`** (MiB, default `3`, `0` disables, must be `>= 0`). - `Sink._run`: every non-DELETE bulk op whose `doc` body has **no `_attachment` key** is measured against the cap; oversized docs are **skipped, logged at `WARNING`, and counted** as `docs_dropped_too_large`. - Size measurement matches the actual **wire bytes** the ES client sends, mirroring `elastic_transport._serializer.JsonSerializer`: `len(json.dumps(op, ensure_ascii=False, separators=(",", ":")).encode("utf-8", "surrogatepass"))`. This avoids over-counting i18n / emoji content (default `json.dumps` uses `ensure_ascii=True`, escaping each non-ASCII character into 6+ ASCII bytes — e.g. `é` → `\u00e9` measured as 6 bytes vs 2 UTF-8 bytes on the wire). - `_attachment` gate uses **key presence**, not value, so docs that pass an empty/None/falsy `_attachment` are still treated as binary path and exempted. - `Sink.__init__` rejects negative `max_text_document_size` with a clear `ValueError` at construction. Without this guard, a negative cap would be truthy and silently drop every non-attachment doc. - `DELETE` ops are exempt (no body). - Wired through `SyncOrchestrator.async_bulk` via `options["max_text_document_size"]`, defaulting to `DEFAULT_MAX_TEXT_DOCUMENT_SIZE` from `connectors.config`, and threaded into `_default_config()`. - Added a commented entry + upgrade-warning paragraph in `config.yml.example`. ### Behavior Default: documents whose serialized bulk-op JSON exceeds **3 MiB** (UTF-8 wire bytes), and that have no `_attachment` key, are dropped with a log line: ``` Dropping doc id=<id> index=<index> op=<op>: serialized text size <N>B exceeds elasticsearch.bulk.max_text_document_size (<M>B) ``` Set `elasticsearch.bulk.max_text_document_size: 0` to disable the cap entirely and restore previous behavior. ### Out of scope (deferred to follow-ups) This PR fixes the **DES-on text path** specifically. Known related issues that are **not** addressed here and need separate PRs: - Removing the `use_text_extraction_service` short-circuit in `is_file_size_within_limit` so DES-extracted file size is also bounded. - Fixing connectors (e.g. GitLab) that pass `file_size=0` and bypass the metadata-based file-size check. - Adding a streaming byte counter in `download_to_temp_file` so the actual download is bounded even when metadata is missing/wrong. - Switching `bulk_size` / `chunk_mem_size` accounting from `pympler.asizeof` to exact serialized bytes (currently the cap and the chunk flush use different units). - Feeding pre-serialized bytes to `client.bulk(...)` to skip transport-side re-serialization (this PR does serialize twice for non-`_attachment` docs: once to measure, once on the wire). ## Testing ### Unit tests `tests/test_sink.py` (cap-specific): - `test_sink_drops_doc_exceeding_max_text_document_size` — oversized text doc is dropped, warning string and counter pinned exactly. - `test_sink_does_not_drop_doc_within_max_text_document_size` — small text doc passes through. - `test_sink_max_text_document_size_disabled[None, 0]` — falsy cap disables. - `test_sink_does_not_drop_delete_op_even_if_oversized` — DELETE exempt. - `test_sink_drops_only_oversized_doc_in_mixed_batch` — only the oversized doc is dropped from a mixed batch; the rest are sent. - `test_sink_does_not_drop_doc_with_attachment_even_if_oversized` — binary path exemption. - `test_sink_attachment_gate_uses_key_presence_not_value[None, "", 0, False, []]` — gate is key-presence, not value-truthiness. - `test_sink_drops_oversized_doc_when_attachment_key_absent` — explicit key-absent sanity. - `test_sink_drops_doc_with_body_only_when_oversized` — DES-text shape (`body`, no `_attachment`). - `test_sink_drops_structured_only_doc_when_oversized` — structured-only doc shape (no `body`, no `_attachment`). - `test_sink_measures_serialized_size_in_wire_utf8_bytes` — regression for the i18n/emoji over-count: 200 000 × `é` body fits the 1 MiB cap on the wire (~0.4 MiB UTF-8) but would have been over the cap (~1.15 MiB) under the old `json.dumps` default measurement. - `test_sink_rejects_negative_max_text_document_size[-1, -3, -1024]` — negative cap is rejected at construction with `ValueError`. `tests/test_config.py`: - `test_default_max_text_document_size` asserts the default is `3`. All 119 cap+config tests pass. ### Functional smoke The `dir` source ftest still completes end-to-end with the cap enabled: ``` DATA_SIZE=medium MAX_DURATION=1800 REFRESH_RATE=2 \ make -C app/connectors_service ftest NAME=dir ``` (Earlier runs in this PR exercised the older `pympler.asizeof` measurement path; the current measurement is wire-byte and the log format is the new `serialized text size ...` form.) ## Checklists #### Pre-Review Checklist - [x] this PR does NOT contain credentials of any kind - [x] this PR has a meaningful title - [x] this PR links to all relevant github issues - [x] this PR has a thorough description - [x] Covered the changes with automated tests - [x] Tested the changes locally - [x] Added a label for each target release version - [ ] For bugfixes: backport safely to all minor branches still receiving patch releases - [x] Considered corresponding documentation changes - [x] Contributed any configuration settings changes to the configuration reference #### Changes Requiring Extra Attention - **Behavior change (default-on):** by default, non-binary docs whose serialized bulk-op JSON exceeds 3 MiB will be dropped (with a warning + `docs_dropped_too_large` counter) instead of being sent. Operators relying on ingesting very large structured/text docs must set `elasticsearch.bulk.max_text_document_size: 0` or raise the threshold. Binary attachments (`_attachment` present) are unchanged. ## Release Note Add `elasticsearch.bulk.max_text_document_size` (MiB, default `3`, `0` disables) to drop oversized **text** documents (docs without an `_attachment` key) before they hit the Elasticsearch bulk API, protecting clusters from being overwhelmed by huge extracted/structured documents. The size is measured against the actual UTF-8 wire bytes the ES client sends. Dropped docs are logged at `WARNING` and counted as `docs_dropped_too_large`. Binary attachments remain governed by `service.max_file_download_size`. **Upgrade note:** earlier connectors versions had no per-document text cap, so a sync that previously succeeded with multi-MiB structured/text docs may begin reporting `docs_dropped_too_large` after upgrade. Set `elasticsearch.bulk.max_text_document_size: 0` to preserve previous behavior, or raise the value to fit your largest legitimate doc. --------- Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com> Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 51cfce3 commit df8379f

6 files changed

Lines changed: 383 additions & 6 deletions

File tree

app/connectors_service/NOTICE.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4146,7 +4146,7 @@ Apache Software License
41464146

41474147

41484148
google-auth
4149-
2.52.0
4149+
2.53.0
41504150
Apache Software License
41514151
Apache License
41524152
Version 2.0, January 2004

app/connectors_service/config.yml.example

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,21 @@
136136
#elasticsearch.bulk.chunk_max_mem_size: 3
137137
#
138138
#
139+
## Per-document cap in MiB for docs whose `doc` body has no `_attachment`
140+
## key (i.e. text extracted by the Data Extraction Service, or
141+
## structured-only docs). Compared against the serialized bulk-op JSON
142+
## (UTF-8 wire bytes); oversized docs are dropped, logged, and counted.
143+
## Binary attachments (any doc carrying an `_attachment` key) are governed
144+
## by `service.max_file_download_size`. Set to 0 to disable.
145+
##
146+
## UPGRADE NOTE: this cap is new and active by default. Earlier connectors
147+
## versions had no per-document text cap, so a sync that previously
148+
## succeeded with multi-MiB structured/text docs may begin reporting
149+
## `docs_dropped_too_large`. Set to 0 to preserve previous behavior, or
150+
## raise the value to fit your largest legitimate doc.
151+
#elasticsearch.bulk.max_text_document_size: 3
152+
#
153+
#
139154
## The max size of the bulk operation to Elasticsearch.
140155
#elasticsearch.bulk.chunk_size: 500
141156
#

app/connectors_service/connectors/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
DEFAULT_QUEUE_REFRESH_TIMEOUT = 1300
3232
DEFAULT_CHUNK_SIZE = 500
3333
DEFAULT_CHUNK_MAX_MEM_SIZE = 3
34+
DEFAULT_MAX_TEXT_DOCUMENT_SIZE = 3
3435
DEFAULT_DISPLAY_EVERY = 100
3536
DEFAULT_MAX_CONCURRENCY = 5
3637
DEFAULT_CONCURRENT_DOWNLOADS = 10
@@ -97,6 +98,7 @@ def _default_config():
9798
"chunk_size": DEFAULT_CHUNK_SIZE,
9899
"max_concurrency": DEFAULT_MAX_CONCURRENCY,
99100
"chunk_max_mem_size": DEFAULT_CHUNK_MAX_MEM_SIZE,
101+
"max_text_document_size": DEFAULT_MAX_TEXT_DOCUMENT_SIZE,
100102
"max_retries": DEFAULT_ELASTICSEARCH_MAX_RETRIES,
101103
"retry_interval": DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
102104
"concurrent_downloads": DEFAULT_CONCURRENT_DOWNLOADS,

app/connectors_service/connectors/es/sink.py

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from connectors_sdk.utils import (
3131
iso_utc,
3232
)
33+
from elastic_transport import JsonSerializer
3334

3435
from connectors.config import (
3536
DEFAULT_CHUNK_MAX_MEM_SIZE,
@@ -39,6 +40,7 @@
3940
DEFAULT_ELASTICSEARCH_MAX_RETRIES,
4041
DEFAULT_ELASTICSEARCH_RETRY_INTERVAL,
4142
DEFAULT_MAX_CONCURRENCY,
43+
DEFAULT_MAX_TEXT_DOCUMENT_SIZE,
4244
DEFAULT_QUEUE_MAX_MEM_SIZE,
4345
DEFAULT_QUEUE_MAX_SIZE,
4446
DEFAULT_QUEUE_REFRESH_INTERVAL,
@@ -84,6 +86,7 @@
8486
DOCS_EXTRACTED = "docs_extracted"
8587
DOCS_FILTERED = "docs_filtered"
8688
DOCS_DROPPED = "docs_dropped"
89+
DOCS_DROPPED_TOO_LARGE = "docs_dropped_too_large"
8790
ID_MISSING = "_ids_missing"
8891
RESULT_ERROR = "result_errors"
8992
RESULT_SUCCESS = "result_successes"
@@ -94,6 +97,11 @@
9497
# Successful results according to the docs: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-response-body
9598
SUCCESSFUL_RESULTS = ("created", "deleted", "updated", "noop")
9699

100+
# Reuse the same serializer the elasticsearch client uses to send bulk
101+
# requests, so the cap check below sees byte-for-byte the wire payload
102+
# (incl. its `default()` handling for `datetime`/`UUID`/`Decimal`).
103+
_BULK_JSON_SERIALIZER = JsonSerializer()
104+
97105

98106
def get_mib_size(obj):
99107
"""Returns the size of ob in MiB"""
@@ -137,6 +145,11 @@ class Sink:
137145
- `pipeline` -- ingest pipeline settings to pass to the bulk API
138146
- `chunk_mem_size` -- a maximum size in MiB for each bulk request
139147
- `max_concurrency` -- a maximum number of concurrent bulk requests
148+
- `max_text_document_size` -- hard per-document cap in MiB for docs
149+
whose `doc` body has no `_attachment` key, measured against the
150+
serialized bulk-op JSON. Must be ``>= 0``; ``0`` and ``None`` disable
151+
the cap. Binary attachments (any doc carrying an `_attachment` key)
152+
are governed by ``service.max_file_download_size``.
140153
"""
141154

142155
def __init__(
@@ -152,6 +165,7 @@ def __init__(
152165
error_monitor,
153166
logger_=None,
154167
enable_bulk_operations_logging=False,
168+
max_text_document_size=None,
155169
):
156170
self.client = client
157171
self.queue = queue
@@ -166,6 +180,17 @@ def __init__(
166180
self._logger = logger_ or logger
167181
self._canceled = False
168182
self._enable_bulk_operations_logging = enable_bulk_operations_logging
183+
if max_text_document_size is not None and max_text_document_size < 0:
184+
msg = (
185+
"elasticsearch.bulk.max_text_document_size must be >= 0 "
186+
f"(got {max_text_document_size}); use 0 to disable the cap."
187+
)
188+
raise ValueError(msg)
189+
self.max_text_document_size = (
190+
max_text_document_size * 1024 * 1024
191+
if max_text_document_size
192+
else max_text_document_size
193+
)
169194
self.counters = Counters()
170195

171196
def _bulk_op(self, doc, operation=OP_INDEX):
@@ -399,12 +424,33 @@ async def _dispatch_batch():
399424
if not doc_id:
400425
self._logger.warning(f"Skip document {doc} as '_id' is missing.")
401426
continue
427+
ops = self._bulk_op(doc, operation)
428+
if (
429+
self.max_text_document_size
430+
and operation != OP_DELETE
431+
and "_attachment" not in doc["doc"]
432+
):
433+
# Reuse the elasticsearch client's own JSON serializer so
434+
# the measured size matches the actual bulk payload on the
435+
# wire, including its `default()` handling for non-JSON
436+
# native types like `datetime`/`UUID`/`Decimal`.
437+
serialized_size = sum(
438+
len(_BULK_JSON_SERIALIZER.json_dumps(op)) for op in ops
439+
)
440+
if serialized_size > self.max_text_document_size:
441+
self._logger.warning(
442+
f"Dropping doc id={doc_id} index={doc['_index']} op={operation}: "
443+
f"serialized text size {serialized_size}B exceeds "
444+
f"elasticsearch.bulk.max_text_document_size "
445+
f"({self.max_text_document_size}B)"
446+
)
447+
self.counters.increment(DOCS_DROPPED_TOO_LARGE)
448+
continue
402449
# Flush before adding if this doc would overflow either cap.
403-
# `_bulk_op` emits 1 entry for deletes and 2 for index/update,
404-
# so we compare prospective rather than current entry count.
405-
entries = 1 if operation == OP_DELETE else 2
450+
# `len(ops)` matches `_bulk_op`'s output (1 entry for delete,
451+
# 2 for index/update), so we compare prospective entry count.
406452
if batch and (
407-
len(batch) + entries > self.chunk_size
453+
len(batch) + len(ops) > self.chunk_size
408454
or bulk_size + doc_size > self.chunk_mem_size
409455
):
410456
await _dispatch_batch()
@@ -423,7 +469,7 @@ async def _dispatch_batch():
423469
overhead_size = get_size(overhead)
424470
stats[operation][doc_id] = max(doc_size - overhead_size, 0)
425471
self.counters.increment(operation, namespace=BULK_OPERATIONS)
426-
batch.extend(self._bulk_op(doc, operation))
472+
batch.extend(ops)
427473
bulk_size += doc_size
428474

429475
# Also flush when this doc fills the batch up to (or past) the
@@ -1046,6 +1092,9 @@ async def async_bulk(
10461092
mem_queue_refresh_interval = options.get(
10471093
"queue_refresh_interval", DEFAULT_QUEUE_REFRESH_INTERVAL
10481094
)
1095+
max_text_document_size = options.get(
1096+
"max_text_document_size", DEFAULT_MAX_TEXT_DOCUMENT_SIZE
1097+
)
10491098

10501099
stream = MemQueue(
10511100
maxsize=queue_size,
@@ -1088,6 +1137,7 @@ async def async_bulk(
10881137
error_monitor=self.error_monitor,
10891138
logger_=self._logger,
10901139
enable_bulk_operations_logging=enable_bulk_operations_logging,
1140+
max_text_document_size=max_text_document_size,
10911141
)
10921142
self._sink_task = asyncio.create_task(
10931143
self._sink.run(), name=f"Sink for {job_type} sync to {index}"

app/connectors_service/tests/test_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ def test_config(set_env):
3333
assert config["elasticsearch"]["user"] == "elastic"
3434

3535

36+
def test_default_max_text_document_size(set_env):
37+
config = load_config(CONFIG_FILE)
38+
assert config["elasticsearch"]["bulk"]["max_text_document_size"] == 3
39+
40+
3641
def test_config_with_ent_search(set_env):
3742
with mock.patch.dict(os.environ, {"ENT_SEARCH_CONFIG_PATH": ES_CONFIG_FILE}):
3843
config = load_config(CONFIG_FILE)

0 commit comments

Comments
 (0)