-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
774 lines (686 loc) · 30.9 KB
/
main.py
File metadata and controls
774 lines (686 loc) · 30.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
import sys
from pathlib import Path
# Add vendored modules to path BEFORE importing them
PLUGIN_DIR = Path(__file__).parent
sys.path.insert(0, str(PLUGIN_DIR / "py_modules"))
import asyncio
import glob
import json
import os
import re
import ssl
import time
import aiohttp
import certifi
import vdf
import decky
# Switch the backend's Worker URL to the dev environment when a sibling
# `.dev` marker file exists alongside this script. The deploy-to-deck.sh
# helper writes/removes the marker based on SQUAD_UP_BUILD; production
# users never see a `.dev` file and always hit the prod Worker.
_DEV_MARKER = PLUGIN_DIR / ".dev"
if _DEV_MARKER.exists():
WORKER_URL = "https://worker-dev.squad-up.vip"
else:
WORKER_URL = "https://worker.squad-up.vip"
DISCORD_API = "https://discord.com/api/v10"
SETTINGS_PATH = Path(decky.DECKY_PLUGIN_SETTINGS_DIR) / "settings.json"
class Plugin:
# ===== Lifecycle =====
async def _main(self):
decky.logger.info("Squad Up plugin loaded")
ssl_context = ssl.create_default_context(cafile=certifi.where())
connector = aiohttp.TCPConnector(ssl=ssl_context)
self.session = aiohttp.ClientSession(connector=connector)
self.tokens = self._load_tokens()
if self.tokens:
decky.logger.info("Loaded existing tokens from disk")
async def _unload(self):
decky.logger.info("Squad Up plugin unloaded")
await self.session.close()
# ===== Settings persistence =====
def _load_settings(self):
"""Read the plugin's on-disk settings, or {} if missing/corrupt."""
if SETTINGS_PATH.exists():
try:
return json.loads(SETTINGS_PATH.read_text())
except json.JSONDecodeError:
decky.logger.error("settings.json is corrupted; ignoring")
return {}
return {}
def _save_settings(self, settings):
"""Persist the full settings dict to disk."""
SETTINGS_PATH.parent.mkdir(parents=True, exist_ok=True)
SETTINGS_PATH.write_text(json.dumps(settings))
def _load_tokens(self):
"""Return the saved Discord token bundle, or None if not signed in."""
return (self._load_settings() or {}).get("tokens")
def _save_tokens(self, tokens):
"""Persist the Discord token bundle alongside other settings."""
settings = self._load_settings() or {}
settings["tokens"] = tokens
self._save_settings(settings)
def _clear_tokens(self):
"""Drop the in-memory and on-disk Discord tokens."""
settings = self._load_settings() or {}
settings.pop("tokens", None)
self._save_settings(settings)
self.tokens = None
# ===== Authentication =====
async def is_authenticated(self) -> bool:
"""True if we hold a Discord token bundle (not necessarily valid)."""
return self.tokens is not None
async def claim_code(self, code: str) -> bool:
"""Exchange a 6-digit short code for tokens via the Worker."""
decky.logger.info(f"Claiming code: {code}")
try:
async with self.session.get(f"{WORKER_URL}/claim?code={code}") as r:
if r.status != 200:
body = await r.text()
decky.logger.warning(f"Claim failed: {r.status} {body}")
return False
tokens = await r.json()
except Exception as e:
decky.logger.error(f"Claim exception: {e}")
return False
self.tokens = {
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
"expires_at": int(time.time()) + tokens["expires_in"],
}
self._save_tokens(self.tokens)
decky.logger.info("Tokens saved")
return True
async def get_my_user_id(self) -> str | None:
"""Return the authed user's Discord user.id (snowflake) or None.
Cached on-disk inside the tokens bundle so we hit /users/@me at most
once per token bundle. Used by move_user to address the right user."""
if not await self._refresh_if_needed():
return None
cached = self.tokens.get("user_id")
if cached:
return cached
headers = {"Authorization": f"Bearer {self.tokens['access_token']}"}
try:
async with self.session.get(f"{DISCORD_API}/users/@me", headers=headers) as r:
if r.status != 200:
decky.logger.warning(f"get_my_user_id: /users/@me {r.status}")
return None
data = await r.json()
user_id = data.get("id")
if user_id:
self.tokens["user_id"] = user_id
self._save_tokens(self.tokens)
return user_id
except Exception as e:
decky.logger.error(f"get_my_user_id exception: {e}")
return None
async def move_user(self, guild_id: str, channel_id: str | None) -> bool:
"""Move the authed user to channel_id within guild_id, or pass None
to disconnect them from voice. Requires the bot has Move Members in
that guild AND the user is already connected to voice there."""
user_id = await self.get_my_user_id()
if not user_id:
return False
try:
async with self.session.post(
f"{WORKER_URL}/move/{guild_id}/{user_id}",
json={"channel_id": channel_id},
) as r:
if r.status == 200:
return True
decky.logger.warning(
f"move_user failed: {r.status} {await r.text()}"
)
return False
except Exception as e:
decky.logger.error(f"move_user exception: {e}")
return False
async def sign_out(self) -> bool:
"""Wipe stored tokens. Always returns True."""
decky.logger.info("Signing out")
self._clear_tokens()
return True
async def _refresh_if_needed(self) -> bool:
"""Refresh the access token if it's expired or close to expiring.
Returns True if we have a usable token after this call, False if not."""
if not self.tokens:
return False
# Refresh if less than 60 seconds remaining
if time.time() < self.tokens.get("expires_at", 0) - 60:
return True
decky.logger.info("Access token near expiry, refreshing")
try:
async with self.session.post(
f"{WORKER_URL}/refresh",
json={"refresh_token": self.tokens["refresh_token"]},
headers={"Content-Type": "application/json"},
) as r:
if r.status == 401:
decky.logger.warning("Refresh token rejected; clearing session")
self._clear_tokens()
return False
if r.status != 200:
body = await r.text()
decky.logger.error(f"Refresh failed: {r.status} {body}")
return False
new_tokens = await r.json()
except Exception as e:
decky.logger.error(f"Refresh exception: {e}")
return False
self.tokens = {
"access_token": new_tokens["access_token"],
"refresh_token": new_tokens["refresh_token"],
"expires_at": int(time.time()) + new_tokens["expires_in"],
}
self._save_tokens(self.tokens)
decky.logger.info("Token refreshed successfully")
return True
# ===== Guild & voice channel listing =====
async def get_guilds_with_voice(self):
"""Fetch the user's guilds, filter to those where the bot is present
and that contain at least one voice channel. Returns a list of guild
objects with voice channels and category metadata for grouping."""
if not await self._refresh_if_needed():
return {"error": "not_authenticated"}
headers = {"Authorization": f"Bearer {self.tokens['access_token']}"}
# 1. Get user's guild list directly from Discord (user OAuth token)
try:
async with self.session.get(
f"{DISCORD_API}/users/@me/guilds", headers=headers
) as r:
if r.status == 401:
decky.logger.warning("User token rejected (401)")
self._clear_tokens()
return {"error": "token_expired"}
if r.status != 200:
decky.logger.error(f"Failed to fetch guilds: {r.status}")
return {"error": f"discord_error_{r.status}"}
user_guilds = await r.json()
except Exception as e:
decky.logger.error(f"get_guilds exception: {e}")
return {"error": "network_error"}
decky.logger.info(f"User is in {len(user_guilds)} guilds")
# 2. For each guild, probe the Worker to check bot presence + get
# channels. Run probes in parallel for speed.
async def probe(guild):
try:
async with self.session.get(
f"{WORKER_URL}/channels/{guild['id']}"
) as r:
if r.status != 200:
return None
return guild, await r.json()
except Exception as e:
decky.logger.error(f"Probe exception for {guild['id']}: {e}")
return None
probe_results = await asyncio.gather(*[probe(g) for g in user_guilds])
# 3. Filter and shape results
results = []
for item in probe_results:
if item is None:
continue
guild, data = item
if not data.get("bot_present"):
continue
channels = data.get("channels", [])
voice_channels = [c for c in channels if c.get("type") == 2]
if not voice_channels:
continue
categories = {
c["id"]: {
"name": c["name"],
"position": c.get("position", 0),
}
for c in channels if c.get("type") == 4
}
voice_channels.sort(key=lambda c: c.get("position", 0))
results.append({
"id": guild["id"],
"name": guild["name"],
"icon": guild.get("icon"),
"categories": categories,
"voice_channels": [
{
"id": c["id"],
"name": c["name"],
"parent_id": c.get("parent_id"),
"position": c.get("position", 0),
}
for c in voice_channels
],
})
# Sort by the user's saved guild_order (set via set_guild_order),
# falling back to alphabetical for any guild that isn't in the saved
# list yet (e.g. newly-joined servers since the last reorder).
saved_order = (self._load_settings() or {}).get("guild_order") or []
order_index = {gid: i for i, gid in enumerate(saved_order)}
unset_rank = len(saved_order)
results.sort(
key=lambda g: (order_index.get(g["id"], unset_rank), g["name"].lower())
)
decky.logger.info(
f"Returning {len(results)} guilds with voice channels and bot present"
)
return results
async def set_guild_order(self, order: list[str]) -> bool:
"""Persist the user's preferred guild ordering as a list of guild IDs.
Used by the frontend's reorder UI; consumed by get_guilds_with_voice
on the next fetch."""
settings = self._load_settings() or {}
settings["guild_order"] = [str(gid) for gid in order]
self._save_settings(settings)
return True
async def get_voice_occupants(self, guild_id: str):
"""Fetch current voice channel occupants for a guild from the Worker.
Returns { 'channels': { '<channelId>': [{userId, username, displayName, avatarUrl}, ...] } }
on success, or {'error': ...} on failure. Unauthenticated route on the
Worker side; the Worker proxies to the gateway bot."""
try:
async with self.session.get(f"{WORKER_URL}/voice-state/{guild_id}") as r:
if r.status != 200:
body = await r.text()
decky.logger.warning(
f"voice-state probe failed for {guild_id}: {r.status} {body}"
)
return {"error": f"worker_error_{r.status}"}
return await r.json()
except Exception as e:
decky.logger.error(f"get_voice_occupants exception: {e}")
return {"error": "network_error"}
# ===== Channel launching =====
async def open_channel(self, guild_id: str, channel_id: str) -> bool:
"""Open the Discord web client at the given guild/channel.
In Game Mode this routes through the configured non-Steam-game
browser shortcut so gamescope composites the window correctly."""
return await self.open_url(f"https://discord.com/channels/{guild_id}/{channel_id}")
async def open_url(self, url: str) -> bool:
"""Open an arbitrary URL in the user's configured browser. Same launch
flow as open_channel — used for the bot-invite link and similar."""
decky.logger.info(f"Opening: {url}")
session_env = self._get_user_session_env()
env = os.environ.copy()
env.update(session_env)
# Decky's bundled libs poison subprocess env (causes bash crashes etc.)
for var in ("LD_LIBRARY_PATH", "LD_PRELOAD", "PYTHONPATH", "PYTHONHOME"):
env.pop(var, None)
if self._is_game_mode():
return await self._open_in_game_mode(url, env)
return await self._open_in_desktop_mode(url, env)
async def is_game_mode(self) -> bool:
"""Frontend-callable: True if running under SteamOS Game Mode."""
return self._is_game_mode()
async def _open_in_desktop_mode(self, url: str, env: dict) -> bool:
"""Desktop Mode: just use xdg-open."""
decky.logger.info("Desktop Mode launch")
try:
proc = await asyncio.create_subprocess_exec(
"/usr/bin/xdg-open", url,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
start_new_session=True,
env=env,
)
decky.logger.info(f"xdg-open launched (PID {proc.pid})")
return True
except Exception as e:
decky.logger.error(f"Desktop Mode launch failed: {e}")
return False
async def _open_in_game_mode(self, url: str, env: dict) -> bool:
"""Game Mode: launch browser via Steam shortcut, then nudge it with the URL."""
decky.logger.info("Game Mode launch")
browser = (self._load_settings() or {}).get("browser")
if not browser:
decky.logger.error("No browser configured for Game Mode")
return False
try:
# Step 1: send steam://rungameid unconditionally. When the
# browser is already running but in the background, this brings
# it to the foreground (composited by gamescope) — without it,
# `flatpak run <id> <url>` opens the new tab into a window
# gamescope can't see. When the browser isn't running yet, this
# is a normal launch. Steam shows a "Game already running"
# error toast on the re-launch path; that's annoying but
# accepted as the cost of reliable window-raising — every
# alternative path we tried (skipping the call, xdotool
# windowactivate, gamescope baselayer atom checks) regressed
# to either invisible-browser behavior or unreliable
# window-raising in gamescope's nested-XWayland setup.
was_running = await self._is_browser_running(browser, env)
await asyncio.create_subprocess_exec(
"/usr/bin/steam", "-ifrunning",
f"steam://rungameid/{browser['rungameid']}",
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
start_new_session=True,
env=env,
)
decky.logger.info(
f"Sent steam://rungameid/{browser['rungameid']} (was_running={was_running})"
)
if was_running:
# Browser was already up; small settle for Steam to bring
# the existing window to the foreground.
await asyncio.sleep(0.4)
else:
# Cold start: poll for the browser process to actually
# exist before firing the navigate command, with a short
# post-detection settle so the second-instance IPC socket
# is ready.
if await self._wait_for_browser(browser, env, timeout=10.0):
decky.logger.info("Browser process detected; settling before navigate")
await asyncio.sleep(1.0)
else:
decky.logger.warning(
"Browser didn't appear within 10s; trying navigate anyway"
)
# Step 2: navigate the (now-running) browser to the URL
exe = browser.get("exe", "").strip()
flatpak_id = browser.get("flatpak_id")
if flatpak_id:
cmd = ["/usr/bin/flatpak", "run", flatpak_id, url]
elif exe.startswith("flatpak run"):
parts = exe.split()
cmd = ["/usr/bin/flatpak"] + parts[1:] + [url]
else:
cmd = [exe, url]
decky.logger.info(f"Launching: {cmd}")
await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
start_new_session=True,
env=env,
)
return True
except Exception as e:
decky.logger.error(f"Game Mode launch failed: {e}")
return False
async def _wait_for_browser(
self, browser: dict, env: dict, timeout: float = 10.0, poll_interval: float = 0.5
) -> bool:
"""Poll _is_browser_running until True or timeout. Returns True if the
browser process appeared within the timeout, False otherwise."""
deadline = time.time() + timeout
while time.time() < deadline:
if await self._is_browser_running(browser, env):
return True
await asyncio.sleep(poll_interval)
return False
async def _is_browser_running(self, browser: dict, env: dict) -> bool:
"""Return True if the configured browser already has a running process.
Used in Game Mode to skip Steam's launch step (which surfaces a
'couldn't open, already running' error when the shortcut is hot)."""
flatpak_id = browser.get("flatpak_id")
# Flatpak browsers: ask flatpak directly for its running app list.
if flatpak_id:
try:
proc = await asyncio.create_subprocess_exec(
"/usr/bin/flatpak", "ps", "--columns=application",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
env=env,
)
stdout, _ = await proc.communicate()
running = stdout.decode("utf-8", errors="replace").splitlines()
return flatpak_id.strip() in (line.strip() for line in running)
except Exception as e:
decky.logger.warning(f"flatpak ps failed: {e}")
# Native browsers: pgrep against the binary basename.
exe = browser.get("exe", "").strip()
if exe and not exe.startswith("flatpak run"):
binary = os.path.basename(exe.split()[0])
if binary:
try:
proc = await asyncio.create_subprocess_exec(
"/usr/bin/pgrep", "-x", binary,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
stdin=asyncio.subprocess.DEVNULL,
env=env,
)
await proc.wait()
return proc.returncode == 0
except Exception as e:
decky.logger.warning(f"pgrep failed: {e}")
return False
def _is_game_mode(self) -> bool:
"""True if running under SteamOS Game Mode (gamescope compositor)."""
session_env = self._get_user_session_env()
# Game Mode runs under gamescope; Desktop Mode runs KDE
desktop = session_env.get("XDG_CURRENT_DESKTOP", "").lower()
if "gamescope" in desktop:
return True
# Even if XDG_CURRENT_DESKTOP says KDE, gamescope socket presence
# alongside an active session is a Game Mode signal
runtime_dir = session_env.get("XDG_RUNTIME_DIR", "/run/user/1000")
if os.path.exists(os.path.join(runtime_dir, "gamescope-0")):
if "kde" not in desktop:
return True
return False
def _get_user_session_env(self) -> dict:
"""Pull DISPLAY/WAYLAND/DBUS env vars from a live deck-user process,
with a fallback for Game Mode. Decky strips these from subprocess env,
so we have to reconstruct them by snooping /proc."""
wanted = (
"DISPLAY",
"WAYLAND_DISPLAY",
"XDG_RUNTIME_DIR",
"DBUS_SESSION_BUS_ADDRESS",
"XDG_SESSION_TYPE",
"XDG_CURRENT_DESKTOP",
"XAUTHORITY",
)
result = {}
for env_path in glob.glob("/proc/*/environ"):
try:
st = os.stat(env_path)
if st.st_uid != 1000:
continue
with open(env_path, "rb") as f:
raw = f.read()
if not raw:
continue
pairs = {}
for chunk in raw.split(b"\0"):
if b"=" not in chunk:
continue
k, _, v = chunk.partition(b"=")
pairs[k.decode("utf-8", errors="replace")] = v.decode(
"utf-8", errors="replace"
)
if "DISPLAY" in pairs or "WAYLAND_DISPLAY" in pairs:
for key in wanted:
if key in pairs:
result[key] = pairs[key]
if result:
decky.logger.info(
f"Pulled session env from PID {env_path.split('/')[2]}: "
f"{list(result.keys())}"
)
break
except (PermissionError, FileNotFoundError, OSError):
continue
# Fallback: detect Game Mode and inject Wayland socket if missing
if "WAYLAND_DISPLAY" not in result:
xdg_runtime = result.get("XDG_RUNTIME_DIR", "/run/user/1000")
for socket_name in ("gamescope-0", "wayland-0", "wayland-1"):
socket_path = os.path.join(xdg_runtime, socket_name)
if os.path.exists(socket_path):
result["WAYLAND_DISPLAY"] = socket_name
decky.logger.info(f"Detected Wayland socket: {socket_name}")
break
if not result:
decky.logger.warning("Could not find any user-session env")
return result
# ===== Browser configuration =====
async def list_browser_shortcuts(self):
"""Scan shortcuts.vdf and return non-Steam-game entries that look
like a browser. Used in Game Mode setup so the user can pick which
shortcut to launch for navigation."""
path = self._find_shortcuts_vdf()
if not path:
return {"error": "no_shortcuts_file"}
try:
with open(path, "rb") as f:
data = vdf.binary_load(f)
except Exception as e:
decky.logger.error(f"Failed to parse shortcuts.vdf: {e}")
return {"error": "parse_failed"}
results = []
shortcuts = data.get("shortcuts", {})
for idx, entry in shortcuts.items():
exe = self._clean_vdf_string(self._ci_get(entry, "exe", ""))
appname = self._clean_vdf_string(self._ci_get(entry, "appname", ""))
flatpak_id = self._clean_vdf_string(self._ci_get(entry, "flatpakappid", ""))
launch_options = self._clean_vdf_string(
self._ci_get(entry, "launchoptions", "")
)
appid = self._ci_get(entry, "appid")
# Fallback: if FlatpakAppID is empty but Exe is flatpak, derive
# the app id from LaunchOptions.
if not flatpak_id and "flatpak" in exe.lower() and launch_options:
flatpak_id = self._extract_flatpak_id_from_launch_options(launch_options)
if appid is None:
continue
# Match priority: FlatpakAppID → AppName → Exe
browser_name = None
if flatpak_id:
browser_name = self._looks_like_browser(flatpak_id)
if not browser_name and appname:
browser_name = self._looks_like_browser(appname)
if not browser_name and exe:
browser_name = self._looks_like_browser(exe)
if not browser_name:
continue
# rungameid for non-Steam games is (appid << 32) | 0x02000000.
# appid in shortcuts.vdf is signed 32-bit; mask back to unsigned.
if appid < 0:
appid_unsigned = appid + (1 << 32)
else:
appid_unsigned = appid
rungameid = (appid_unsigned << 32) | 0x02000000
if flatpak_id:
launch_exe = f"flatpak run {flatpak_id}"
else:
launch_exe = exe.strip('"')
results.append({
"appname": appname,
"browser_name": browser_name,
"exe": launch_exe,
"flatpak_id": flatpak_id if flatpak_id else None,
"rungameid": str(rungameid),
})
decky.logger.info(f"Found {len(results)} browser shortcut(s)")
return results
async def get_browser_config(self):
"""Return the currently selected browser config, or None."""
return (self._load_settings() or {}).get("browser")
async def set_browser_config(
self, rungameid: str, exe: str, browser_name: str, flatpak_id: str = None
):
"""Persist the user's browser selection for Game Mode launching."""
settings = self._load_settings() or {}
settings["browser"] = {
"rungameid": rungameid,
"exe": exe,
"browser_name": browser_name,
"flatpak_id": flatpak_id,
}
self._save_settings(settings)
decky.logger.info(
f"Saved browser config: {browser_name} (flatpak_id={flatpak_id})"
)
return True
async def clear_browser_config(self):
"""Drop the saved browser config, forcing the user to pick again."""
settings = self._load_settings() or {}
settings.pop("browser", None)
self._save_settings(settings)
return True
def _find_shortcuts_vdf(self):
"""Locate the user's shortcuts.vdf file. Returns Path or None."""
steam_userdata = Path.home() / ".local/share/Steam/userdata"
if not steam_userdata.exists():
decky.logger.warning(f"Steam userdata not found at {steam_userdata}")
return None
# Pick the first user directory (most users only have one)
for user_dir in steam_userdata.iterdir():
if not user_dir.is_dir() or not user_dir.name.isdigit():
continue
shortcuts_path = user_dir / "config" / "shortcuts.vdf"
if shortcuts_path.exists():
return shortcuts_path
return None
@staticmethod
def _looks_like_browser(s: str) -> str | None:
"""Best-effort match: given a flatpak id, app name, or exe path,
return a human-readable browser name or None."""
s_lower = s.lower()
browsers = [
("com.google.chrome", "Google Chrome"),
("google chrome", "Google Chrome"),
("google-chrome", "Google Chrome"),
("/chrome", "Chrome"),
("org.mozilla.firefox", "Firefox"),
("firefox", "Firefox"),
("org.chromium.chromium", "Chromium"),
("chromium", "Chromium"),
("com.brave.browser", "Brave"),
("brave browser", "Brave"),
("/brave", "Brave"),
("com.vivaldi.vivaldi", "Vivaldi"),
("vivaldi", "Vivaldi"),
("com.microsoft.edge", "Microsoft Edge"),
("microsoft edge", "Microsoft Edge"),
("microsoft-edge", "Microsoft Edge"),
]
for needle, name in browsers:
if needle in s_lower:
return name
return None
@staticmethod
def _ci_get(d: dict, key: str, default=None):
"""Case-insensitive dict lookup. shortcuts.vdf keys vary in casing."""
target = key.lower()
for k, v in d.items():
if k.lower() == target:
return v
return default
@staticmethod
def _clean_vdf_string(s: str) -> str:
"""Strip surrounding quotes and any markdown-link wrapper that some
tools have been seen writing into VDF fields, e.g.
'[com.google.Chrome](http://com.google.Chrome)'."""
if not s:
return s
s = s.strip().strip('"')
m = re.match(r"^\[([^\]]+)\]\(.*\)$", s)
if m:
return m.group(1)
return s
@staticmethod
def _extract_flatpak_id_from_launch_options(launch_options: str) -> str:
"""Pull a Flatpak app ID out of a LaunchOptions string.
Some shortcuts have FlatpakAppID empty even when Exe is Flatpak."""
# Collect bare tokens that look like reverse-DNS Flatpak IDs (foo.bar.Baz)
tokens = re.findall(r'"([^"]*)"|(\S+)', launch_options)
for quoted, unquoted in tokens:
tok = quoted or unquoted
if not tok:
continue
# Skip flags and the literal "run" subcommand
if tok.startswith("-") or tok == "run":
continue
# Strip markdown wrapper if some tool added one
m = re.match(r"^\[([^\]]+)\]\(.*\)$", tok)
if m:
tok = m.group(1)
# Heuristic: looks like a reverse-DNS Flatpak ID
if re.match(r"^[a-zA-Z][a-zA-Z0-9_]*(\.[a-zA-Z][a-zA-Z0-9_-]*){2,}$", tok):
return tok
return ""