-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathcircular_buffer.py
More file actions
executable file
·43 lines (36 loc) · 1.29 KB
/
circular_buffer.py
File metadata and controls
executable file
·43 lines (36 loc) · 1.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from datastructures.circular_buffer.exceptions import (
BufferFullException,
BufferEmptyException,
)
class CircularBuffer:
def __init__(self, size):
self.buffer = bytearray(size)
self.read_point = 0
self.write_point = 0
def _update_buffer(self, data):
"""
Protected helper method for Python 2/3
"""
try:
self.buffer[self.write_point] = data
except TypeError:
self.buffer[self.write_point] = ord(data)
def clear(self):
self.buffer = bytearray(len(self.buffer))
def write(self, data):
if all(self.buffer):
raise BufferFullException
self._update_buffer(data)
self.write_point = (self.write_point + 1) % len(self.buffer)
def read(self):
if not any(self.buffer):
raise BufferEmptyException
data = chr(self.buffer[self.read_point])
self.buffer[self.read_point] = 0
self.read_point = (self.read_point + 1) % len(self.buffer)
return data
def overwrite(self, data):
self._update_buffer(data)
if all(self.buffer) and self.read_point == self.write_point:
self.read_point = (self.read_point + 1) % len(self.buffer)
self.write_point = (self.write_point + 1) % len(self.buffer)