Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion docs/building_plugins.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ This guide walks you through the process of creating a new plugin for InkyPi.
# update value for next refresh
settings["index"] = settings["index"] + 1
```
- (Optional) If your plugin should sometimes skip playlist display, implement `skip_display_condition`.
- Return `None` to proceed with normal display.
- Return a string to skip this playlist cycle. The string is shown as the reason in the generated plugin preview image.
- This is intended for plugins that have valid periods with nothing to show, such as a sports scoreboard during the offseason.
- If this method fetches data and returns `None`, cache that data in `settings` using a plugin-private, JSON-serializable key so `generate_image` can reuse it instead of making the same external request again.
```python
def skip_display_condition(self, settings, device_config, current_dt):
games = fetch_games(settings, current_dt)
if not games:
return "No games to display"

settings["_scoreboard_games_cache"] = games
return None
```

### 3. Create a Settings Template (Optional)

Expand Down Expand Up @@ -185,4 +199,3 @@ Your repository must include:
See [InkyPi-Plugin-Template](https://github.com/fatihak/InkyPi-Plugin-Template) for a sample template of a third party plugin.

Once you're done, feel free to add your plugin to the [3rd Party Plugin List](https://github.com/fatihak/InkyPi/wiki/3rd-Party-Plugins) and share it in the [🙌 Show and Tell Discussion Board](https://github.com/fatihak/InkyPi/discussions/categories/show-and-tell).

11 changes: 11 additions & 0 deletions src/plugins/base_plugin/base_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ def __init__(self, config, **dependencies):
def generate_image(self, settings, device_config):
raise NotImplementedError("generate_image must be implemented by subclasses")

# Optional playlist self-skip hook. Most plugins should not implement this.
# Implement it only when a plugin might have valid periods with nothing to
# display, such as a sports scoreboard during the offseason. Return None to
# proceed with normal display, or return a human-readable string explaining
# why the plugin should be skipped for this playlist cycle. If this method
# fetches data and then returns None, cache that data in settings using a
# plugin-private, JSON-serializable key so generate_image can reuse it
# instead of making the same external request again.
def skip_display_condition(self, settings, device_config, current_dt):
return None

def cleanup(self, settings):
"""Optional cleanup method that plugins can override to delete associated resources.

Expand Down
80 changes: 73 additions & 7 deletions src/plugins/screenshot/screenshot.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,92 @@
from plugins.base_plugin.base_plugin import BasePlugin
from PIL import Image
from utils.image_utils import take_screenshot
import logging

logger = logging.getLogger(__name__)

class Screenshot(BasePlugin):
DEFAULT_RENDER_WAIT_MS = None

def __init__(self, config, **dependencies):
super().__init__(config, **dependencies)
self._captured_image_cache = {}

def generate_image(self, settings, device_config):
cache_key = self._get_cache_key(settings, device_config)
image = self._captured_image_cache.pop(cache_key, None)
if image:
return image

image = self._capture_screenshot(settings, device_config)

if not image:
raise RuntimeError("Failed to take screenshot, please check logs.")

return image

def skip_display_condition(self, settings, device_config, current_dt):
skip_if_blank = settings.get('skipIfBlank')
# Default to True if not specified, matching the UI default
if skip_if_blank is not None and skip_if_blank not in (True, 'true'):
return None

image = self._capture_screenshot(settings, device_config)
if not image:
raise RuntimeError("Failed to take screenshot, please check logs.")

if self._is_single_color(image):
return "Screenshot is blank"

url = settings.get('url')
self._captured_image_cache[self._get_cache_key(settings, device_config)] = image
return None

def _capture_screenshot(self, settings, device_config):
url = self._get_url(settings)
if not url:
raise RuntimeError("URL is required.")

dimensions = self._get_screenshot_dimensions(device_config)
virtual_time_budget_ms = self._get_virtual_time_budget_ms(settings)

logger.info(f"Taking screenshot of url: {url}")

return take_screenshot(url, dimensions, timeout_ms=40000, virtual_time_budget_ms=virtual_time_budget_ms)

def _get_url(self, settings):
return settings.get('url')

def _get_screenshot_dimensions(self, device_config):
dimensions = device_config.get_resolution()
if device_config.get_config("orientation") == "vertical":
dimensions = dimensions[::-1]
return dimensions

logger.info(f"Taking screenshot of url: {url}")
def _get_virtual_time_budget_ms(self, settings):
render_wait_ms = settings.get('renderWaitMs')
if render_wait_ms in (None, ''):
render_wait_ms = self.DEFAULT_RENDER_WAIT_MS

image = take_screenshot(url, dimensions, timeout_ms=40000)
virtual_time_budget_ms = None
if render_wait_ms:
try:
virtual_time_budget_ms = int(render_wait_ms)
except (TypeError, ValueError):
raise RuntimeError("Render wait must be a whole number of milliseconds.")

if not image:
raise RuntimeError("Failed to take screenshot, please check logs.")
if virtual_time_budget_ms < 0:
raise RuntimeError("Render wait must be zero or greater.")
if virtual_time_budget_ms == 0:
virtual_time_budget_ms = None

return virtual_time_budget_ms

def _get_cache_key(self, settings, device_config):
return (
id(settings),
self._get_url(settings),
settings.get('renderWaitMs'),
self._get_screenshot_dimensions(device_config),
)

return image
def _is_single_color(self, image):
return len(image.convert("RGBA").getcolors(maxcolors=2) or []) == 1
18 changes: 17 additions & 1 deletion src/plugins/screenshot/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,17 @@
<input type="text" id="url" name="url" placeholder="Type something..." required class="form-input">
</div>

<div class="form-group">
<label for="renderWaitMs" class="form-label">Render Wait (milliseconds):</label>
<input type="number" id="renderWaitMs" name="renderWaitMs" min="0" step="100" value="0" class="form-input">
<span style="color: var(--text-primary);">Optional. Allows pages with delayed JavaScript rendering to finish before the screenshot is captured. Use 0 to disable.</span>
</div>

<div class="form-group">
<input type="checkbox" id="skipIfBlank" name="skipIfBlank" value="true" checked onclick="this.value=this.checked ? 'true' : 'false';">
<span style="color: var(--text-primary);">Skip if blank: Automatically skip display if the screenshot is a single color.</span>
</div>

<div class="form-group">
<span style="color: var(--text-primary);">Warning: Do not run with untrusted URLs, as it may pose a security risk. This may also fail if the website takes too long to load.</span>
</div>
Expand All @@ -12,6 +23,11 @@
document.addEventListener('DOMContentLoaded', () => {
if (loadPluginSettings) {
document.getElementById('url').value = pluginSettings.url;
document.getElementById('renderWaitMs').value = pluginSettings.renderWaitMs || '0';

const skipIfBlank = pluginSettings.skipIfBlank !== undefined ? (pluginSettings.skipIfBlank === 'true' || pluginSettings.skipIfBlank === true) : true;
document.getElementById('skipIfBlank').checked = skipIfBlank;
document.getElementById('skipIfBlank').value = skipIfBlank ? 'true' : 'false';
}
});
</script>
</script>
144 changes: 141 additions & 3 deletions src/refresh_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from plugins.plugin_registry import get_plugin_instance
from utils.image_utils import compute_image_hash
from model import RefreshInfo, PlaylistManager
from PIL import Image
from PIL import Image, ImageDraw, ImageFont

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -106,6 +106,18 @@ def _run(self):
refresh_action = PlaylistRefresh(playlist, plugin_instance)

if refresh_action:
skipped_plugins = False
if isinstance(refresh_action, PlaylistRefresh):
if refresh_action.force:
refresh_action, skipped_plugins = self._get_forced_playlist_refresh(refresh_action, current_dt)
else:
refresh_action, skipped_plugins = self._get_displayable_playlist_refresh(refresh_action, current_dt)

if not refresh_action:
if skipped_plugins:
self.device_config.write_config()
continue

plugin_config = self.device_config.get_plugin(refresh_action.get_plugin_id())
if plugin_config is None:
logger.error(f"Plugin config not found for '{refresh_action.get_plugin_id()}'.")
Expand Down Expand Up @@ -186,6 +198,51 @@ def _determine_next_plugin(self, playlist_manager, latest_refresh_info, current_
logger.info(f"Determined next plugin. | active_playlist: {playlist.name} | plugin_instance: {plugin.name}")

return playlist, plugin

def _get_displayable_playlist_refresh(self, refresh_action, current_dt):
"""Returns the next playlist refresh action whose plugin does not self-skip."""
playlist = refresh_action.playlist
skipped_plugins = False

for _ in range(len(playlist.plugins)):
plugin_config = self.device_config.get_plugin(refresh_action.get_plugin_id())
if plugin_config is None:
logger.error(f"Plugin config not found for '{refresh_action.get_plugin_id()}'.")
return None, skipped_plugins

plugin = get_plugin_instance(plugin_config)
skip_reason = plugin.skip_display_condition(refresh_action.plugin_instance.settings, self.device_config, current_dt)
if skip_reason is None:
return refresh_action, skipped_plugins

skipped_plugins = True
logger.info(
f"Plugin skipped display. | plugin_instance: {refresh_action.plugin_instance.name} | reason: {skip_reason}"
)
refresh_action.save_skip_image(self.device_config, current_dt, skip_reason)
next_plugin = playlist.get_next_plugin()
refresh_action = PlaylistRefresh(playlist, next_plugin)

logger.info(f"All plugins skipped display. | active_playlist: {playlist.name}")
return None, skipped_plugins

def _get_forced_playlist_refresh(self, refresh_action, current_dt):
"""Returns a forced playlist refresh unless that plugin self-skips."""
plugin_config = self.device_config.get_plugin(refresh_action.get_plugin_id())
if plugin_config is None:
logger.error(f"Plugin config not found for '{refresh_action.get_plugin_id()}'.")
return None, False

plugin = get_plugin_instance(plugin_config)
skip_reason = plugin.skip_display_condition(refresh_action.plugin_instance.settings, self.device_config, current_dt)
if skip_reason is None:
return refresh_action, False

logger.info(
f"Plugin skipped forced display. | plugin_instance: {refresh_action.plugin_instance.name} | reason: {skip_reason}"
)
refresh_action.save_skip_image(self.device_config, current_dt, skip_reason)
return None, True

def log_system_stats(self):
metrics = {
Expand Down Expand Up @@ -271,18 +328,99 @@ def execute(self, plugin, device_config, current_dt: datetime):
"""Performs a refresh for the specified plugin instance within its playlist context."""
# Determine the file path for the plugin's image
plugin_image_path = os.path.join(device_config.plugin_image_dir, self.plugin_instance.get_image_path())
has_skip_preview = self.plugin_instance.settings.get("_inkypi_skip_preview", False)

# Check if a refresh is needed based on the plugin instance's criteria
if self.plugin_instance.should_refresh(current_dt) or self.force:
if self.plugin_instance.should_refresh(current_dt) or self.force or has_skip_preview:
logger.info(f"Refreshing plugin instance. | plugin_instance: '{self.plugin_instance.name}'")
# Generate a new image
image = plugin.generate_image(self.plugin_instance.settings, device_config)
image.save(plugin_image_path)
self.plugin_instance.settings.pop("_inkypi_skip_preview", None)
self.plugin_instance.latest_refresh_time = current_dt.isoformat()
else:
logger.info(f"Not time to refresh plugin instance, using latest image. | plugin_instance: {self.plugin_instance.name}.")
# Load the existing image from disk
with Image.open(plugin_image_path) as img:
image = img.copy()

return image
return image

def save_skip_image(self, device_config, current_dt, reason):
"""Save a diagnostic image explaining why this plugin skipped display."""
plugin_image_path = os.path.join(device_config.plugin_image_dir, self.plugin_instance.get_image_path())
image = self._generate_skip_image(device_config, reason)
image.save(plugin_image_path)

# The skipped preview replaces the cached plugin image. Mark it so the
# next non-skipped cycle renders fresh content instead of displaying it.
self.plugin_instance.settings["_inkypi_skip_preview"] = True
self.plugin_instance.latest_refresh_time = current_dt.isoformat()

def _generate_skip_image(self, device_config, reason):
dimensions = device_config.get_resolution()
width, height = dimensions
image = Image.new("RGB", dimensions, "white")
draw = ImageDraw.Draw(image)

title_font = self._load_skip_image_font(max(14, min(width, height) // 10), bold=True)
subtitle_font = self._load_skip_image_font(max(12, min(width, height) // 12), bold=True)
body_font = self._load_skip_image_font(max(10, min(width, height) // 16))
label_font = self._load_skip_image_font(max(10, min(width, height) // 16), bold=True)

margin = max(8, min(width, height) // 12)
max_text_width = width - (margin * 2)
lines = [
(f"Plugin: {self.plugin_instance.name}", title_font),
("Skipped Display", subtitle_font),
]
if reason:
lines.append(("Reason:", label_font))
for line in self._wrap_skip_image_text(draw, str(reason), body_font, max_text_width):
lines.append((line, body_font))

line_spacing = max(2, height // 40)
line_heights = []
for text, font in lines:
bbox = draw.textbbox((0, 0), text, font=font)
line_heights.append(bbox[3] - bbox[1])

total_height = sum(line_heights) + (line_spacing * (len(lines) - 1))
y = max(margin, (height - total_height) // 2)

for index, (text, font) in enumerate(lines):
bbox = draw.textbbox((0, 0), text, font=font)
text_width = bbox[2] - bbox[0]
x = max(margin, (width - text_width) // 2)
draw.text((x, y), text, fill="black", font=font)
y += line_heights[index] + line_spacing

return image

def _load_skip_image_font(self, size, bold=False):
font_name = "Jost-SemiBold.ttf" if bold else "Jost.ttf"
font_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static", "fonts", font_name)
try:
return ImageFont.truetype(font_path, size)
except OSError:
return ImageFont.load_default()

def _wrap_skip_image_text(self, draw, text, font, max_width):
lines = []
for paragraph in text.splitlines() or [""]:
words = paragraph.split()
if not words:
lines.append("")
continue

current_line = words[0]
for word in words[1:]:
candidate = f"{current_line} {word}"
if draw.textlength(candidate, font=font) <= max_width:
current_line = candidate
else:
lines.append(current_line)
current_line = word
lines.append(current_line)

return lines
8 changes: 5 additions & 3 deletions src/utils/image_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,15 @@ def compute_image_hash(image):
img_bytes = image.tobytes()
return hashlib.sha256(img_bytes).hexdigest()

def take_screenshot_html(html_str, dimensions, timeout_ms=None):
def take_screenshot_html(html_str, dimensions, timeout_ms=None, virtual_time_budget_ms=None):
image = None
try:
# Create a temporary HTML file
with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as html_file:
html_file.write(html_str.encode("utf-8"))
html_file_path = html_file.name

image = take_screenshot(html_file_path, dimensions, timeout_ms)
image = take_screenshot(html_file_path, dimensions, timeout_ms, virtual_time_budget_ms)

# Remove html file
os.remove(html_file_path)
Expand All @@ -117,7 +117,7 @@ def _find_chromium_binary():
return None


def take_screenshot(target, dimensions, timeout_ms=None):
def take_screenshot(target, dimensions, timeout_ms=None, virtual_time_budget_ms=None):
image = None
try:
# Find available browser binary
Expand Down Expand Up @@ -153,6 +153,8 @@ def take_screenshot(target, dimensions, timeout_ms=None):
]
if timeout_ms:
command.append(f"--timeout={timeout_ms}")
if virtual_time_budget_ms:
command.append(f"--virtual-time-budget={virtual_time_budget_ms}")
result = subprocess.run(command, capture_output=True, check=False)

# Check if the process failed or the output file is missing
Expand Down
Loading