This repository was archived by the owner on Sep 24, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsyncProtocol.go
More file actions
214 lines (197 loc) · 8.21 KB
/
syncProtocol.go
File metadata and controls
214 lines (197 loc) · 8.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
/*The MIT License (MIT)
* Copyright (c) 2018 Damoon Azarpazhooh
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the
* Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute,
* sublicense, and/or sell copies of the Software, and to
* permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice
* shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
* ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
* PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS
* OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR
* OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package main
import (
"bufio"
"context"
"fmt"
"io/ioutil"
"os"
"github.com/libp2p/go-libp2p-net"
"github.com/mholt/archiver"
"github.com/multiformats/go-multicodec"
cbor "github.com/multiformats/go-multicodec/cbor"
)
// Sync protocol is used to sync files in a directory and send them
// to another node.
const syncProtocol = "/sync/1.0.0"
// DataStream is a struct that is used to wrap a <net.Stream> stream.
// Since we are using <multicodec> to encode and decode streams and
// <bufio> to read to or write from streams, we put those in this struct so that
// we can easily 'carry' them with us and use them as we see fit.
type DataStream struct {
stream net.Stream
encoder multicodec.Encoder
decoder multicodec.Decoder
writer *bufio.Writer
reader *bufio.Reader
}
// WrapDataStream is the function that takes a stream and wrap it into a
// <DataStream> struct.We use cbor codec in this case
// ----------------------------------------------------------------------------
// <stream> is a parameter of <net.Stream> struct type that wee want to wrap into
// <DataStream> struct
// ----------------------------------------------------------------------------
// it returns a pointer to <DataStream> struct
func WrapDataStream(stream net.Stream) *DataStream {
var result DataStream
result.reader = bufio.NewReader(stream)
result.writer = bufio.NewWriter(stream)
result.decoder = cbor.Multicodec().Decoder(result.reader)
result.encoder = cbor.Multicodec().Encoder(result.writer)
return &result
}
// decodeTransfer is used to decode a <*DataStream> and save the files
// encoded onto the disk
// ----------------------------------------------------------------------------
// <wrappedDataStream> is a receiver of pointer type to <DataStream>.It is
// the Wrapped stream that the file was written to so that it
// can get transferred between nodes and is ready to get decoded.
// ----------------------------------------------------------------------------
// It returns an erro in case something goes wrong.
func (wrappedDataStream *DataStream) decodeTransfer() error {
// initialize variable <file> as an array of bytes
var file []byte
// use <wrappedDataStream.encoder> to decode and store to <file>
err := wrappedDataStream.decoder.Decode(&file)
if err != nil {
return err
}
// use <ioutil.WriteFile> to write <file> byte array to disk as the zip file
// it originated from.
err = ioutil.WriteFile("data.zip", file, 0644)
// if there is an error, remove the zip file and return an error
if err != nil {
os.Remove("data.zip")
return (err)
}
// use <archiver> package to unzip the zip file that was transferred in
// the stream and was stored on disk.
err = archiver.Zip.Open("data.zip", "")
// If there is an error, remove the zip file and < /data > directory in
// which contents of the file were supposed to get unzipped to.
if err != nil {
os.Remove("data.zip")
os.Remove("data")
return err
}
// if there are no issues, just remove the zip file from hard drive.
os.Remove("data.zip")
return nil
}
// RequestSync : the main function that is used in sync
// protocol to request files from another node that has the files..
// ----------------------------------------------------------------------------
// <node> is a receiver of pointer type to <PeerNode>.
// <node> is the peer node that requests the files
// ----------------------------------------------------------------------------
// <wrappedTransactionStream> is a receiver of pointer type to <TransactionStream>.It is
// the Wrapped stream that the Transsaction will be written to so that it
// can get transferred between nodes.
// <bootstrapNodeAddress> is a parameter of string type that is the
// IPFS address of the node that receiving the request to transfer the files
func (node *PeerNode) RequestSync(bootstrapNodeAddress string) {
// First, we add the peer node <bootstrapNodeAddress> string points to
// <node> local address book
peerID, err := addAddressToPeerstore(node, bootstrapNodeAddress)
if err != nil {
panic(err)
}
// <node> creates a news tream by calling <NewStream>
// function and passing background context, receiver's
// <peerID> and <paymentProtocol> (<"/sync/1.0.0">)
stream, err := node.NewStream(context.Background(), peerID, syncProtocol)
if err != nil {
panic(err)
}
// we make sure the stream gets closed at the end of the function call by
// using defer keyword before <stream.Close()>.
defer stream.Close()
// use <WrapDataStream (stream net.Stream)> function to wrap
// <stream> stream and save it in variable <wrappedDataStream>
wrappedDataStream := WrapDataStream(stream)
// Call <decodeTransfer()> to save the received Zip file on disk and
// extract it.
err = wrappedDataStream.decodeTransfer()
if err != nil {
panic(err)
}
}
// encodeFile is the function in which all the files in </data>
// are zipped and transferredover a strem.
// ----------------------------------------------------------------------------
// <wrappedDataStream> is a receiver of pointer type to <DataStream>.It is
// the Wrapped stream that the a zip file will be written to so that it
// can get transferred between nodes.
// ----------------------------------------------------------------------------
// it returns an error if something goes wrong.
func (wrappedDataStream *DataStream) encodeFile() error {
// use <archiver> package to zip the files in < /data > directory
// so that they can easily get transferred over the stream
err := archiver.Zip.Make("data.zip", []string{"data"})
if err == nil {
// use <ioutil.ReadFile> to read data.zip from disk to byte array <data>
data, err := ioutil.ReadFile("data.zip")
if err != nil {
panic(err)
}
// use <wrappedDataStream.encoder> to encode <data>
err = wrappedDataStream.encoder.Encode(data)
// Write the transaction to the stream and since output is buffered with
// <bufio> so <Flush> has to get called before exit.
wrappedDataStream.writer.Flush()
// Remove data.zip from disk
os.Remove("data.zip")
return err
}
return err
}
// SyncProtocolMultiplexer Multiplexes "/sync/1.0.0" to a node and takes care
// of the way nodes behave when they receive a stream of sync protocol.
// It is called to initialize payment protocol before any other function
func (node *PeerNode) SyncProtocolMultiplexer() {
// <SetStreamHandler> function takes a string
// (<syncProtocol>) and an anonymous function that
// takes a <net.stream> struct and Multiplexes the string
// to that anonymous functions code so whenever a stream
// with the same string attached is received by a node,that code
// inside anonymous function is executed on the receiver node
node.SetStreamHandler(syncProtocol, func(stream net.Stream) {
fmt.Println("Sync intiated!")
// it uses <WrapDataStream (stream net.Stream)> function to wrap
// <stream> stream and save it in variable <wrappedDataStream>
wrappedDataStream := WrapDataStream(stream)
// use <encodeFile()> function to zip the files in < /data > directory
// and write them to the stream.
err := wrappedDataStream.encodeFile()
if err != nil {
fmt.Println(err)
stream.Reset()
} else {
stream.Close()
}
})
fmt.Printf("Sync Protocol 1.0.0 Multiplexd!\n")
}