-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathimage_utils.py
More file actions
171 lines (143 loc) · 5.73 KB
/
image_utils.py
File metadata and controls
171 lines (143 loc) · 5.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# image_utils.py — PNG fetch, decode, scale, and Floyd-Steinberg dither
# Target: 800x480, 1-bit packed output (48000 bytes, MSB-first per row)
import gc
import adafruit_pngdecoder
_TARGET_W = 800
_TARGET_H = 480
_BUF_SIZE = _TARGET_W * _TARGET_H // 8 # 48000
# ---------------------------------------------------------------------------
# Network helpers
# ---------------------------------------------------------------------------
def fetch_xkcd_info(requests) -> dict:
"""Fetch latest XKCD metadata. Returns dict with num, img, alt keys."""
response = requests.get("https://xkcd.com/info.0.json")
data = response.json()
response.close()
return data
def download_png(requests, url: str) -> bytearray:
"""Stream PNG from url into a bytearray. Raises MemoryError if too large."""
response = requests.get(url, stream=True)
chunks = []
total = 0
for chunk in response.iter_content(chunk_size=4096):
chunks.append(bytes(chunk))
total += len(chunk)
response.close()
buf = bytearray(total)
pos = 0
for c in chunks:
buf[pos:pos + len(c)] = c
pos += len(c)
chunks.clear()
gc.collect()
return buf
# ---------------------------------------------------------------------------
# Image pipeline
# ---------------------------------------------------------------------------
def _to_grayscale_row(row, mode: str) -> bytearray:
"""
Convert a single decoded PNG row to 8-bit grayscale.
mode: 'RGB', 'RGBA', 'L', 'LA', or 'P' (palette, treated as L)
Returns bytearray of length = number of pixels in row.
"""
if mode in ('L', 'P'):
return bytearray(row)
if mode == 'LA':
return bytearray(row[i] for i in range(0, len(row), 2))
if mode == 'RGB':
out = bytearray(len(row) // 3)
for i in range(len(out)):
r, g, b = row[i * 3], row[i * 3 + 1], row[i * 3 + 2]
out[i] = (77 * r + 150 * g + 29 * b) >> 8
return out
if mode == 'RGBA':
out = bytearray(len(row) // 4)
for i in range(len(out)):
r, g, b = row[i * 4], row[i * 4 + 1], row[i * 4 + 2]
out[i] = (77 * r + 150 * g + 29 * b) >> 8
return out
# Fallback: assume grayscale
return bytearray(row)
def decode_and_process(png_bytes: bytearray,
target_w: int = _TARGET_W,
target_h: int = _TARGET_H) -> bytearray:
"""
Decode PNG, scale to fit target dimensions with letterboxing, apply
Floyd-Steinberg dither, and return packed 1-bit buffer (MSB-first).
Returns bytearray of target_w * target_h // 8 bytes.
"""
# --- Decode PNG rows into a grayscale pixel array ---
decoder = adafruit_pngdecoder.PngDecoder(png_bytes)
src_w = decoder.width
src_h = decoder.height
mode = decoder.color_mode # e.g. 'RGB', 'L', etc.
# Allocate grayscale source image (8 MB PSRAM on S3 gives ample headroom)
gray = bytearray(src_w * src_h)
for y in range(src_h):
row = decoder.read_row()
g_row = _to_grayscale_row(row, mode)
start = y * src_w
gray[start:start + src_w] = g_row[:src_w]
del decoder
gc.collect()
# --- Compute letterbox scale and offsets ---
scale_x = target_w / src_w
scale_y = target_h / src_h
scale = min(scale_x, scale_y)
scaled_w = int(src_w * scale)
scaled_h = int(src_h * scale)
off_x = (target_w - scaled_w) // 2
off_y = (target_h - scaled_h) // 2
# --- Build scaled grayscale target (float for dithering) ---
# Use a flat array of 16-bit ints (0-255) to hold pixel + error values.
# We process row by row to save memory: only two rows needed at once.
# Allocate output bit buffer (packed, 1-bit)
out_bits = bytearray(_BUF_SIZE)
# Allocate a working row (scaled_w wide) with space for error propagation.
# We use signed integers (store as int, allow negative/>255 before clamping).
# Two rows: current and next.
err_cur = [0] * (target_w + 2)
err_next = [0] * (target_w + 2)
for ty in range(target_h):
# Reset next-row error accumulator
for i in range(len(err_next)):
err_next[i] = 0
for tx in range(target_w):
# Determine source pixel value (with letterbox padding = white=255)
if ty < off_y or ty >= off_y + scaled_h or tx < off_x or tx >= off_x + scaled_w:
pixel = 255 # padding: white
else:
# Nearest-neighbor map to source
sx = int((tx - off_x) / scale)
sy = int((ty - off_y) / scale)
sx = min(sx, src_w - 1)
sy = min(sy, src_h - 1)
pixel = gray[sy * src_w + sx]
# Add accumulated error (clamp to 0-255)
pixel_e = pixel + err_cur[tx + 1]
if pixel_e < 0:
pixel_e = 0
elif pixel_e > 255:
pixel_e = 255
# Threshold
if pixel_e >= 128:
bit = 1 # white
quant_err = pixel_e - 255
else:
bit = 0 # black
quant_err = pixel_e
# Pack bit into output buffer (MSB first)
byte_idx = (ty * target_w + tx) >> 3
bit_idx = 7 - (tx & 7)
if bit:
out_bits[byte_idx] |= (1 << bit_idx)
# Distribute Floyd-Steinberg error
err_cur [tx + 2] += quant_err * 7 >> 4 # right
err_next[tx ] += quant_err * 3 >> 4 # down-left
err_next[tx + 1] += quant_err * 5 >> 4 # down
err_next[tx + 2] += quant_err * 1 >> 4 # down-right
# Swap error rows
err_cur, err_next = err_next, err_cur
del gray
gc.collect()
return out_bits