-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommit.go
More file actions
75 lines (62 loc) · 1.92 KB
/
commit.go
File metadata and controls
75 lines (62 loc) · 1.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package consumer
import (
"context"
"fmt"
"time"
"github.com/ipfs/go-cid"
"github.com/areknoster/public-distributed-commit-log/pdcl"
"github.com/areknoster/public-distributed-commit-log/pdclpb"
"github.com/areknoster/public-distributed-commit-log/storage"
)
type commit struct {
Cid cid.Cid
Previous cid.Cid
Messages []cid.Cid
}
func newCommit(itsOwnCID cid.Cid, pbCommit *pdclpb.Commit) (commit, error) {
messagesCids := make([]cid.Cid, len(pbCommit.MessagesCids))
var err error
for i, messageCID := range pbCommit.MessagesCids {
messagesCids[i], err = cid.Decode(messageCID)
if err != nil {
return commit{}, fmt.Errorf("decode message cid: %w", err)
}
}
var previousCID cid.Cid
previousCID, err = pdcl.ParseCID(pbCommit.PreviousCommitCid)
if err != nil {
return commit{}, fmt.Errorf("decode previous commit cid: %w", err)
}
return commit{
Cid: itsOwnCID,
Previous: previousCID,
Messages: messagesCids,
}, nil
}
type commitReader interface {
GetCommit(ctx context.Context, cid cid.Cid) (commit, error)
}
type storageCommitReader struct {
reader storage.MessageReader
timeout time.Duration
}
func newStorageCommitReader(reader storage.MessageReader, timeout time.Duration) commitReader {
return &storageCommitReader{reader: reader, timeout: timeout}
}
func (cr *storageCommitReader) GetCommit(ctx context.Context, cid cid.Cid) (commit, error) {
ctxTimeout, cancel := context.WithTimeout(ctx, cr.timeout)
defer cancel()
unmarshallable, err := cr.reader.Read(ctxTimeout, cid)
if err != nil {
return commit{}, fmt.Errorf("read commit message from storage: %w", err)
}
pbCommit := &pdclpb.Commit{}
if err := unmarshallable.Decode(pbCommit); err != nil {
return commit{}, fmt.Errorf("unmarshall to commit proto: %w", err)
}
domainCommit, err := newCommit(cid, pbCommit)
if err != nil {
return commit{}, fmt.Errorf("map proto commit to consumer commit: %w", err)
}
return domainCommit, nil
}