Skip to content

Commit 5a26737

Browse files
committed
Add go-tfdata integration (tests)
1 parent 21f6b3b commit 5a26737

6 files changed

Lines changed: 188 additions & 13 deletions

File tree

Makefile

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ bin/tarp: $(cmds) $(datapipes)
1111
bin/tarp -h
1212

1313
test:
14-
cd datapipes && go test -v
14+
cd dpipes && go test -v
15+
16+
test-tfdata:
17+
cd dpipes && go test -v --tags=gitlabnvidia
1518

1619
dtest:
1720
cd datapipes && debug=stdout go test -v | tee ../test.log

README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,31 @@ Future work:
135135
- TFRecord/tf.Example interoperability
136136
- add JSON input to "tarp create"
137137
- add separator option to "tarp create"
138+
139+
# Building with private repositories
140+
141+
The `dpipes` module uses external dependencies from private repositories hosted on `gitlab-master.nvidia.com`.
142+
Access to them is not required, but enables additional tarp features. At the moment tarp integrates with
143+
`go-tfdata` - a Go library helping to work with tar/tgz archives and files in
144+
[TFRecord and tf.Example formats](https://www.tensorflow.org/tutorials/load_data/tfrecord).
145+
146+
If you believe that you have the access to `gitlab-master.nvidia.com` and you wish to enable additional features
147+
you should build tarp with [build tags](https://golang.org/pkg/go/build/#hdr-Build_Constraints) and do the following steps:
148+
149+
```console
150+
# use ssh instead of https for go get gitlab-master.nvidia.com
151+
git config --global url."ssh://git@gitlab-master.nvidia.com:12051/aistorage/go-tfdata".insteadOf "https://github.com/NVIDIA/go-tfdata"
152+
# inform go that gitlab-master.nvidia.com is private repo
153+
export GOPRIVATE="gitlab-master.nvidia.com"
154+
```
155+
156+
```console
157+
go test -v --tags=gitlabnvidia
158+
```
159+
160+
```console
161+
make test-tfdata
162+
```
163+
164+
>Make sure *not* to include private dependencies in requirements in go.mod files. `go mod tidy` will include them
165+
>by default, so they have to be excluded (deleted) explicitly. More context in [this github issue](https://github.com/golang/go/issues/35832).

dpipes/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,6 @@ require (
66
github.com/Masterminds/squirrel v1.2.0
77
github.com/mattn/go-sqlite3 v2.0.3+incompatible
88
github.com/shamaton/msgpack v1.1.1
9-
github.com/stretchr/testify v1.2.2
9+
github.com/stretchr/testify v1.3.0
1010
gopkg.in/zeromq/goczmq.v4 v4.1.0
1111
)

dpipes/gotfdata_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// +build gitlabnvidia
2+
3+
package dpipes
4+
5+
import (
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"io/ioutil"
10+
"os"
11+
"testing"
12+
13+
"github.com/NVIDIA/go-tfdata/tfdata/core"
14+
"github.com/NVIDIA/go-tfdata/tfdata/transform"
15+
"github.com/stretchr/testify/assert"
16+
)
17+
18+
type (
19+
SamplesReader struct {
20+
pipe Pipe
21+
}
22+
)
23+
24+
func (r *SamplesReader) Read() (sample *core.Sample, err error) {
25+
s, ok := <-r.pipe
26+
if !ok {
27+
return nil, io.EOF
28+
}
29+
30+
return tarpSampleToTfDataSample(s), nil
31+
}
32+
33+
func TFRecordSink(t *testing.T, writer io.Writer) Sink {
34+
return func(pipe Pipe) {
35+
w := core.NewTFRecordWriter(writer)
36+
samplesReader := &SamplesReader{pipe}
37+
tfExamplesReader := transform.NewSamplesToTFExample(samplesReader)
38+
err := w.WriteMessages(tfExamplesReader)
39+
40+
assert.NoError(t, err)
41+
}
42+
}
43+
44+
func TFRecordSource(t *testing.T, reader io.Reader) Source {
45+
return func(pipe Pipe) {
46+
defer close(pipe)
47+
var (
48+
ex *core.TFExample
49+
err error
50+
r core.TFExampleReader
51+
)
52+
r = core.NewTFRecordReader(reader)
53+
for ex, err = r.Read(); err == nil; ex, err = r.Read() {
54+
pipe <- tfExampleTarpSample(ex)
55+
}
56+
if err != io.EOF {
57+
assert.Fail(t, "expected to get io.EOF, got %v instead", err)
58+
}
59+
}
60+
}
61+
62+
func SamplesChecker(t *testing.T, target int) Process {
63+
return func(in, out Pipe) {
64+
total := 0
65+
for s := range in {
66+
assert.Equal(t, s["txt"], Bytes(fmt.Sprintf("%d", total)))
67+
assert.Equal(t, s["__key__"], Bytes(fmt.Sprintf("%06d", total)))
68+
total++
69+
out <- s
70+
}
71+
close(out)
72+
assert.Equal(t, target, total)
73+
}
74+
}
75+
76+
func tarpSampleToTfDataSample(sample Sample) *core.Sample {
77+
s := core.NewSample()
78+
for k, v := range sample {
79+
s.Entries[k] = v
80+
}
81+
return s
82+
}
83+
84+
func tfExampleTarpSample(example *core.TFExample) Sample {
85+
s := make(map[string]Bytes, len(example.GetFeatures().Feature))
86+
for k, v := range example.GetFeatures().Feature {
87+
var b Bytes
88+
err := json.Unmarshal(v.GetBytesList().Value[0], &b)
89+
if err != nil {
90+
panic(err)
91+
}
92+
s[k] = b // assume that all TFExample features are just a list of bytes
93+
}
94+
return s
95+
}
96+
97+
func PrepareTarSource() Source {
98+
return func(pipe Pipe) {
99+
for i := 0; i < 1; i++ {
100+
pipe <- Sample{
101+
"__key__": Bytes(fmt.Sprintf("%06d", i)),
102+
"txt": Bytes(fmt.Sprintf("%d", i)),
103+
}
104+
}
105+
close(pipe)
106+
}
107+
}
108+
109+
func prepareTar(t *testing.T) *os.File {
110+
var (
111+
sinkFd *os.File
112+
err error
113+
)
114+
sinkFd, err = ioutil.TempFile("", "go-tfdata-*.tar")
115+
assert.NoError(t, err)
116+
117+
sink := TarSink(sinkFd)
118+
Processing(PrepareTarSource(), nil, sink)
119+
return sinkFd
120+
}
121+
122+
func TestGoTfData(t *testing.T) {
123+
var (
124+
sourceFd = prepareTar(t)
125+
sinkFd *os.File
126+
err error
127+
)
128+
129+
defer os.RemoveAll(sourceFd.Name())
130+
sourceFd, err = os.Open(sourceFd.Name())
131+
assert.NoError(t, err)
132+
133+
sinkFd, err = ioutil.TempFile("", "go-tfdata-*.tfrecord")
134+
assert.NoError(t, err)
135+
defer os.RemoveAll(sinkFd.Name())
136+
137+
Processing(TarSource(sourceFd), nil, TFRecordSink(t, sinkFd))
138+
sinkFd.Close()
139+
sourceFd, err = os.Open(sinkFd.Name())
140+
assert.NoError(t, err)
141+
sinkFd, err = os.OpenFile(os.DevNull, os.O_RDWR, os.ModeAppend)
142+
assert.NoError(t, err)
143+
144+
Processing(TFRecordSource(t, sourceFd), SamplesChecker(t, 1), TFRecordSink(t, sinkFd))
145+
}

dpipes/rawtario.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@ package dpipes
22

33
import (
44
"archive/tar"
5-
"time"
65
"bytes"
76
"fmt"
87
"io"
98
"regexp"
9+
"time"
1010
)
1111

1212
// Raw is a struct representing unaggregated data items (e.g., from a tar file).
@@ -97,15 +97,19 @@ func TarRawSource(stream io.Reader) func(RawPipe) {
9797
tr := tar.NewReader(stream)
9898
for {
9999
header, err := tr.Next()
100+
if err == io.EOF {
101+
break
102+
}
103+
if err != nil {
104+
panic(err)
105+
}
100106
if header == nil {
101107
break
102108
}
103109
if header.Typeflag != tar.TypeReg {
104110
continue
105111
}
106-
if err != nil {
107-
panic(err)
108-
}
112+
109113
var buffer bytes.Buffer
110114
io.Copy(&buffer, tr)
111115
data := buffer.Bytes()

go.mod

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@ module github.com/tmbdev/tarp
22

33
replace github.com/tmbdev/tarp/dpipes => ./dpipes
44

5-
go 1.14
5+
replace github.com/NVIDIA/go-tfdata => gitlab-master.nvidia.com/aistorage/go-tfdata v0.0.0-20200427194410-c20d8f9980a7
66

7-
require (
8-
github.com/bcicen/ctop v0.7.3 // indirect
9-
github.com/jessevdk/go-flags v1.4.0
10-
github.com/maruel/panicparse v1.3.0 // indirect
11-
github.com/tmbdev/tarp/dpipes v0.0.0-20200330012711-53823ac810b9
12-
)
7+
go 1.14

0 commit comments

Comments
 (0)