-
Notifications
You must be signed in to change notification settings - Fork 79
Expand file tree
/
Copy pathStorage.purs
More file actions
346 lines (304 loc) · 16.4 KB
/
Storage.purs
File metadata and controls
346 lines (304 loc) · 16.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
-- | An effect for reading and writing to the registry storage backend.
module Registry.App.Effect.Storage
( S3Env
, IntegrityCheck
, STORAGE
, STORAGE_CACHE
, Storage(..)
, StorageCache
, _storage
, _storageCache
, delete
, download
, handleReadOnly
, handleS3
, interpret
, upload
, query
) where
import Registry.App.Prelude
import Data.Array as Array
import Data.Exists as Exists
import Data.Int as Int
import Data.Set as Set
import Data.String as String
import Effect.Aff as Aff
import Effect.Exception as Exception
import Fetch.Retry as Fetch
import Node.Buffer as Buffer
import Node.FS.Aff as FS.Aff
import Registry.App.Effect.Cache (class FsEncodable, Cache, FsEncoding(..))
import Registry.App.Effect.Cache as Cache
import Registry.App.Effect.Env (RESOURCE_ENV)
import Registry.App.Effect.Env as Env
import Registry.App.Effect.Log (LOG)
import Registry.App.Effect.Log as Log
import Registry.Foreign.S3 as S3
import Registry.PackageName as PackageName
import Registry.Sha256 (Sha256)
import Registry.Sha256 as Sha256
import Registry.Version as Version
import Run (AFF, EFFECT, Run)
import Run as Run
import Run.Except (EXCEPT)
import Run.Except as Except
-- | Expected integrity values for downloaded packages.
type IntegrityCheck = { hash :: Sha256, bytes :: Number }
-- | The Storage effect, which describes uploading, downloading, and deleting
-- | tarballs from the registry storage backend.
data Storage a
= Upload PackageName Version FilePath (Either String Unit -> a)
| Download PackageName Version FilePath IntegrityCheck (Either String Unit -> a)
| Delete PackageName Version (Either String Unit -> a)
| Query PackageName (Either String (Set Version) -> a)
derive instance Functor Storage
type STORAGE r = (storage :: Storage | r)
_storage :: Proxy "storage"
_storage = Proxy
-- | Upload a package tarball to the storage backend from the given path.
upload :: forall r. PackageName -> Version -> FilePath -> Run (STORAGE + EXCEPT String + r) Unit
upload name version file = Except.rethrow =<< Run.lift _storage (Upload name version file identity)
-- | Download a package tarball from the storage backend to the given path,
-- | verifying its integrity against the expected hash and size.
download :: forall r. PackageName -> Version -> FilePath -> IntegrityCheck -> Run (STORAGE + EXCEPT String + r) Unit
download name version file integrity = Except.rethrow =<< Run.lift _storage (Download name version file integrity identity)
-- | Delete a package tarball from the storage backend.
delete :: forall r. PackageName -> Version -> Run (STORAGE + EXCEPT String + r) Unit
delete name version = Except.rethrow =<< Run.lift _storage (Delete name version identity)
-- | Interpret the STORAGE effect, given a handler.
interpret :: forall r a. (Storage ~> Run r) -> Run (STORAGE + r) a -> Run r a
interpret handler = Run.interpret (Run.on _storage handler Run.send)
-- | Query what tarballs exist for a package in the storage backend
query :: forall r. PackageName -> Run (STORAGE + EXCEPT String + r) (Set Version)
query name = Except.rethrow =<< Run.lift _storage (Query name identity)
formatPackagePath :: PackageName -> Version -> String
formatPackagePath name version = Array.fold
[ PackageName.print name
, "/"
, Version.print version
, ".tar.gz"
]
parsePackagePath :: String -> Either String { name :: PackageName, version :: Version }
parsePackagePath input = do
filePath <- String.stripSuffix (String.Pattern ".tar.gz") input
# note ("Missing .tar.gz suffix: " <> input)
case String.split (String.Pattern "/") filePath of
[ namePart, versionPart ] ->
{ name: _, version: _ }
<$> PackageName.parse namePart
<*> Version.parse versionPart
parts -> Left
if Array.length parts > 2 then "Too many parts in path: " <> input
else "Too few parts in path: " <> input
formatPackageUrl :: forall r. PackageName -> Version -> Run (RESOURCE_ENV + r) URL
formatPackageUrl name version = do
{ s3ApiUrl } <- Env.askResourceEnv
pure $ Array.fold [ s3ApiUrl, "/", formatPackagePath name version ]
connectS3 :: forall r. S3.SpaceKey -> Run (RESOURCE_ENV + LOG + EXCEPT String + AFF + r) S3.Space
connectS3 key = do
let bucket = "purescript-registry"
{ s3BucketUrl: space } <- Env.askResourceEnv
Log.debug $ "Connecting to the bucket " <> bucket <> " at space " <> space <> " with public key " <> key.key
Run.liftAff (withRetryOnTimeout (Aff.attempt (S3.connect key space bucket))) >>= case _ of
Cancelled ->
Except.throw "Timed out when attempting to connect to S3 storage backend."
Failed err -> do
Log.error $ "Failed to connect to S3 due to an exception: " <> Aff.message err
Except.throw "Could not connect to storage backend."
Succeeded connection -> do
Log.debug "Connected to S3!"
pure connection
type S3Env =
{ cache :: FilePath
, s3 :: S3.SpaceKey
}
-- | Handle package storage using a remote S3 bucket.
handleS3 :: forall r a. S3Env -> Storage a -> Run (RESOURCE_ENV + LOG + AFF + EFFECT + r) a
handleS3 env = Cache.interpret _storageCache (Cache.handleFs env.cache) <<< case _ of
Query name reply -> map (map reply) Except.runExcept do
s3 <- connectS3 env.s3
resources <- Except.rethrow =<< Run.liftAff (withRetryListObjects s3 name)
pure $ Set.fromFoldable $ resources >>= \resource -> do
{ name: parsedName, version } <- Array.fromFoldable $ parsePackagePath resource
version <$ guard (name == parsedName)
Download name version path integrity reply -> map (map reply) Except.runExcept do
let package = formatPackageVersion name version
buffer <- Cache.get _storageCache (Package name version) >>= case _ of
Nothing -> do
buffer <- downloadS3 name version integrity
Cache.put _storageCache (Package name version) buffer
pure buffer
Just cached -> do
archiveHash <- Run.liftEffect $ Sha256.hashBuffer cached
archiveSize <- Run.liftEffect $ Buffer.size cached
if archiveHash == integrity.hash && Int.toNumber archiveSize == integrity.bytes then
pure cached
else do
Log.warn $ "Cached tarball for " <> package <> " failed integrity check, evicting and re-downloading..."
Cache.delete _storageCache (Package name version)
buffer <- downloadS3 name version integrity
Cache.put _storageCache (Package name version) buffer
pure buffer
Run.liftAff (Aff.attempt (FS.Aff.writeFile path buffer)) >>= case _ of
Left error -> do
Log.error $ "Downloaded " <> package <> " but failed to write it to the file at path " <> path <> ":\n" <> Aff.message error
Except.throw $ "Could not save downloaded package " <> package <> " due to an internal error."
Right _ -> pure unit
Upload name version path reply -> map (map reply) Except.runExcept do
let
package = formatPackageVersion name version
packagePath = formatPackagePath name version
buffer <- Run.liftAff (Aff.attempt (FS.Aff.readFile path)) >>= case _ of
Left error -> do
Log.error $ "Failed to read contents of " <> package <> " at path " <> path <> ": " <> Aff.message error
Except.throw $ "Could not upload package " <> package <> " due to a file system error."
Right buf ->
pure buf
Log.debug $ "Read file for " <> package <> ", now uploading to " <> packagePath <> "..."
s3 <- connectS3 env.s3
published <- Except.rethrow =<< Run.liftAff (withRetryListObjects s3 name)
if Array.elem packagePath published then do
Log.error $ packagePath <> " already exists on S3."
packageUrl <- formatPackageUrl name version
Except.throw $ "Could not upload " <> package <> " because a package at " <> packageUrl <> " already exists."
else do
Log.debug $ "Uploading release to the bucket at path " <> packagePath
let putParams = { key: packagePath, body: buffer, acl: S3.PublicRead }
Run.liftAff (withRetryOnTimeout (Aff.attempt (S3.putObject s3 putParams))) >>= case _ of
Cancelled -> do
Log.error "Failed to upload object to S3 because the process timed out."
Except.throw $ "Could not upload package " <> package <> " due to an error connecting to the storage backend."
Failed error -> do
Log.error $ "Failed to upload object to S3 because of an exception: " <> Aff.message error
Except.throw $ "Could not upload package " <> package <> " due to an error connecting to the storage backend."
Succeeded _ ->
Log.info $ "Uploaded " <> package <> " to the bucket at path " <> packagePath
Delete name version reply -> map (map reply) Except.runExcept do
let
package = formatPackageVersion name version
packagePath = formatPackagePath name version
Log.debug $ "Deleting " <> package
s3 <- connectS3 env.s3
published <- Except.rethrow =<< Run.liftAff (withRetryListObjects s3 name)
if Array.elem packagePath published then do
Log.debug $ "Deleting release from the bucket at path " <> packagePath
let deleteParams = { key: packagePath }
Run.liftAff (withRetryOnTimeout (Aff.attempt (S3.deleteObject s3 deleteParams))) >>= case _ of
Cancelled -> do
Log.error $ "Timed out when attempting to delete the release of " <> package <> " from S3 at the path " <> packagePath
Except.throw $ "Could not delete " <> package <> " from the storage backend."
Failed error -> do
Log.error $ "Failed to delete object from S3 because of an exception: " <> Aff.message error
Except.throw $ "Could not delete package " <> package <> " due to an error connecting to the storage backend."
Succeeded _ -> do
Log.debug $ "Deleted release of " <> package <> " from S3 at the path " <> packagePath
Cache.delete _storageCache (Package name version)
pure unit
else do
Log.error $ packagePath <> " does not exist on S3 (available: " <> String.joinWith ", " published <> ")"
Except.throw $ "Could not delete " <> package <> " because it does not exist in the storage backend."
-- | A storage effect that reads from the registry but does not write to it.
handleReadOnly :: forall r a. FilePath -> Storage a -> Run (RESOURCE_ENV + LOG + AFF + EFFECT + r) a
handleReadOnly cache = Cache.interpret _storageCache (Cache.handleFs cache) <<< case _ of
-- TODO: is there a way to do this without S3 credentials?
Query _ reply -> do
pure $ reply $ Left "Cannot query in read-only mode."
Upload name version path reply -> map (map reply) Except.runExcept do
let package = formatPackageVersion name version
packageUrl <- formatPackageUrl name version
Log.warn $ "Requested upload of " <> package <> " to url " <> packageUrl <> " but this interpreter is read-only. Caching tarball locally."
buffer <- Run.liftAff (Aff.attempt (FS.Aff.readFile path)) >>= case _ of
Left error -> do
Log.error $ "Failed to read tarball for " <> package <> " at path " <> path <> ": " <> Aff.message error
Except.throw $ "Could not cache package " <> package <> " due to a file system error."
Right buf ->
pure buf
Cache.put _storageCache (Package name version) buffer
Delete name version reply -> do
packageUrl <- formatPackageUrl name version
Log.warn $ "Requested deletion of " <> formatPackageVersion name version <> " from url " <> packageUrl <> " but this interpreter is read-only."
pure $ reply $ Right unit
Download name version path integrity reply -> map (map reply) Except.runExcept do
let package = formatPackageVersion name version
buffer <- Cache.get _storageCache (Package name version) >>= case _ of
Nothing -> do
buffer <- downloadS3 name version integrity
Cache.put _storageCache (Package name version) buffer
pure buffer
Just cached -> do
archiveHash <- Run.liftEffect $ Sha256.hashBuffer cached
archiveSize <- Run.liftEffect $ Buffer.size cached
if archiveHash == integrity.hash && Int.toNumber archiveSize == integrity.bytes then
pure cached
else do
Log.warn $ "Cached tarball for " <> package <> " failed integrity check, evicting and re-downloading..."
Cache.delete _storageCache (Package name version)
buffer <- downloadS3 name version integrity
Cache.put _storageCache (Package name version) buffer
pure buffer
Run.liftAff (Aff.attempt (FS.Aff.writeFile path buffer)) >>= case _ of
Left error -> do
Log.error $ "Downloaded " <> package <> " but failed to write it to the file at path " <> path <> ":\n" <> Aff.message error
Except.throw $ "Could not save downloaded package " <> package <> " due to an internal error."
Right _ -> pure unit
-- | An implementation for downloading packages from the registry using `Aff` requests.
-- | Verifies the downloaded package's integrity against the expected hash and size.
downloadS3 :: forall r. PackageName -> Version -> IntegrityCheck -> Run (RESOURCE_ENV + LOG + EXCEPT String + AFF + EFFECT + r) Buffer
downloadS3 name version expected = do
let package = formatPackageVersion name version
packageUrl <- formatPackageUrl name version
Log.debug $ "Downloading " <> package <> " from " <> packageUrl
response <- Run.liftAff $ Fetch.withRetryRequest packageUrl {}
case response of
Cancelled -> do
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of a connection timeout."
Except.throw $ "Failed to download " <> package <> " from the storage backend."
Failed (Fetch.FetchError error) -> do
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of an HTTP error: " <> Exception.message error
Except.throw $ "Could not download " <> package <> " from the storage backend."
Failed (Fetch.StatusError { status, arrayBuffer: arrayBufferAff }) -> do
arrayBuffer <- Run.liftAff arrayBufferAff
buffer <- Run.liftEffect $ Buffer.fromArrayBuffer arrayBuffer
bodyString <- Run.liftEffect $ Buffer.toString UTF8 (buffer :: Buffer)
Log.error $ "Failed to download " <> package <> " from " <> packageUrl <> " because of a bad status code (" <> show status <> ") with body " <> bodyString
Except.throw $ "Could not download " <> package <> " from the storage backend."
Succeeded { arrayBuffer: arrayBufferAff } -> do
arrayBuffer <- Run.liftAff arrayBufferAff
Log.debug $ "Successfully downloaded " <> package <> " into a buffer."
buffer :: Buffer <- Run.liftEffect $ Buffer.fromArrayBuffer arrayBuffer
-- Verify size
archiveSize <- Run.liftEffect $ Buffer.size buffer
unless (Int.toNumber archiveSize == expected.bytes) do
Log.error $ "Archive for " <> package <> " has size " <> show archiveSize <> " but expected " <> show expected.bytes
Except.throw $ "Integrity check failed for " <> package <> ": size mismatch"
-- Verify hash
archiveHash <- Run.liftEffect $ Sha256.hashBuffer buffer
unless (archiveHash == expected.hash) do
Log.error $ "Archive for " <> package <> " has hash " <> Sha256.print archiveHash <> " but expected " <> Sha256.print expected.hash
Except.throw $ "Integrity check failed for " <> package <> ": hash mismatch"
Log.debug $ "Verified integrity of " <> package
pure buffer
withRetryListObjects :: S3.Space -> PackageName -> Aff (Either String (Array String))
withRetryListObjects s3 name = do
let package = PackageName.print name
result <- withRetry (defaultRetry { retryOnFailure = \attempt _ -> attempt < 3 }) do
Aff.attempt (S3.listObjects s3 { prefix: package <> "/" })
pure $ case result of
Cancelled -> do
Left $ "Failed to list S3 objects for " <> package <> " because the process timed out."
Failed error -> do
Left $ "Failed to list S3 objects for " <> package <> " because of an exception: " <> Aff.message error
Succeeded objects ->
pure $ map _.key objects
-- | A key type for the storage cache. Only supports packages identified by
-- | their name and version.
data StorageCache (c :: Type -> Type -> Type) a = Package PackageName Version (c Buffer a)
instance Functor2 c => Functor (StorageCache c) where
map k (Package name version a) = Package name version (map2 k a)
instance FsEncodable StorageCache where
encodeFs = case _ of
Package name version next ->
Exists.mkExists $ AsBuffer (PackageName.print name <> "-" <> Version.print version) next
type STORAGE_CACHE r = (storageCache :: Cache StorageCache | r)
_storageCache :: Proxy "storageCache"
_storageCache = Proxy