Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions mini_copilot/commands/search_provider.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
SEARCH_PROVIDERS = ["duckduckgo"]
SEARCH_PROVIDERS = ["duckduckgo", "startpage"]


def handle_search_provider_command(search_provider):
print(f"\nCurrent search provider: {search_provider}")
print("Available providers:")
for i, prov in enumerate(SEARCH_PROVIDERS, 1):
marker = "*" if prov == search_provider else " "
print(f" {marker} {i}. {prov}")

try:
choice = input("Select search provider (number, Enter to keep current): ").strip()
choice = input(
"Select search provider (number, Enter to keep current): "
).strip()
if choice:
if choice.isdigit():
n = int(choice)
Expand All @@ -21,5 +24,5 @@ def handle_search_provider_command(search_provider):
print("Please enter a number.\n")
except (EOFError, KeyboardInterrupt):
print()

return search_provider
74 changes: 49 additions & 25 deletions mini_copilot/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,13 @@

try:
import readline

COMMANDS = ["/login", "/help", "/copy", "/model", "/search_provider", ".exit"]

def completer(text, state):
matches = [c for c in COMMANDS if c.startswith(text)]
return matches[state] if state < len(matches) else None

readline.set_completer(completer)
readline.parse_and_bind("tab: complete")
except ImportError:
Expand All @@ -31,8 +34,6 @@ def completer(text, state):
(".exit", "Quit"),
]

SEARCH_PROVIDERS = ["duckduckgo"]

CONFIG_PATH = Path.home() / ".config" / "mini-copilot" / "config.json"
TOKEN_REFRESH_INTERVAL = 24 * 60 # seconds

Expand Down Expand Up @@ -60,11 +61,14 @@ def completer(text, state):
}
TOOLS = [WEB_SEARCH_TOOL]


def load_github_token():
if not CONFIG_PATH.exists(): return None
if not CONFIG_PATH.exists():
return None
config = json.loads(CONFIG_PATH.read_text())
return config.get("github_token")


def main():
github_token = load_github_token()
copilot_token = None
Expand Down Expand Up @@ -93,33 +97,43 @@ def main():
try:
user_input = input("> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nGoodbye!"); break
print("\nGoodbye!")
break

if not user_input: continue
if not user_input:
continue
if user_input in ("/", "/help"):
print("\nAvailable commands:")
for cmd, desc in COMMANDS_HELP:
print(f" {cmd:<20} {desc}")
print(); continue
if user_input == ".exit": print("Goodbye!"); break
print()
continue
if user_input == ".exit":
print("Goodbye!")
break
if user_input == "/copy":
handle_copy_command(last_reply); continue
handle_copy_command(last_reply)
continue
if user_input == "/model":
current_model = handle_model_command(copilot_token, current_model); continue
current_model = handle_model_command(copilot_token, current_model)
continue
if user_input == "/search_provider":
search_provider = handle_search_provider_command(search_provider); continue
search_provider = handle_search_provider_command(search_provider)
continue
if user_input == "/login":
github_token = handle_login_command(CONFIG_PATH, TOKEN_REFRESH_INTERVAL)
if github_token:
try:
copilot_token = get_copilot_token(github_token)
token_expiry = time.monotonic() + TOKEN_REFRESH_INTERVAL
print("Connected to GitHub Copilot.\n")
except Exception as e: print(f"Error: {e}", file=sys.stderr)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
continue

if not copilot_token:
print("Not authenticated. Type /login first.", file=sys.stderr); continue
print("Not authenticated. Type /login first.", file=sys.stderr)
continue

try:
if time.monotonic() >= token_expiry:
Expand All @@ -128,32 +142,42 @@ def main():

messages.append({"role": "user", "content": user_input})
response_message = chat(messages, copilot_token, current_model, tools=TOOLS)

while response_message.get("tool_calls"):
messages.append(response_message)
for tool_call in response_message["tool_calls"]:
function_name = tool_call["function"]["name"]
function_args = json.loads(tool_call["function"]["arguments"])

if function_name == "web_search":
search_query = function_args.get("query")
num_results = function_args.get("num_results", 20)

search_context = web_search(search_query, num_results=num_results)

messages.append({
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": search_context,
})
response_message = chat(messages, copilot_token, current_model, tools=TOOLS)

search_context = web_search(
search_query,
num_results=num_results,
provider=search_provider,
)

messages.append(
{
"tool_call_id": tool_call["id"],
"role": "tool",
"name": function_name,
"content": search_context,
}
)
response_message = chat(
messages, copilot_token, current_model, tools=TOOLS
)

reply = response_message["content"]
messages.append({"role": "assistant", "content": reply})
last_reply = reply
print(f"\n{reply}\n")
except Exception as e: print(f"Error: {e}", file=sys.stderr)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)


if __name__ == "__main__":
main()
59 changes: 54 additions & 5 deletions mini_copilot/web_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,43 @@ def search_ddg(query, num_results=10):
return results[:num_results]


def search_startpage(query, num_results=20):
"""Using Startpage search"""
url = "https://www.startpage.com/sp/search"
params = {
"query": query,
"cat": "web",
"language": "english",
}

try:
res = requests.get(
url, params=params, headers=HEADERS, proxies=PROXY, timeout=10
)
res.raise_for_status()
except Exception as e:
print(f"[web search] Error searching Startpage: {e}")
return []

soup = BeautifulSoup(res.text, "html.parser")
results = []

search_items = soup.select(".result")

for item in search_items:
link_tag = item.select_one("a.result-link")
title_tag = item.select_one(".wgl-title")

if link_tag and link_tag.has_attr("href") and title_tag:
href = link_tag["href"]
title = title_tag.get_text(strip=True)
# Ensure it's an external link
if isinstance(href, str) and href.startswith("http"):
results.append({"title": title, "url": href})

return results[:num_results]


def extract_text_from_url(url):
try:
session = requests.Session()
Expand Down Expand Up @@ -130,10 +167,15 @@ def format_llm_output(results):
return "\n\n".join(blocks)


def web_search(query, num_results=5):
def web_search(query, num_results=5, provider="duckduckgo"):
"""Function to be called as a tool."""
print(f"[web search] Searching: {query}")
search_results = search_ddg(query, num_results=num_results)
print(f"[web search] Searching ({provider}): {query}")

if provider == "startpage":
search_results = search_startpage(query, num_results=num_results)
else:
search_results = search_ddg(query, num_results=num_results)

processed_results = []

if not search_results:
Expand All @@ -151,7 +193,9 @@ def web_search(query, num_results=5):
processed_results.append({**info, "content": content})
print(f"[web search] Fetched: {info['url']}")
except Exception:
processed_results.append({**info, "content": "Failed to extract content."})
processed_results.append(
{**info, "content": "Failed to extract content."}
)

# Sort results to match original search order
url_order = {res["url"]: i for i, res in enumerate(search_results)}
Expand All @@ -162,12 +206,17 @@ def web_search(query, num_results=5):

if __name__ == "__main__":
import argparse

parser = argparse.ArgumentParser(
description="Optimized DDG Search & Extract for LLMs."
)
parser.add_argument("query", help="The search query")
parser.add_argument(
"-n", "--num_results", type=int, default=5, help="Number of results (default: 5)"
"-n",
"--num_results",
type=int,
default=5,
help="Number of results (default: 5)",
)
args = parser.parse_args()

Expand Down
Loading