Skip to content

Commit c59a17a

Browse files
committed
update dependencies and enhance XodusQueue functionality with additional constructors and locking mechanisms
1 parent d07fea3 commit c59a17a

4 files changed

Lines changed: 151 additions & 42 deletions

File tree

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,20 +63,20 @@
6363
<dependency>
6464
<groupId>org.junit.jupiter</groupId>
6565
<artifactId>junit-jupiter-api</artifactId>
66-
<version>6.0.0</version>
66+
<version>6.0.3</version>
6767
<scope>test</scope>
6868
</dependency>
6969
<dependency>
7070
<groupId>org.junit.jupiter</groupId>
7171
<artifactId>junit-jupiter-engine</artifactId>
72-
<version>6.0.0</version>
72+
<version>6.0.3</version>
7373
<scope>test</scope>
7474
</dependency>
7575

7676
<dependency>
7777
<groupId>org.junit.platform</groupId>
7878
<artifactId>junit-platform-launcher</artifactId>
79-
<version>6.0.0</version>
79+
<version>6.0.3</version>
8080
<scope>test</scope>
8181
</dependency>
8282

src/main/java/ch/rasc/xodusqueue/XodusBlockingQueue.java

Lines changed: 132 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package ch.rasc.xodusqueue;
1717

1818
import java.util.Collection;
19+
import java.util.Iterator;
1920
import java.util.Objects;
2021
import java.util.concurrent.BlockingQueue;
2122
import java.util.concurrent.TimeUnit;
@@ -38,6 +39,19 @@ public class XodusBlockingQueue<T> extends XodusQueue<T> implements BlockingQueu
3839

3940
private long capacity;
4041

42+
public XodusBlockingQueue(String databaseDir, Class<T> entryClass) {
43+
this(databaseDir, entryClass, Long.MAX_VALUE);
44+
}
45+
46+
public XodusBlockingQueue(String databaseDir, XodusQueueSerializer<T> serializer) {
47+
this(databaseDir, serializer, Long.MAX_VALUE);
48+
}
49+
50+
public XodusBlockingQueue(LogConfig logConfig, EnvironmentConfig environmentConfig,
51+
XodusQueueSerializer<T> serializer) {
52+
this(logConfig, environmentConfig, serializer, Long.MAX_VALUE);
53+
}
54+
4155
public XodusBlockingQueue(LogConfig logConfig, EnvironmentConfig environmentConfig,
4256
XodusQueueSerializer<T> serializer, long capacity) {
4357
super(logConfig, environmentConfig, serializer);
@@ -161,7 +175,9 @@ public T poll() {
161175
lock.lock();
162176
try {
163177
T e = super.poll();
164-
this.notFull.signal();
178+
if (e != null) {
179+
this.notFull.signal();
180+
}
165181
return e;
166182
}
167183
finally {
@@ -177,7 +193,9 @@ public T take() throws InterruptedException {
177193
while (super.sizeLong() == 0) {
178194
this.notEmpty.await();
179195
}
180-
return poll();
196+
T e = super.poll();
197+
this.notFull.signal();
198+
return e;
181199
}
182200
finally {
183201
lock.unlock();
@@ -196,7 +214,9 @@ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
196214
}
197215
nanos = this.notEmpty.awaitNanos(nanos);
198216
}
199-
return poll();
217+
T e = super.poll();
218+
this.notFull.signal();
219+
return e;
200220
}
201221
finally {
202222
lock.unlock();
@@ -319,4 +339,113 @@ public boolean retainAll(Collection<?> c) {
319339
}
320340
}
321341

342+
@Override
343+
public T peek() {
344+
final ReentrantLock lock = this.reentrantLock;
345+
lock.lock();
346+
try {
347+
return super.peek();
348+
}
349+
finally {
350+
lock.unlock();
351+
}
352+
}
353+
354+
@Override
355+
public int size() {
356+
final ReentrantLock lock = this.reentrantLock;
357+
lock.lock();
358+
try {
359+
return super.size();
360+
}
361+
finally {
362+
lock.unlock();
363+
}
364+
}
365+
366+
@Override
367+
public long sizeLong() {
368+
final ReentrantLock lock = this.reentrantLock;
369+
lock.lock();
370+
try {
371+
return super.sizeLong();
372+
}
373+
finally {
374+
lock.unlock();
375+
}
376+
}
377+
378+
@Override
379+
public boolean isEmpty() {
380+
final ReentrantLock lock = this.reentrantLock;
381+
lock.lock();
382+
try {
383+
return super.isEmpty();
384+
}
385+
finally {
386+
lock.unlock();
387+
}
388+
}
389+
390+
@Override
391+
public boolean contains(Object o) {
392+
final ReentrantLock lock = this.reentrantLock;
393+
lock.lock();
394+
try {
395+
return super.contains(o);
396+
}
397+
finally {
398+
lock.unlock();
399+
}
400+
}
401+
402+
@Override
403+
public boolean containsAll(Collection<?> c) {
404+
final ReentrantLock lock = this.reentrantLock;
405+
lock.lock();
406+
try {
407+
return super.containsAll(c);
408+
}
409+
finally {
410+
lock.unlock();
411+
}
412+
}
413+
414+
@Override
415+
public Object[] toArray() {
416+
final ReentrantLock lock = this.reentrantLock;
417+
lock.lock();
418+
try {
419+
return super.toArray();
420+
}
421+
finally {
422+
lock.unlock();
423+
}
424+
}
425+
426+
@SuppressWarnings({ "unchecked", "hiding" })
427+
@Override
428+
public <T> T[] toArray(T[] a) {
429+
final ReentrantLock lock = this.reentrantLock;
430+
lock.lock();
431+
try {
432+
return super.toArray(a);
433+
}
434+
finally {
435+
lock.unlock();
436+
}
437+
}
438+
439+
@Override
440+
public Iterator<T> iterator() {
441+
final ReentrantLock lock = this.reentrantLock;
442+
lock.lock();
443+
try {
444+
return super.iterator();
445+
}
446+
finally {
447+
lock.unlock();
448+
}
449+
}
450+
322451
}

src/main/java/ch/rasc/xodusqueue/XodusQueue.java

Lines changed: 15 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import java.util.Collection;
2323
import java.util.Iterator;
2424
import java.util.List;
25+
import java.util.Collections;
2526
import java.util.Objects;
26-
import java.util.concurrent.atomic.AtomicLong;
2727

2828
import ch.rasc.xodusqueue.serializer.BigDecimalXodusQueueSerializer;
2929
import ch.rasc.xodusqueue.serializer.BigIntegerXodusQueueSerializer;
@@ -57,8 +57,6 @@ public class XodusQueue<T> extends AbstractQueue<T> implements AutoCloseable {
5757

5858
private final XodusQueueSerializer<T> serializer;
5959

60-
private final AtomicLong key = new AtomicLong(0L);
61-
6260
@SuppressWarnings("unchecked")
6361
public XodusQueue(final String databaseDir, final Class<T> entryClass) {
6462
this.env = Environments.newInstance(databaseDir);
@@ -96,42 +94,17 @@ else if (entryClass == BigDecimal.class) {
9694
else {
9795
this.serializer = new DefaultXodusQueueSerializer<>(entryClass);
9896
}
99-
100-
this.key.set(this.lastKey());
10197
}
10298

10399
public XodusQueue(final String databaseDir, final XodusQueueSerializer<T> serializer) {
104100
this.env = Environments.newInstance(databaseDir);
105101
this.serializer = serializer;
106-
107-
this.key.set(this.lastKey());
108102
}
109103

110104
public XodusQueue(final LogConfig logConfig, final EnvironmentConfig environmentConfig,
111105
final XodusQueueSerializer<T> serializer) {
112106
this.env = Environments.newInstance(logConfig, environmentConfig);
113107
this.serializer = serializer;
114-
115-
this.key.set(this.lastKey());
116-
}
117-
118-
/**
119-
* Returns the last key stored in the queue store or 0 if the store is empty.
120-
*/
121-
private long lastKey() {
122-
return this.env.computeInReadonlyTransaction(txn -> {
123-
Store store = this.env.openStore(STORE_NAME, StoreConfig.WITHOUT_DUPLICATES, txn, false);
124-
if (store == null) {
125-
return 0L;
126-
}
127-
128-
try (Cursor cursor = store.openCursor(txn)) {
129-
if (cursor.getLast()) {
130-
return LongBinding.entryToLong(cursor.getKey());
131-
}
132-
}
133-
return 0L;
134-
});
135108
}
136109

137110
@Override
@@ -149,7 +122,6 @@ public boolean offer(T e) {
149122
}
150123

151124
store.putRight(txn, LongBinding.longToEntry(nextKey), this.serializer.toEntry(e));
152-
this.key.set(nextKey);
153125
});
154126

155127
return true;
@@ -182,10 +154,6 @@ public boolean addAll(Collection<? extends T> c) {
182154
modified = true;
183155
}
184156

185-
if (modified) {
186-
this.key.set(last);
187-
}
188-
189157
return modified;
190158
});
191159
}
@@ -285,7 +253,7 @@ public Iterator<T> iterator() {
285253
}
286254
}
287255
});
288-
return snapshot.iterator();
256+
return Collections.unmodifiableList(snapshot).iterator();
289257
}
290258

291259
@Override
@@ -324,14 +292,23 @@ public <T> T[] toArray(T[] a) {
324292
r[ix++] = (T) this.serializer.fromEntry(value);
325293
}
326294
}
295+
if (r.length > ix) {
296+
r[ix] = null;
297+
}
327298
return r;
328299
}
300+
if (a.length > 0) {
301+
a[0] = null;
302+
}
329303
return a;
330304
});
331305
}
332306

333307
@Override
334308
public boolean remove(Object o) {
309+
if (o == null) {
310+
return false;
311+
}
335312
return this.env.computeInExclusiveTransaction(txn -> {
336313
Store store = this.env.openStore(STORE_NAME, StoreConfig.WITHOUT_DUPLICATES, txn, false);
337314

@@ -352,6 +329,10 @@ public boolean remove(Object o) {
352329

353330
@Override
354331
public boolean containsAll(Collection<?> c) {
332+
Objects.requireNonNull(c);
333+
if (c.isEmpty()) {
334+
return true;
335+
}
355336
return this.env.computeInReadonlyTransaction(txn -> {
356337
Store store = this.env.openStore(STORE_NAME, StoreConfig.WITHOUT_DUPLICATES, txn, false);
357338
if (store != null) {
@@ -428,7 +409,6 @@ public boolean retainAll(Collection<?> c) {
428409
public void clear() {
429410
this.env.executeInExclusiveTransaction(txn -> {
430411
this.env.truncateStore(STORE_NAME, txn);
431-
this.key.set(0L);
432412
});
433413
}
434414

src/test/java/ch/rasc/xodusqueue/XodusQueueTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ void testAdd() {
171171
@Test
172172
void testAddAllCollectionOfQextendsT() {
173173
try (XodusQueue<Integer> queue = new XodusQueue<>("./test", Integer.class)) {
174-
Assertions.assertThrows(NullPointerException.class, null);
174+
Assertions.assertThrows(NullPointerException.class, () -> queue.addAll(null));
175175

176176
Assertions.assertTrue(queue.addAll(Arrays.asList(1, 2, 3, 4, 5)));
177177
Assertions.assertEquals(5, queue.size());

0 commit comments

Comments
 (0)