-
Notifications
You must be signed in to change notification settings - Fork 188
Expand file tree
/
Copy pathimport_chatgpt.py
More file actions
258 lines (202 loc) · 7.97 KB
/
import_chatgpt.py
File metadata and controls
258 lines (202 loc) · 7.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
"""Import command for ChatGPT conversations."""
import asyncio
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any, List, Annotated, Set, Optional
import typer
from basic_memory.cli.app import import_app
from basic_memory.config import config
from basic_memory.markdown import EntityParser, MarkdownProcessor
from basic_memory.markdown.schemas import EntityMarkdown, EntityFrontmatter
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn
console = Console()
def clean_filename(text: str) -> str:
"""Convert text to safe filename."""
clean = "".join(c if c.isalnum() else "-" for c in text.lower()).strip("-")
return clean
def format_timestamp(ts: float) -> str:
"""Format Unix timestamp for display."""
dt = datetime.fromtimestamp(ts)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def get_message_content(message: Dict[str, Any]) -> str:
"""Extract clean message content."""
if not message or "content" not in message:
return "" # pragma: no cover
content = message["content"]
if content.get("content_type") == "text":
return "\n".join(content.get("parts", []))
elif content.get("content_type") == "code":
return f"```{content.get('language', '')}\n{content.get('text', '')}\n```"
return "" # pragma: no cover
def traverse_messages(
mapping: Dict[str, Any], root_id: Optional[str], seen: Set[str]
) -> List[Dict[str, Any]]:
"""Traverse message tree and return messages in order."""
messages = []
node = mapping.get(root_id) if root_id else None
while node:
if node["id"] not in seen and node.get("message"):
seen.add(node["id"])
messages.append(node["message"])
# Follow children
children = node.get("children", [])
for child_id in children:
child_msgs = traverse_messages(mapping, child_id, seen)
messages.extend(child_msgs)
break # Don't follow siblings
return messages
def format_chat_markdown(
title: str,
mapping: Dict[str, Any],
root_id: Optional[str],
created_at: float,
modified_at: float,
) -> str:
"""Format chat as clean markdown."""
# Start with title
lines = [f"# {title}\n"]
# Traverse message tree
seen_msgs = set()
messages = traverse_messages(mapping, root_id, seen_msgs)
# Format each message
for msg in messages:
# Skip hidden messages
if msg.get("metadata", {}).get("is_visually_hidden_from_conversation"):
continue
# Get author and timestamp
author = msg["author"]["role"].title()
ts = format_timestamp(msg["create_time"]) if msg.get("create_time") else ""
# Add message header
lines.append(f"### {author} ({ts})")
# Add message content
content = get_message_content(msg)
if content:
lines.append(content)
# Add spacing
lines.append("")
return "\n".join(lines)
def format_chat_content(folder: str, conversation: Dict[str, Any]) -> EntityMarkdown:
"""Convert chat conversation to Basic Memory entity."""
# Extract timestamps
created_at = conversation["create_time"]
modified_at = conversation["update_time"]
root_id = None
# Find root message
for node_id, node in conversation["mapping"].items():
if node.get("parent") is None:
root_id = node_id
break
# Generate permalink
date_prefix = datetime.fromtimestamp(created_at).strftime("%Y%m%d")
clean_title = clean_filename(conversation["title"])
# Format content
content = format_chat_markdown(
title=conversation["title"],
mapping=conversation["mapping"],
root_id=root_id,
created_at=created_at,
modified_at=modified_at,
)
# Create entity
entity = EntityMarkdown(
frontmatter=EntityFrontmatter(
metadata={
"type": "conversation",
"title": conversation["title"],
"created": format_timestamp(created_at),
"modified": format_timestamp(modified_at),
"permalink": f"{folder}/{date_prefix}-{clean_title}",
}
),
content=content,
)
return entity
async def process_chatgpt_json(
json_path: Path, folder: str, markdown_processor: MarkdownProcessor
) -> Dict[str, int]:
"""Import conversations from ChatGPT JSON format."""
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
console=console,
) as progress:
read_task = progress.add_task("Reading chat data...", total=None)
# Read conversations
conversations = json.loads(json_path.read_text(encoding="utf-8"))
progress.update(read_task, total=len(conversations))
# Process each conversation
messages_imported = 0
chats_imported = 0
for chat in conversations:
# Convert to entity
entity = format_chat_content(folder, chat)
# Write file
file_path = config.home / f"{entity.frontmatter.metadata['permalink']}.md"
# logger.info(f"Writing file: {file_path.absolute()}")
await markdown_processor.write_file(file_path, entity)
# Count messages
msg_count = sum(
1
for node in chat["mapping"].values()
if node.get("message")
and not node.get("message", {})
.get("metadata", {})
.get("is_visually_hidden_from_conversation")
)
chats_imported += 1
messages_imported += msg_count
progress.update(read_task, advance=1)
return {"conversations": chats_imported, "messages": messages_imported}
async def get_markdown_processor() -> MarkdownProcessor:
"""Get MarkdownProcessor instance."""
entity_parser = EntityParser(config.home)
return MarkdownProcessor(entity_parser)
@import_app.command(name="chatgpt", help="Import conversations from ChatGPT JSON export.")
def import_chatgpt(
conversations_json: Annotated[
Path, typer.Argument(help="Path to ChatGPT conversations.json file")
] = Path("conversations.json"),
folder: Annotated[
str, typer.Option(help="The folder to place the files in.")
] = "conversations",
):
"""Import chat conversations from ChatGPT JSON format.
This command will:
1. Read the complex tree structure of messages
2. Convert them to linear markdown conversations
3. Save as clean, readable markdown files
After importing, run 'basic-memory sync' to index the new files.
"""
try:
if conversations_json:
if not conversations_json.exists():
typer.echo(f"Error: File not found: {conversations_json}", err=True)
raise typer.Exit(1)
# Get markdown processor
markdown_processor = asyncio.run(get_markdown_processor())
# Process the file
base_path = config.home / folder
console.print(f"\nImporting chats from {conversations_json}...writing to {base_path}")
results = asyncio.run(
process_chatgpt_json(conversations_json, folder, markdown_processor)
)
# Show results
console.print(
Panel(
f"[green]Import complete![/green]\n\n"
f"Imported {results['conversations']} conversations\n"
f"Containing {results['messages']} messages",
expand=False,
)
)
console.print("\nRun 'basic-memory sync' to index the new files.")
except Exception as e:
logger.error("Import failed")
typer.echo(f"Error during import: {e}", err=True)
raise typer.Exit(1)