diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java
new file mode 100644
index 000000000..283943494
--- /dev/null
+++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemoryLimitCalculator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.common.threadpool;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
+ * does not depend on {@link java.lang.instrument.Instrumentation}
+ *
+ * @see MemorySafeLinkedBlockingQueue
+ */
+public class MemoryLimitCalculator {
+
+ private static volatile long maxAvailable;
+
+ private static final ScheduledExecutorService SCHEDULER = Executors.newSingleThreadScheduledExecutor();
+
+ static {
+ // immediately refresh when this class is loaded to prevent maxAvailable from being 0
+ refresh();
+ // check every 50 ms to improve performance
+ SCHEDULER.scheduleWithFixedDelay(MemoryLimitCalculator::refresh, 50, 50, TimeUnit.MILLISECONDS);
+ Runtime.getRuntime().addShutdownHook(new Thread(SCHEDULER::shutdown));
+ }
+
+ private static void refresh() {
+ maxAvailable = Runtime.getRuntime().freeMemory();
+ }
+
+ /**
+ * Get the maximum available memory of the current JVM.
+ *
+ * @return maximum available memory
+ */
+ public static long maxAvailable() {
+ return maxAvailable;
+ }
+
+ /**
+ * Take the current JVM's maximum available memory
+ * as a percentage of the result as the limit.
+ *
+ * @param percentage percentage
+ * @return available memory
+ */
+ public static long calculate(final float percentage) {
+ if (percentage <= 0 || percentage > 1) {
+ throw new IllegalArgumentException();
+ }
+ return (long) (maxAvailable() * percentage);
+ }
+
+ /**
+ * By default, it takes 80% of the maximum available memory of the current JVM.
+ *
+ * @return available memory
+ */
+ public static long defaultLimit() {
+ return (long) (maxAvailable() * 0.8);
+ }
+}
\ No newline at end of file
diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java
new file mode 100644
index 000000000..191f3894c
--- /dev/null
+++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueue.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.common.threadpool;
+
+import java.util.Collection;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Can completely solve the OOM problem caused by {@link java.util.concurrent.LinkedBlockingQueue},
+ * does not depend on {@link java.lang.instrument.Instrumentation}
+ *
+ * @see MemorySafeLinkedBlockingQueue
+ */
+public class MemorySafeLinkedBlockingQueue extends LinkedBlockingQueue {
+
+ private static final long serialVersionUID = 8032578371739960142L;
+
+ public static int THE_256_MB = 256 * 1024 * 1024;
+
+ private int maxFreeMemory;
+
+ public MemorySafeLinkedBlockingQueue() {
+ this(THE_256_MB);
+ }
+
+ public MemorySafeLinkedBlockingQueue(final int maxFreeMemory) {
+ super(Integer.MAX_VALUE);
+ this.maxFreeMemory = maxFreeMemory;
+ }
+
+ public MemorySafeLinkedBlockingQueue(final Collection extends E> c,
+ final int maxFreeMemory) {
+ super(c);
+ this.maxFreeMemory = maxFreeMemory;
+ }
+
+ /**
+ * set the max free memory.
+ *
+ * @param maxFreeMemory the max free memory
+ */
+ public void setMaxFreeMemory(final int maxFreeMemory) {
+ this.maxFreeMemory = maxFreeMemory;
+ }
+
+ /**
+ * get the max free memory.
+ *
+ * @return the max free memory limit
+ */
+ public int getMaxFreeMemory() {
+ return maxFreeMemory;
+ }
+
+ /**
+ * determine if there is any remaining free memory.
+ *
+ * @return true if has free memory
+ */
+ public boolean hasRemainedMemory() {
+ return MemoryLimitCalculator.maxAvailable() > maxFreeMemory;
+ }
+
+ @Override
+ public void put(final E e) throws InterruptedException {
+ if (hasRemainedMemory()) {
+ super.put(e);
+ }
+ }
+
+ @Override
+ public boolean offer(final E e, final long timeout, final TimeUnit unit) throws InterruptedException {
+ return hasRemainedMemory() && super.offer(e, timeout, unit);
+ }
+
+ @Override
+ public boolean offer(final E e) {
+ return hasRemainedMemory() && super.offer(e);
+ }
+}
\ No newline at end of file
diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java
index 4a474a2a5..f4c57019b 100644
--- a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java
+++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtils.java
@@ -16,6 +16,8 @@
*/
package com.alipay.sofa.rpc.common.utils;
+import com.alipay.sofa.rpc.common.threadpool.MemorySafeLinkedBlockingQueue;
+
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
@@ -248,7 +250,7 @@ public static BlockingQueue buildQueue(int size, boolean isPriority) {
queue = size < 0 ? new PriorityBlockingQueue()
: new PriorityBlockingQueue(size);
} else {
- queue = size < 0 ? new LinkedBlockingQueue()
+ queue = size < 0 ? new MemorySafeLinkedBlockingQueue()
: new LinkedBlockingQueue(size);
}
}
diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java
new file mode 100644
index 000000000..a1cca3fc2
--- /dev/null
+++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/threadpool/MemorySafeLinkedBlockingQueueTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.common.threadpool;
+
+import net.bytebuddy.agent.ByteBuddyAgent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.instrument.Instrumentation;
+
+public class MemorySafeLinkedBlockingQueueTest {
+
+ @Test
+ public void test() throws Exception {
+ ByteBuddyAgent.install();
+ final Instrumentation instrumentation = ByteBuddyAgent.getInstrumentation();
+ final long objectSize = instrumentation.getObjectSize((Runnable) () -> {
+ });
+ int maxFreeMemory = (int) MemoryLimitCalculator.maxAvailable();
+ MemorySafeLinkedBlockingQueue queue = new MemorySafeLinkedBlockingQueue<>(maxFreeMemory);
+
+ // all memory is reserved for JVM, so it will fail here
+ Assert.assertEquals(queue.offer(() -> {
+ }), false);
+
+ // maxFreeMemory-objectSize Byte memory is reserved for the JVM, so this will succeed
+ queue.setMaxFreeMemory((int) (MemoryLimitCalculator.maxAvailable() - objectSize));
+ Assert.assertEquals(queue.offer(() -> {
+ }), true);
+ }
+}
\ No newline at end of file
diff --git a/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java b/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java
index dcf4e38d6..6da188a44 100644
--- a/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java
+++ b/core/common/src/test/java/com/alipay/sofa/rpc/common/utils/ThreadPoolUtilsTest.java
@@ -17,6 +17,7 @@
package com.alipay.sofa.rpc.common.utils;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
+import com.alipay.sofa.rpc.common.threadpool.MemorySafeLinkedBlockingQueue;
import org.junit.Assert;
import org.junit.Test;
@@ -151,7 +152,7 @@ public void buildQueue() throws Exception {
BlockingQueue queue = ThreadPoolUtils.buildQueue(0);
Assert.assertEquals(queue.getClass(), SynchronousQueue.class);
queue = ThreadPoolUtils.buildQueue(-1);
- Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class);
+ Assert.assertEquals(queue.getClass(), MemorySafeLinkedBlockingQueue.class);
queue = ThreadPoolUtils.buildQueue(10);
Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class);
}
@@ -165,7 +166,7 @@ public void buildQueue1() throws Exception {
queue = ThreadPoolUtils.buildQueue(100, true);
Assert.assertEquals(queue.getClass(), PriorityBlockingQueue.class);
queue = ThreadPoolUtils.buildQueue(-1, false);
- Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class);
+ Assert.assertEquals(queue.getClass(), MemorySafeLinkedBlockingQueue.class);
queue = ThreadPoolUtils.buildQueue(100, false);
Assert.assertEquals(queue.getClass(), LinkedBlockingQueue.class);
}