Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import org.apache.ignite.internal.pagememory.datastructure.DataStructure;
import org.apache.ignite.internal.pagememory.io.IoVersions;
Expand Down Expand Up @@ -142,6 +143,9 @@ public abstract class AbstractBplusTreePageMemoryTest extends BaseIgniteAbstract
@Nullable
protected PageMemory pageMem;

@Nullable
protected PartitionPageMemory partitionPageMemory;

@Nullable
private ReuseList reuseList;

Expand All @@ -165,8 +169,9 @@ protected void beforeEach() throws Exception {
rnd = new Random(seed);

pageMem = createPageMemory();
partitionPageMemory = pageMem.createPartitionPageMemory(GROUP_ID, 0);

reuseList = createReuseList(GROUP_ID, 0, pageMem, 0, true);
reuseList = createReuseList(GROUP_ID, 0, partitionPageMemory, 0, true);
}

@AfterEach
Expand Down Expand Up @@ -234,7 +239,7 @@ protected long getTestTimeout() {
protected abstract @Nullable ReuseList createReuseList(
int grpId,
int partId,
PageMemory pageMem,
PartitionPageMemory pageMem,
long rootId,
boolean initNew
) throws Exception;
Expand Down Expand Up @@ -2568,7 +2573,7 @@ private static void findNextForKeys(TestTree tree, long keyCount) throws Excepti
}

private TestTree reCreateTestTree(TestTree tree, long globalRmvId) throws Exception {
return new TestTree(tree.metaFullPageId(), reuseList, tree.canGetRow, pageMem, new AtomicLong(globalRmvId), false);
return new TestTree(tree.metaFullPageId(), reuseList, tree.canGetRow, partitionPageMemory, new AtomicLong(globalRmvId), false);
}

private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws Exception {
Expand Down Expand Up @@ -2762,7 +2767,7 @@ private static int size(Cursor<?> c) {
}

private TestTree createTestTree(boolean canGetRow, AtomicLong globalRmvId) throws Exception {
var tree = new TestTree(allocateMetaPage(), reuseList, canGetRow, pageMem, globalRmvId, true);
var tree = new TestTree(allocateMetaPage(), reuseList, canGetRow, partitionPageMemory, globalRmvId, true);

assertEquals(0, tree.size());
assertEquals(0, tree.rootLevel());
Expand All @@ -2775,7 +2780,7 @@ protected TestTree createTestTree(boolean canGetRow) throws Exception {
}

private FullPageId allocateMetaPage() throws Exception {
return new FullPageId(pageMem.allocatePage(reuseList, GROUP_ID, 0, FLAG_AUX), GROUP_ID);
return new FullPageId(partitionPageMemory.allocatePage(reuseList, GROUP_ID, 0, FLAG_AUX), GROUP_ID);
}

/** Test tree. */
Expand All @@ -2802,7 +2807,7 @@ protected static class TestTree extends BplusTree<Long, Long> {
FullPageId metaPageId,
@Nullable ReuseList reuseList,
boolean canGetRow,
PageMemory pageMem,
PartitionPageMemory pageMem,
AtomicLong globalRmvId,
boolean initNew
) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.freelist.FreeListImpl;
import org.apache.ignite.internal.pagememory.reuse.ReuseList;

Expand All @@ -31,7 +32,7 @@ public abstract class AbstractBplusTreeReusePageMemoryTest extends AbstractBplus
protected ReuseList createReuseList(
int grpId,
int partId,
PageMemory pageMem,
PartitionPageMemory pageMem,
long rootId,
boolean initNew
) throws IgniteInternalCheckedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import org.apache.ignite.internal.pagememory.configuration.VolatileDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.inmemory.VolatilePageMemory;
Expand Down Expand Up @@ -67,9 +68,13 @@ public class ItBplusTreeReplaceRemoveRaceTest extends BaseIgniteAbstractTest {
@Nullable
protected PageMemory pageMem;

@Nullable
protected PartitionPageMemory partitionPageMemory;

@BeforeEach
protected void beforeEach() throws Exception {
pageMem = createPageMemory();
partitionPageMemory = pageMem.createPartitionPageMemory(GROUP_ID, 0);
}

@AfterEach
Expand All @@ -92,7 +97,7 @@ protected PageMemory createPageMemory() throws Exception {
}

private FullPageId allocateMetaPage() throws IgniteInternalCheckedException {
return new FullPageId(pageMem.allocatePageNoReuse(GROUP_ID, 0, FLAG_AUX), GROUP_ID);
return new FullPageId(partitionPageMemory.allocatePageNoReuse(GROUP_ID, 0, FLAG_AUX), GROUP_ID);
}

/**
Expand Down Expand Up @@ -126,7 +131,7 @@ protected static class TestPairTree extends BplusTree<Pair, Pair> {
*/
public TestPairTree(
FullPageId metaPageId,
PageMemory pageMem
PartitionPageMemory pageMem
) throws IgniteInternalCheckedException {
super(
"test",
Expand Down Expand Up @@ -419,7 +424,7 @@ public void testConcurrentPutRemoveSameRow() throws Exception {
* @throws Exception If failed.
*/
private TestPairTree prepareBplusTree() throws Exception {
TestPairTree tree = new TestPairTree(allocateMetaPage(), pageMem);
TestPairTree tree = new TestPairTree(allocateMetaPage(), partitionPageMemory);

tree.putx(new Pair(1, 0));
tree.putx(new Pair(2, 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.ignite.internal.pagememory.tree.inmemory;

import java.util.concurrent.ConcurrentLinkedDeque;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.reuse.ReuseBag;
import org.apache.ignite.internal.pagememory.reuse.ReuseList;
Expand All @@ -32,7 +32,7 @@ public class ItBplusTreeFakeReuseVolatilePageMemoryTest extends ItBplusTreeVolat
protected ReuseList createReuseList(
int grpId,
int partId,
PageMemory pageMem,
PartitionPageMemory pageMem,
Comment thread
ibessonov marked this conversation as resolved.
long rootId,
boolean initNew
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import org.apache.ignite.internal.pagememory.configuration.VolatileDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.inmemory.VolatilePageMemory;
Expand Down Expand Up @@ -65,7 +66,7 @@ protected long acquiredPages() {
protected @Nullable ReuseList createReuseList(
int grpId,
int partId,
PageMemory pageMem,
PartitionPageMemory pageMem,
long rootId,
boolean initNew
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.lang.IgniteSystemProperties;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoRegistry;
import org.apache.ignite.internal.pagememory.configuration.PersistentDataRegionConfiguration;
import org.apache.ignite.internal.pagememory.persistence.PageHeader;
Expand Down Expand Up @@ -92,7 +93,7 @@ protected long acquiredPages() {
protected @Nullable ReuseList createReuseList(
int grpId,
int partId,
PageMemory pageMem,
PartitionPageMemory pageMem,
long rootId,
boolean initNew
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.PartitionPageMemory;
import org.apache.ignite.internal.pagememory.TestPageIoModule.TestSimpleValuePageIo;
import org.apache.ignite.internal.pagememory.configuration.ReplacementMode;
import org.apache.ignite.internal.pagememory.io.PageIo;
Expand Down Expand Up @@ -92,6 +93,7 @@ public class PageReplacementBenchmark extends PersistentPageMemoryBenchmarkBase
public CachePressure cachePressure;

private long[] pageIds;
private PartitionPageMemory[] partitionPageMemories;

private int workingSetSize;

Expand Down Expand Up @@ -262,6 +264,7 @@ public void setup(Blackhole blackhole) throws Exception {
*/
private void prepareWorkingSet() throws Exception {
pageIds = new long[workingSetSize];
partitionPageMemories = new PartitionPageMemory[workingSetSize];

// Allocate 80% of capacity at a time to avoid hitting dirty pages limit.
int batchSize = (int) Math.round(REGION_CAPACITY_PAGES * 0.8);
Expand All @@ -276,12 +279,13 @@ private void prepareWorkingSet() throws Exception {
// Distribute pages randomly across partitions.
for (int i = batchStart; i < batchEnd; i++) {
int partitionId = partitionRandom.nextInt(PARTITION_COUNT);
pageIds[i] = persistentPageMemory().allocatePage(null, GROUP_ID, partitionId, FLAG_DATA);
PartitionPageMemory partitionPageMemory = partitionPageMemory(partitionId);
pageIds[i] = partitionPageMemory.allocatePage(null, GROUP_ID, partitionId, FLAG_DATA);
partitionPageMemories[i] = partitionPageMemory;
}

for (int i = batchStart; i < batchEnd; i++) {
long pageId = pageIds[i];
writePage(pageId, pageIo);
writePage(i, pageIo);
}
} finally {
checkpointManager().checkpointTimeoutLock().checkpointReadUnlock();
Expand Down Expand Up @@ -325,9 +329,8 @@ private void warmupCache(Blackhole blackhole) throws IgniteInternalCheckedExcept
try {
for (int i = 0; i < warmupPages; i++) {
int index = warmupDistribution.next();
long pageId = pageIds[index];

accessPageReadOnly(pageId, 0, blackhole);
accessPageReadOnly(index, 0, blackhole);
}
} finally {
checkpointManager().checkpointTimeoutLock().checkpointReadUnlock();
Expand Down Expand Up @@ -387,13 +390,15 @@ public void zipfianLatencyFourThreads(ThreadState state, Blackhole blackhole)

private void benchmarkIteration(ThreadState state, Blackhole blackhole) throws IgniteInternalCheckedException {
int index = state.nextZipfianIndex();
long pageId = pageIds[index];
accessPageReadOnly(pageId, state.threadIndex(), blackhole);
accessPageReadOnly(index, state.threadIndex(), blackhole);
}

private void accessPageReadOnly(long pageId, int threadIndex, Blackhole blackhole)
private void accessPageReadOnly(int index, int threadIndex, Blackhole blackhole)
throws IgniteInternalCheckedException {
long page = persistentPageMemory().acquirePage(GROUP_ID, pageId);
long pageId = pageIds[index];
PartitionPageMemory partitionPageMemory = partitionPageMemories[index];

long page = partitionPageMemory.acquirePage(GROUP_ID, pageId);

if (page == 0) {
throw new IllegalStateException(String.format(
Expand All @@ -408,22 +413,25 @@ private void accessPageReadOnly(long pageId, int threadIndex, Blackhole blackhol
try {
blackhole.consume(page);
} finally {
persistentPageMemory().releasePage(GROUP_ID, pageId, page);
partitionPageMemory.releasePage(GROUP_ID, pageId, page);
}
}

private void writePage(long pageId, PageIo pageIo) throws IgniteInternalCheckedException {
long page = persistentPageMemory().acquirePage(GROUP_ID, pageId);
private void writePage(int index, PageIo pageIo) throws IgniteInternalCheckedException {
long pageId = pageIds[index];
PartitionPageMemory partitionPageMemory = partitionPageMemories[index];

long page = partitionPageMemory.acquirePage(GROUP_ID, pageId);
try {
long pageAddr = persistentPageMemory().writeLock(GROUP_ID, pageId, page);
long pageAddr = partitionPageMemory.writeLock(GROUP_ID, pageId, page);
try {
pageIo.initNewPage(pageAddr, pageId, PAGE_SIZE);
TestSimpleValuePageIo.setLongValue(pageAddr, pageId);
} finally {
persistentPageMemory().writeUnlock(GROUP_ID, pageId, page, true);
partitionPageMemory.writeUnlock(GROUP_ID, pageId, page, true);
}
} finally {
persistentPageMemory().releasePage(GROUP_ID, pageId, page);
partitionPageMemory.releasePage(GROUP_ID, pageId, page);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
/**
* Class responsible for allocating/freeing page IDs.
*/
// TODO IGNITE-28429 Remove "groupId" parameter from all methods.
public interface PageIdAllocator {
/**
* Flag for a Data page. Also used by partition meta and tracking pages. This type doesn't use the Page ID rotation mechanism.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@

package org.apache.ignite.internal.pagememory;

import java.nio.ByteBuffer;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.pagememory.io.PageIo;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;

/**
* Class responsible for pages storage and handling.
*/
// TODO IGNITE-16350 Improve javadoc in this class.
public interface PageMemory extends PageIdAllocator, PageSupport {
// TODO IGNITE-28429 Remove the inheritance.
public interface PageMemory {
/**
* Stops page memory.
*
* @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse on subsequent {@link #start()}
* @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse on subsequent {@code start()}
*/
void stop(boolean deallocate) throws IgniteInternalException;

Expand All @@ -39,35 +37,22 @@ public interface PageMemory extends PageIdAllocator, PageSupport {
*/
int pageSize();

/**
* Returns a page size without the encryption overhead, in bytes.
*
* @param groupId Group id.
*/
// TODO IGNITE-16350 Consider renaming.
int realPageSize(int groupId);

/**
* Returns a page's size with system overhead, in bytes.
*/
// TODO IGNITE-16350 Consider renaming.
int systemPageSize();

/**
* Wraps a page address, obtained by a {@code readLock}/{@code writeLock} or their variants, into a direct byte buffer.
*
* @param pageAddr Page address.
* @return Page byte buffer.
*/
ByteBuffer pageBuffer(long pageAddr);

/**
* Returns the total number of pages loaded into memory.
*/
long loadedPages();

/**
* Returns a registry to obtain {@link PageIo} instances for pages.
* Creates a new instance of {@link PartitionPageMemory} for a specified partition.
*
* @param groupId Group ID for the specific partition.
* @param partitionId Partition ID of the specific partition.
*/
PageIoRegistry ioRegistry();
PartitionPageMemory createPartitionPageMemory(int groupId, int partitionId);
Comment thread
ibessonov marked this conversation as resolved.
}
Loading