diff --git a/IPython/core/displayhook.py b/IPython/core/displayhook.py index b1de9df7ee1..88ec0310d89 100644 --- a/IPython/core/displayhook.py +++ b/IPython/core/displayhook.py @@ -51,7 +51,7 @@ def __init__(self, shell=None, cache_size=1000, **kwargs): # we need a reference to the user-level namespace self.shell = shell - + self._,self.__,self.___ = '','','' # these are deliberately global: @@ -84,13 +84,13 @@ def check_for_underscore(self): def quiet(self): """Should we silence the display hook because of ';'?""" # do not print output if input ends in ';' - + try: cell = self.shell.history_manager.input_hist_parsed[-1] except IndexError: # some uses of ipshellembed may fail here return False - + return self.semicolon_at_end_of_expression(cell) @staticmethod @@ -242,6 +242,7 @@ def fill_exec_result(self, result): def log_output(self, format_dict): """Log the output.""" + self.shell.history_manager.output_mime_bundles[self.prompt_count] = format_dict if 'text/plain' not in format_dict: # nothing to do return @@ -280,13 +281,12 @@ def cull_cache(self): cull_count = max(int(sz * self.cull_fraction), 2) warn('Output cache limit (currently {sz} entries) hit.\n' 'Flushing oldest {cull_count} entries.'.format(sz=sz, cull_count=cull_count)) - + for i, n in enumerate(sorted(oh)): if i >= cull_count: break self.shell.user_ns.pop('_%i' % n, None) oh.pop(n, None) - def flush(self): if not self.do_full_cache: diff --git a/IPython/core/displaypub.py b/IPython/core/displaypub.py index 1428c68ea78..d547d7db65f 100644 --- a/IPython/core/displaypub.py +++ b/IPython/core/displaypub.py @@ -129,6 +129,18 @@ def publish( if self.shell is not None: handlers = getattr(self.shell, "mime_renderers", {}) + output_bundles = self.shell.history_manager.output_mime_bundles + exec_count = self.shell.execution_count + + if exec_count in output_bundles: + for key, value in data.items(): + if key in output_bundles[exec_count]: + output_bundles[exec_count][key] += "\n" + value + else: + output_bundles[exec_count][key] = value + else: + output_bundles[exec_count] = data + for mime, handler in handlers.items(): if mime in data: handler(data[mime], metadata.get(mime, None)) diff --git a/IPython/core/history.py b/IPython/core/history.py index cdfd25760a7..f6e3da2946b 100644 --- a/IPython/core/history.py +++ b/IPython/core/history.py @@ -610,7 +610,11 @@ def _dir_hist_default(self) -> list[Path]: # execution count. output_hist = Dict() # The text/plain repr of outputs. - output_hist_reprs: dict[int, str] = Dict() # type: ignore [assignment] + output_hist_reprs: typing.Dict[int, str] = Dict() # type: ignore [assignment] + # Maps execution_count to MIME bundles + output_mime_bundles: typing.Dict[int, typing.Dict[str, str]] = Dict() # type: ignore [assignment] + # Maps execution_count to exception tracebacks + exceptions: typing.Dict[int, typing.Dict[str, Any]] = Dict() # type: ignore [assignment] # The number of the current session in the history database session_number: int = Integer() # type: ignore [assignment] @@ -749,6 +753,9 @@ def reset(self, new_session: bool = True) -> None: """Clear the session history, releasing all object references, and optionally open a new session.""" self.output_hist.clear() + self.output_mime_bundles.clear() + self.exceptions.clear() + # The directory history can't be completely empty self.dir_hist[:] = [Path.cwd()] diff --git a/IPython/core/historyapp.py b/IPython/core/historyapp.py index d555447b5d4..220ae9ad96e 100644 --- a/IPython/core/historyapp.py +++ b/IPython/core/historyapp.py @@ -59,9 +59,9 @@ def start(self): print("There are already at most %d entries in the history database." % self.keep) print("Not doing anything. Use --keep= argument to keep fewer entries") return - + print("Trimming history to the most recent %d entries." % self.keep) - + inputs.pop() # Remove the extra element we got to check the length. inputs.reverse() if inputs: @@ -71,7 +71,7 @@ def start(self): sessions = list(con.execute('SELECT session, start, end, num_cmds, remark FROM ' 'sessions WHERE session >= ?', (first_session,))) con.close() - + # Create the new history database. new_hist_file = profile_dir / "history.sqlite.new" i = 0 diff --git a/IPython/core/interactiveshell.py b/IPython/core/interactiveshell.py index 67cca752d25..ba7a3428cb9 100644 --- a/IPython/core/interactiveshell.py +++ b/IPython/core/interactiveshell.py @@ -3200,6 +3200,11 @@ async def run_cell_async( def error_before_exec(value): if store_history: + if self.history_manager: + # Store formatted traceback and error details + self.history_manager.exceptions[self.execution_count] = ( + self._format_exception_for_storage(value) + ) self.execution_count += 1 result.error_before_exec = value self.last_execution_succeeded = False @@ -3310,11 +3315,73 @@ def error_before_exec(value): # Write output to the database. Does nothing unless # history output logging is enabled. self.history_manager.store_output(self.execution_count) + exec_count = self.execution_count + if result.error_in_exec: + # Store formatted traceback and error details + self.history_manager.exceptions[exec_count] = ( + self._format_exception_for_storage(result.error_in_exec) + ) + # Each cell is a *single* input, regardless of how many lines it has self.execution_count += 1 return result + def _format_exception_for_storage( + self, exception, filename=None, running_compiled_code=False + ): + """ + Format an exception's traceback and details for storage, with special handling + for different types of errors. + """ + etype = type(exception) + evalue = exception + tb = exception.__traceback__ + + # Handle SyntaxError and IndentationError with specific formatting + if issubclass(etype, (SyntaxError, IndentationError)): + if filename and isinstance(evalue, SyntaxError): + try: + evalue.filename = filename + except: + pass # Keep the original filename if modification fails + + # Extract traceback if the error happened during compiled code execution + elist = traceback.extract_tb(tb) if running_compiled_code else [] + stb = self.SyntaxTB.structured_traceback(etype, evalue, elist) + + # Handle UsageError with a simple message + elif etype is UsageError: + stb = [f"UsageError: {evalue}"] + + else: + # Check if the exception (or its context) is an ExceptionGroup. + def contains_exceptiongroup(val): + if val is None: + return False + return isinstance(val, BaseExceptionGroup) or contains_exceptiongroup( + val.__context__ + ) + + if contains_exceptiongroup(evalue): + # Fallback: use the standard library's formatting for exception groups. + stb = traceback.format_exception(etype, evalue, tb) + else: + try: + # If the exception has a custom traceback renderer, use it. + if hasattr(evalue, "_render_traceback_"): + stb = evalue._render_traceback_() + else: + # Otherwise, use InteractiveTB to format the traceback. + stb = self.InteractiveTB.structured_traceback( + etype, evalue, tb, tb_offset=1 + ) + except Exception: + # In case formatting fails, fallback to Python's built-in formatting. + stb = traceback.format_exception(etype, evalue, tb) + + return {"ename": etype.__name__, "evalue": str(evalue), "traceback": stb} + def transform_cell(self, raw_cell): """Transform an input cell before parsing it. diff --git a/IPython/core/magics/basic.py b/IPython/core/magics/basic.py index d68020075b1..b96144e0d4c 100644 --- a/IPython/core/magics/basic.py +++ b/IPython/core/magics/basic.py @@ -21,7 +21,7 @@ class MagicsDisplay: def __init__(self, magics_manager, ignore=None): self.ignore = ignore if ignore else [] self.magics_manager = magics_manager - + def _lsmagic(self): """The main implementation of the %lsmagic""" mesc = magic_escapes['line'] @@ -39,13 +39,13 @@ def _lsmagic(self): def _repr_pretty_(self, p, cycle): p.text(self._lsmagic()) - + def __repr__(self): return self.__str__() def __str__(self): return self._lsmagic() - + def _jsonable(self): """turn magics dict into jsonable dict of the same structure @@ -62,10 +62,10 @@ def _jsonable(self): classname = obj.__self__.__class__.__name__ except AttributeError: classname = 'Other' - + d[name] = classname return magic_dict - + def _repr_json_(self): return self._jsonable() @@ -561,13 +561,40 @@ def notebook(self, s): cells = [] hist = list(self.shell.history_manager.get_range()) + output_mime_bundles = self.shell.history_manager.output_mime_bundles + exceptions = self.shell.history_manager.exceptions + if(len(hist)<=1): raise ValueError('History is empty, cannot export') for session, execution_count, source in hist[:-1]: - cells.append(v4.new_code_cell( - execution_count=execution_count, - source=source - )) + cell = v4.new_code_cell(execution_count=execution_count, source=source) + # Check if this execution_count is in exceptions (current session) + if execution_count in output_mime_bundles: + mime_bundle = output_mime_bundles[execution_count] + for mime_type, data in mime_bundle.items(): + if mime_type == "text/plain": + cell.outputs.append( + v4.new_output( + "execute_result", + data={mime_type: data}, + execution_count=execution_count, + ) + ) + else: + cell.outputs.append( + v4.new_output( + "display_data", + data={mime_type: data}, + ) + ) + + # Check if this execution_count is in exceptions (current session) + if execution_count in exceptions: + cell.outputs.append( + v4.new_output("error", **exceptions[execution_count]) + ) + cells.append(cell) + nb = v4.new_notebook(cells=cells) with io.open(outfname, "w", encoding="utf-8") as f: write(nb, f, version=4) diff --git a/tests/test_magic.py b/tests/test_magic.py index a2e6f80e5ee..e5e5b038bb8 100644 --- a/tests/test_magic.py +++ b/tests/test_magic.py @@ -914,6 +914,48 @@ def test_notebook_export_json(): _ip.run_line_magic("notebook", "%s" % outfile) +def test_notebook_export_json_with_output(): + """Tests if notebook export correctly captures outputs, errors, display outputs, and stream outputs.""" + pytest.importorskip("nbformat") + import nbformat + _ip = get_ipython() + _ip.history_manager.reset() + + cmds = [ + "display('test')", + "1+1", + "1/0", + "print('test')", + "1", + ] # Last cmd isn't stored in history + for cmd in cmds: + _ip.run_cell(cmd, store_history=True) + + with TemporaryDirectory() as td: + outfile = os.path.join(td, "nb.ipynb") + _ip.run_line_magic("notebook", outfile) + nb = nbformat.read(outfile, as_version=4) + + two_found, errors, display_output, stream_output = False, False, False, False + for cell in nb["cells"]: + print("\nCell -> ", cell) + for output in cell.get("outputs", []): + if output["output_type"] == "error": + assert _ip.history_manager.exceptions[3]["ename"] in output["ename"] + errors = True + elif output["output_type"] in ("execute_result", "display_data"): + if output["data"].get("text/plain") == "2": + two_found = True + elif output["data"].get("text/plain") == "'test'": + display_output = True + elif output["output_type"] == "stream" and output.get("text") == ["test\n"]: + stream_output = True + + assert two_found, "Expected output '2' missing" + assert errors, "Expected error output missing" + assert display_output, "Expected display output missing" + assert stream_output, "Expected print output missing" + class TestEnv(TestCase): def test_env(self):