Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions __tests__/halfOpen209.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import { expect, test, jest, beforeEach } from '@jest/globals';
import net from '../src/index';
import { nativeEventEmitter } from '../src/Globals';
import { NativeModules } from 'react-native';

const Sockets = NativeModules.TcpSockets;

// Mirror the Globals mock used by allowHalfOpen.test.js so socket ids
// and the native event emitter behave deterministically.
jest.mock('../src/Globals', () => {
const { EventEmitter } = require('events');
const emitter = new EventEmitter();
const originalAddListener = emitter.addListener.bind(emitter);
// @ts-ignore
emitter.addListener = (event, listener) => {
originalAddListener(event, listener);
return { remove: () => emitter.removeListener(event, listener) };
};
let idCounter = 2000;
return {
__esModule: true,
nativeEventEmitter: emitter,
getNextId: () => idCounter++,
// Socket.js guards every JSI call with `typeof fn === 'function'`,
// so omitting these here exercises the no-bridge degrade path.
};
});

beforeEach(() => {
Sockets.connect.mockClear();
Sockets.end.mockClear();
Sockets.destroy.mockClear();
Sockets.write.mockClear();
});

/**
* #209 / #183 Node `net.Socket` parity. `@libp2p/tcp` requires these
* exact APIs: it passes `allowHalfOpen` through `net.connect(cOpts)`,
* calls `socket.destroySoon()` on every graceful close, calls
* `socket.resetAndDestroy()` for resets, and reads a BOOLEAN `hadError`
* from the `'close'` event.
*/

test('connect() reads allowHalfOpen from the options object (libp2p passes it through net.connect)', () => {
const socket = new net.Socket();
expect(socket.allowHalfOpen).toBe(false);
socket.connect({ port: 1234, host: '127.0.0.1', allowHalfOpen: true });
expect(socket.allowHalfOpen).toBe(true);
// It must NOT be forwarded to the native connect args (JS lifecycle
// flag only — and arbitrary caller objects must never cross).
const customOptions = Sockets.connect.mock.calls[0][3];
expect(customOptions.allowHalfOpen).toBeUndefined();
});

test('destroySoon() exists and, when nothing is buffered after end(), destroys immediately', () => {
const socket = new net.Socket();
socket.connect({ port: 1, host: 'h' });
// simulate native connect ack
socket._setConnected({
localAddress: '127.0.0.1',
localPort: 1,
remoteAddress: 'h',
remotePort: 1,
remoteFamily: 'IPv4',
});
expect(typeof socket.destroySoon).toBe('function');
socket.end(); // writable finished, no buffered bytes
socket.destroySoon();
expect(Sockets.destroy).toHaveBeenCalled();
});

test('destroySoon() with unflushed write defers destroy until close', () => {
const socket = new net.Socket();
socket.connect({ port: 1, host: 'h' });
socket._setConnected({
localAddress: '127.0.0.1',
localPort: 1,
remoteAddress: 'h',
remotePort: 1,
remoteFamily: 'IPv4',
});
// Simulate unflushed outbound bytes. (Going through write() would hit
// the intentional loud throw — the Jest env has no JSI bridge — so
// set the buffered-size accumulator directly: destroySoon's decision
// is exactly `_writableEnded && _writeBufferSize === 0`.)
socket._writeBufferSize = 13;
socket.destroySoon();
// FIN sent, but destroy deferred until 'close'
expect(Sockets.end).toHaveBeenCalled();
expect(Sockets.destroy).not.toHaveBeenCalled();
// native reports the socket closed → now it tears down
nativeEventEmitter.emit('close', { id: socket._id });
expect(Sockets.destroy).toHaveBeenCalled();
});

test('resetAndDestroy() exists and destroys the socket', () => {
const socket = new net.Socket();
socket.connect({ port: 1, host: 'h' });
socket._setConnected({
localAddress: '127.0.0.1',
localPort: 1,
remoteAddress: 'h',
remotePort: 1,
remoteFamily: 'IPv4',
});
expect(typeof socket.resetAndDestroy).toBe('function');
socket.resetAndDestroy();
expect(Sockets.destroy).toHaveBeenCalled();
expect(socket.destroyed).toBe(true);
});

test("'close' emits a BOOLEAN hadError (Node net.Socket spec), not the error object", () => {
// A socket emits 'close' exactly once and detaches its native
// listeners on disconnect, so each case needs its own socket.
const clean = new net.Socket();
clean.connect({ port: 1, host: 'h' });
let cleanArg;
clean.on('close', (hadError) => {
cleanArg = hadError;
});
nativeEventEmitter.emit('close', { id: clean._id });

const errored = new net.Socket();
errored.connect({ port: 2, host: 'h' });
let erroredArg;
errored.on('close', (hadError) => {
erroredArg = hadError;
});
nativeEventEmitter.emit('close', { id: errored._id, error: new Error('boom') });

expect(cleanArg).toBe(false);
expect(erroredArg).toBe(true);
expect(typeof cleanArg).toBe('boolean');
expect(typeof erroredArg).toBe('boolean');
});

test('destroy(error) emits the error then marks destroyed', () => {
const socket = new net.Socket();
socket.connect({ port: 1, host: 'h' });
const errs = [];
socket.on('error', (e) => errs.push(e));
const boom = new Error('reset');
socket.destroy(boom);
expect(errs).toEqual([boom]);
expect(socket.destroyed).toBe(true);
});
34 changes: 34 additions & 0 deletions android/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# VENHO fork — Phase 1 zero-copy data-plane native build.
# Modeled on react-native-quick-base64/android/CMakeLists.txt (proven
# RN-0.83 New-Arch cxxTurboModule wiring in this same app).
cmake_minimum_required(VERSION 3.13.0)
project(TcpDataBridge)

set(PACKAGE_NAME "react-native-tcp-socket")
set(CMAKE_VERBOSE_MAKEFILE ON)

add_library(
${PACKAGE_NAME} STATIC
../cpp/TcpDataBridge.cpp
../cpp/TcpInboundRegistry.cpp
TcpBridgeJni.cpp
)

set_target_properties(
${PACKAGE_NAME} PROPERTIES
CXX_STANDARD 20
CXX_STANDARD_REQUIRED ON
CXX_EXTENSIONS OFF
)

target_include_directories(
${PACKAGE_NAME} PUBLIC
../cpp
)

target_link_libraries(
${PACKAGE_NAME} jsi
fbjni
reactnative
react_codegen_RNTcpSocketsSpec
)
189 changes: 189 additions & 0 deletions android/TcpBridgeJni.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// VENHO fork — Phase 1 JNI glue: Java socket read thread → C++ inbound
// registry. The Java TcpReceiverTask, instead of base64+WritableMap+
// RCTDeviceEventEmitter (the Phase-0 folly::dynamic OOM path), calls
// these to push raw bytes into the bounded C++ queue the JSI host
// object drains zero-copy.
//
// Registered via explicit JNI (RegisterNatives in JNI_OnLoad) for the
// Java class com.asterinet.react.tcpsocket.TcpDataBridgeNative.

#include "TcpDataBridge.h"
#include "TcpInboundRegistry.h"

#include <jni.h>

#include <cstdint>
#include <string>

namespace {

// pushInbound(int id, byte[] data, int len) -> boolean
// Returns false when the bounded queue hit the high watermark and the
// Java read loop must pause (registry fires resume via the no-op below;
// Java polls canResume()).
jboolean nativePushInbound(JNIEnv* env, jclass, jint id, jbyteArray data,
jint len) {
jbyte* buf = env->GetByteArrayElements(data, nullptr);
bool keepReading = venho::TcpInboundRegistry::instance().pushInbound(
static_cast<int32_t>(id), reinterpret_cast<const uint8_t*>(buf),
static_cast<size_t>(len));
env->ReleaseByteArrayElements(data, buf, JNI_ABORT); // no copy-back
return keepReading ? JNI_TRUE : JNI_FALSE;
}

// registerSocket(int id): create the bounded queue. The "resume" signal
// is polled by Java (canResume) rather than a JNI upcall, to keep this
// glue minimal and avoid attaching the C++ caller thread to the JVM.
void nativeRegisterSocket(JNIEnv*, jclass, jint id) {
// Java polls canResume() for backpressure — no resume callback.
venho::TcpInboundRegistry::instance().registerSocket(
static_cast<int32_t>(id));
}

void nativeUnregisterSocket(JNIEnv*, jclass, jint id) {
venho::TcpInboundRegistry::instance().unregisterSocket(
static_cast<int32_t>(id));
// Drop this socket's readable-coalescing pending entry so the
// process-global map stays bounded across many short-lived libp2p
// peer connections.
facebook::react::TcpDataBridge::forgetSocket(static_cast<int32_t>(id));
}

// canResume(int id): true once the queue drained below the low
// watermark (Java's paused read loop polls this on a short wait).
jboolean nativeCanResume(JNIEnv*, jclass, jint id) {
return venho::TcpInboundRegistry::instance().canResume(
static_cast<int32_t>(id))
? JNI_TRUE
: JNI_FALSE;
}

// waitWriteChunk(int id, int[] msgIdOut) -> byte[] | null
// Blocks the native write thread until the socket's OUTBOUND queue (fed
// zero-copy by the JSI write() host fn) has a chunk, then returns its
// bytes and writes the chunk's msgId into msgIdOut[0]. Returns null when
// the socket was unregistered (write thread must exit). One JNI call;
// the only allocation is the unavoidable jbyteArray that must cross to
// Java — NO base64, NO folly::dynamic, NO @ReactMethod.
jbyteArray nativeWaitWriteChunk(JNIEnv* env, jclass, jint id,
jintArray msgIdOut) {
venho::OutboundChunk chunk;
if (!venho::TcpInboundRegistry::instance().waitPopOutbound(
static_cast<int32_t>(id), chunk)) {
return nullptr; // socket gone — signal the write thread to exit
}
const auto len = static_cast<jsize>(chunk.bytes.size());
jbyteArray out = env->NewByteArray(len);
if (out == nullptr) {
return nullptr; // OOM on the Java heap — let the write thread exit
}
if (len > 0) {
env->SetByteArrayRegion(
out, 0, len, reinterpret_cast<const jbyte*>(chunk.bytes.data()));
}
if (msgIdOut != nullptr && env->GetArrayLength(msgIdOut) >= 1) {
jint mid = static_cast<jint>(chunk.msgId);
env->SetIntArrayRegion(msgIdOut, 0, 1, &mid);
}
return out;
}

// signalReadable(int id): "data available for socket id" — hops to the
// JS thread via the CallInvoker and calls the registered JS readable
// callback. This is the legacy-bridge-FREE replacement for the old
// RCTDeviceEventEmitter "readable" emit that still OOM'd in 1d.
void nativeSignalReadable(JNIEnv*, jclass, jint id) {
facebook::react::TcpDataBridge::signalReadable(static_cast<int32_t>(id));
}

// signalWritten(int id, int msgId, String err): per-write ACK — hops to
// the JS thread via the CallInvoker and calls the registered JS
// `written` callback with (id,msgId,err). err is "" on success (Java
// passes null → treated as ""). Legacy-bridge-FREE replacement for the
// RCTDeviceEventEmitter "written" emit that accumulated an unbounded
// folly::dynamic per write (Scenario-C OOM #2). The std::string is
// copied before the async hop, so the Java string may be released.
void nativeSignalWritten(JNIEnv* env, jclass, jint id, jint msgId,
jstring err) {
std::string errStr;
if (err != nullptr) {
const char* c = env->GetStringUTFChars(err, nullptr);
if (c != nullptr) {
errStr.assign(c);
env->ReleaseStringUTFChars(err, c);
}
}
facebook::react::TcpDataBridge::signalWritten(
static_cast<int32_t>(id), static_cast<int32_t>(msgId), errStr);
}

const JNINativeMethod kMethods[] = {
{"nativePushInbound", "(I[BI)Z",
reinterpret_cast<void*>(nativePushInbound)},
{"nativeRegisterSocket", "(I)V",
reinterpret_cast<void*>(nativeRegisterSocket)},
{"nativeUnregisterSocket", "(I)V",
reinterpret_cast<void*>(nativeUnregisterSocket)},
{"nativeCanResume", "(I)Z",
reinterpret_cast<void*>(nativeCanResume)},
{"nativeSignalReadable", "(I)V",
reinterpret_cast<void*>(nativeSignalReadable)},
{"nativeSignalWritten", "(IILjava/lang/String;)V",
reinterpret_cast<void*>(nativeSignalWritten)},
{"nativeWaitWriteChunk", "(I[I)[B",
reinterpret_cast<void*>(nativeWaitWriteChunk)},
};

} // namespace

// VENHO Phase 1 — JNI registration LIFETIME fix (replaces JNI_OnLoad).
//
// The fork's native code is autolinked as a STATIC lib and merged into
// libappmodules.so (RN New-Arch single-merged-lib model: see
// android/CMakeLists.txt + the generated Android-autolinking.cmake).
// libappmodules.so has exactly ONE JNI_OnLoad, invoked by SoLoader VERY
// early — before the JS bundle runs, with the bootstrap classloader.
// FindClass("com/asterinet/.../TcpDataBridgeNative") from THAT context
// returns null (the RN app classes aren't reachable from the bootstrap
// loader), so the old JNI_OnLoad returned JNI_ERR and the 7 natives were
// NEVER registered → UnsatisfiedLinkError when libp2p first tore a
// socket down (nativeUnregisterSocket). Per-`.so` JNI_OnLoad is NOT
// re-invoked, so ensureLoaded()'s loadLibrary couldn't recover it.
//
// Fix: ONE *implicitly* bound bootstrap method,
// Java_com_asterinet_react_tcpsocket_TcpDataBridgeNative_nativeInstallBridge.
// Implicit JNI binding resolves Java_<class>_<method> by exported-symbol
// lookup across already-loaded .so's — no JNI_OnLoad, no RegisterNatives
// needed for IT. Java's TcpDataBridgeNative.ensureLoaded() calls it once;
// because the call originates in Java, `env` carries the APP classloader
// and `clazz` IS TcpDataBridgeNative — so the explicit RegisterNatives
// for the other 7 succeeds deterministically, at first socket use.
extern "C" JNIEXPORT void JNICALL
Java_com_asterinet_react_tcpsocket_TcpDataBridgeNative_nativeInstallBridge(
JNIEnv* env, jclass clazz) {
// `clazz` is TcpDataBridgeNative itself (passed by the JVM for a
// static native). No FindClass / classloader hazard.
if (env->RegisterNatives(clazz, kMethods,
sizeof(kMethods) / sizeof(kMethods[0])) != 0) {
// Leave any pending exception for Java to surface (ensureLoaded's
// caller will see it) rather than silently swallowing — a hard,
// loud failure here is correct: the data plane cannot work without
// these natives, and a quiet failure would resurface later as the
// same opaque UnsatisfiedLinkError this fix eliminates.
}
}

// VENHO Phase 1 — link-time KEEP anchor.
//
// This TU only defines a JNI entry point that is resolved at RUNTIME by
// name (implicit binding). Nothing references it at LINK time, so the
// merged-lib link (NDK default --gc-sections) dead-strips this entire
// object file out of libappmodules.so — exactly what happened on the
// first attempt (the .a had the symbol; the merged .so did not, so
// implicit JNI lookup found nothing). TcpDataBridge.cpp IS kept (the
// codegen cxxTurboModule references it), so having its constructor touch
// this no-op creates the single link-time edge that pulls TcpBridgeJni
// .cpp.o — and therefore the nativeInstallBridge export — into the .so.
// Must be a real, externally-visible definition (not inline / not
// static) so the reference cannot be optimised away.
extern "C" void venho_tcpBridgeJniKeepAnchor() {}
Loading