Skip to content

Commit 5ebb6fb

Browse files
chandrasekharan-zipstackclaudegaya3-zipstack
authored
UN-3295 [FIX] Fix fsspec directory listing cache issue on connectors (#1826)
* UN-3295 [FIX] Fix fsspec directory listing cache issue on connectors Fixes Azure Blob Storage (and other connectors) not detecting new files or seeing deleted files. Changes span: - Disable fsspec directory listing cache: Azure uses listings_expiry_time=1 due to adlfs bug, S3/Box/Dropbox use use_listings_cache=False - Eliminate walk() + listdir() double-listing by using walk(detail=True) - Update type hints from list[str] to Collection[str] for dirs parameter - Restore success toast for "Clear File History" operation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3295 [FIX] Fix Ruff B007 lint warnings for unused loop variables Rename unused `root` loop variable to `_root` in fsspec.walk() calls across filesystem and file discovery modules. Addresses CodeRabbit review feedback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3295 Address PR review feedback for fsspec cache handling - Add on_error callback to fs_fsspec.walk() for proper error logging - Disable Azure listings cache by setting listings_expiry_time=0 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3295 [FIX] Handle on_error callback in SharePointFileSystem.walk() SharePoint's custom walk() was silently swallowing exceptions without invoking the on_error callback. Now supports callable, "raise", and "omit" (default) modes matching fsspec's contract. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * UN-3295 [FIX] Fix invalid type annotation in SharePoint walk() Use `Callable[[Exception], Any]` instead of builtin `callable` which is not a valid type for annotations without `__future__.annotations`. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: Gayathri <142381512+gaya3-zipstack@users.noreply.github.com>
1 parent 0f37b40 commit 5ebb6fb

9 files changed

Lines changed: 74 additions & 42 deletions

File tree

backend/workflow_manager/endpoint_v2/source.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import shutil
55
import uuid
6+
from collections.abc import Collection
67
from hashlib import sha256
78
from io import BytesIO
89
from itertools import islice
@@ -486,14 +487,14 @@ def _get_matched_files(
486487
max_depth = int(SourceConstant.MAX_RECURSIVE_DEPTH) if recursive else 1
487488

488489
fs_fsspec = source_fs.get_fsspec_fs()
489-
for root, dirs, _ in fs_fsspec.walk(input_directory, maxdepth=max_depth):
490-
try:
491-
fs_metadata_list: list[dict[str, Any]] = fs_fsspec.listdir(
492-
root
493-
) # Single call for file system metadata
494-
except Exception as e:
495-
logger.warning(f"Failed to list directory from path: {root}, error: {e}")
496-
continue
490+
491+
def _on_walk_error(error: Exception) -> None:
492+
logger.warning(f"Failed to list directory: {error}")
493+
494+
for _root, dirs, files in fs_fsspec.walk(
495+
input_directory, maxdepth=max_depth, detail=True, on_error=_on_walk_error
496+
):
497+
fs_metadata_list: list[dict[str, Any]] = list(files.values())
497498

498499
count = self._process_file_fs_directory(
499500
fs_metadata_list=fs_metadata_list,
@@ -517,7 +518,7 @@ def _process_file_fs_directory(
517518
matched_files: dict[str, FileHash],
518519
patterns: list[str],
519520
source_fs: UnstractFileSystem,
520-
dirs: list[str],
521+
dirs: Collection[str],
521522
) -> int:
522523
for fs_metadata in fs_metadata_list:
523524
if count >= limit:
@@ -708,7 +709,7 @@ def _is_directory(
708709
source_fs: UnstractFileSystem,
709710
file_path: str,
710711
metadata: dict[str, Any],
711-
dirs: list[str],
712+
dirs: Collection[str],
712713
) -> bool:
713714
"""Check if the given path is a directory.
714715
@@ -717,7 +718,7 @@ def _is_directory(
717718
reading the file.
718719
file_path (str): The path of the file.
719720
metadata (dict[str, Any]): The metadata of the file.
720-
dirs (list[str]): The list of directories.
721+
dirs (Collection[str]): The collection of directory names.
721722
722723
Returns:
723724
bool: True if the file is a directory, False otherwise.

frontend/src/hooks/useClearFileHistory.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ const useClearFileHistory = () => {
3535

3636
try {
3737
await workflowServiceInstance.clearFileMarkers(workflowId);
38+
setAlertDetails({
39+
type: "success",
40+
content: "File history cleared successfully",
41+
});
3842
return true;
3943
} catch (err) {
4044
setAlertDetails(handleException(err, "Failed to clear file history"));

unstract/connectors/src/unstract/connectors/filesystems/azure_cloud_storage/azure_cloud_storage.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,15 @@ def __init__(self, settings: dict[str, Any]):
3939
account_name = settings.get("account_name", "")
4040
access_key = settings.get("access_key", "")
4141
self.bucket = settings.get("bucket", "")
42+
# adlfs._ls_containers() unconditionally reads from DirCache after
43+
# populating it — use_listings_cache=False makes the write a no-op,
44+
# causing a KeyError. Setting listings_expiry_time=0 allows writes
45+
# but expires entries immediately on subsequent reads.
46+
# Check https://github.com/fsspec/adlfs/issues/230 for more context.
4247
self.azure_fs = AzureBlobFileSystem(
43-
account_name=account_name, credential=access_key
48+
account_name=account_name,
49+
credential=access_key,
50+
listings_expiry_time=0,
4451
)
4552

4653
@staticmethod

unstract/connectors/src/unstract/connectors/filesystems/box/box.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ def __init__(self, settings: dict[str, Any]):
3636
try:
3737
oauth = JWTAuth.from_settings_dictionary(settings_dict)
3838
root_id = 0
39-
self.box_fs = BoxFileSystem(oauth=oauth, root_id=root_id)
39+
self.box_fs = BoxFileSystem(
40+
oauth=oauth, root_id=root_id, use_listings_cache=False
41+
)
4042
except ValueError as e:
4143
raise ConnectorError(
4244
f"Error initialising from Box app settings: {e}",

unstract/connectors/src/unstract/connectors/filesystems/minio/minio.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def __init__(self, settings: dict[str, Any]):
2626
anon=False,
2727
key=key,
2828
secret=secret,
29+
use_listings_cache=False,
2930
default_fill_cache=False,
3031
default_cache_type="none",
3132
skip_instance_cache=True,

unstract/connectors/src/unstract/connectors/filesystems/sharepoint/sharepoint.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import logging
1212
import os
1313
import threading
14+
from collections.abc import Callable
1415
from datetime import UTC, datetime
1516
from typing import Any, NoReturn
1617

@@ -415,6 +416,7 @@ def walk(
415416
self,
416417
path: str = "",
417418
maxdepth: int | None = None,
419+
on_error: str | Callable[[Exception], Any] = "omit",
418420
**kwargs: Any,
419421
):
420422
"""Walk directory tree."""
@@ -424,7 +426,11 @@ def walk(
424426
path = self._normalize_path(path)
425427
try:
426428
items = self.ls(path, detail=True)
427-
except Exception:
429+
except Exception as e:
430+
if callable(on_error):
431+
on_error(e)
432+
elif on_error == "raise":
433+
raise
428434
return
429435

430436
dirs = []
@@ -441,7 +447,7 @@ def walk(
441447
for d in dirs:
442448
subpath = f"{path}/{d}" if path != "root" else d
443449
new_depth = maxdepth - 1 if maxdepth is not None else None
444-
yield from self.walk(subpath, maxdepth=new_depth, **kwargs)
450+
yield from self.walk(subpath, maxdepth=new_depth, on_error=on_error, **kwargs)
445451

446452

447453
class SharePointFS(UnstractFileSystem):

unstract/connectors/src/unstract/connectors/filesystems/unstract_file_system.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -187,23 +187,31 @@ def list_files(
187187
Returns:
188188
List of file metadata dictionaries
189189
"""
190-
all_files = []
190+
all_files: list[dict[str, Any]] = []
191191
fs_fsspec = self.get_fsspec_fs()
192192

193-
for root, dirs, _ in fs_fsspec.walk(directory, maxdepth=max_depth):
194-
try:
195-
fs_metadata_list = fs_fsspec.listdir(root, detail=True)
196-
for metadata in fs_metadata_list:
197-
if not include_dirs and self.is_dir_by_metadata(metadata):
198-
continue
199-
all_files.append(metadata)
200-
201-
if limit is not None and len(all_files) >= limit:
202-
return all_files
203-
except Exception as e:
204-
logger.warning(f"Failed to list directory {root}: {e}")
205-
self._store_user_error(f"Could not access directory: {root}")
206-
continue
193+
def _on_walk_error(error: Exception) -> None:
194+
"""Callback for walk errors — preserves user error reporting."""
195+
logger.warning(f"Failed to list directory: {error}")
196+
self._store_user_error(f"Could not access directory: {error}")
197+
198+
for _root, dirs, files in fs_fsspec.walk(
199+
directory, maxdepth=max_depth, detail=True, on_error=_on_walk_error
200+
):
201+
if include_dirs:
202+
items = list(dirs.values()) + list(files.values())
203+
else:
204+
items = list(files.values())
205+
206+
for metadata in items:
207+
# Safety check: some providers (GCS zero-size objects, Azure
208+
# is_directory tag) mis-classify directories as files.
209+
if not include_dirs and self.is_dir_by_metadata(metadata):
210+
continue
211+
all_files.append(metadata)
212+
213+
if limit is not None and len(all_files) >= limit:
214+
return all_files
207215
return all_files
208216

209217
def report_errors_to_user(self) -> list[str]:

unstract/connectors/src/unstract/connectors/filesystems/zs_dropbox/zs_dropbox.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@ def __init__(self, settings: dict[str, Any]):
1616
from dropboxdrivefs import DropboxDriveFileSystem
1717

1818
super().__init__("Dropbox")
19-
self.dropbox_fs = DropboxDriveFileSystem(token=settings["token"])
19+
self.dropbox_fs = DropboxDriveFileSystem(
20+
token=settings["token"], use_listings_cache=False
21+
)
2022
self.path = "///"
2123

2224
@staticmethod

workers/shared/processing/file_discovery.py

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
"""
66

77
import time
8+
from collections.abc import Collection
89
from typing import Any
910

1011
from unstract.connectors.filesystems.unstract_file_system import UnstractFileSystem
@@ -115,22 +116,22 @@ def discover_files_streaming(
115116

116117
logger.info(f"[StreamingDiscovery] Processing directory: {directory}")
117118

119+
def _on_walk_error(error: Exception) -> None:
120+
logger.warning(
121+
f"[StreamingDiscovery] Failed to list directory: {error}"
122+
)
123+
118124
# Walk directory with max depth control
119-
for root, dirs, _ in self.fs_fsspec.walk(directory, maxdepth=max_depth):
125+
for _root, dirs, files in self.fs_fsspec.walk(
126+
directory, maxdepth=max_depth, detail=True, on_error=_on_walk_error
127+
):
120128
metrics["directories_walked"] += 1
121129

122130
# Check limit before processing directory
123131
if len(matched_files) >= file_hard_limit:
124132
break
125133

126-
try:
127-
# Get all items in directory with metadata
128-
fs_metadata_list: list[dict[str, Any]] = self.fs_fsspec.listdir(
129-
root
130-
)
131-
except Exception as e:
132-
logger.warning(f"Failed to list directory {root}: {e}")
133-
continue
134+
fs_metadata_list: list[dict[str, Any]] = list(files.values())
134135

135136
# Process files in this directory
136137
for fs_metadata in fs_metadata_list:
@@ -395,14 +396,14 @@ def _process_batch(
395396
)
396397

397398
def _is_directory(
398-
self, file_path: str, metadata: dict[str, Any], dirs: list[str]
399+
self, file_path: str, metadata: dict[str, Any], dirs: Collection[str]
399400
) -> bool:
400401
"""Check if path is a directory using multiple detection methods.
401402
402403
Args:
403404
file_path: Path to check
404405
metadata: File metadata from fsspec
405-
dirs: List of directories from walk
406+
dirs: Collection of directory names from walk
406407
407408
Returns:
408409
True if path is a directory

0 commit comments

Comments
 (0)