Skip to content

Commit 4509add

Browse files
Fix dissemination failure despite satisfying RequiredPeerCount
Signed-off-by: Drupadh Dinesh <drupadhdinesh@gmail.com>
1 parent 86c1172 commit 4509add

1 file changed

Lines changed: 57 additions & 80 deletions

File tree

gossip/privdata/distributor.go

Lines changed: 57 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
196196
if err != nil {
197197
return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName)
198198
}
199-
disseminationPlan = append(disseminationPlan, dPlan...)
199+
disseminationPlan = append(disseminationPlan, dPlan)
200200
}
201201
}
202202
return disseminationPlan, nil
@@ -213,8 +213,7 @@ func (d *distributorImpl) getCollectionConfig(config *peer.CollectionConfigPacka
213213
return nil, errors.New(fmt.Sprint("no configuration for collection", collection.CollectionName, "found"))
214214
}
215215

216-
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) ([]*dissemination, error) {
217-
var disseminationPlan []*dissemination
216+
func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAccessPolicy, colFilter privdata.Filter, pvtDataMsg *protoext.SignedGossipMessage) (*dissemination, error) {
218217

219218
routingFilter, err := d.gossipAdapter.PeerFilter(gossipCommon.ChannelID(d.chainID), func(signature api.PeerSignature) bool {
220219
return colFilter(protoutil.SignedData{
@@ -251,110 +250,88 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
251250
peerEndpoints[string(peer.PKIid)] = epToAdd
252251
}
253252

254-
// Initialize maximumPeerRemainingCount and requiredPeerRemainingCount,
255-
// these will be decremented until we've selected enough peers for dissemination
253+
// Initialize maximumPeerRemainingCount and requiredPeerCount,
254+
// maximumPeerRemainingCount will be decremented until we've selected enough peers for dissemination
256255
maximumPeerRemainingCount := colAP.MaximumPeerCount()
257-
requiredPeerRemainingCount := colAP.RequiredPeerCount()
256+
requiredPeerCount := colAP.RequiredPeerCount()
258257

259258
remainingPeersAcrossOrgs := []api.PeerIdentityInfo{}
260259
selectedPeerEndpointsForDebug := []string{}
260+
selectedPKIDs := make(map[string]struct{})
261261

262262
var seed [32]byte
263-
_, _ = crand.Read(seed[:])
263+
if _, err := crand.Read(seed[:]); err != nil {
264+
return nil, errors.Wrap(err, "failed to generate random seed")
265+
}
264266
r := rand.New(rand.NewChaCha8(seed))
265267

266268
// PHASE 1 - Select one peer from each eligible org
267-
if maximumPeerRemainingCount > 0 {
268-
for _, selectionPeersForOrg := range identitySetsByOrg {
269-
270-
// Peers are tagged as a required peer (acksRequired=1) for RequiredPeerCount up front before dissemination.
271-
// TODO It would be better to attempt dissemination to MaxPeerCount first, and then verify that enough sends were acknowledged to meet RequiredPeerCount.
272-
acksRequired := 1
273-
if requiredPeerRemainingCount == 0 {
274-
acksRequired = 0
275-
}
269+
for _, selectionPeersForOrg := range identitySetsByOrg {
276270

277-
selectedPeerIndex := r.IntN(len(selectionPeersForOrg))
278-
peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex]
279-
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)])
280-
sc := gossipgossip.SendCriteria{
281-
Timeout: d.pushAckTimeout,
282-
Channel: gossipCommon.ChannelID(d.chainID),
283-
MaxPeers: 1,
284-
MinAck: acksRequired,
285-
IsEligible: func(member discovery.NetworkMember) bool {
286-
return bytes.Equal(member.PKIid, peer2SendPerOrg.PKIId)
287-
},
288-
}
289-
disseminationPlan = append(disseminationPlan, &dissemination{
290-
criteria: sc,
291-
msg: &protoext.SignedGossipMessage{
292-
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
293-
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
294-
},
295-
})
271+
if maximumPeerRemainingCount == 0 {
272+
break
273+
}
296274

297-
// Add unselected peers to remainingPeersAcrossOrgs
298-
for i, peer := range selectionPeersForOrg {
299-
if i != selectedPeerIndex {
300-
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, peer)
301-
}
302-
}
275+
if len(selectionPeersForOrg) == 0 {
276+
continue
277+
}
303278

304-
if requiredPeerRemainingCount > 0 {
305-
requiredPeerRemainingCount--
306-
}
279+
selectedPeerIndex := r.IntN(len(selectionPeersForOrg))
280+
peer2SendPerOrg := selectionPeersForOrg[selectedPeerIndex]
281+
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2SendPerOrg.PKIId)])
307282

308-
maximumPeerRemainingCount--
309-
if maximumPeerRemainingCount == 0 {
310-
d.logger.Debug("MaximumPeerCount satisfied")
311-
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
312-
return disseminationPlan, nil
313-
}
314-
}
283+
selectedPKIDs[string(peer2SendPerOrg.PKIId)] = struct{}{}
284+
// Safe because selectionPeersForOrg is not used after this point.
285+
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs, append(selectionPeersForOrg[:selectedPeerIndex], selectionPeersForOrg[selectedPeerIndex+1:]...)...)
286+
287+
maximumPeerRemainingCount--
315288
}
316289

317290
// PHASE 2 - Select additional peers to satisfy colAP.MaximumPeerCount() if there are still peers in the remainingPeersAcrossOrgs pool
318291
numRemainingPeersToSelect := min(len(remainingPeersAcrossOrgs), maximumPeerRemainingCount)
319292
if numRemainingPeersToSelect > 0 {
320293
d.logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect)
321294
}
322-
for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 {
323-
required := 1
324-
if requiredPeerRemainingCount == 0 {
325-
required = 0
326-
}
327-
selectedPeerIndex := r.IntN(len(remainingPeersAcrossOrgs))
328-
peer2Send := remainingPeersAcrossOrgs[selectedPeerIndex]
295+
296+
for i := range numRemainingPeersToSelect {
297+
298+
// Partial Fisher-Yates shuffle to uniformly sample peers without replacement.
299+
j := i + r.IntN(len(remainingPeersAcrossOrgs)-i)
300+
remainingPeersAcrossOrgs[i], remainingPeersAcrossOrgs[j] = remainingPeersAcrossOrgs[j], remainingPeersAcrossOrgs[i]
301+
302+
peer2Send := remainingPeersAcrossOrgs[i]
329303
selectedPeerEndpointsForDebug = append(selectedPeerEndpointsForDebug, peerEndpoints[string(peer2Send.PKIId)])
330-
sc := gossipgossip.SendCriteria{
331-
Timeout: d.pushAckTimeout,
332-
Channel: gossipCommon.ChannelID(d.chainID),
333-
MaxPeers: 1,
334-
MinAck: required,
335-
IsEligible: func(member discovery.NetworkMember) bool {
336-
return bytes.Equal(member.PKIid, peer2Send.PKIId)
337-
},
338-
}
339-
disseminationPlan = append(disseminationPlan, &dissemination{
340-
criteria: sc,
341-
msg: &protoext.SignedGossipMessage{
342-
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
343-
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
344-
},
345-
})
346-
if requiredPeerRemainingCount > 0 {
347-
requiredPeerRemainingCount--
348-
}
304+
305+
selectedPKIDs[string(peer2Send.PKIId)] = struct{}{}
349306

350307
maximumPeerRemainingCount--
308+
}
309+
310+
if maximumPeerRemainingCount == 0 {
311+
d.logger.Debugf("MaximumPeerCount satisfied")
312+
}
351313

352-
// remove the selected peer from remaining peers
353-
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...)
314+
sc := gossipgossip.SendCriteria{
315+
Timeout: d.pushAckTimeout,
316+
Channel: gossipCommon.ChannelID(d.chainID),
317+
MaxPeers: len(selectedPKIDs),
318+
MinAck: requiredPeerCount,
319+
IsEligible: func(member discovery.NetworkMember) bool {
320+
_, exists := selectedPKIDs[string(member.PKIid)]
321+
return exists
322+
},
323+
}
324+
dPlan := &dissemination{
325+
criteria: sc,
326+
msg: &protoext.SignedGossipMessage{
327+
Envelope: proto.Clone(pvtDataMsg.Envelope).(*protosgossip.Envelope),
328+
GossipMessage: proto.Clone(pvtDataMsg.GossipMessage).(*protosgossip.GossipMessage),
329+
},
354330
}
355331

356332
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
357-
return disseminationPlan, nil
333+
334+
return dPlan, nil
358335
}
359336

360337
// identitiesOfEligiblePeersByOrg returns the peers eligible for a collection (aka PeerIdentitySet) grouped in a hash map keyed by orgid

0 commit comments

Comments
 (0)