@@ -109,41 +109,50 @@ async def upload_arc(request: Request) -> dict[str, str | dict[str, str]]:
109109 def _generate_random_arc_id () -> str :
110110 return f"arc_{ os .urandom (4 ).hex ()} "
111111
112- if isinstance (raw_arc_id , str ) and raw_arc_id .strip ():
113- candidate_id = raw_arc_id .strip ()
114- # Reduce to a single path component and normalize it.
115- safe_name = os .path .normpath (Path (candidate_id ).name )
116- # Reject empty names, current/parent directory markers, or anything that
117- # would reintroduce directory components on this platform.
118- if not safe_name or safe_name in {"." , ".." } or "/" in safe_name or "\\ " in safe_name :
119- arc_id = _generate_random_arc_id ()
120- else :
121- # Build the full path and ensure it stays within the output_path root.
122- full_path = (output_path / safe_name ).resolve ()
123- try :
124- common_root = os .path .commonpath ([str (output_path .resolve ()), str (full_path )])
125- except ValueError :
126- # On error (e.g., different drives), fall back to a random ID.
127- arc_id = _generate_random_arc_id ()
112+ def _derive_safe_arc_id (base_dir : Path , raw_id : object ) -> tuple [str , Path ] | tuple [None , None ]:
113+ """
114+ Derive a safe ARC identifier and corresponding directory path that
115+ is guaranteed to stay within the given base_dir. Returns (None, None)
116+ if no safe identifier can be derived.
117+ """
118+ base_resolved = base_dir .resolve ()
119+
120+ def _fallback () -> tuple [str , Path ]:
121+ rid = _generate_random_arc_id ()
122+ target = (base_resolved / rid ).resolve ()
123+ return rid , target
124+
125+ if isinstance (raw_id , str ) and raw_id .strip ():
126+ candidate_id = raw_id .strip ()
127+ # Reduce to a single path component and normalize it.
128+ safe_name = os .path .normpath (Path (candidate_id ).name )
129+ # Reject empty names, current/parent directory markers, or anything that
130+ # would reintroduce directory components on this platform.
131+ if not safe_name or safe_name in {"." , ".." } or "/" in safe_name or "\\ " in safe_name :
132+ arc_id , candidate_dir = _fallback ()
128133 else :
129- if common_root != str (output_path .resolve ()):
130- arc_id = _generate_random_arc_id ()
131- else :
132- arc_id = safe_name
133- else :
134- arc_id = _generate_random_arc_id ()
134+ candidate_dir = (base_resolved / safe_name ).resolve ()
135+ arc_id = safe_name
136+ else :
137+ arc_id , candidate_dir = _fallback ()
138+
139+ try :
140+ common_root = os .path .commonpath ([str (base_resolved ), str (candidate_dir )])
141+ except ValueError :
142+ return None , None
143+
144+ if common_root != str (base_resolved ):
145+ return None , None
146+
147+ return arc_id , candidate_dir
135148
136149 now = datetime .now (UTC ).isoformat ()
137- arc_dir = output_path / arc_id
138150
139- # Ensure the resolved target directory stays within the intended output root.
140- output_root_resolved = output_path .resolve ()
141- arc_dir_resolved = arc_dir .resolve ()
142- common_root = Path (os .path .commonpath ([str (output_root_resolved ), str (arc_dir_resolved )]))
143- if common_root != output_root_resolved :
151+ arc_id , arc_dir = _derive_safe_arc_id (output_path , raw_arc_id )
152+ if arc_id is None or arc_dir is None :
144153 # Reject paths that would escape the output root (for example via symlinks).
145154 return {
146- "arc_id" : arc_id ,
155+ "arc_id" : "invalid" ,
147156 "status" : "error" ,
148157 "metadata" : {
149158 "rdi" : rdi ,
@@ -153,8 +162,6 @@ def _generate_random_arc_id() -> str:
153162 "last_seen" : now ,
154163 },
155164 }
156-
157- arc_dir = arc_dir_resolved
158165 payload_path = arc_dir .with_suffix (".payload.json" )
159166
160167 with open (payload_path , "w" , encoding = "utf-8" ) as handle :
0 commit comments