From a3cc8deefc5000a18b7f5818f04ea4a467ae9256 Mon Sep 17 00:00:00 2001 From: julianev Date: Sat, 30 May 2020 18:28:33 +0200 Subject: [PATCH 01/10] Slide-By-Tuple Window Implementation and Test --- .../scotty/core/windowType/SessionWindow.java | 6 + .../core/windowType/SlideByTupleWindow.java | 189 ++++++++++ .../windowContext/WindowContext.java | 11 + .../tub/dima/scotty/slicing/SliceManager.java | 10 +- .../windowTest/SlideByTupleWindowTest.java | 332 ++++++++++++++++++ 5 files changed, 547 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java create mode 100644 slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java index c30128aa..8929f682 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java @@ -3,6 +3,8 @@ import de.tub.dima.scotty.core.*; import de.tub.dima.scotty.core.windowType.windowContext.*; +import java.util.ArrayList; + public class SessionWindow implements ForwardContextAware { private final WindowMeasure measure; @@ -115,6 +117,10 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, } } + @Override + public ActiveWindow updateContextWindows(Object element, long ts, ArrayList listOfTs) { + return null; + } } diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java new file mode 100644 index 00000000..161098d4 --- /dev/null +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java @@ -0,0 +1,189 @@ +package de.tub.dima.scotty.core.windowType; + +import de.tub.dima.scotty.core.WindowCollector; +import de.tub.dima.scotty.core.windowType.windowContext.WindowContext; + +import java.util.ArrayList; + +public class SlideByTupleWindow implements ForwardContextAware { + + private final WindowMeasure measure; + private final long size; + private final long slide; + private final boolean outOfOrder; + + /** + * @param size size of the SlideByTuple window + * @param slide window slide step in tuple counts + */ + public SlideByTupleWindow(long size, long slide, boolean outOfOrder){ + this.measure = WindowMeasure.Time; + this.size = size; + this.slide = slide; + this.outOfOrder = outOfOrder; + } + + public long getSize() { + return size; + } + + public long getSlide() { + return slide; + } + + @Override + public WindowMeasure getWindowMeasure() { + return measure; + } + + @Override + public SlideByTupleContext createContext() { + return new SlideByTupleContext(); + } + + public class SlideByTupleContext extends WindowContext { + + private long nextStart = 0; //start count of next window + private long count = 0; //tuple-counter + + @Override + public ActiveWindow updateContext(Object tuple, long position){ + //processes in-order tuples + if (hasActiveWindows()) { + //append first window + addNewWindow(0, position, position + size); + count++; + nextStart += getSlide(); + return getWindow(0); + } + + int windowIndex = getWindowIndex(position); + + if (windowIndex == -1) { + addNewWindow(0, position, position + size); + count++; + nextStart += getSlide(); + } else { + if (count == nextStart) { //new window starts + count++; + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, position, position + size); + } else { + ActiveWindow w = getWindow(windowIndex); + if (w.getEnd() > position) { //append to active window + count++; + return w; + } else{ + /* Tuple, which does not belong to current window and where count != nextStart, + does not get included in any window instance */ + count++; + } + } + } + return null; + } + + @Override + public ActiveWindow updateContextWindows(Object tuple, long position, ArrayList listOfTs){ + //processes out-of-order tuples + int windowIndex = getWindowIndex(position); + if(windowIndex+1 <= (numberOfActiveWindows()-1)){ + //windows after the tuple exist, they have to be shifted + //beginning from the next window, to which the tuples does not belong + for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){ + ActiveWindow w = getWindow(i); + int index = listOfTs.indexOf(w.getStart()); + long timestampBefore = (long) listOfTs.get(index-1); + + if(position == timestampBefore){ //start of window has to be shifted to current tuple + // shift start and modify slice, otherwise insertion into wrong slice + shiftStart(w,timestampBefore); + }else{ + // shift start and dont mofify slice, simple insertion into existing slice possible + shiftStartDontModify(w,timestampBefore); + } + shiftEnd(w,timestampBefore+size); + } + count++; + return null; + }else{ + //no subsequent windows exist, out-of-order tuple may starts a new window + long lateCount = listOfTs.indexOf(position); //count of current tuple + long windowStart = getWindow(windowIndex).getStart(); + int indexOfStart = listOfTs.indexOf(windowStart); //tuple count on position of start of last window + long lateNextStart = indexOfStart + getSlide(); //tuple count of next start + + if(lateCount==lateNextStart){ // tuple starts a new window + nextStart += getSlide(); //update next start + addNewWindow(windowIndex + 1, position, position + size); + } + count++; + } + return null; + } + + public int getWindowIndex(long position) { + //returns newest window + int i = numberOfActiveWindows()-1; + for (; i >= 0 ; i--) { + ActiveWindow p = getWindow(i); + if (p.getStart() <= position) { + return i; + } + } + return -1; + } + + @Override + public long assignNextWindowStart(long position) { + if(slide > 1 && !outOfOrder) { + if (count == nextStart) { + // new Window starts, append new Slice + return position; + } else { + // determine if new slice is needed + int windowIndex = getWindowIndex(position); + if(windowIndex == -1){ + return position; + } + ActiveWindow w = getWindow(windowIndex); + if (windowIndex == 0) { // first window: all tuples belong to the first slice + return w.getEnd(); + } else { + ActiveWindow wBefore = getWindow(windowIndex - 1); + if (wBefore.getEnd() <= position) { + //tuple before does not belong to window before -> append current value to slice + return w.getEnd(); + } else { + //tuple before also belongs to the window before or is outlier -> new slice + return wBefore.getEnd(); + } + } + } + }else { //append one slice for each tuple for out-of-order streams or slide == 1 + return position; + } + } + + @Override + public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark) { + ActiveWindow w = getWindow(0); + while (w.getEnd() <= currentWatermark) { + aggregateWindows.trigger(w.getStart(), w.getEnd() , measure); + removeWindow(0); + if (hasActiveWindows()) + return; + w = getWindow(0); + } + } + } + + @Override + public String toString() { + return "SlideByTupleWindow{" + + "measure=" + measure + + ", size=" + size + + ", slide=" + slide + + '}'; + } +} diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index bb0730ed..3611bbe6 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -57,6 +57,11 @@ public void shiftStart(ActiveWindow window, long position) { window.setStart(position); } + public void shiftStartDontModify(ActiveWindow window, long position) { // does not modify the start edge + //modifiedWindowEdges.add(new ShiftModification(window.start, position)); + window.setStart(position); + } + public void shiftEnd(ActiveWindow window, long position) { //modifiedWindowEdges.add(new ShiftModification(window.end, position)); window.setEnd(position); @@ -70,6 +75,12 @@ public final ActiveWindow updateContext(Tuple tuple, long position, Set listOfTs); //For out-of-order processing + + public ActiveWindow updateContextWindows(Tuple element, long ts, ArrayList listOfTs, Set windowModifications) { + this.modifiedWindowEdges = windowModifications; + return updateContextWindows(element, ts, listOfTs); + } public abstract long assignNextWindowStart(long position); public abstract void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark); diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index 3c9a9a6f..db585f0c 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -1,5 +1,6 @@ package de.tub.dima.scotty.slicing; +import de.tub.dima.scotty.core.windowType.SlideByTupleWindow; import de.tub.dima.scotty.core.windowType.windowContext.*; import de.tub.dima.scotty.slicing.aggregationstore.*; import de.tub.dima.scotty.slicing.slice.*; @@ -11,6 +12,7 @@ public class SliceManager { private final SliceFactory sliceFactory; private final AggregationStore aggregationStore; private final WindowManager windowManager; + public ArrayList listOfTs = new ArrayList(); public SliceManager(final SliceFactory sliceFactory, final AggregationStore aggregationStore, final WindowManager windowManager) { this.sliceFactory = sliceFactory; @@ -51,6 +53,7 @@ public void processElement(InputType element, long ts) { } final Slice currentSlice = this.aggregationStore.getCurrentSlice(); + listOfTs.add(ts); // is element in order? if (ts >= currentSlice.getTLast()) { @@ -66,7 +69,12 @@ public void processElement(InputType element, long ts) { for (WindowContext windowContext : this.windowManager.getContextAwareWindows()) { Set windowModifications = new HashSet<>(); - WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); + if (!(windowContext instanceof SlideByTupleWindow.SlideByTupleContext)){ + WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); + }else { + Collections.sort(listOfTs); // for out-of-order processing of Slide-By-Tuple Windows + WindowContext.ActiveWindow assignedWindow = windowContext.updateContextWindows(element, ts, listOfTs, windowModifications); + } checkSliceEdges(windowModifications); } diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java new file mode 100644 index 00000000..b1d8b8c7 --- /dev/null +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java @@ -0,0 +1,332 @@ +package de.tub.dima.scotty.slicing.aggregationstore.test.windowTest; + +import de.tub.dima.scotty.core.AggregateWindow; +import de.tub.dima.scotty.core.windowFunction.ReduceAggregateFunction; +import de.tub.dima.scotty.core.windowType.SlideByTupleWindow; +import de.tub.dima.scotty.slicing.SlicingWindowOperator; +import de.tub.dima.scotty.state.memory.MemoryStateFactory; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class SlideByTupleWindowTest { + + private SlicingWindowOperator slicingWindowOperator; + private MemoryStateFactory stateFactory; + + @Before + public void setup() { + this.stateFactory = new MemoryStateFactory(); + this.slicingWindowOperator = new SlicingWindowOperator(stateFactory); + } + + @Test + public void inOrderSlide1() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1, false)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + // Assert for slide = 1 + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),3, 13, 3); + WindowAssert.assertEquals(resultWindows.get(2),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(3),8, 18, 3); + WindowAssert.assertEquals(resultWindows.get(4),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(5),17, 27, 3); + WindowAssert.assertEquals(resultWindows.get(6),24, 34, 2); + WindowAssert.assertEquals(resultWindows.get(7),24, 34, 2); + } + + @Test + public void inOrderSlide2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2, false)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + // Assert for slide = 2 + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),24, 34, 1); + } + + + @Test + public void inOrderTest() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5, false)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 7); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 8); + WindowAssert.assertEquals(resultWindows.get(3),19, 29, 6); + WindowAssert.assertEquals(resultWindows.get(4),27, 37, 2); + } + + @Test + public void inOrderTestOutlier() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2, false)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 24); //outlier, should not be contained in any window + slicingWindowOperator.processElement(1, 30); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 1); + WindowAssert.assertEquals(resultWindows.get(3),30, 40, 1); + } + + @Test + public void inOrderTwoWindowsSize() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + //windows with different sizes + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 2, false)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1,23); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(45); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),23, 33, 2); + WindowAssert.assertEquals(resultWindows.get(4),1, 21, 6); + WindowAssert.assertEquals(resultWindows.get(5),6, 26, 6); + WindowAssert.assertEquals(resultWindows.get(6),13, 33, 4); + WindowAssert.assertEquals(resultWindows.get(7),23, 43, 2); + + } + + @Test + public void inOrderTwoWindowsSlide() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + //windows with different slides + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 3, false)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + /*Assert for two windows slide = 2 and slide = 3*/ + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),24, 34, 1); + WindowAssert.assertEquals(resultWindows.get(4),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(5),8, 18, 3); + WindowAssert.assertEquals(resultWindows.get(6),24, 34, 1); + } + + @Test + public void inOrderTwoWindowsTest2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + //windows with different size and slide + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 5,false)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1,23); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(45); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),23, 33, 2); + WindowAssert.assertEquals(resultWindows.get(4),1, 21, 6); + WindowAssert.assertEquals(resultWindows.get(5),17, 37, 3); + } + + @Test + public void outOfOrderTest() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,true)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 6); // out of order Tuple, shift start of window after + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + } + + @Test + public void outOfOrderTest2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,true)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1,26); + slicingWindowOperator.processElement(1, 16); // out of order Tuple, shift start of two windows before + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0), 1, 11, 3); + WindowAssert.assertEquals(resultWindows.get(1), 8, 18, 4); + WindowAssert.assertEquals(resultWindows.get(2), 14, 24, 4); + WindowAssert.assertEquals(resultWindows.get(3), 19,29,4); + WindowAssert.assertEquals(resultWindows.get(4),25,35,2); + } + + @Test + public void outOfOrderTest3() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5, true)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + slicingWindowOperator.processElement(1,7); //out-of-order tuple, shift three windows + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 10); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 8); + WindowAssert.assertEquals(resultWindows.get(2),12, 22, 8); + WindowAssert.assertEquals(resultWindows.get(3),18, 28, 7); + WindowAssert.assertEquals(resultWindows.get(4),25, 35, 3); + } + + @Test + public void outOfOrderTest4() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5, true)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); //start of window + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); //count=2 + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); //count=4 before out of order tuples -> no new window starts before out-of-order tuples arrive + + //out-of-order tuples + slicingWindowOperator.processElement(1, 17); //count=3 if in-order after 16 + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); //count=5 -> start of new window instead of shift + slicingWindowOperator.processElement(1, 21); //count=6 + + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 7); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 8); + WindowAssert.assertEquals(resultWindows.get(3),19, 29, 6); + WindowAssert.assertEquals(resultWindows.get(4),27, 37, 2); + } +} From f703118d1c1622fc286b7fc62ff4628d4e405a4c Mon Sep 17 00:00:00 2001 From: julianev Date: Sat, 1 May 2021 10:29:36 +0200 Subject: [PATCH 02/10] remove added function in WindowContext --- .../tub/dima/scotty/core/windowType/SessionWindow.java | 5 ----- .../core/windowType/windowContext/WindowContext.java | 8 +------- .../java/de/tub/dima/scotty/slicing/SliceManager.java | 9 +-------- 3 files changed, 2 insertions(+), 20 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java index 8929f682..da040113 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java @@ -117,11 +117,6 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, } } - @Override - public ActiveWindow updateContextWindows(Object element, long ts, ArrayList listOfTs) { - return null; - } - } @Override diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index 3611bbe6..66321908 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -75,12 +75,6 @@ public final ActiveWindow updateContext(Tuple tuple, long position, Set listOfTs); //For out-of-order processing - - public ActiveWindow updateContextWindows(Tuple element, long ts, ArrayList listOfTs, Set windowModifications) { - this.modifiedWindowEdges = windowModifications; - return updateContextWindows(element, ts, listOfTs); - } public abstract long assignNextWindowStart(long position); public abstract void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark); @@ -115,4 +109,4 @@ public int compareTo(ActiveWindow o) { return Long.compare(this.start, o.start); } } -} +} \ No newline at end of file diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index db585f0c..99f06d31 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -12,7 +12,6 @@ public class SliceManager { private final SliceFactory sliceFactory; private final AggregationStore aggregationStore; private final WindowManager windowManager; - public ArrayList listOfTs = new ArrayList(); public SliceManager(final SliceFactory sliceFactory, final AggregationStore aggregationStore, final WindowManager windowManager) { this.sliceFactory = sliceFactory; @@ -53,7 +52,6 @@ public void processElement(InputType element, long ts) { } final Slice currentSlice = this.aggregationStore.getCurrentSlice(); - listOfTs.add(ts); // is element in order? if (ts >= currentSlice.getTLast()) { @@ -69,12 +67,7 @@ public void processElement(InputType element, long ts) { for (WindowContext windowContext : this.windowManager.getContextAwareWindows()) { Set windowModifications = new HashSet<>(); - if (!(windowContext instanceof SlideByTupleWindow.SlideByTupleContext)){ - WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); - }else { - Collections.sort(listOfTs); // for out-of-order processing of Slide-By-Tuple Windows - WindowContext.ActiveWindow assignedWindow = windowContext.updateContextWindows(element, ts, listOfTs, windowModifications); - } + WindowContext.ActiveWindow assignedWindow = windowContext.updateContext(element, ts, windowModifications); checkSliceEdges(windowModifications); } From c4e68377ab9dc3b878dce80fa676adbae78c4ff4 Mon Sep 17 00:00:00 2001 From: julianev Date: Sat, 1 May 2021 10:30:05 +0200 Subject: [PATCH 03/10] adapt SlideByTuple Window --- .../core/windowType/SlideByTupleWindow.java | 113 ++++++++++-------- 1 file changed, 62 insertions(+), 51 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java index 161098d4..8643ba1b 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java @@ -4,6 +4,7 @@ import de.tub.dima.scotty.core.windowType.windowContext.WindowContext; import java.util.ArrayList; +import java.util.Collections; public class SlideByTupleWindow implements ForwardContextAware { @@ -13,7 +14,8 @@ public class SlideByTupleWindow implements ForwardContextAware { private final boolean outOfOrder; /** - * @param size size of the SlideByTuple window + * the measure of the Slide-by-tuple Window is time + * @param size size of the SlideByTuple window in time * @param slide window slide step in tuple counts */ public SlideByTupleWindow(long size, long slide, boolean outOfOrder){ @@ -45,79 +47,88 @@ public class SlideByTupleContext extends WindowContext { private long nextStart = 0; //start count of next window private long count = 0; //tuple-counter + private ArrayList timestamps = new ArrayList(); //holds timestamps of tuples for out-of-order processing @Override public ActiveWindow updateContext(Object tuple, long position){ - //processes in-order tuples + if (hasActiveWindows()) { //append first window addNewWindow(0, position, position + size); count++; nextStart += getSlide(); + timestamps.add(position); return getWindow(0); } - int windowIndex = getWindowIndex(position); + if (position >= timestamps.get(timestamps.size() -1)) { + //processes in-order tuples + timestamps.add(position); + int windowIndex = getWindowIndex(position); - if (windowIndex == -1) { - addNewWindow(0, position, position + size); - count++; - nextStart += getSlide(); - } else { - if (count == nextStart) { //new window starts + if (windowIndex == -1) { + addNewWindow(0, position, position + size); count++; nextStart += getSlide(); - return addNewWindow(windowIndex + 1, position, position + size); + } else { - ActiveWindow w = getWindow(windowIndex); - if (w.getEnd() > position) { //append to active window + if (count == nextStart) { //new window starts count++; - return w; - } else{ + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, position, position + size); + + } else { + ActiveWindow w = getWindow(windowIndex); + if (w.getEnd() > position) { //append to active window + count++; + return w; + } else { /* Tuple, which does not belong to current window and where count != nextStart, does not get included in any window instance */ - count++; + count++; + } } } - } - return null; - } + } else { + //processes out-of-order tuples + + timestamps.add(position); + Collections.sort(timestamps); + + int windowIndex = getWindowIndex(position); + if (windowIndex+1 <= (numberOfActiveWindows()-1)) { + //windows after the tuple exist, they have to be shifted + //beginning from the next window, to which the tuples does not belong + for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){ + ActiveWindow w = getWindow(i); + int index = timestamps.indexOf(w.getStart()); + long timestampBefore = (long) timestamps.get(index-1); + + if(position == timestampBefore){ //start of window has to be shifted to current tuple + // shift start and modify slice, otherwise insertion into wrong slice + shiftStart(w,timestampBefore); + }else{ + // shift start and dont mofify slice, simple insertion into existing slice possible + shiftStartDontModify(w,timestampBefore); + } + shiftEnd(w,timestampBefore+size); + } + count++; + return null; - @Override - public ActiveWindow updateContextWindows(Object tuple, long position, ArrayList listOfTs){ - //processes out-of-order tuples - int windowIndex = getWindowIndex(position); - if(windowIndex+1 <= (numberOfActiveWindows()-1)){ - //windows after the tuple exist, they have to be shifted - //beginning from the next window, to which the tuples does not belong - for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){ - ActiveWindow w = getWindow(i); - int index = listOfTs.indexOf(w.getStart()); - long timestampBefore = (long) listOfTs.get(index-1); - - if(position == timestampBefore){ //start of window has to be shifted to current tuple - // shift start and modify slice, otherwise insertion into wrong slice - shiftStart(w,timestampBefore); - }else{ - // shift start and dont mofify slice, simple insertion into existing slice possible - shiftStartDontModify(w,timestampBefore); + } else { + //no subsequent windows exist, out-of-order tuple may starts a new window + long lateCount = timestamps.indexOf(position); //count of current tuple + long windowStart = getWindow(windowIndex).getStart(); + int indexOfStart = timestamps.indexOf(windowStart); //tuple count on position of start of last window + long lateNextStart = indexOfStart + getSlide(); //tuple count of next start + + if (lateCount==lateNextStart) { // tuple starts a new window + nextStart += getSlide(); //update next start + addNewWindow(windowIndex + 1, position, position + size); } - shiftEnd(w,timestampBefore+size); - } - count++; - return null; - }else{ - //no subsequent windows exist, out-of-order tuple may starts a new window - long lateCount = listOfTs.indexOf(position); //count of current tuple - long windowStart = getWindow(windowIndex).getStart(); - int indexOfStart = listOfTs.indexOf(windowStart); //tuple count on position of start of last window - long lateNextStart = indexOfStart + getSlide(); //tuple count of next start - - if(lateCount==lateNextStart){ // tuple starts a new window - nextStart += getSlide(); //update next start - addNewWindow(windowIndex + 1, position, position + size); + count++; } - count++; } return null; } From 935c513e09fce7ebe8691f1a3a422184f11f39bd Mon Sep 17 00:00:00 2001 From: julianev Date: Sat, 15 May 2021 11:17:23 +0200 Subject: [PATCH 04/10] changes in SlideByTupleWindow and Test for out-of-order processing --- .../core/windowType/SlideByTupleWindow.java | 26 ++++--- .../windowTest/SlideByTupleWindowTest.java | 76 +++++++++++++++---- 2 files changed, 79 insertions(+), 23 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java index 8643ba1b..86318b5b 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java @@ -11,18 +11,16 @@ public class SlideByTupleWindow implements ForwardContextAware { private final WindowMeasure measure; private final long size; private final long slide; - private final boolean outOfOrder; /** * the measure of the Slide-by-tuple Window is time * @param size size of the SlideByTuple window in time * @param slide window slide step in tuple counts */ - public SlideByTupleWindow(long size, long slide, boolean outOfOrder){ + public SlideByTupleWindow(long size, long slide){ this.measure = WindowMeasure.Time; this.size = size; this.slide = slide; - this.outOfOrder = outOfOrder; } public long getSize() { @@ -96,6 +94,13 @@ public ActiveWindow updateContext(Object tuple, long position){ Collections.sort(timestamps); int windowIndex = getWindowIndex(position); + + if(slide == 1){ // slide == 1: add new window for every new tuple + count ++; + nextStart += getSlide(); + return addNewWindow(windowIndex+1, position, position + getSize()); + } + if (windowIndex+1 <= (numberOfActiveWindows()-1)) { //windows after the tuple exist, they have to be shifted //beginning from the next window, to which the tuples does not belong @@ -105,19 +110,22 @@ public ActiveWindow updateContext(Object tuple, long position){ long timestampBefore = (long) timestamps.get(index-1); if(position == timestampBefore){ //start of window has to be shifted to current tuple - // shift start and modify slice, otherwise insertion into wrong slice + // shift start of window and modify slice start, otherwise insertion into wrong slice shiftStart(w,timestampBefore); }else{ - // shift start and dont mofify slice, simple insertion into existing slice possible + // shift start of window, split slice if necessary shiftStartDontModify(w,timestampBefore); + splitSlice(timestampBefore); } - shiftEnd(w,timestampBefore+size); + + shiftEnd(w, timestampBefore + size); + splitSlice(timestampBefore+size); } count++; return null; } else { - //no subsequent windows exist, out-of-order tuple may starts a new window + //no subsequent windows exist, out-of-order tuple may start a new window long lateCount = timestamps.indexOf(position); //count of current tuple long windowStart = getWindow(windowIndex).getStart(); int indexOfStart = timestamps.indexOf(windowStart); //tuple count on position of start of last window @@ -147,7 +155,7 @@ public int getWindowIndex(long position) { @Override public long assignNextWindowStart(long position) { - if(slide > 1 && !outOfOrder) { + if (slide > 1) { if (count == nextStart) { // new Window starts, append new Slice return position; @@ -171,7 +179,7 @@ public long assignNextWindowStart(long position) { } } } - }else { //append one slice for each tuple for out-of-order streams or slide == 1 + } else { //append one slice for each tuple for slide == 1 return position; } } diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java index b1d8b8c7..8d9107b4 100644 --- a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java @@ -24,7 +24,7 @@ public void setup() { @Test public void inOrderSlide1() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); @@ -51,7 +51,7 @@ public void inOrderSlide1() { @Test public void inOrderSlide2() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); @@ -74,7 +74,7 @@ public void inOrderSlide2() { @Test public void inOrderTest() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 2); @@ -111,7 +111,7 @@ public void inOrderTest() { @Test public void inOrderTestOutlier() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); @@ -133,8 +133,8 @@ public void inOrderTestOutlier() { public void inOrderTwoWindowsSize() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); //windows with different sizes - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2, false)); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 2, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 2)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); slicingWindowOperator.processElement(1, 6); @@ -161,8 +161,8 @@ public void inOrderTwoWindowsSize() { public void inOrderTwoWindowsSlide() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); //windows with different slides - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,false)); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 3, false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 3)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); slicingWindowOperator.processElement(1, 6); @@ -187,8 +187,8 @@ public void inOrderTwoWindowsSlide() { public void inOrderTwoWindowsTest2() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); //windows with different size and slide - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,false)); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 5,false)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 5)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); slicingWindowOperator.processElement(1, 6); @@ -211,7 +211,7 @@ public void inOrderTwoWindowsTest2() { @Test public void outOfOrderTest() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,true)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); slicingWindowOperator.processElement(1, 8); @@ -230,7 +230,7 @@ public void outOfOrderTest() { @Test public void outOfOrderTest2() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2,true)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 3); slicingWindowOperator.processElement(1, 8); @@ -254,7 +254,7 @@ public void outOfOrderTest2() { @Test public void outOfOrderTest3() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5, true)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 2); @@ -292,7 +292,7 @@ public void outOfOrderTest3() { @Test public void outOfOrderTest4() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); - slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5, true)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); slicingWindowOperator.processElement(1, 1); slicingWindowOperator.processElement(1, 2); @@ -329,4 +329,52 @@ public void outOfOrderTest4() { WindowAssert.assertEquals(resultWindows.get(3),19, 29, 6); WindowAssert.assertEquals(resultWindows.get(4),27, 37, 2); } + + @Test + public void outOfOrderTest5() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); + + //out-of-order tuple + slicingWindowOperator.processElement(1, 17); + + slicingWindowOperator.processElement(1, 25); + + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 6); + WindowAssert.assertEquals(resultWindows.get(1),2, 12, 5); + WindowAssert.assertEquals(resultWindows.get(2),3, 13, 5); + WindowAssert.assertEquals(resultWindows.get(3),8, 18, 8); // + WindowAssert.assertEquals(resultWindows.get(4),9, 19, 8); // + WindowAssert.assertEquals(resultWindows.get(5),10, 20, 8); // + WindowAssert.assertEquals(resultWindows.get(6),12, 22, 8); // + WindowAssert.assertEquals(resultWindows.get(7),13, 23, 8); + WindowAssert.assertEquals(resultWindows.get(8),14, 24, 7); + WindowAssert.assertEquals(resultWindows.get(9),16, 26, 8); + WindowAssert.assertEquals(resultWindows.get(10),17, 27, 7); + WindowAssert.assertEquals(resultWindows.get(11),18, 28, 6); // + WindowAssert.assertEquals(resultWindows.get(12),19, 29, 5); + WindowAssert.assertEquals(resultWindows.get(13),21, 31, 4); + WindowAssert.assertEquals(resultWindows.get(14),22, 32, 3); + WindowAssert.assertEquals(resultWindows.get(15),24, 34, 2); + WindowAssert.assertEquals(resultWindows.get(16),25, 35, 1); + } } From b1cc3885c89cd81b5586bdd4eac3397c002ff165 Mon Sep 17 00:00:00 2001 From: julianev Date: Sat, 15 May 2021 11:19:47 +0200 Subject: [PATCH 05/10] shift tuples in split slice --- .../core/windowType/windowContext/WindowContext.java | 8 ++++++-- .../de/tub/dima/scotty/slicing/SliceManager.java | 12 ++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index 66321908..c8d29842 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -57,8 +57,8 @@ public void shiftStart(ActiveWindow window, long position) { window.setStart(position); } - public void shiftStartDontModify(ActiveWindow window, long position) { // does not modify the start edge - //modifiedWindowEdges.add(new ShiftModification(window.start, position)); + public void shiftStartDontModify(ActiveWindow window, long position) { + // does not modify the start edge of the slice, only adjusts the window start window.setStart(position); } @@ -67,6 +67,10 @@ public void shiftEnd(ActiveWindow window, long position) { window.setEnd(position); } + public void splitSlice(long position) { + //adds a AddModification for end of window ShiftModification(oldPosition, position) + modifiedWindowEdges.add(new AddModification(position)); + } public abstract ActiveWindow updateContext(Tuple tuple, long position); diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index 99f06d31..f173001d 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -145,5 +145,17 @@ public void splitSlice(int sliceIndex, long timestamp) { sliceA.setTEnd(timestamp); sliceA.setType(new Slice.Flexible()); this.aggregationStore.addSlice(sliceIndex + 1, sliceB); + + //move records to new slice + if (sliceA instanceof LazySlice) { + while (true){ + if(((LazySlice)sliceA).getTLast() >= timestamp){ + StreamRecord lastElement = ((LazySlice)sliceA).dropLastElement(); + ((LazySlice)sliceB).prependElement(lastElement); + }else{ + break; + } + } + } } } From 268f60f50a2de26b9a981eac8efbbedef481774e Mon Sep 17 00:00:00 2001 From: julianev Date: Sun, 23 May 2021 10:42:43 +0200 Subject: [PATCH 06/10] fix in SlideByTuple Window and Test, adaption of window context --- .../core/windowType/SlideByTupleWindow.java | 73 ++++++++++--------- .../windowContext/WindowContext.java | 12 +-- .../windowTest/SlideByTupleWindowTest.java | 51 +++++++++++-- 3 files changed, 91 insertions(+), 45 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java index 86318b5b..43db047f 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java @@ -74,7 +74,6 @@ public ActiveWindow updateContext(Object tuple, long position){ count++; nextStart += getSlide(); return addNewWindow(windowIndex + 1, position, position + size); - } else { ActiveWindow w = getWindow(windowIndex); if (w.getEnd() > position) { //append to active window @@ -89,58 +88,64 @@ public ActiveWindow updateContext(Object tuple, long position){ } } else { //processes out-of-order tuples - timestamps.add(position); Collections.sort(timestamps); - + count++; int windowIndex = getWindowIndex(position); if(slide == 1){ // slide == 1: add new window for every new tuple - count ++; nextStart += getSlide(); return addNewWindow(windowIndex+1, position, position + getSize()); } if (windowIndex+1 <= (numberOfActiveWindows()-1)) { - //windows after the tuple exist, they have to be shifted - //beginning from the next window, to which the tuples does not belong - for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){ - ActiveWindow w = getWindow(i); - int index = timestamps.indexOf(w.getStart()); - long timestampBefore = (long) timestamps.get(index-1); - - if(position == timestampBefore){ //start of window has to be shifted to current tuple - // shift start of window and modify slice start, otherwise insertion into wrong slice - shiftStart(w,timestampBefore); - }else{ - // shift start of window, split slice if necessary - shiftStartDontModify(w,timestampBefore); - splitSlice(timestampBefore); - } - - shiftEnd(w, timestampBefore + size); - splitSlice(timestampBefore+size); - } - count++; + //subsequent windows have to be shifted, beginning from the next window, to which the tuples does not belong + shiftWindows(position, windowIndex); return null; } else { - //no subsequent windows exist, out-of-order tuple may start a new window - long lateCount = timestamps.indexOf(position); //count of current tuple - long windowStart = getWindow(windowIndex).getStart(); - int indexOfStart = timestamps.indexOf(windowStart); //tuple count on position of start of last window - long lateNextStart = indexOfStart + getSlide(); //tuple count of next start - - if (lateCount==lateNextStart) { // tuple starts a new window - nextStart += getSlide(); //update next start - addNewWindow(windowIndex + 1, position, position + size); + //no subsequent windows exist, current tuple or some tuple before may start a new window + if (timestamps.size()-1 == nextStart){ + // tuple that arrived before starts new window because of changed count + long positionOfNewWindow = timestamps.get((int)nextStart); + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, positionOfNewWindow, positionOfNewWindow + size); + } else if (timestamps.lastIndexOf(position) == nextStart) { // get count of current tuple + // current tuple starts a new window + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, position, position + size); } - count++; } } return null; } + /** + * shifts all windows after the out-of-order tuple + * @param position of the out-of-order tuple + * @param windowIndex of the window the tuple belongs to + */ + private void shiftWindows(long position, int windowIndex){ + + for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){ + ActiveWindow w = getWindow(i); + int index = timestamps.indexOf(w.getStart()); + long timestampBefore = timestamps.get(index-1); + + if(position == timestampBefore){ //start of window has to be shifted to current tuple + // shift start of window and modify slice start, otherwise insertion into wrong slice + shiftStart(w,timestampBefore); + }else{ + // shift start of window, split slice if necessary + shiftStartDontModifySlice(w,timestampBefore); + splitSlice(timestampBefore); + } + + shiftEnd(w, timestampBefore + size); + splitSlice(timestampBefore + size); + } + } + public int getWindowIndex(long position) { //returns newest window int i = numberOfActiveWindows()-1; diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index c8d29842..78f7fc23 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -57,18 +57,18 @@ public void shiftStart(ActiveWindow window, long position) { window.setStart(position); } - public void shiftStartDontModify(ActiveWindow window, long position) { - // does not modify the start edge of the slice, only adjusts the window start - window.setStart(position); - } - public void shiftEnd(ActiveWindow window, long position) { //modifiedWindowEdges.add(new ShiftModification(window.end, position)); window.setEnd(position); } + public void shiftStartDontModifySlice(ActiveWindow window, long position) { + // does not modify the start edge of the slice, only adjusts the window start + window.setStart(position); + } + public void splitSlice(long position) { - //adds a AddModification for end of window ShiftModification(oldPosition, position) + // adds a AddModification to split the slice if necessary, regardless of changes to the window modifiedWindowEdges.add(new AddModification(position)); } diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java index 8d9107b4..f2720c23 100644 --- a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java @@ -332,6 +332,47 @@ public void outOfOrderTest4() { @Test public void outOfOrderTest5() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + // same timestamp twice + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 5); //start of new window + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); //start of window + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + // same timestamp twice + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 19); //start of new window + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 25); // start of new window after out-of-order tuples arrive + slicingWindowOperator.processElement(1, 21); // out of order tuple with existing timestamp + slicingWindowOperator.processElement(1, 22); // out of order tuple + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),5, 15, 7); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 9); + WindowAssert.assertEquals(resultWindows.get(3),19, 29, 7); + WindowAssert.assertEquals(resultWindows.get(4),25, 35, 3); + } + + @Test + public void outOfOrderTestSlide1() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1)); @@ -362,15 +403,15 @@ public void outOfOrderTest5() { WindowAssert.assertEquals(resultWindows.get(0),1, 11, 6); WindowAssert.assertEquals(resultWindows.get(1),2, 12, 5); WindowAssert.assertEquals(resultWindows.get(2),3, 13, 5); - WindowAssert.assertEquals(resultWindows.get(3),8, 18, 8); // - WindowAssert.assertEquals(resultWindows.get(4),9, 19, 8); // - WindowAssert.assertEquals(resultWindows.get(5),10, 20, 8); // - WindowAssert.assertEquals(resultWindows.get(6),12, 22, 8); // + WindowAssert.assertEquals(resultWindows.get(3),8, 18, 8); + WindowAssert.assertEquals(resultWindows.get(4),9, 19, 8); + WindowAssert.assertEquals(resultWindows.get(5),10, 20, 8); + WindowAssert.assertEquals(resultWindows.get(6),12, 22, 8); WindowAssert.assertEquals(resultWindows.get(7),13, 23, 8); WindowAssert.assertEquals(resultWindows.get(8),14, 24, 7); WindowAssert.assertEquals(resultWindows.get(9),16, 26, 8); WindowAssert.assertEquals(resultWindows.get(10),17, 27, 7); - WindowAssert.assertEquals(resultWindows.get(11),18, 28, 6); // + WindowAssert.assertEquals(resultWindows.get(11),18, 28, 6); WindowAssert.assertEquals(resultWindows.get(12),19, 29, 5); WindowAssert.assertEquals(resultWindows.get(13),21, 31, 4); WindowAssert.assertEquals(resultWindows.get(14),22, 32, 3); From d8ef49275d0405b4366a7c882edd96feba648d9c Mon Sep 17 00:00:00 2001 From: julianev Date: Mon, 24 May 2021 11:11:59 +0200 Subject: [PATCH 07/10] fix in SliceManager --- .../java/de/tub/dima/scotty/slicing/SliceManager.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index f173001d..b362020f 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -148,13 +148,9 @@ public void splitSlice(int sliceIndex, long timestamp) { //move records to new slice if (sliceA instanceof LazySlice) { - while (true){ - if(((LazySlice)sliceA).getTLast() >= timestamp){ - StreamRecord lastElement = ((LazySlice)sliceA).dropLastElement(); - ((LazySlice)sliceB).prependElement(lastElement); - }else{ - break; - } + while (((LazySlice)sliceA).getTLast() >= timestamp){ + StreamRecord lastElement = ((LazySlice)sliceA).dropLastElement(); + ((LazySlice)sliceB).prependElement(lastElement); } } } From 6cb528f4124b6bae80585d8cb2e4a55cc1829e61 Mon Sep 17 00:00:00 2001 From: julianev Date: Fri, 4 Jun 2021 13:31:36 +0200 Subject: [PATCH 08/10] add Testcase --- .../windowTest/SlideByTupleWindowTest.java | 58 +++++++++++++++++-- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java index f2720c23..5b4cdc7d 100644 --- a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java @@ -311,13 +311,13 @@ public void outOfOrderTest4() { slicingWindowOperator.processElement(1, 24); //count=4 before out of order tuples -> no new window starts before out-of-order tuples arrive //out-of-order tuples - slicingWindowOperator.processElement(1, 17); //count=3 if in-order after 16 - slicingWindowOperator.processElement(1, 18); - slicingWindowOperator.processElement(1, 19); //count=5 -> start of new window instead of shift - slicingWindowOperator.processElement(1, 21); //count=6 + slicingWindowOperator.processElement(1, 17); //count=3 if in-order after 16, starts new window at ts 24 + slicingWindowOperator.processElement(1, 18); //shifts window start from 24 to 22 + slicingWindowOperator.processElement(1, 19); //count=5 -> shift window to this position + slicingWindowOperator.processElement(1, 21); //simple insert into window slicingWindowOperator.processElement(1, 25); - slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 27); //starts new window slicingWindowOperator.processElement(1, 30); @@ -361,7 +361,6 @@ public void outOfOrderTest5() { slicingWindowOperator.processElement(1, 27); slicingWindowOperator.processElement(1, 30); - List resultWindows = slicingWindowOperator.processWatermark(40); WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); @@ -371,6 +370,53 @@ public void outOfOrderTest5() { WindowAssert.assertEquals(resultWindows.get(4),25, 35, 3); } + @Test + public void outOfOrderTest6() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 3)); + + slicingWindowOperator.processElement(1, 1); //start of window + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); //start of window + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); //start of window + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); //start of window + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); //start of window + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); //start of window + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); //start of window + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + slicingWindowOperator.processElement(1, 31); + slicingWindowOperator.processElement(1, 32); + slicingWindowOperator.processElement(1, 34); + slicingWindowOperator.processElement(1, 35); + slicingWindowOperator.processElement(1, 37); + slicingWindowOperator.processElement(1, 38); + slicingWindowOperator.processElement(1, 39); + slicingWindowOperator.processElement(1, 40); + + List resultWindows = slicingWindowOperator.processWatermark(41); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),4, 14, 8); + WindowAssert.assertEquals(resultWindows.get(2),8, 18, 8); + WindowAssert.assertEquals(resultWindows.get(3),12, 22, 8); + WindowAssert.assertEquals(resultWindows.get(4),16, 26, 8); + WindowAssert.assertEquals(resultWindows.get(5),19, 29, 6); + WindowAssert.assertEquals(resultWindows.get(6),24, 34, 6); + } + @Test public void outOfOrderTestSlide1() { slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); From 2c87d55f3956cc392562dfd1e5421586ea5c549e Mon Sep 17 00:00:00 2001 From: julianev Date: Mon, 9 Aug 2021 12:22:10 +0200 Subject: [PATCH 09/10] revert changes in SliceManager --- .../de/tub/dima/scotty/core/windowType/SessionWindow.java | 3 --- .../java/de/tub/dima/scotty/slicing/SliceManager.java | 8 -------- 2 files changed, 11 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java index da040113..c8678bd8 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java @@ -3,8 +3,6 @@ import de.tub.dima.scotty.core.*; import de.tub.dima.scotty.core.windowType.windowContext.*; -import java.util.ArrayList; - public class SessionWindow implements ForwardContextAware { private final WindowMeasure measure; @@ -116,7 +114,6 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, session = getWindow(0); } } - } @Override diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index b362020f..99f06d31 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -145,13 +145,5 @@ public void splitSlice(int sliceIndex, long timestamp) { sliceA.setTEnd(timestamp); sliceA.setType(new Slice.Flexible()); this.aggregationStore.addSlice(sliceIndex + 1, sliceB); - - //move records to new slice - if (sliceA instanceof LazySlice) { - while (((LazySlice)sliceA).getTLast() >= timestamp){ - StreamRecord lastElement = ((LazySlice)sliceA).dropLastElement(); - ((LazySlice)sliceB).prependElement(lastElement); - } - } } } From 2b2194a91ece3486e99105ef41c9371cc498023c Mon Sep 17 00:00:00 2001 From: julianev Date: Wed, 21 Sep 2022 10:45:49 +0200 Subject: [PATCH 10/10] changes in assignNextWindowStart --- .../core/windowType/SlideByTupleWindow.java | 47 ++++++++----------- 1 file changed, 20 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java index 43db047f..529a1474 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java @@ -150,8 +150,8 @@ public int getWindowIndex(long position) { //returns newest window int i = numberOfActiveWindows()-1; for (; i >= 0 ; i--) { - ActiveWindow p = getWindow(i); - if (p.getStart() <= position) { + ActiveWindow w = getWindow(i); + if (w.getStart() <= position) { return i; } } @@ -160,33 +160,26 @@ public int getWindowIndex(long position) { @Override public long assignNextWindowStart(long position) { - if (slide > 1) { - if (count == nextStart) { - // new Window starts, append new Slice - return position; - } else { - // determine if new slice is needed - int windowIndex = getWindowIndex(position); - if(windowIndex == -1){ - return position; - } - ActiveWindow w = getWindow(windowIndex); - if (windowIndex == 0) { // first window: all tuples belong to the first slice - return w.getEnd(); - } else { - ActiveWindow wBefore = getWindow(windowIndex - 1); - if (wBefore.getEnd() <= position) { - //tuple before does not belong to window before -> append current value to slice - return w.getEnd(); - } else { - //tuple before also belongs to the window before or is outlier -> new slice - return wBefore.getEnd(); - } - } - } - } else { //append one slice for each tuple for slide == 1 + if (count == nextStart) { + // new window starts, create new slice return position; + } else { + // return next window end + return getNextWindowEnd(position); + } + } + + public long getNextWindowEnd(long position) { + //returns next window end after position + long nextWEnd = -1; + for (int i = numberOfActiveWindows() -1 ; i >= 0 ; i--) { + ActiveWindow w = getWindow(i); + if (position >= w.getEnd()) { + break; + } + nextWEnd = w.getEnd(); } + return nextWEnd; } @Override