diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java new file mode 100644 index 000000000..283943494 --- /dev/null +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}, + * does not depend on {@link java.lang.instrument.Instrumentation} + * + * @see MemorySafeLinkedBlockingQueue + */ +public class MemoryLimitCalculator { + + private static volatile long maxAvailable; + + private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor(); + + static { + // immediately refresh when this class is loaded to prevent maxAvailable from being 0 + refresh(); + // check every 50 ms to improve performance + SCHEDULER.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS); + Runtime.getRuntime().addShutdownHook(new Thread(SCHEDULER::shutdown)); + } + + private static void refresh() { + maxAvailable = Runtime.getRuntime().freeMemory(); + } + + /** + * Get the maximum available memory of the current JVM. + * + * @return maximum available memory + */ + public static long maxAvailable() { + return maxAvailable; + } + + /** + * Take the current JVM's maximum available memory + * as a percentage of the result as the limit. + * + * @param percentage percentage + * @return available memory + */ + public static long calculate(final float percentage) { + if (percentage <= 0 || percentage > 1) { + throw new IllegalArgumentException(); + } + return (long) (maxAvailable() * percentage); + } + + /** + * By default, it takes 80% of the maximum available memory of the current JVM. + * + * @return available memory + */ + public static long defaultLimit() { + return (long) (maxAvailable() * 0.8); + } +} \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java new file mode 100644 index 000000000..191f3894c --- /dev/null +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool; + +import java.util.Collection; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue}, + * does not depend on {@link java.lang.instrument.Instrumentation} + * + * @see MemorySafeLinkedBlockingQueue + */ +public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue { + + private static final long serialVersionUID = 8032578371739960142L; + + public static int THE_256_MB = 256 * 1024 * 1024; + + private int maxFreeMemory; + + public MemorySafeLinkedBlockingQueue() { + this(THE_256_MB); + } + + public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) { + super(Integer.MAX_VALUE); + this.maxFreeMemory = maxFreeMemory; + } + + public MemorySafeLinkedBlockingQueue(final Collection c, + final int maxFreeMemory) { + super(c); + this.maxFreeMemory = maxFreeMemory; + } + + /** + * set the max free memory. + * + * @param maxFreeMemory the max free memory + */ + public void setMaxFreeMemory(final int maxFreeMemory) { + this.maxFreeMemory = maxFreeMemory; + } + + /** + * get the max free memory. + * + * @return the max free memory limit + */ + public int getMaxFreeMemory() { + return maxFreeMemory; + } + + /** + * determine if there is any remaining free memory. + * + * @return true if has free memory + */ + public boolean hasRemainedMemory() { + return MemoryLimitCalculator.maxAvailable() > maxFreeMemory; + } + + @Override + public void put(final E e) throws InterruptedException { + if (hasRemainedMemory()) { + super.put(e); + } + } + + @Override + public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException { + return hasRemainedMemory() && super.offer(e, timeout, unit); + } + + @Override + public boolean offer(final E e) { + return hasRemainedMemory() && super.offer(e); + } +} \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java index 4a474a2a5..f4c57019b 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.common.utils; +import com.alipay.sofa.rpc.common.threadpool.MemorySafeLinkedBlockingQueue; + import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; @@ -248,7 +250,7 @@ public static BlockingQueue buildQueue(int size, boolean isPriority) { queue = size < 0 ? new PriorityBlockingQueue() : new PriorityBlockingQueue(size); } else { - queue = size < 0 ? new LinkedBlockingQueue() + queue = size < 0 ? new MemorySafeLinkedBlockingQueue() : new LinkedBlockingQueue(size); } } diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java new file mode 100644 index 000000000..a1cca3fc2 --- /dev/null +++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.common.threadpool; + +import net.bytebuddy.agent.ByteBuddyAgent; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.instrument.Instrumentation; + +public class MemorySafeLinkedBlockingQueueTest { + + @Test + public void test() throws Exception { + ByteBuddyAgent.install(); + final Instrumentation instrumentation = ByteBuddyAgent.getInstrumentation(); + final long objectSize = instrumentation.getObjectSize((Runnable) () -> { + }); + int maxFreeMemory = (int) MemoryLimitCalculator.maxAvailable(); + MemorySafeLinkedBlockingQueue queue = new MemorySafeLinkedBlockingQueue<>(maxFreeMemory); + + // all memory is reserved for JVM, so it will fail here + Assert.assertEquals(queue.offer(() -> { + }), false); + + // maxFreeMemory-objectSize Byte memory is reserved for the JVM, so this will succeed + queue.setMaxFreeMemory((int) (MemoryLimitCalculator.maxAvailable() - objectSize)); + Assert.assertEquals(queue.offer(() -> { + }), true); + } +} \ No newline at end of file diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java index dcf4e38d6..6da188a44 100644 --- a/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java +++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java @@ -17,6 +17,7 @@ package com.alipay.sofa.rpc.common.utils; import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; +import com.alipay.sofa.rpc.common.threadpool.MemorySafeLinkedBlockingQueue; import org.junit.Assert; import org.junit.Test; @@ -151,7 +152,7 @@ public void buildQueue() throws Exception { BlockingQueue queue = ThreadPoolUtils.buildQueue(0); Assert.assertEquals(queue.getClass(), SynchronousQueue.class); queue = ThreadPoolUtils.buildQueue(-1); - Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); + Assert.assertEquals(queue.getClass(), MemorySafeLinkedBlockingQueue.class); queue = ThreadPoolUtils.buildQueue(10); Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); } @@ -165,7 +166,7 @@ public void buildQueue1() throws Exception { queue = ThreadPoolUtils.buildQueue(100, true); Assert.assertEquals(queue.getClass(), PriorityBlockingQueue.class); queue = ThreadPoolUtils.buildQueue(-1, false); - Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); + Assert.assertEquals(queue.getClass(), MemorySafeLinkedBlockingQueue.class); queue = ThreadPoolUtils.buildQueue(100, false); Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class); }