Skip to content

Commit 48a2021

Browse files
committed
split big file
1 parent dbefa3a commit 48a2021

4 files changed

Lines changed: 451 additions & 374 deletions

File tree

model_srv.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
// Copyright 2023 The CortexTheseus Authors
2+
// This file is part of the CortexTheseus library.
3+
//
4+
// The CortexTheseus library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The CortexTheseus library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the CortexTheseus library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package robot
18+
19+
import (
20+
"context"
21+
"errors"
22+
"github.com/CortexFoundation/CortexTheseus/common"
23+
"github.com/CortexFoundation/CortexTheseus/common/mclock"
24+
"github.com/CortexFoundation/CortexTheseus/log"
25+
"github.com/CortexFoundation/torrentfs/params"
26+
"github.com/CortexFoundation/torrentfs/types"
27+
"time"
28+
)
29+
30+
func (m *Monitor) parseFileMeta(tx *types.Transaction, meta *types.FileMeta, b *types.Block) error {
31+
log.Debug("Monitor", "FileMeta", meta)
32+
33+
receipt, err := m.getReceipt(tx.Hash.String())
34+
if err != nil {
35+
return err
36+
}
37+
38+
if receipt.ContractAddr == nil {
39+
log.Warn("contract address is nil, waiting for indexing", "tx.Hash.String()", tx.Hash.String())
40+
return errors.New("contract address is nil")
41+
}
42+
43+
log.Debug("Transaction Receipt", "address", receipt.ContractAddr.String(), "gas", receipt.GasUsed, "status", receipt.Status) //, "tx", receipt.TxHash.String())
44+
45+
if receipt.Status != 1 {
46+
log.Warn("receipt.Status is wrong", "receipt.Status", receipt.Status)
47+
return nil
48+
}
49+
50+
log.Debug("Meta data", "meta", meta)
51+
52+
info := m.fs.NewFileInfo(meta)
53+
54+
info.LeftSize = meta.RawSize
55+
info.ContractAddr = receipt.ContractAddr
56+
info.Relate = append(info.Relate, *info.ContractAddr)
57+
op, update, err := m.fs.AddFile(info)
58+
if err != nil {
59+
log.Warn("Create file failed", "err", err)
60+
return err
61+
}
62+
if update && op == 1 {
63+
log.Debug("Create new file", "ih", meta.InfoHash, "op", op)
64+
65+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
66+
defer cancel()
67+
if m.mode == params.FULL {
68+
return m.download(ctx, meta.InfoHash, 512*1024)
69+
} else {
70+
return m.download(ctx, meta.InfoHash, 0)
71+
}
72+
}
73+
return nil
74+
}
75+
76+
func (m *Monitor) parseBlockTorrentInfo(b *types.Block) (bool, error) {
77+
var (
78+
record bool
79+
start = mclock.Now()
80+
final []types.Transaction
81+
)
82+
for _, tx := range b.Txs {
83+
if meta := tx.Parse(); meta != nil {
84+
log.Debug("Data encounter", "ih", meta.InfoHash, "number", b.Number, "meta", meta)
85+
if err := m.parseFileMeta(&tx, meta, b); err != nil {
86+
log.Error("Parse file meta error", "err", err, "number", b.Number)
87+
return false, err
88+
}
89+
record = true
90+
final = append(final, tx)
91+
} else if tx.IsFlowControl() {
92+
if tx.Recipient == nil {
93+
continue
94+
}
95+
file := m.fs.GetFileByAddr(*tx.Recipient)
96+
if file == nil {
97+
continue
98+
}
99+
receipt, err := m.getReceipt(tx.Hash.String())
100+
if err != nil {
101+
return false, err
102+
}
103+
if receipt.Status != 1 || receipt.GasUsed != params.UploadGas {
104+
continue
105+
}
106+
remainingSize, err := m.getRemainingSize((*tx.Recipient).String())
107+
if err != nil {
108+
log.Error("Get remain failed", "err", err, "addr", (*tx.Recipient).String())
109+
return false, err
110+
}
111+
if file.LeftSize > remainingSize {
112+
file.LeftSize = remainingSize
113+
if _, progress, err := m.fs.AddFile(file); err != nil {
114+
return false, err
115+
} else if progress { // && progress {
116+
log.Debug("Update storage success", "ih", file.Meta.InfoHash, "left", file.LeftSize)
117+
var bytesRequested uint64
118+
if file.Meta.RawSize > file.LeftSize {
119+
bytesRequested = file.Meta.RawSize - file.LeftSize
120+
}
121+
if file.LeftSize == 0 {
122+
log.Debug("Data processing completed !!!", "ih", file.Meta.InfoHash, "addr", (*tx.Recipient).String(), "remain", common.StorageSize(remainingSize), "request", common.StorageSize(bytesRequested), "raw", common.StorageSize(file.Meta.RawSize), "number", b.Number)
123+
} else {
124+
log.Debug("Data processing ...", "ih", file.Meta.InfoHash, "addr", (*tx.Recipient).String(), "remain", common.StorageSize(remainingSize), "request", common.StorageSize(bytesRequested), "raw", common.StorageSize(file.Meta.RawSize), "number", b.Number)
125+
}
126+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
127+
defer cancel()
128+
if err := m.download(ctx, file.Meta.InfoHash, bytesRequested); err != nil {
129+
return false, err
130+
}
131+
}
132+
}
133+
record = true
134+
final = append(final, tx)
135+
}
136+
}
137+
if len(final) > 0 && len(final) < len(b.Txs) {
138+
log.Debug("Final txs layout", "total", len(b.Txs), "final", len(final), "num", b.Number, "txs", m.fs.Txs())
139+
b.Txs = final
140+
}
141+
if record {
142+
if err := m.fs.AddBlock(b); err == nil {
143+
log.Info("Root has been changed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root())
144+
} else {
145+
log.Warn("Block added failed", "number", b.Number, "hash", b.Hash, "root", m.fs.Root(), "err", err)
146+
}
147+
}
148+
if len(b.Txs) > 0 {
149+
elapsed := time.Duration(mclock.Now()) - time.Duration(start)
150+
log.Trace("Transactions scanning", "count", len(b.Txs), "number", b.Number, "elapsed", common.PrettyDuration(elapsed))
151+
}
152+
return record, nil
153+
}

0 commit comments

Comments
 (0)