Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 40 additions & 24 deletions compose/compose.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,39 +266,45 @@ func getZero(idx int, raft string) service {
return svc
}

func getAlpha(idx int, raft string) service {
basename := "alpha"
// internalPort is used for Raft communication between nodes.
internalPort := alphaBasePort + opts.PortOffset + getOffset(idx)
// grpcPort is the public-facing port for clients. It is offset from the internal port.
grpcPort := internalPort + 1000
svc := initService(basename, idx, grpcPort)

if opts.TmpFS {
svc.TmpFS = append(svc.TmpFS, fmt.Sprintf("/data/%s/w", svc.name))
}

// makeZeroList builds a comma-separated --zero= value covering `count` zeros
// starting at zero index `firstIdx` (1-based). Used by main alphas (pointing
// at the main zero quorum, firstIdx=1) and by learner alphas (pointing at the
// learner zero quorum, firstIdx=opts.NumZeros+1). On Dgraph versions older
// than 1.2.3 / 20.03.0 (which only accept a single --zero), the list is
// truncated to the first address.
func makeZeroList(firstIdx, count int) string {
isMultiZeros := true
var isInvalidVersion, err = semverCompare("< 1.2.3 || 20.03.0", opts.Tag)
isInvalidVersion, err := semverCompare("< 1.2.3 || 20.03.0", opts.Tag)
if err != nil || isInvalidVersion {
if opts.Tag != "latest" {
isMultiZeros = false
}
}

maxZeros := 1
if isMultiZeros {
maxZeros = opts.NumZeros
if !isMultiZeros {
count = 1
}

zeroHostAddr := fmt.Sprintf("zero%d:%d", 1, zeroBasePort+opts.PortOffset)
zeros := []string{zeroHostAddr}
for i := 2; i <= maxZeros; i++ {
zeroHostAddr = fmt.Sprintf("zero%d:%d", i, zeroBasePort+opts.PortOffset+getOffset(i))
zeros = append(zeros, zeroHostAddr)
zeros := make([]string, 0, count)
for i := 0; i < count; i++ {
idx := firstIdx + i
port := zeroBasePort + opts.PortOffset + getOffset(idx)
zeros = append(zeros, fmt.Sprintf("zero%d:%d", idx, port))
}
return strings.Join(zeros, ",")
}

zerosOpt := strings.Join(zeros, ",")
func getAlpha(idx int, raft, zerosOpt string) service {
basename := "alpha"
// internalPort is used for Raft communication between nodes.
internalPort := alphaBasePort + opts.PortOffset + getOffset(idx)
// grpcPort is the public-facing port for clients. It is offset from the internal port.
grpcPort := internalPort + 1000
svc := initService(basename, idx, grpcPort)

if opts.TmpFS {
svc.TmpFS = append(svc.TmpFS, fmt.Sprintf("/data/%s/w", svc.name))
}

offset := getOffset(idx)
if (opts.PortOffset + offset) != 0 {
Expand Down Expand Up @@ -663,6 +669,16 @@ func main() {

services := make(map[string]service)

// Main alphas point at the main zero quorum; learner alphas point at
// the learner zero quorum (the whole reason for deploying learner zeros
// in the first place — they're the edge contact point for nearby learner
// alphas, see compose docs / customer feedback on this).
mainZerosOpt := makeZeroList(1, opts.NumZeros)
var learnerZerosOpt string
if opts.NumLearners > 0 {
learnerZerosOpt = makeZeroList(opts.NumZeros+1, opts.NumLearners)
}

for i := 1; i <= opts.NumZeros; i++ {
svc := getZero(i, fmt.Sprintf("idx=%d", i))
services[svc.name] = svc
Expand All @@ -671,7 +687,7 @@ func main() {
for i := 1; i <= opts.NumAlphas; i++ {
gid := int(math.Ceil(float64(i) / float64(opts.NumReplicas)))
rs := fmt.Sprintf("idx=%d; group=%d", i, gid)
svc := getAlpha(i, rs)
svc := getAlpha(i, rs, mainZerosOpt)
// Don't make Alphas depend on each other.
svc.DependsOn = nil
services[svc.name] = svc
Expand All @@ -690,7 +706,7 @@ func main() {
for i := 1; i <= opts.NumLearners; i++ {
lidx++
rs := fmt.Sprintf("idx=%d; group=%d; learner=true", lidx, gid)
svc := getAlpha(lidx, rs)
svc := getAlpha(lidx, rs, learnerZerosOpt)
services[svc.name] = svc
}
}
Expand Down
Loading