Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 197 additions & 106 deletions configargparse.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ def __init__(self, *args, **kwargs):
be parsed in order, with the values from each config file
taking precedence over previous ones. This allows an application
to look for config files in multiple standard locations such as
the install directory, home directory, and current directory.
the install directory, home directory, and/or current directory.
Also, shell \* syntax can be used to specify all conf files in a
directory. For example::

Expand All @@ -857,6 +857,14 @@ def __init__(self, *args, **kwargs):
"~/.my_app_config.ini",
"./app_config.txt"]

Path entries may be strings, ``os.PathLike`` objects (e.g.
``pathlib.Path``), or zero-argument callable functions that
return an open file-like object containing config file
contents. Any provided callable is invoked each time the parser opens
config files, and the returned stream is closed by the parser
after parsing. The callable must return a stream. This is useful
for sourcing config from non-filesystem locations such as in-memory
buffers, secrets managers, or HTTP responses.
ignore_unknown_config_file_keys: If true, settings that are found
in a config file but don't correspond to any defined
configargparse args will be ignored. If false, they will be
Expand Down Expand Up @@ -923,6 +931,14 @@ def __init__(self, *args, **kwargs):
hint = " (e.g. ['%s'])" % value if isinstance(value, str) else ""
raise TypeError("%s must be a list%s. Got: %r" % (name, hint, value))

for i, entry in enumerate(default_config_files):
if not (isinstance(entry, (str, bytes, os.PathLike)) or callable(entry)):
raise TypeError(
"default_config_files[%d] must be a string, bytes, or "
"os.PathLike path, or a callable that returns an open "
"file-like object. Got: %r" % (i, entry)
)

if not callable(config_file_open_func):
raise TypeError(
"config_file_open_func must be callable. Got: %r"
Expand Down Expand Up @@ -1179,64 +1195,75 @@ def parse_known_args(
for config_key in self.get_possible_config_keys(action)
}

# open the config file(s)
# open the config file(s). config_streams is a list of (stream, source_label) tuples.
config_streams = []
if config_file_contents is not None:
stream = StringIO(config_file_contents)
stream.name = "method arg"
config_streams = [stream]
config_streams = [(stream, "method arg")]
elif not skip_config_file_parsing:
config_streams = self._open_config_files(args)

# parse each config file
for stream in reversed(config_streams):
try:
config_items = self._config_file_parser.parse(stream)
except ConfigFileParserException as e:
self.error(str(e))
finally:
if hasattr(stream, "close"):
stream.close()

# add each config item to the commandline unless it's there already
config_args = []
for key, value in config_items.items():
if key in known_config_keys:
action = known_config_keys[key]
discard_this_key = already_on_command_line(
args, action.option_strings, self.prefix_chars
)
else:
action = None
discard_this_key = (
self._ignore_unknown_config_file_keys
or already_on_command_line(
args,
[
self.get_command_line_key_for_unknown_config_file_setting(
key
)
],
self.prefix_chars,
try:
for stream, source_label in reversed(config_streams):
try:
config_items = self._config_file_parser.parse(stream)
except ConfigFileParserException as e:
self.error(str(e))

# add each config item to the commandline unless it's there already
config_args = []
for key, value in config_items.items():
if key in known_config_keys:
action = known_config_keys[key]
discard_this_key = already_on_command_line(
args, action.option_strings, self.prefix_chars
)
else:
action = None
discard_this_key = (
self._ignore_unknown_config_file_keys
or already_on_command_line(
args,
[
self.get_command_line_key_for_unknown_config_file_setting(
key
)
],
self.prefix_chars,
)
)
)

# Skip empty string values for args with nargs to match YAML behavior
# where empty values are treated as None/not present (see issue #296)
if value == "" and action and action.nargs:
continue

if not discard_this_key:
config_args += self.convert_item_to_command_line_arg(
action, key, value
)
source_key = "%s|%s" % (_CONFIG_FILE_SOURCE_KEY, stream.name)
if source_key not in self._source_to_settings:
self._source_to_settings[source_key] = OrderedDict()
self._source_to_settings[source_key][key] = (action, value)
# Skip empty string values for args with nargs to match YAML behavior
# where empty values are treated as None/not present (see issue #296)
if value == "" and action and action.nargs:
continue

idx = self._find_insertion_index(args)
args = args[:idx] + config_args + args[idx:]
if not discard_this_key:
config_args += self.convert_item_to_command_line_arg(
action, key, value
)
source_key = "%s|%s" % (
_CONFIG_FILE_SOURCE_KEY,
source_label,
)
if source_key not in self._source_to_settings:
self._source_to_settings[source_key] = OrderedDict()
self._source_to_settings[source_key][key] = (action, value)

idx = self._find_insertion_index(args)
args = args[:idx] + config_args + args[idx:]
finally:
# Close every stream exactly once, regardless of whether parsing
# succeeded or aborted partway through (e.g. a custom parser
# raised a non-ConfigFileParserException).
for stream, _ in config_streams:
try:
if hasattr(stream, "close"):
stream.close()
except Exception:
pass

# save default settings for use by print_values()
default_settings = OrderedDict()
Expand Down Expand Up @@ -1517,75 +1544,122 @@ def _open_config_files(self, command_line_args):
command_line_args: List of all args

Returns:
list[io.IOBase]: open config files
list[tuple[io.IOBase, str]]: list of ``(stream, source_label)``
pairs. The ``source_label`` is unique per entry (file path for
path entries; ``"<entry_label>[<index>]"`` for callable entries),
so different entries cannot collapse into a single source key in
``format_values()``.
"""
# open any default config files
config_files = []
for files in map(
glob.glob, map(os.path.expanduser, self._default_config_files)
):
for f in files:
config_files.append(self._config_file_open_func(f))
try:
for i, entry in enumerate(self._default_config_files):
if isinstance(entry, (str, os.PathLike)):
# Path entries (str or PathLike). Checked before callable so
# objects implementing both __fspath__ and __call__ are
# treated as paths, matching the documented behavior.
for f in glob.glob(os.path.expanduser(os.fspath(entry))):
config_files.append((self._config_file_open_func(f), f))
else:
# Callable entry (validation in __init__ guarantees this).
entry_label = getattr(entry, "__name__", repr(entry))
try:
stream = entry()
except Exception as e:
raise ConfigFileParserException(
"default_config_files entry %r raised while being "
"called: %s" % (entry_label, e)
) from e
if stream is None:
raise TypeError(
"default_config_files entry %r returned None; "
"must return an open file-like object." % (entry_label,)
)
# Use a source label that always includes the entry index
# so two callables returning streams with the same .name
# (or even the same library-generated name from a previous
# iteration) cannot collide into one _source_to_settings
# entry. The label also doubles as stream.name when the
# stream lacks one, for readable parser error messages.
display_name = (
getattr(stream, "name", None)
if hasattr(stream, "name")
else None
)
source_label = "%s[%d]" % (display_name or entry_label, i)
# Append before attempting .name so the outer cleanup
# closes the stream if the assignment below raises.
config_files.append((stream, source_label))
if not hasattr(stream, "name"):
try:
stream.name = source_label
except Exception as e:
raise ConfigFileParserException(
"default_config_files entry %r returned a "
"stream whose .name attribute could not be "
"set: %s" % (entry_label, e)
) from e

# list actions with is_config_file_arg=True. Its possible there is more
# than one such arg.
user_config_file_arg_actions = [
a for a in self._actions if getattr(a, "is_config_file_arg", False)
]
# list actions with is_config_file_arg=True. Its possible there is
# more than one such arg.
user_config_file_arg_actions = [
a for a in self._actions if getattr(a, "is_config_file_arg", False)
]

if not user_config_file_arg_actions:
return config_files
if not user_config_file_arg_actions:
return config_files

for action in user_config_file_arg_actions:
# try to parse out the config file path by using a clean new
# ArgumentParser that only knows this one arg/action.
arg_parser = argparse.ArgumentParser(
prefix_chars=self.prefix_chars, add_help=False
)
for action in user_config_file_arg_actions:
# try to parse out the config file path by using a clean new
# ArgumentParser that only knows this one arg/action.
arg_parser = argparse.ArgumentParser(
prefix_chars=self.prefix_chars, add_help=False
)

arg_parser._add_action(action)
arg_parser._add_action(action)

# make parser not exit on error by replacing its error method.
# Otherwise it sys.exits(..) if, for example, config file
# is_required=True and user doesn't provide it.
def error_method(self, message):
pass
# make parser not exit on error by replacing its error method.
# Otherwise it sys.exits(..) if, for example, config file
# is_required=True and user doesn't provide it.
def error_method(self, message):
pass

arg_parser.error = types.MethodType(error_method, arg_parser)
arg_parser.error = types.MethodType(error_method, arg_parser)

# check whether the user provided a value
parsed_arg = arg_parser.parse_known_args(args=command_line_args)
if not parsed_arg:
continue
namespace, _ = parsed_arg
user_config_file = getattr(namespace, action.dest, None)
# check whether the user provided a value
namespace, _ = arg_parser.parse_known_args(args=command_line_args)
user_config_file = getattr(namespace, action.dest, None)

if not user_config_file:
continue
if not user_config_file:
continue

# open user-provided config file
user_config_file = os.path.expanduser(user_config_file)
try:
stream = self._config_file_open_func(user_config_file)
except Exception as e:
if len(e.args) == 2: # OSError
errno, msg = e.args
else:
msg = str(e)
# close previously opened config files
for config_file in config_files:
try:
config_file.close()
except Exception:
pass
self.error(
"Unable to open config file: %s. Error: %s"
% (user_config_file, msg)
)
# open user-provided config file
user_config_file = os.path.expanduser(user_config_file)
try:
stream = self._config_file_open_func(user_config_file)
except Exception as e:
self.error(
"Unable to open config file: %s. Error: %s"
% (user_config_file, str(e))
)

config_files += [stream]
config_files.append((stream, user_config_file))

return config_files
return config_files
except BaseException:
# If anything in the body above raises (callable failure, .name
# assignment failure, glob/open failure, an inner argparse type=
# callback raising during user-config-file parsing, self.error()
# exiting, etc.), close every stream we opened so we don't leak
# file handles. close() is idempotent for standard streams, so it
# is safe to call on streams already closed by a nested handler.
for cf, _ in config_files:
try:
if hasattr(cf, "close"):
cf.close()
except Exception:
pass
raise

def format_values(self):
"""Returns a string with all args and settings and where they came from
Expand All @@ -1606,7 +1680,7 @@ def format_values(self):
source,
settings,
) in self._source_to_settings.items(): # type: ignore[argument-error]
source = source.split("|")
source = source.split("|", 1)
source = source_key_to_display_value_map[source[0]] % tuple(source[1:])
r.write(source)
for key, (action, value) in settings.items():
Expand Down Expand Up @@ -1659,9 +1733,26 @@ def format_help(self):
if config_arg_string:
config_arg_string = "specified via " + config_arg_string
if default_config_files or config_arg_string:
# Mirror _open_config_files: path entries (str/PathLike)
# are checked before callable, so an object that is both
# PathLike and callable is rendered as a path here too.
# os.fsdecode() handles bytes-returning __fspath__ (PEP
# 519 allows it); the try/except handles malformed
# PathLike whose __fspath__ returns the wrong type.
def _describe_default_config_file_entry(e):
if isinstance(e, (str, os.PathLike)):
try:
return os.fsdecode(os.fspath(e))
except (TypeError, ValueError):
return repr(e)
return getattr(e, "__name__", "<callable>")

described_files = tuple(
_describe_default_config_file_entry(e)
for e in default_config_files
Comment thread
bw2 marked this conversation as resolved.
)
msg += " (%s)." % " or ".join(
tuple(map(str, default_config_files))
+ tuple(filter(None, [config_arg_string]))
described_files + tuple(filter(None, [config_arg_string]))
)
msg += " " + self._config_file_parser.get_syntax_description()

Expand Down
Loading
Loading