diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java index c30128aa..c8678bd8 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SessionWindow.java @@ -114,8 +114,6 @@ public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, session = getWindow(0); } } - - } @Override diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java new file mode 100644 index 00000000..529a1474 --- /dev/null +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/SlideByTupleWindow.java @@ -0,0 +1,206 @@ +package de.tub.dima.scotty.core.windowType; + +import de.tub.dima.scotty.core.WindowCollector; +import de.tub.dima.scotty.core.windowType.windowContext.WindowContext; + +import java.util.ArrayList; +import java.util.Collections; + +public class SlideByTupleWindow implements ForwardContextAware { + + private final WindowMeasure measure; + private final long size; + private final long slide; + + /** + * the measure of the Slide-by-tuple Window is time + * @param size size of the SlideByTuple window in time + * @param slide window slide step in tuple counts + */ + public SlideByTupleWindow(long size, long slide){ + this.measure = WindowMeasure.Time; + this.size = size; + this.slide = slide; + } + + public long getSize() { + return size; + } + + public long getSlide() { + return slide; + } + + @Override + public WindowMeasure getWindowMeasure() { + return measure; + } + + @Override + public SlideByTupleContext createContext() { + return new SlideByTupleContext(); + } + + public class SlideByTupleContext extends WindowContext { + + private long nextStart = 0; //start count of next window + private long count = 0; //tuple-counter + private ArrayList timestamps = new ArrayList(); //holds timestamps of tuples for out-of-order processing + + @Override + public ActiveWindow updateContext(Object tuple, long position){ + + if (hasActiveWindows()) { + //append first window + addNewWindow(0, position, position + size); + count++; + nextStart += getSlide(); + timestamps.add(position); + return getWindow(0); + } + + if (position >= timestamps.get(timestamps.size() -1)) { + //processes in-order tuples + timestamps.add(position); + int windowIndex = getWindowIndex(position); + + if (windowIndex == -1) { + addNewWindow(0, position, position + size); + count++; + nextStart += getSlide(); + + } else { + if (count == nextStart) { //new window starts + count++; + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, position, position + size); + } else { + ActiveWindow w = getWindow(windowIndex); + if (w.getEnd() > position) { //append to active window + count++; + return w; + } else { + /* Tuple, which does not belong to current window and where count != nextStart, + does not get included in any window instance */ + count++; + } + } + } + } else { + //processes out-of-order tuples + timestamps.add(position); + Collections.sort(timestamps); + count++; + int windowIndex = getWindowIndex(position); + + if(slide == 1){ // slide == 1: add new window for every new tuple + nextStart += getSlide(); + return addNewWindow(windowIndex+1, position, position + getSize()); + } + + if (windowIndex+1 <= (numberOfActiveWindows()-1)) { + //subsequent windows have to be shifted, beginning from the next window, to which the tuples does not belong + shiftWindows(position, windowIndex); + return null; + + } else { + //no subsequent windows exist, current tuple or some tuple before may start a new window + if (timestamps.size()-1 == nextStart){ + // tuple that arrived before starts new window because of changed count + long positionOfNewWindow = timestamps.get((int)nextStart); + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, positionOfNewWindow, positionOfNewWindow + size); + } else if (timestamps.lastIndexOf(position) == nextStart) { // get count of current tuple + // current tuple starts a new window + nextStart += getSlide(); + return addNewWindow(windowIndex + 1, position, position + size); + } + } + } + return null; + } + + /** + * shifts all windows after the out-of-order tuple + * @param position of the out-of-order tuple + * @param windowIndex of the window the tuple belongs to + */ + private void shiftWindows(long position, int windowIndex){ + + for(int i = windowIndex+1; i <= (numberOfActiveWindows()-1); i++){ + ActiveWindow w = getWindow(i); + int index = timestamps.indexOf(w.getStart()); + long timestampBefore = timestamps.get(index-1); + + if(position == timestampBefore){ //start of window has to be shifted to current tuple + // shift start of window and modify slice start, otherwise insertion into wrong slice + shiftStart(w,timestampBefore); + }else{ + // shift start of window, split slice if necessary + shiftStartDontModifySlice(w,timestampBefore); + splitSlice(timestampBefore); + } + + shiftEnd(w, timestampBefore + size); + splitSlice(timestampBefore + size); + } + } + + public int getWindowIndex(long position) { + //returns newest window + int i = numberOfActiveWindows()-1; + for (; i >= 0 ; i--) { + ActiveWindow w = getWindow(i); + if (w.getStart() <= position) { + return i; + } + } + return -1; + } + + @Override + public long assignNextWindowStart(long position) { + if (count == nextStart) { + // new window starts, create new slice + return position; + } else { + // return next window end + return getNextWindowEnd(position); + } + } + + public long getNextWindowEnd(long position) { + //returns next window end after position + long nextWEnd = -1; + for (int i = numberOfActiveWindows() -1 ; i >= 0 ; i--) { + ActiveWindow w = getWindow(i); + if (position >= w.getEnd()) { + break; + } + nextWEnd = w.getEnd(); + } + return nextWEnd; + } + + @Override + public void triggerWindows(WindowCollector aggregateWindows, long lastWatermark, long currentWatermark) { + ActiveWindow w = getWindow(0); + while (w.getEnd() <= currentWatermark) { + aggregateWindows.trigger(w.getStart(), w.getEnd() , measure); + removeWindow(0); + if (hasActiveWindows()) + return; + w = getWindow(0); + } + } + } + + @Override + public String toString() { + return "SlideByTupleWindow{" + + "measure=" + measure + + ", size=" + size + + ", slide=" + slide + + '}'; + } +} diff --git a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java index bb0730ed..78f7fc23 100644 --- a/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java +++ b/core/src/main/java/de/tub/dima/scotty/core/windowType/windowContext/WindowContext.java @@ -62,6 +62,15 @@ public void shiftEnd(ActiveWindow window, long position) { window.setEnd(position); } + public void shiftStartDontModifySlice(ActiveWindow window, long position) { + // does not modify the start edge of the slice, only adjusts the window start + window.setStart(position); + } + + public void splitSlice(long position) { + // adds a AddModification to split the slice if necessary, regardless of changes to the window + modifiedWindowEdges.add(new AddModification(position)); + } public abstract ActiveWindow updateContext(Tuple tuple, long position); @@ -104,4 +113,4 @@ public int compareTo(ActiveWindow o) { return Long.compare(this.start, o.start); } } -} +} \ No newline at end of file diff --git a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java index f6ce8335..5790a6c0 100644 --- a/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java +++ b/slicing/src/main/java/de/tub/dima/scotty/slicing/SliceManager.java @@ -1,5 +1,6 @@ package de.tub.dima.scotty.slicing; +import de.tub.dima.scotty.core.windowType.SlideByTupleWindow; import de.tub.dima.scotty.core.windowType.windowContext.*; import de.tub.dima.scotty.slicing.aggregationstore.*; import de.tub.dima.scotty.slicing.slice.*; diff --git a/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java new file mode 100644 index 00000000..5b4cdc7d --- /dev/null +++ b/slicing/src/test/java/de/tub/dima/scotty/slicing/aggregationstore/test/windowTest/SlideByTupleWindowTest.java @@ -0,0 +1,467 @@ +package de.tub.dima.scotty.slicing.aggregationstore.test.windowTest; + +import de.tub.dima.scotty.core.AggregateWindow; +import de.tub.dima.scotty.core.windowFunction.ReduceAggregateFunction; +import de.tub.dima.scotty.core.windowType.SlideByTupleWindow; +import de.tub.dima.scotty.slicing.SlicingWindowOperator; +import de.tub.dima.scotty.state.memory.MemoryStateFactory; +import org.junit.Before; +import org.junit.Test; + +import java.util.List; + +public class SlideByTupleWindowTest { + + private SlicingWindowOperator slicingWindowOperator; + private MemoryStateFactory stateFactory; + + @Before + public void setup() { + this.stateFactory = new MemoryStateFactory(); + this.slicingWindowOperator = new SlicingWindowOperator(stateFactory); + } + + @Test + public void inOrderSlide1() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + // Assert for slide = 1 + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),3, 13, 3); + WindowAssert.assertEquals(resultWindows.get(2),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(3),8, 18, 3); + WindowAssert.assertEquals(resultWindows.get(4),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(5),17, 27, 3); + WindowAssert.assertEquals(resultWindows.get(6),24, 34, 2); + WindowAssert.assertEquals(resultWindows.get(7),24, 34, 2); + } + + @Test + public void inOrderSlide2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + // Assert for slide = 2 + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),24, 34, 1); + } + + + @Test + public void inOrderTest() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 7); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 8); + WindowAssert.assertEquals(resultWindows.get(3),19, 29, 6); + WindowAssert.assertEquals(resultWindows.get(4),27, 37, 2); + } + + @Test + public void inOrderTestOutlier() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 24); //outlier, should not be contained in any window + slicingWindowOperator.processElement(1, 30); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 1); + WindowAssert.assertEquals(resultWindows.get(3),30, 40, 1); + } + + @Test + public void inOrderTwoWindowsSize() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + //windows with different sizes + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 2)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1,23); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(45); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),23, 33, 2); + WindowAssert.assertEquals(resultWindows.get(4),1, 21, 6); + WindowAssert.assertEquals(resultWindows.get(5),6, 26, 6); + WindowAssert.assertEquals(resultWindows.get(6),13, 33, 4); + WindowAssert.assertEquals(resultWindows.get(7),23, 43, 2); + + } + + @Test + public void inOrderTwoWindowsSlide() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + //windows with different slides + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 3)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + /*Assert for two windows slide = 2 and slide = 3*/ + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),24, 34, 1); + WindowAssert.assertEquals(resultWindows.get(4),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(5),8, 18, 3); + WindowAssert.assertEquals(resultWindows.get(6),24, 34, 1); + } + + @Test + public void inOrderTwoWindowsTest2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + //windows with different size and slide + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(20, 5)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1,23); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(45); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + WindowAssert.assertEquals(resultWindows.get(3),23, 33, 2); + WindowAssert.assertEquals(resultWindows.get(4),1, 21, 6); + WindowAssert.assertEquals(resultWindows.get(5),17, 37, 3); + } + + @Test + public void outOfOrderTest() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 6); // out of order Tuple, shift start of window after + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 24); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 4); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 3); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 2); + } + + @Test + public void outOfOrderTest2() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 2)); + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1,26); + slicingWindowOperator.processElement(1, 16); // out of order Tuple, shift start of two windows before + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0), 1, 11, 3); + WindowAssert.assertEquals(resultWindows.get(1), 8, 18, 4); + WindowAssert.assertEquals(resultWindows.get(2), 14, 24, 4); + WindowAssert.assertEquals(resultWindows.get(3), 19,29,4); + WindowAssert.assertEquals(resultWindows.get(4),25,35,2); + } + + @Test + public void outOfOrderTest3() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + slicingWindowOperator.processElement(1,7); //out-of-order tuple, shift three windows + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 10); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 8); + WindowAssert.assertEquals(resultWindows.get(2),12, 22, 8); + WindowAssert.assertEquals(resultWindows.get(3),18, 28, 7); + WindowAssert.assertEquals(resultWindows.get(4),25, 35, 3); + } + + @Test + public void outOfOrderTest4() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); //start of window + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); //count=2 + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); //count=4 before out of order tuples -> no new window starts before out-of-order tuples arrive + + //out-of-order tuples + slicingWindowOperator.processElement(1, 17); //count=3 if in-order after 16, starts new window at ts 24 + slicingWindowOperator.processElement(1, 18); //shifts window start from 24 to 22 + slicingWindowOperator.processElement(1, 19); //count=5 -> shift window to this position + slicingWindowOperator.processElement(1, 21); //simple insert into window + + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); //starts new window + slicingWindowOperator.processElement(1, 30); + + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),6, 16, 7); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 8); + WindowAssert.assertEquals(resultWindows.get(3),19, 29, 6); + WindowAssert.assertEquals(resultWindows.get(4),27, 37, 2); + } + + @Test + public void outOfOrderTest5() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 5)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); + // same timestamp twice + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 5); //start of new window + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); //start of window + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + // same timestamp twice + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 19); //start of new window + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 24); + slicingWindowOperator.processElement(1, 25); // start of new window after out-of-order tuples arrive + slicingWindowOperator.processElement(1, 21); // out of order tuple with existing timestamp + slicingWindowOperator.processElement(1, 22); // out of order tuple + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),5, 15, 7); + WindowAssert.assertEquals(resultWindows.get(2),13, 23, 9); + WindowAssert.assertEquals(resultWindows.get(3),19, 29, 7); + WindowAssert.assertEquals(resultWindows.get(4),25, 35, 3); + } + + @Test + public void outOfOrderTest6() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 3)); + + slicingWindowOperator.processElement(1, 1); //start of window + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 4); //start of window + slicingWindowOperator.processElement(1, 5); + slicingWindowOperator.processElement(1, 6); + slicingWindowOperator.processElement(1, 8); //start of window + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); //start of window + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); //start of window + slicingWindowOperator.processElement(1, 17); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); //start of window + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); //start of window + slicingWindowOperator.processElement(1, 25); + slicingWindowOperator.processElement(1, 27); + slicingWindowOperator.processElement(1, 30); + slicingWindowOperator.processElement(1, 31); + slicingWindowOperator.processElement(1, 32); + slicingWindowOperator.processElement(1, 34); + slicingWindowOperator.processElement(1, 35); + slicingWindowOperator.processElement(1, 37); + slicingWindowOperator.processElement(1, 38); + slicingWindowOperator.processElement(1, 39); + slicingWindowOperator.processElement(1, 40); + + List resultWindows = slicingWindowOperator.processWatermark(41); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 9); + WindowAssert.assertEquals(resultWindows.get(1),4, 14, 8); + WindowAssert.assertEquals(resultWindows.get(2),8, 18, 8); + WindowAssert.assertEquals(resultWindows.get(3),12, 22, 8); + WindowAssert.assertEquals(resultWindows.get(4),16, 26, 8); + WindowAssert.assertEquals(resultWindows.get(5),19, 29, 6); + WindowAssert.assertEquals(resultWindows.get(6),24, 34, 6); + } + + @Test + public void outOfOrderTestSlide1() { + slicingWindowOperator.addWindowFunction((ReduceAggregateFunction) (currentAggregate, element) -> currentAggregate + element); + slicingWindowOperator.addWindowAssigner(new SlideByTupleWindow(10, 1)); + + slicingWindowOperator.processElement(1, 1); + slicingWindowOperator.processElement(1, 2); + slicingWindowOperator.processElement(1, 3); + slicingWindowOperator.processElement(1, 8); + slicingWindowOperator.processElement(1, 9); + slicingWindowOperator.processElement(1, 10); + slicingWindowOperator.processElement(1, 12); + slicingWindowOperator.processElement(1, 13); + slicingWindowOperator.processElement(1, 14); + slicingWindowOperator.processElement(1, 16); + slicingWindowOperator.processElement(1, 18); + slicingWindowOperator.processElement(1, 19); + slicingWindowOperator.processElement(1, 21); + slicingWindowOperator.processElement(1, 22); + slicingWindowOperator.processElement(1, 24); + + //out-of-order tuple + slicingWindowOperator.processElement(1, 17); + + slicingWindowOperator.processElement(1, 25); + + + List resultWindows = slicingWindowOperator.processWatermark(40); + + WindowAssert.assertEquals(resultWindows.get(0),1, 11, 6); + WindowAssert.assertEquals(resultWindows.get(1),2, 12, 5); + WindowAssert.assertEquals(resultWindows.get(2),3, 13, 5); + WindowAssert.assertEquals(resultWindows.get(3),8, 18, 8); + WindowAssert.assertEquals(resultWindows.get(4),9, 19, 8); + WindowAssert.assertEquals(resultWindows.get(5),10, 20, 8); + WindowAssert.assertEquals(resultWindows.get(6),12, 22, 8); + WindowAssert.assertEquals(resultWindows.get(7),13, 23, 8); + WindowAssert.assertEquals(resultWindows.get(8),14, 24, 7); + WindowAssert.assertEquals(resultWindows.get(9),16, 26, 8); + WindowAssert.assertEquals(resultWindows.get(10),17, 27, 7); + WindowAssert.assertEquals(resultWindows.get(11),18, 28, 6); + WindowAssert.assertEquals(resultWindows.get(12),19, 29, 5); + WindowAssert.assertEquals(resultWindows.get(13),21, 31, 4); + WindowAssert.assertEquals(resultWindows.get(14),22, 32, 3); + WindowAssert.assertEquals(resultWindows.get(15),24, 34, 2); + WindowAssert.assertEquals(resultWindows.get(16),25, 35, 1); + } +}