Skip to content

Commit d6e926c

Browse files
address more PR comments
1 parent 7ed3d75 commit d6e926c

8 files changed

Lines changed: 45 additions & 61 deletions

File tree

src/Share/BackgroundJobs/Diffs/ContributionDiffs.hs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,15 @@ worker scope = do
3737
unisonRuntime <- asks Env.sandboxedRuntime
3838
let makeRuntime :: Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)
3939
makeRuntime codebase = do
40-
runtime <- Codebase.codebaseRuntime' unisonRuntime codebase
40+
runtime <- Codebase.codebaseRuntimeTransaction unisonRuntime codebase
4141
pure (badUnliftCodebaseRuntime runtime)
4242
newWorker scope "diffs:contributions" $ forever do
4343
Notif.waitOnChannel Notif.ContributionDiffChannel (maxPollingIntervalSeconds * 1000000)
4444
processDiffs authZReceipt makeRuntime
4545

46-
-- Process diffs until we run out of them. We claim a diff in a transaction, commit it, then proceed to compute the
47-
-- diff. There's therefore a chance we claim a diff and fail to compute it (due to e.g. server restart). The current
48-
-- solution to these "at most once" semantics is to simply re-enqueue a diff job if necessary; e.g. in the view diff
49-
-- endpoint handler.
46+
-- Process diffs until we run out of them. We claim a diff in a transaction and compute the diff in the same
47+
-- transaction, with a row lock on the contribution id (which is skipped by other workers). There's therefore no chance
48+
-- that we claim a diff but fail to write the result of computing that diff back to the database.
5049
processDiffs :: AuthZ.AuthZReceipt -> (Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)) -> Background ()
5150
processDiffs authZReceipt makeRuntime = do
5251
let loop :: Background ()

src/Share/Codebase.hs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ module Share.Codebase
1717
CodebaseRuntime (..),
1818
codebaseEnv,
1919
codebaseRuntime,
20-
codebaseRuntime',
20+
codebaseRuntimeTransaction,
2121
badAskUnliftCodebaseRuntime,
2222
codebaseForProjectBranch,
2323
codebaseLocationForUserCodebase,
@@ -189,16 +189,16 @@ codebaseEnv !_authZReceipt codebaseLoc = do
189189
codebaseRuntime :: (MonadReader (Env.Env x) m, MonadUnliftIO m) => CodebaseEnv -> m (CodebaseRuntime IO)
190190
codebaseRuntime codebase = do
191191
unisonRuntime <- asks Env.sandboxedRuntime
192-
rt <- liftIO (codebaseRuntime' unisonRuntime codebase)
192+
rt <- liftIO (codebaseRuntimeTransaction unisonRuntime codebase)
193193
unlift <- badAskUnliftCodebaseRuntime
194194
pure (unlift rt)
195195

196196
-- | Ideally, we'd use this – a runtime with lookup actions in transaction, not IO. But that will require refactoring to
197197
-- the runtime interface in ucm, so we can't use it for now. That's bad: we end up unsafely running separate
198198
-- transactions for inner calls to 'codeLookup' / 'cachedEvalResult', which can lead to deadlock due to a starved
199199
-- connection pool.
200-
codebaseRuntime' :: Runtime Symbol -> CodebaseEnv -> IO (CodebaseRuntime (PG.Transaction e))
201-
codebaseRuntime' unisonRuntime CodebaseEnv {codebaseOwner} = do
200+
codebaseRuntimeTransaction :: Runtime Symbol -> CodebaseEnv -> IO (CodebaseRuntime (PG.Transaction e))
201+
codebaseRuntimeTransaction unisonRuntime CodebaseEnv {codebaseOwner} = do
202202
cacheVar <- newTVarIO (CodeLookupCache mempty mempty)
203203
pure
204204
CodebaseRuntime
@@ -207,6 +207,9 @@ codebaseRuntime' unisonRuntime CodebaseEnv {codebaseOwner} = do
207207
unisonRuntime
208208
}
209209

210+
-- Why bad: see above comment on `codebaseRuntimeTransaction`. We don't want to use a `CodebaseRuntime IO`, because it
211+
-- will run every lookup in a separate transaction. But we can't use a `CodebaseRuntime Transaction` because we call
212+
-- back into UCM library code that expects a `CodebaseRuntime IO`.
210213
badAskUnliftCodebaseRuntime ::
211214
(MonadReader (Env.Env x) m, MonadUnliftIO m) =>
212215
m (CodebaseRuntime (PG.Transaction Void) -> CodebaseRuntime IO)

src/Share/NamespaceDiffs.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -817,10 +817,10 @@ computeThreeWayNamespaceDiff codebaseEnvs2 branchHashIds3 nameLookupReceipts3 =
817817
let termReferenceIds = Map.mapMaybe Referent.toTermReferenceId (BiMultimap.range termReferents)
818818
termIds <-
819819
PG.pFor termReferenceIds \refId ->
820-
(refId,) <$> DefnsQ.pipelinedExpectTermId refId
820+
(refId,) <$> DefnsQ.expectTermId refId
821821
v2Terms <-
822822
PG.pFor termIds \(refId, termId) ->
823-
(refId,) <$> DefnsQ.pipelinedExpectTermById codebaseUser refId termId
823+
(refId,) <$> DefnsQ.expectTermById codebaseUser refId termId
824824
v1Terms <-
825825
for v2Terms \(refId, (term, typ)) ->
826826
(refId,) <$> Codebase.convertTerm2to1 (Reference.idToHash refId) term typ
@@ -833,7 +833,7 @@ computeThreeWayNamespaceDiff codebaseEnvs2 branchHashIds3 nameLookupReceipts3 =
833833
let typeReferenceIds = Map.mapMaybe Reference.toId (BiMultimap.range typeReferences)
834834
typeIds <-
835835
PG.pFor typeReferenceIds \refId ->
836-
(refId,) <$> DefnsQ.pipelinedExpectTypeComponentElementAndTypeId codebaseUser refId
836+
(refId,) <$> DefnsQ.expectTypeComponentElementAndTypeId codebaseUser refId
837837
v1Decls <-
838838
PG.pFor typeIds \(refId, typeId) ->
839839
DefnsQ.loadDeclByTypeComponentElementAndTypeId typeId <&> \v2Decl ->

src/Share/Postgres.hs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
{-# LANGUAGE ConstraintKinds #-}
2+
{-# LANGUAGE DefaultSignatures #-}
23
{-# LANGUAGE DeriveAnyClass #-}
34
{-# LANGUAGE ImpredicativeTypes #-}
45
{-# LANGUAGE LiberalTypeSynonyms #-}
@@ -45,7 +46,6 @@ module Share.Postgres
4546
defaultIsolationLevel,
4647
pipelined,
4748
pEitherMap,
48-
pUnrecoverableEitherMap,
4949
pFor,
5050
pFor_,
5151

@@ -139,14 +139,6 @@ pEitherMap f (Pipeline p) =
139139
Right x -> mapLeft Err (f x)
140140
Left e -> Left e
141141

142-
-- | Like 'pEitherMap', but for throwing unrecoverable errors.
143-
pUnrecoverableEitherMap :: (Loggable e, Show e, ToServerError e) => (a -> Either e b) -> Pipeline e' a -> Pipeline e' b
144-
pUnrecoverableEitherMap f (Pipeline p) =
145-
Pipeline $
146-
p <&> \case
147-
Right x -> mapLeft (Unrecoverable . someServerError) (f x)
148-
Left e -> Left e
149-
150142
pFor :: (Traversable f) => f a -> (a -> Pipeline e b) -> Transaction e (f b)
151143
pFor f p = pipelined $ for f p
152144

@@ -346,6 +338,17 @@ class (Applicative m) => QueryA m where
346338
-- | Fail the transaction and whole request with an unrecoverable server error.
347339
unrecoverableError :: (HasCallStack, ToServerError e, Loggable e, Show e) => e -> m a
348340

341+
-- | Map an either-returning function over the result of an action; if it returns Left, throw an unrecoverable error.
342+
-- This is a trivial combinator for any monad, hence the default signature, but it can be implemented by our
343+
-- Pipeline applicative, too.
344+
unrecoverableEitherMap :: (HasCallStack, Loggable e, Show e, ToServerError e) => (a -> Either e b) -> m a -> m b
345+
default unrecoverableEitherMap :: (HasCallStack, Loggable e, Show e, ToServerError e, Monad m) => (a -> Either e b) -> m a -> m b
346+
unrecoverableEitherMap f m = do
347+
x <- m
348+
case f x of
349+
Right y -> pure y
350+
Left e -> unrecoverableError e
351+
349352
class (Monad m, QueryA m) => QueryM m where
350353
-- | Allow running IO actions in a transaction. These actions may be run multiple times if
351354
-- the transaction is retried.
@@ -376,6 +379,12 @@ instance QueryA (Pipeline e) where
376379

377380
unrecoverableError e = Pipeline $ pure (Left (Unrecoverable (someServerError e)))
378381

382+
unrecoverableEitherMap f (Pipeline p) =
383+
Pipeline $
384+
p <&> \case
385+
Right x -> mapLeft (Unrecoverable . someServerError) (f x)
386+
Left e -> Left e
387+
379388
instance (QueryM m) => QueryA (ReaderT e m) where
380389
statement q s = lift $ statement q s
381390

src/Share/Postgres/Causal/Queries.hs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ module Share.Postgres.Causal.Queries
2020
expectNamespaceStatsOf,
2121
expectNamespaceHashByCausalHash,
2222
HashQ.expectNamespaceIdsByCausalIdsOf,
23-
HashQ.pipelinedExpectNamespaceIdsByCausalIdsOf,
2423
importAccessibleCausals,
2524
importCausalIntoCodebase,
2625
hashCausal,

src/Share/Postgres/Definitions/Queries.hs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
module Share.Postgres.Definitions.Queries
44
( loadTerm,
55
expectTerm,
6-
pipelinedExpectTermId,
7-
pipelinedExpectTermById,
6+
expectTermId,
7+
expectTermById,
88
saveTermComponent,
99
saveEncodedTermComponent,
1010
loadTermComponent,
@@ -20,7 +20,7 @@ module Share.Postgres.Definitions.Queries
2020
loadDecl,
2121
expectDecl,
2222
loadDeclByTypeComponentElementAndTypeId,
23-
pipelinedExpectTypeComponentElementAndTypeId,
23+
expectTypeComponentElementAndTypeId,
2424
loadCachedEvalResult,
2525
saveCachedEvalResult,
2626
termReferencesByPrefix,
@@ -153,9 +153,9 @@ loadTermId (Reference.Id compHash (pgComponentIndex -> compIndex)) =
153153
AND term.component_index = #{compIndex}
154154
|]
155155

156-
pipelinedExpectTermId :: TermReferenceId -> PG.Pipeline e TermId
157-
pipelinedExpectTermId refId =
158-
pUnrecoverableEitherMap
156+
expectTermId :: QueryA m => TermReferenceId -> m TermId
157+
expectTermId refId =
158+
unrecoverableEitherMap
159159
( \case
160160
Nothing -> Left (expectedTermError refId)
161161
Just termId -> Right termId
@@ -310,9 +310,9 @@ loadTermById codebaseUser termId = do
310310
<$> loadTermComponentElementByTermId codebaseUser termId
311311
<*> termLocalReferences termId
312312

313-
pipelinedExpectTermById :: UserId -> TermReferenceId -> TermId -> Pipeline e (V2.Term Symbol, V2.Type Symbol)
314-
pipelinedExpectTermById userId refId termId =
315-
pUnrecoverableEitherMap
313+
expectTermById :: QueryA m => UserId -> TermReferenceId -> TermId -> m (V2.Term Symbol, V2.Type Symbol)
314+
expectTermById userId refId termId =
315+
unrecoverableEitherMap
316316
( \case
317317
Nothing -> Left (expectedTermError refId)
318318
Just term -> Right term
@@ -443,9 +443,9 @@ loadTypeComponentElementAndTypeId codebaseUser (Reference.Id compHash (pgCompone
443443
AND typ.component_index = #{compIndex}
444444
|]
445445

446-
pipelinedExpectTypeComponentElementAndTypeId :: UserId -> TermReferenceId -> PG.Pipeline e (TypeComponentElement, TypeId)
447-
pipelinedExpectTypeComponentElementAndTypeId codebaseUser refId =
448-
pUnrecoverableEitherMap
446+
expectTypeComponentElementAndTypeId :: QueryA m => UserId -> TermReferenceId -> m (TypeComponentElement, TypeId)
447+
expectTypeComponentElementAndTypeId codebaseUser refId =
448+
unrecoverableEitherMap
449449
( \case
450450
Nothing -> Left (expectedTypeError refId)
451451
Just decl -> Right decl

src/Share/Postgres/Hashes/Queries.hs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ module Share.Postgres.Hashes.Queries
2929
loadCausalIdByHash,
3030
expectCausalIdByHash,
3131
expectNamespaceIdsByCausalIdsOf,
32-
pipelinedExpectNamespaceIdsByCausalIdsOf,
3332
expectNamespaceHashesByNamespaceHashIdsOf,
3433
isComponentHashAllowedToBeMismatched,
3534
isCausalHashAllowedToBeMismatched,
@@ -305,31 +304,6 @@ expectNamespaceIdsByCausalIdsOf trav s = do
305304
then unrecoverableError . MissingExpectedEntity $ "expectNamespaceIdsByCausalIdsOf: Expected to get the same number of results as causal ids. " <> tShow causalIds
306305
else pure results
307306

308-
-- | Mitchell says: this could/should just have replaced 'expectNamespaceIdsByCausalIdsOf', but that function has
309-
-- many callers, so having two temporarily eases the transition.
310-
pipelinedExpectNamespaceIdsByCausalIdsOf :: Traversal s t CausalId BranchHashId -> s -> Pipeline e t
311-
pipelinedExpectNamespaceIdsByCausalIdsOf trav s = do
312-
s
313-
& unsafePartsOf trav %%~ \causalIds ->
314-
let causalIdsTable = ordered causalIds
315-
in pUnrecoverableEitherMap
316-
( \results ->
317-
if length results /= length causalIds
318-
then Left (MissingExpectedEntity $ "expectNamespaceIdsByCausalIdsOf: Expected to get the same number of results as causal ids. " <> tShow causalIds)
319-
else Right results
320-
)
321-
( queryListCol @BranchHashId
322-
[sql|
323-
WITH causal_ids(ord, causal_id) AS (
324-
SELECT ord, causal_id FROM ^{toTable causalIdsTable} as t(ord, causal_id)
325-
)
326-
SELECT c.namespace_hash_id
327-
FROM causal_ids cid
328-
JOIN causals c ON cid.causal_id = c.id
329-
ORDER BY cid.ord
330-
|]
331-
)
332-
333307
expectNamespaceHashesByNamespaceHashIdsOf :: (HasCallStack, QueryM m) => Traversal s t BranchHashId BranchHash -> s -> m t
334308
expectNamespaceHashesByNamespaceHashIdsOf trav s = do
335309
s

src/Share/Web/Share/Projects/Impl.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ diffNamespacesEndpoint (AuthN.MaybeAuthedUserID callerUserId) userHandle project
175175
unisonRuntime <- asks Env.sandboxedRuntime
176176
let makeRuntime :: Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)
177177
makeRuntime codebase = do
178-
runtime <- Codebase.codebaseRuntime' unisonRuntime codebase
178+
runtime <- Codebase.codebaseRuntimeTransaction unisonRuntime codebase
179179
pure (badUnliftCodebaseRuntime runtime)
180180
diff <-
181181
PG.runTransaction do

0 commit comments

Comments
 (0)