-
-
Notifications
You must be signed in to change notification settings - Fork 34.4k
Expand file tree
/
Copy pathbase_eventqueue.py
More file actions
158 lines (135 loc) · 5.39 KB
/
base_eventqueue.py
File metadata and controls
158 lines (135 loc) · 5.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright 2000-2008 Michael Hudson-Doyle <micahel@gmail.com>
# Armin Rigo
#
# All Rights Reserved
#
#
# Permission to use, copy, modify, and distribute this software and
# its documentation for any purpose is hereby granted without fee,
# provided that the above copyright notice appear in all copies and
# that both that copyright notice and this permission notice appear in
# supporting documentation.
#
# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""
OS-independent base for an event and VT sequence scanner
See unix_eventqueue and windows_eventqueue for subclasses.
"""
from collections import deque
from . import keymap
from .console import Event
from .trace import trace
class BaseEventQueue:
_ESCAPE_TIMEOUT_MS = 50
def __init__(self, encoding: str, keymap_dict: dict[bytes, str]) -> None:
self.compiled_keymap = keymap.compile_keymap(keymap_dict)
self.keymap = self.compiled_keymap
trace("keymap {k!r}", k=self.keymap)
self.encoding = encoding
self.events: deque[Event] = deque()
self.buf = bytearray()
self._pending_escape_deadline: float | None = None
def get(self) -> Event | None:
"""
Retrieves the next event from the queue.
"""
if self.events:
return self.events.popleft()
else:
return None
def empty(self) -> bool:
"""
Checks if the queue is empty.
"""
return not self.events
def flush_buf(self) -> bytearray:
"""
Flushes the buffer and returns its contents.
"""
old = self.buf
self.buf = bytearray()
return old
def insert(self, event: Event) -> None:
"""
Inserts an event into the queue.
"""
trace('added event {event}', event=event)
self.events.append(event)
def has_pending_escape_sequence(self) -> bool:
"""
Check if there's a potential escape sequence waiting for more input.
Returns True if we have exactly one byte (ESC) in the buffer and
we're in the middle of keymap navigation, indicating we're waiting
to see if more bytes will arrive to complete an escape sequence.
"""
return (
len(self.buf) == 1
and self.buf[0] == 27 # ESC byte
and self.keymap is not self.compiled_keymap
)
def should_emit_standalone_escape(self, current_time_ms: float) -> bool:
"""
Check if a pending ESC should be emitted as a standalone escape key.
"""
if not self.has_pending_escape_sequence():
return False
if self._pending_escape_deadline is None:
self._pending_escape_deadline = current_time_ms + self._ESCAPE_TIMEOUT_MS
return False
return current_time_ms >= self._pending_escape_deadline
def emit_standalone_escape(self) -> None:
"""
Emit the buffered ESC byte as a standalone escape key event.
"""
self.keymap = self.compiled_keymap
# Standalone ESC event
self.insert(Event('key', '\033', b'\033'))
# Just in case there are remaining bytes in the buffer
remaining = self.flush_buf()[1:]
for byte in remaining:
self.push(byte)
self._pending_escape_deadline = None
def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
"""
assert isinstance(char, (int, bytes))
ord_char = char if isinstance(char, int) else ord(char)
char = ord_char.to_bytes()
self.buf.append(ord_char)
if self._pending_escape_deadline is not None:
self._pending_escape_deadline = None
if char in self.keymap:
if self.keymap is self.compiled_keymap:
# sanity check, buffer is empty when a special key comes
assert len(self.buf) == 1
k = self.keymap[char]
trace('found map {k!r}', k=k)
if isinstance(k, dict):
self.keymap = k
else:
self.insert(Event('key', k, bytes(self.flush_buf())))
self.keymap = self.compiled_keymap
elif self.buf and self.buf[0] == 27: # escape
# escape sequence not recognized by our keymap: propagate it
# outside so that i can be recognized as an M-... key (see also
# the docstring in keymap.py
trace('unrecognized escape sequence, propagating...')
self.keymap = self.compiled_keymap
self.insert(Event('key', '\033', b'\033'))
for _c in self.flush_buf()[1:]:
self.push(_c)
else:
try:
decoded = bytes(self.buf).decode(self.encoding)
except UnicodeError:
return
else:
self.insert(Event('key', decoded, bytes(self.flush_buf())))
self.keymap = self.compiled_keymap