Skip to content

Commit 4cfb0a0

Browse files
committed
Construct ChunkState for efficiently managing concurrent updated chunks
1 parent 53af939 commit 4cfb0a0

5 files changed

Lines changed: 328 additions & 0 deletions

File tree

frontend/src/host_orchestrator/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.17
44

55
require (
66
github.com/google/android-cuttlefish/frontend/src/liboperator v0.0.0-20240822182916-7bea0dafdbde
7+
github.com/google/btree v1.1.3
78
github.com/google/go-cmp v0.5.9
89
github.com/google/uuid v1.3.0
910
github.com/gorilla/mux v1.8.0

frontend/src/host_orchestrator/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM
3333
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
3434
github.com/google/android-cuttlefish/frontend/src/liboperator v0.0.0-20240822182916-7bea0dafdbde h1:laMjSvqBHvDZ9DB+YaM6I4y5t0a7zYQRorYLM1VJsyM=
3535
github.com/google/android-cuttlefish/frontend/src/liboperator v0.0.0-20240822182916-7bea0dafdbde/go.mod h1:iA3V13fYW7It3n+LqvgOkXAOhw56XAjS1vH43+kU/4o=
36+
github.com/google/btree v1.1.3 h1:CVpQJjYgC4VbzxeGVHfvZrv1ctoYCAI8vbl07Fcxlyg=
37+
github.com/google/btree v1.1.3/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
3638
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
3739
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
3840
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=

frontend/src/host_orchestrator/orchestrator/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test")
33
go_library(
44
name = "orchestrator",
55
srcs = [
6+
"chunkstate.go",
67
"controller.go",
78
"createcvdaction.go",
89
"createcvdbugreportaction.go",
@@ -33,6 +34,7 @@ go_library(
3334
go_test(
3435
name = "orchestrator_test",
3536
srcs = [
37+
"chunkstate_test.go",
3638
"controller_test.go",
3739
"createcvdaction_test.go",
3840
"instancemanager_test.go",
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package orchestrator
16+
17+
import (
18+
"sync"
19+
"fmt"
20+
21+
"github.com/google/btree"
22+
)
23+
24+
// Structure for managing the state of updated chunk for efficiently knowing whether the user
25+
// artifact may need to calculate hash sum or not.
26+
type ChunkState struct {
27+
fileSize int64
28+
items *btree.BTree
29+
mutex sync.RWMutex
30+
}
31+
32+
type chunkStateItem struct {
33+
// Starting byte offset of the continuous range having same state whether updated or not.
34+
offset int64
35+
// State description whether given byte offset of the user artifact is updated or not.
36+
isUpdated bool
37+
}
38+
39+
func (i chunkStateItem) Less(item btree.Item) bool {
40+
return i.offset < item.(chunkStateItem).offset
41+
}
42+
43+
func NewChunkState(fileSize int64) *ChunkState {
44+
cs := ChunkState{
45+
fileSize: fileSize,
46+
items: btree.New(2),
47+
mutex: sync.RWMutex{},
48+
}
49+
cs.items.ReplaceOrInsert(chunkStateItem{offset: 0, isUpdated: false})
50+
return &cs
51+
}
52+
53+
func (cs *ChunkState) getItem(offset int64) *chunkStateItem {
54+
entry := cs.items.Get(chunkStateItem{offset: offset})
55+
if item, ok := entry.(chunkStateItem); ok {
56+
return &item
57+
} else {
58+
return nil
59+
}
60+
}
61+
62+
func (cs *ChunkState) getPrevItem(offset int64) *chunkStateItem {
63+
var record *chunkStateItem
64+
cs.items.DescendLessOrEqual(chunkStateItem{offset: offset - 1}, func(item btree.Item) bool {
65+
entry := item.(chunkStateItem)
66+
record = &entry
67+
return false
68+
})
69+
return record
70+
}
71+
72+
func (cs *ChunkState) getNextItem(offset int64) *chunkStateItem {
73+
var record *chunkStateItem
74+
cs.items.AscendGreaterOrEqual(chunkStateItem{offset: offset + 1}, func(item btree.Item) bool {
75+
entry := item.(chunkStateItem)
76+
record = &entry
77+
return false
78+
})
79+
return record
80+
}
81+
82+
func (cs *ChunkState) Update(start int64, end int64) error {
83+
if start < 0 {
84+
return fmt.Errorf("invalid start offset of the range")
85+
}
86+
if end > cs.fileSize {
87+
return fmt.Errorf("invalid end offset of the range")
88+
}
89+
cs.mutex.Lock()
90+
defer cs.mutex.Unlock()
91+
if item := cs.getItem(start); item != nil {
92+
if !item.isUpdated {
93+
cs.items.Delete(*item)
94+
if item.offset == 0 {
95+
cs.items.ReplaceOrInsert(chunkStateItem{offset: start, isUpdated: true})
96+
}
97+
}
98+
} else if prev := cs.getPrevItem(start); prev != nil {
99+
if !prev.isUpdated {
100+
cs.items.ReplaceOrInsert(chunkStateItem{offset: start, isUpdated: true})
101+
}
102+
} else {
103+
return fmt.Errorf("previous item should exist")
104+
}
105+
106+
for next := cs.getNextItem(start); ; next = cs.getNextItem(start) {
107+
if next == nil {
108+
cs.items.ReplaceOrInsert(chunkStateItem{offset: end, isUpdated: false})
109+
break
110+
}
111+
if next.offset < end {
112+
cs.items.Delete(*next)
113+
} else if next.offset == end {
114+
if next.isUpdated {
115+
cs.items.Delete(*next)
116+
} else {
117+
break
118+
}
119+
} else {
120+
if next.isUpdated {
121+
cs.items.ReplaceOrInsert(chunkStateItem{offset: end, isUpdated: false})
122+
}
123+
break
124+
}
125+
}
126+
return nil
127+
}
128+
129+
func (cs *ChunkState) IsCompleted() bool {
130+
cs.mutex.RLock()
131+
defer cs.mutex.RUnlock()
132+
if cs.items.Len() != 2 {
133+
return false
134+
}
135+
first := chunkStateItem{offset: 0, isUpdated: true}
136+
last := chunkStateItem{offset: cs.fileSize, isUpdated: false}
137+
return cs.items.Min() == first && cs.items.Max() == last
138+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package orchestrator
16+
17+
import (
18+
"testing"
19+
20+
"github.com/google/btree"
21+
"github.com/google/go-cmp/cmp"
22+
)
23+
24+
func getChunkStateItemList(cs *ChunkState) []chunkStateItem {
25+
var items []chunkStateItem
26+
cs.items.Ascend(func(item btree.Item) bool {
27+
items = append(items, item.(chunkStateItem))
28+
return true
29+
})
30+
return items
31+
}
32+
33+
func TestChunkStateUpdateSucceedsForSeparatedChunks(t *testing.T) {
34+
cs := NewChunkState(100)
35+
cs.Update(0, 10)
36+
cs.Update(40, 50)
37+
cs.Update(90, 100)
38+
items := getChunkStateItemList(cs)
39+
expected := []chunkStateItem{
40+
{offset: 0, isUpdated: true},
41+
{offset: 10, isUpdated: false},
42+
{offset: 40, isUpdated: true},
43+
{offset: 50, isUpdated: false},
44+
{offset: 90, isUpdated: true},
45+
{offset: 100, isUpdated: false},
46+
}
47+
if diff := cmp.Diff(expected, items, cmp.AllowUnexported(chunkStateItem{})); diff != "" {
48+
t.Fatalf("chunk state mismatch (-want +got):\n%s", diff)
49+
}
50+
}
51+
52+
func TestChunkStateUpdateSucceedsForSubranges(t *testing.T) {
53+
cs := NewChunkState(100)
54+
cs.Update(40, 50)
55+
cs.Update(60, 70)
56+
cs.Update(30, 80)
57+
cs.Update(50, 60)
58+
cs.Update(65, 75)
59+
items := getChunkStateItemList(cs)
60+
expected := []chunkStateItem{
61+
{offset: 0, isUpdated: false},
62+
{offset: 30, isUpdated: true},
63+
{offset: 80, isUpdated: false},
64+
}
65+
if diff := cmp.Diff(expected, items, cmp.AllowUnexported(chunkStateItem{})); diff != "" {
66+
t.Fatalf("chunk state mismatch (-want +got):\n%s", diff)
67+
}
68+
}
69+
70+
func TestChunkStateUpdateSucceedsForOverlappingRanges(t *testing.T) {
71+
cs := NewChunkState(100)
72+
cs.Update(10, 30)
73+
cs.Update(20, 40)
74+
cs.Update(70, 90)
75+
cs.Update(60, 80)
76+
items := getChunkStateItemList(cs)
77+
expected := []chunkStateItem{
78+
{offset: 0, isUpdated: false},
79+
{offset: 10, isUpdated: true},
80+
{offset: 40, isUpdated: false},
81+
{offset: 60, isUpdated: true},
82+
{offset: 90, isUpdated: false},
83+
}
84+
if diff := cmp.Diff(expected, items, cmp.AllowUnexported(chunkStateItem{})); diff != "" {
85+
t.Fatalf("chunk state mismatch (-want +got):\n%s", diff)
86+
}
87+
}
88+
89+
func TestChunkStateUpdateSucceedsForSameStartOrEnd(t *testing.T) {
90+
cs := NewChunkState(100)
91+
cs.Update(10, 15)
92+
cs.Update(10, 12)
93+
cs.Update(20, 22)
94+
cs.Update(20, 25)
95+
cs.Update(55, 60)
96+
cs.Update(58, 60)
97+
cs.Update(65, 70)
98+
cs.Update(68, 70)
99+
items := getChunkStateItemList(cs)
100+
expected := []chunkStateItem{
101+
{offset: 0, isUpdated: false},
102+
{offset: 10, isUpdated: true},
103+
{offset: 15, isUpdated: false},
104+
{offset: 20, isUpdated: true},
105+
{offset: 25, isUpdated: false},
106+
{offset: 55, isUpdated: true},
107+
{offset: 60, isUpdated: false},
108+
{offset: 65, isUpdated: true},
109+
{offset: 70, isUpdated: false},
110+
}
111+
if diff := cmp.Diff(expected, items, cmp.AllowUnexported(chunkStateItem{})); diff != "" {
112+
t.Fatalf("chunk state mismatch (-want +got):\n%s", diff)
113+
}
114+
}
115+
116+
func TestChunkStateUpdateSucceedsForMergingChunks(t *testing.T) {
117+
cs := NewChunkState(100)
118+
cs.Update(10, 20)
119+
cs.Update(30, 40)
120+
cs.Update(50, 60)
121+
cs.Update(20, 30)
122+
cs.Update(40, 50)
123+
items := getChunkStateItemList(cs)
124+
expected := []chunkStateItem{
125+
{offset: 0, isUpdated: false},
126+
{offset: 10, isUpdated: true},
127+
{offset: 60, isUpdated: false},
128+
}
129+
if diff := cmp.Diff(expected, items, cmp.AllowUnexported(chunkStateItem{})); diff != "" {
130+
t.Fatalf("chunk state mismatch (-want +got):\n%s", diff)
131+
}
132+
}
133+
134+
func TestChunkStateUpdateSucceedsForInvalidRanges(t *testing.T) {
135+
cs := NewChunkState(100)
136+
if err := cs.Update(-1, 10); err == nil {
137+
t.Fatalf("expected an error");
138+
}
139+
if err := cs.Update(90, 101); err == nil {
140+
t.Fatalf("expected an error");
141+
}
142+
items := getChunkStateItemList(cs)
143+
expected := []chunkStateItem{
144+
{offset: 0, isUpdated: false},
145+
}
146+
if diff := cmp.Diff(expected, items, cmp.AllowUnexported(chunkStateItem{})); diff != "" {
147+
t.Fatalf("chunk state mismatch (-want +got):\n%s", diff)
148+
}
149+
}
150+
151+
func TestChunkStateIsCompleted(t *testing.T) {
152+
cs := NewChunkState(100)
153+
cs.Update(0, 20)
154+
cs.Update(80, 100)
155+
cs.Update(20, 80)
156+
if !cs.IsCompleted() {
157+
t.Fatalf("expected as completed")
158+
}
159+
}
160+
161+
func TestChunkStateIsNotCompletedWithMissingStart(t *testing.T) {
162+
cs := NewChunkState(100)
163+
cs.Update(1, 100)
164+
if cs.IsCompleted() {
165+
t.Fatalf("expected as not completed")
166+
}
167+
}
168+
169+
func TestChunkStateIsNotCompletedWithMissingIntermediate(t *testing.T) {
170+
cs := NewChunkState(100)
171+
cs.Update(0, 20)
172+
cs.Update(80, 100)
173+
cs.Update(20, 79)
174+
if cs.IsCompleted() {
175+
t.Fatalf("expected as not completed")
176+
}
177+
}
178+
179+
func TestChunkStateIsNotCompletedWithMissingEnd(t *testing.T) {
180+
cs := NewChunkState(100)
181+
cs.Update(0, 99)
182+
if cs.IsCompleted() {
183+
t.Fatalf("expected as not completed")
184+
}
185+
}

0 commit comments

Comments
 (0)