Skip to content

Commit daae43c

Browse files
committed
multichannel works - still a race condition - I think(!) it's because all threads from both peer runs in a single java engine. See test for details
1 parent efe8561 commit daae43c

3 files changed

Lines changed: 41 additions & 4 deletions

File tree

src/net/sharksystem/hub/hubside/HubGenericImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,17 @@ void connectionCreated(CharSequence sourcePeerID, CharSequence targetPeerID, Str
155155
StreamPairLink dataLink =
156156
new StreamPairLink(dataSessionRequest.connection, sourcePeerID, connection, targetPeerID, false);
157157

158+
/*
158159
if(dataSessionRequest.connectionPreparer != null)
159160
dataSessionRequest.connectionPreparer.prepare(dataSessionRequest.connection);
160161
161162
if(connectionPreparer != null) connectionPreparer.prepare(connection);
163+
*/
164+
165+
// tell peers e2e is established
166+
Log.writeLog(this, "send ready byte to each peer");
167+
dataSessionRequest.connection.getOutputStream().write(ConnectionPreparer.readyByte);
168+
connection.getOutputStream().write(ConnectionPreparer.readyByte);
162169

163170
// now go
164171
dataLink.start();

src/net/sharksystem/hub/peerside/SharedChannelConnectorPeerSide.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,26 @@ protected void silenceEnded() { }
239239

240240
@Override
241241
protected void dataSessionStarted(CharSequence targetPeerID, StreamPair streamPair) {
242-
Log.writeLog(this, this.toString(), "data session started to peer: " + targetPeerID);
242+
Log.writeLog(this, this.toString(), "data session started to peer " + targetPeerID);
243+
244+
Log.writeLog(this, this.toString(), "wait for ready byte " + targetPeerID);
245+
246+
byte b = 0;
247+
while(b != ConnectionPreparer.readyByte) {
248+
Log.writeLog(this, this.toString(), "no ready byte yet");
249+
try {
250+
Thread.sleep(100);
251+
} catch (InterruptedException e) {
252+
// ignore
253+
}
254+
try {
255+
b = (byte) streamPair.getInputStream().read();
256+
} catch (IOException e) {
257+
Log.writeLog(this, this.toString(), "connection gone before usage, other peer: "
258+
+ targetPeerID);
259+
}
260+
}
261+
Log.writeLog(this, this.toString(), "got ready byte from hub - notify data session can begin");
243262

244263
// tell listener
245264
if(this.listener != null) {

tests/net/sharksystem/hub/HubConnectorTester.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,13 @@ public void notifyPeerConnected(CharSequence targetPeerID, StreamPair streamPair
7373
System.out.println(peerID + " writes pure bytes: " + this.messageA);
7474
streamPair.getOutputStream().write(testBytes);
7575
byte[] receivedBytes = new byte[10];
76-
System.out.println(peerID + " available: " + streamPair.getInputStream().available());
77-
//Thread.sleep(100); // give other process a moment
76+
int available = streamPair.getInputStream().available();
77+
if(available < 1) {
78+
//System.out.println(peerID + " try to go in read block");
79+
System.out.println(peerID + " sleep");
80+
//streamPair.getInputStream().read();
81+
Thread.sleep(100); // give other process a moment
82+
}
7883
System.out.println(peerID + " available #2: " + streamPair.getInputStream().available());
7984
streamPair.getInputStream().read(receivedBytes);
8085
System.out.println(peerID + " available #3: " + streamPair.getInputStream().available());
@@ -89,12 +94,15 @@ public void notifyPeerConnected(CharSequence targetPeerID, StreamPair streamPair
8994
byte[] outBytes = baos.toByteArray();
9095
ASAPSerialization.writeByteArray(outBytes, streamPair.getOutputStream());
9196

97+
System.out.println(peerID + " sleep - other threads need to get a chance. Is that still a bug? ");
98+
Thread.sleep(100); // give other process a moment
99+
92100
// read
93101
System.out.println(peerID + " reading..");
94102
if(PeerIDHelper.sameID(peerID, ALICE_ID)) {
95103
int i = 42; // debug break
96104
}
97-
Thread.sleep(1000);
105+
//Thread.sleep(1000);
98106
byte[] inBytes = ASAPSerialization.readByteArray(streamPair.getInputStream());
99107
ByteArrayInputStream bais = new ByteArrayInputStream(inBytes);
100108
String receivedMessage = ASAPSerialization.readCharSequenceParameter(bais);
@@ -118,6 +126,9 @@ public void notifyPeerConnected(CharSequence targetPeerID, StreamPair streamPair
118126
outBytes = baos.toByteArray();
119127
ASAPSerialization.writeByteArray(outBytes, streamPair.getOutputStream());
120128

129+
System.out.println(peerID + " sleep - other threads need to get a chance. Is that still a bug? ");
130+
Thread.sleep(100); // give other process a moment
131+
121132
System.out.println(this.peerID + " reading.. ");
122133
inBytes = ASAPSerialization.readByteArray(streamPair.getInputStream());
123134
bais = new ByteArrayInputStream(inBytes);

0 commit comments

Comments
 (0)