|
1 | 1 | from dataclasses import dataclass |
2 | 2 |
|
3 | | -import gi |
4 | 3 | import numpy as np |
5 | 4 | from scipy.spatial.transform import Rotation |
6 | 5 |
|
7 | | -gi.require_version('Gst', '1.0') |
8 | | -gi.require_version('GstApp', '1.0') |
9 | | -from gi.repository import GLib, Gst |
10 | | - |
11 | 6 |
|
12 | 7 | def ssa(angle: float) -> float: |
13 | 8 | return (angle + np.pi) % (2 * np.pi) - np.pi |
@@ -125,108 +120,3 @@ def __add__(self, other: "State") -> "State": |
125 | 120 |
|
126 | 121 | def __sub__(self, other: "State") -> "State": |
127 | 122 | return State(pose=self.pose - other.pose, twist=self.twist - other.twist) |
128 | | - |
129 | | - |
130 | | -class H264Decoder: |
131 | | - """Decodes H.264 streams using GStreamer.""" |
132 | | - |
133 | | - _gst_initialized = False |
134 | | - |
135 | | - def __init__(self): |
136 | | - """Initializes the H.264 decoder and sets up the GStreamer pipeline.""" |
137 | | - # Ensure GStreamer is initialized only once |
138 | | - if not H264Decoder._gst_initialized: |
139 | | - Gst.init(None) |
140 | | - H264Decoder._gst_initialized = True |
141 | | - |
142 | | - pipeline_desc = ( |
143 | | - "appsrc name=mysrc is-live=true ! " |
144 | | - "h264parse ! " |
145 | | - "avdec_h264 ! " |
146 | | - "videoconvert ! video/x-raw,format=BGR ! " |
147 | | - "appsink name=appsink" |
148 | | - ) |
149 | | - |
150 | | - self._pipeline = Gst.parse_launch(pipeline_desc) |
151 | | - self.appsrc = self._pipeline.get_by_name("mysrc") |
152 | | - self._appsink = self._pipeline.get_by_name("appsink") |
153 | | - |
154 | | - self._appsink.set_property("emit-signals", True) |
155 | | - self._appsink.set_property("sync", False) |
156 | | - self._appsink.connect("new-sample", self._on_new_sample) |
157 | | - |
158 | | - self._bus = self._pipeline.get_bus() |
159 | | - self._bus.add_signal_watch() |
160 | | - self._bus.connect("message", self._on_bus_message) |
161 | | - |
162 | | - self._main_loop = None |
163 | | - |
164 | | - self.decoded_frames = [] |
165 | | - self.max_frames = 3 # Keep only the last 3 frames here |
166 | | - |
167 | | - def start(self): |
168 | | - """Starts the GStreamer pipeline and runs the main event loop.""" |
169 | | - self._pipeline.set_state(Gst.State.PLAYING) |
170 | | - self._main_loop = GLib.MainLoop() |
171 | | - try: |
172 | | - self._main_loop.run() |
173 | | - except KeyboardInterrupt: |
174 | | - pass |
175 | | - finally: |
176 | | - self.stop() |
177 | | - |
178 | | - def stop(self): |
179 | | - """Stops the GStreamer pipeline and cleans up resources.""" |
180 | | - if self._pipeline: |
181 | | - self._pipeline.set_state(Gst.State.NULL) |
182 | | - if self._main_loop is not None: |
183 | | - self._main_loop.quit() |
184 | | - self._main_loop = None |
185 | | - |
186 | | - def push_data(self, data: bytes): |
187 | | - """Pushes H.264 encoded data into the pipeline for decoding.""" |
188 | | - if not self.appsrc: |
189 | | - raise RuntimeError( |
190 | | - "The pipeline's appsrc element was not found or initialized." |
191 | | - ) |
192 | | - gst_buffer = Gst.Buffer.new_allocate(None, len(data), None) |
193 | | - gst_buffer.fill(0, data) |
194 | | - self.appsrc.emit("push-buffer", gst_buffer) |
195 | | - |
196 | | - def _on_bus_message(self, bus, message): |
197 | | - """Handles messages from the GStreamer bus.""" |
198 | | - msg_type = message.type |
199 | | - if msg_type == Gst.MessageType.ERROR: |
200 | | - err, debug = message.parse_error() |
201 | | - print(f"GStreamer ERROR: {err}, debug={debug}") |
202 | | - self.stop() |
203 | | - elif msg_type == Gst.MessageType.EOS: |
204 | | - print("End-Of-Stream reached.") |
205 | | - self.stop() |
206 | | - |
207 | | - def _on_new_sample(self, sink): |
208 | | - """Processes a new decoded video frame from the appsink.""" |
209 | | - sample = sink.emit("pull-sample") |
210 | | - if not sample: |
211 | | - return Gst.FlowReturn.ERROR |
212 | | - |
213 | | - buf = sample.get_buffer() |
214 | | - caps_format = sample.get_caps().get_structure(0) |
215 | | - width = caps_format.get_value("width") |
216 | | - height = caps_format.get_value("height") |
217 | | - |
218 | | - success, map_info = buf.map(Gst.MapFlags.READ) |
219 | | - if not success: |
220 | | - return Gst.FlowReturn.ERROR |
221 | | - |
222 | | - frame_data = np.frombuffer(map_info.data, dtype=np.uint8) |
223 | | - channels = len(frame_data) // (width * height) # typically 3 (BGR) or 4 (BGRA) |
224 | | - frame_data_reshaped = frame_data.reshape((height, width, channels)) |
225 | | - |
226 | | - self.decoded_frames.append(frame_data_reshaped.copy()) |
227 | | - |
228 | | - if len(self.decoded_frames) > self.max_frames: |
229 | | - self.decoded_frames.pop(0) |
230 | | - |
231 | | - buf.unmap(map_info) |
232 | | - return Gst.FlowReturn.OK |
0 commit comments