Skip to content

Commit 4b9a4cc

Browse files
authored
Add files via upload
1 parent da0fa28 commit 4b9a4cc

1 file changed

Lines changed: 178 additions & 1 deletion

File tree

pycatfile.py

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import binascii
3939
import platform
4040
from io import StringIO, BytesIO
41+
import posixpath as pp # POSIX-safe joins/normpaths
4142
try:
4243
from backports import tempfile
4344
except ImportError:
@@ -392,7 +393,7 @@ def decode_unicode_escape(value):
392393
__version_date__ = str(__version_date_info__[0]) + "." + str(
393394
__version_date_info__[1]).zfill(2) + "." + str(__version_date_info__[2]).zfill(2)
394395
__revision__ = __version_info__[3]
395-
__revision_id__ = "$Id: 4b73b24d1d9cb1fb5011cf0090b2e853058cd6fe $"
396+
__revision_id__ = "$Id$"
396397
if(__version_info__[4] is not None):
397398
__version_date_plusrc__ = __version_date__ + \
398399
"-" + str(__version_date_info__[4])
@@ -622,6 +623,182 @@ def _normalize_initial_data(data, isbytes, encoding):
622623
return str(data)
623624

624625

626+
def _split_posix(path_text):
627+
"""Split POSIX paths regardless of OS; return list of components."""
628+
# Normalize leading './'
629+
if path_text.startswith(u'./'):
630+
path_text = path_text[2:]
631+
# Strip redundant slashes
632+
path_text = re.sub(u'/+', u'/', path_text)
633+
# Drop trailing '/' so 'dir/' -> ['dir']
634+
if path_text.endswith(u'/'):
635+
path_text = path_text[:-1]
636+
return path_text.split(u'/') if path_text else []
637+
638+
def _is_abs_like(s):
639+
"""Absolute targets (POSIX or Windows-drive style)."""
640+
return s.startswith(u'/') or s.startswith(u'\\') or re.match(u'^[A-Za-z]:[/\\\\]', s)
641+
642+
def _resolves_outside(base_rel, target_rel):
643+
"""
644+
Given a base directory (relative, POSIX) and a target (relative),
645+
return True if base/target resolves outside of base.
646+
We anchor under '/' so normpath is root-anchored and portable.
647+
"""
648+
base_clean = u'/'.join(_split_posix(base_rel))
649+
target_clean = u'/'.join(_split_posix(target_rel))
650+
base_abs = u'/' + base_clean if base_clean else u'/'
651+
combined = pp.normpath(pp.join(base_abs, target_clean))
652+
if combined == base_abs or combined.startswith(base_abs + u'/'):
653+
return False
654+
return True
655+
656+
def DetectTarbombCatfileArray(listarchivefiles,
657+
top_file_ratio_threshold=0.6,
658+
min_members_for_ratio=4,
659+
symlink_policy="escape-only", # 'escape-only' | 'deny' | 'single-folder-only'
660+
to_text=to_text):
661+
"""
662+
Detect 'tarbomb-like' archives from ArchiveFileToArray/TarFileToArray dicts.
663+
664+
Parameters:
665+
listarchivefiles: dict with key 'ffilelist' -> list of entries (requires 'fname')
666+
top_file_ratio_threshold: float, fraction of root files considered tarbomb
667+
min_members_for_ratio: int, minimum members before ratio heuristic applies
668+
symlink_policy:
669+
- 'escape-only': only symlinks that escape parent/are absolute are unsafe
670+
- 'deny': any symlink is unsafe
671+
- 'single-folder-only': symlinks allowed only if archive has a single top-level folder
672+
to_text: normalization function (your provided to_text)
673+
674+
Returns dict with:
675+
- is_tarbomb, reasons, total_members, top_level_entries, top_level_files_count,
676+
has_absolute_paths, has_parent_traversal,
677+
symlink_escapes_root (bool), symlink_issues (list[{entry,target,reason}])
678+
"""
679+
files = listarchivefiles or {}
680+
members = files.get('ffilelist') or []
681+
682+
names = []
683+
has_abs = False
684+
has_parent = False
685+
686+
# Symlink tracking
687+
has_any_symlink = False
688+
symlink_issues = []
689+
any_symlink_escape = False
690+
691+
for m in members:
692+
m = m or {}
693+
name = to_text(m.get('fname', u""))
694+
695+
if _is_abs_like(name):
696+
has_abs = True
697+
698+
parts = _split_posix(name)
699+
if u'..' in parts:
700+
has_parent = True
701+
702+
if not parts:
703+
continue
704+
705+
norm_name = u'/'.join(parts)
706+
names.append(norm_name)
707+
708+
# ---- Symlink detection ----
709+
ftype = m.get('ftype')
710+
is_symlink = (ftype == 2) or (to_text(ftype).lower() == u'symlink' if ftype is not None else False)
711+
if is_symlink:
712+
has_any_symlink = True
713+
target = to_text(m.get('flinkname', u""))
714+
# Absolute symlink target is unsafe
715+
if _is_abs_like(target):
716+
any_symlink_escape = True
717+
symlink_issues.append({'entry': norm_name, 'target': target, 'reason': 'absolute symlink target'})
718+
else:
719+
parent = u'/'.join(parts[:-1]) # may be ''
720+
if _resolves_outside(parent, target):
721+
any_symlink_escape = True
722+
symlink_issues.append({'entry': norm_name, 'target': target, 'reason': 'symlink escapes parent directory'})
723+
724+
total = len(names)
725+
reasons = []
726+
if total == 0:
727+
return {
728+
"is_tarbomb": False,
729+
"reasons": ["archive contains no members"],
730+
"total_members": 0,
731+
"top_level_entries": [],
732+
"top_level_files_count": 0,
733+
"has_absolute_paths": has_abs,
734+
"has_parent_traversal": has_parent,
735+
"symlink_escapes_root": any_symlink_escape,
736+
"symlink_issues": symlink_issues,
737+
}
738+
739+
# Layout counts
740+
top_counts = {}
741+
top_level_files_count = 0
742+
for name in names:
743+
parts = name.split(u'/')
744+
first = parts[0]
745+
top_counts[first] = top_counts.get(first, 0) + 1
746+
if len(parts) == 1: # directly at archive root
747+
top_level_files_count += 1
748+
749+
top_keys = sorted(top_counts.keys())
750+
is_tarbomb = False
751+
752+
# Path-based dangers
753+
if has_abs:
754+
is_tarbomb = True
755+
reasons.append("contains absolute paths (dangerous)")
756+
if has_parent:
757+
is_tarbomb = True
758+
reasons.append("contains parent-traversal ('..') entries (dangerous)")
759+
if any_symlink_escape:
760+
is_tarbomb = True
761+
reasons.append("contains symlinks that escape their parent directory")
762+
763+
# Symlink policy enforcement
764+
if symlink_policy == "deny" and has_any_symlink:
765+
is_tarbomb = True
766+
reasons.append("symlinks present and policy is 'deny'")
767+
elif symlink_policy == "single-folder-only" and has_any_symlink and len(top_keys) != 1:
768+
is_tarbomb = True
769+
reasons.append("symlinks present but archive lacks a single top-level folder")
770+
771+
# Tarbomb layout heuristics
772+
if len(top_keys) == 1:
773+
reasons.append("single top-level entry '{0}'".format(top_keys[0]))
774+
else:
775+
ratio = float(top_level_files_count) / float(total)
776+
if total >= min_members_for_ratio and ratio > float(top_file_ratio_threshold):
777+
is_tarbomb = True
778+
reasons.append("high fraction of members ({0:.0%}) at archive root".format(ratio))
779+
else:
780+
max_bucket = max(top_counts.values()) if top_counts else 0
781+
if max_bucket < total * 0.9:
782+
is_tarbomb = True
783+
reasons.append("multiple top-level entries with no dominant folder: {0}".format(
784+
u", ".join(top_keys[:10])))
785+
else:
786+
reasons.append("multiple top-level entries but one dominates")
787+
788+
return {
789+
"is_tarbomb": bool(is_tarbomb),
790+
"reasons": reasons,
791+
"total_members": total,
792+
"top_level_entries": top_keys,
793+
"top_level_files_count": top_level_files_count,
794+
"has_absolute_paths": has_abs,
795+
"has_parent_traversal": has_parent,
796+
"symlink_escapes_root": any_symlink_escape,
797+
"symlink_issues": symlink_issues,
798+
}
799+
800+
801+
625802
def MkTempFile(data=None, inmem=__use_inmemfile__, isbytes=True, prefix=__project__,
626803
delete=True, encoding="utf-8"):
627804
"""

0 commit comments

Comments
 (0)