Skip to content

Commit 00aef3d

Browse files
Fix dissemination failure despite satisfying RequiredPeerCount
Signed-off-by: Drupadh Dinesh <drupadhdinesh@gmail.com>
1 parent 86c1172 commit 00aef3d

1 file changed

Lines changed: 57 additions & 81 deletions

File tree

gossip/privdata/distributor.go

Lines changed: 57 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
196196
if err != nil {
197197
return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName)
198198
}
199-
disseminationPlan = append(disseminationPlan, dPlan...)
199+
disseminationPlan = append(disseminationPlan, dPlan)
200200
}
201201
}
202202
return disseminationPlan, nil
@@ -213,9 +213,7 @@ func (d *distributorImpl) getCollectionConfig(config *peer.CollectionConfigPacka
213213
return nil, errors.New(fmt.Sprint("no configuration for collection", collection.CollectionName, "found"))
214214
}
215215

216-
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) ([]*dissemination, error) {
217-
var disseminationPlan []*dissemination
218-
216+
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) (*dissemination, error) {
219217
routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChannelID(d.chainID), func(signature api.PeerSignature) bool {
220218
return colFilter(protoutil.SignedData{
221219
Data: signature.Message,
@@ -251,110 +249,88 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
251249
peerEndpoints[string(peer.PKIid)] = epToAdd
252250
}
253251

254-
// Initialize maximumPeerRemainingCount and requiredPeerRemainingCount,
255-
// these will be decremented until we've selected enough peers for dissemination
252+
// Initialize maximumPeerRemainingCount and requiredPeerCount,
253+
// maximumPeerRemainingCount will be decremented until we've selected enough peers for dissemination
256254
maximumPeerRemainingCount := colAP.MaximumPeerCount()
257-
requiredPeerRemainingCount := colAP.RequiredPeerCount()
255+
requiredPeerCount := colAP.RequiredPeerCount()
258256

259257
remainingPeersAcrossOrgs := []api.PeerIdentityInfo{}
260258
selectedPeerEndpointsForDebug := []string{}
259+
selectedPKIDs := make(map[string]struct{})
261260

262261
var seed [32]byte
263-
_, _ = crand.Read(seed[:])
262+
if _, err := crand.Read(seed[:]); err != nil {
263+
return nil, errors.Wrap(err, "failed to generate random seed")
264+
}
264265
r := rand.New(rand.NewChaCha8(seed))
265266

266267
// PHASE 1 - Select one peer from each eligible org
267-
if maximumPeerRemainingCount > 0 {
268-
for _, selectionPeersForOrg := range identitySetsByOrg {
269-
270-
// Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination.
271-
// TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount.
272-
acksRequired := 1
273-
if requiredPeerRemainingCount == 0 {
274-
acksRequired = 0
275-
}
268+
for _, selectionPeersForOrg := range identitySetsByOrg {
276269

277-
selectedPeerIndex := r.IntN(len(selectionPeersForOrg))
278-
peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex]
279-
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)])
280-
sc := gossipgossip.SendCriteria{
281-
Timeout: d.pushAckTimeout,
282-
Channel: gossipCommon.ChannelID(d.chainID),
283-
MaxPeers: 1,
284-
MinAck: acksRequired,
285-
IsEligible: func(member discovery.NetworkMember) bool {
286-
return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId)
287-
},
288-
}
289-
disseminationPlan = append(disseminationPlan, &dissemination{
290-
criteria: sc,
291-
msg: &protoext.SignedGossipMessage{
292-
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
293-
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
294-
},
295-
})
270+
if maximumPeerRemainingCount == 0 {
271+
break
272+
}
296273

297-
// Add unselected peers to remainingPeersAcrossOrgs
298-
for i, peer := range selectionPeersForOrg {
299-
if i != selectedPeerIndex {
300-
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, peer)
301-
}
302-
}
274+
if len(selectionPeersForOrg) == 0 {
275+
continue
276+
}
303277

304-
if requiredPeerRemainingCount > 0 {
305-
requiredPeerRemainingCount--
306-
}
278+
selectedPeerIndex := r.IntN(len(selectionPeersForOrg))
279+
peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex]
280+
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)])
307281

308-
maximumPeerRemainingCount--
309-
if maximumPeerRemainingCount == 0 {
310-
d.logger.Debug("MaximumPeerCount satisfied")
311-
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
312-
return disseminationPlan, nil
313-
}
314-
}
282+
selectedPKIDs[string(peer2SendPerOrg.PKIId)] = struct{}{}
283+
// Safe because selectionPeersForOrg is not used after this point.
284+
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, append(selectionPeersForOrg[:selectedPeerIndex], selectionPeersForOrg[selectedPeerIndex+1:]...)...)
285+
286+
maximumPeerRemainingCount--
315287
}
316288

317289
// PHASE 2 - Select additional peers to satisfy colAP.MaximumPeerCount() if there are still peers in the remainingPeersAcrossOrgs pool
318290
numRemainingPeersToSelect := min(len(remainingPeersAcrossOrgs), maximumPeerRemainingCount)
319291
if numRemainingPeersToSelect > 0 {
320292
d.logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect)
321293
}
322-
for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 {
323-
required := 1
324-
if requiredPeerRemainingCount == 0 {
325-
required = 0
326-
}
327-
selectedPeerIndex := r.IntN(len(remainingPeersAcrossOrgs))
328-
peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex]
294+
295+
for i := range numRemainingPeersToSelect {
296+
297+
// Partial Fisher-Yates shuffle to uniformly sample peers without replacement.
298+
j := i + r.IntN(len(remainingPeersAcrossOrgs)-i)
299+
remainingPeersAcrossOrgs[i], remainingPeersAcrossOrgs[j] = remainingPeersAcrossOrgs[j], remainingPeersAcrossOrgs[i]
300+
301+
peer2Send := remainingPeersAcrossOrgs[i]
329302
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)])
330-
sc := gossipgossip.SendCriteria{
331-
Timeout: d.pushAckTimeout,
332-
Channel: gossipCommon.ChannelID(d.chainID),
333-
MaxPeers: 1,
334-
MinAck: required,
335-
IsEligible: func(member discovery.NetworkMember) bool {
336-
return bytes.Equal(member.PKIid, peer2Send.PKIId)
337-
},
338-
}
339-
disseminationPlan = append(disseminationPlan, &dissemination{
340-
criteria: sc,
341-
msg: &protoext.SignedGossipMessage{
342-
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
343-
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
344-
},
345-
})
346-
if requiredPeerRemainingCount > 0 {
347-
requiredPeerRemainingCount--
348-
}
303+
304+
selectedPKIDs[string(peer2Send.PKIId)] = struct{}{}
349305

350306
maximumPeerRemainingCount--
307+
}
308+
309+
if maximumPeerRemainingCount == 0 {
310+
d.logger.Debugf("MaximumPeerCount satisfied")
311+
}
351312

352-
// remove the selected peer from remaining peers
353-
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...)
313+
sc := gossipgossip.SendCriteria{
314+
Timeout: d.pushAckTimeout,
315+
Channel: gossipCommon.ChannelID(d.chainID),
316+
MaxPeers: len(selectedPKIDs),
317+
MinAck: requiredPeerCount,
318+
IsEligible: func(member discovery.NetworkMember) bool {
319+
_, exists := selectedPKIDs[string(member.PKIid)]
320+
return exists
321+
},
322+
}
323+
dPlan := &dissemination{
324+
criteria: sc,
325+
msg: &protoext.SignedGossipMessage{
326+
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
327+
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
328+
},
354329
}
355330

356331
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
357-
return disseminationPlan, nil
332+
333+
return dPlan, nil
358334
}
359335

360336
// identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid

0 commit comments

Comments
 (0)