@@ -7,13 +7,13 @@ import Share.BackgroundJobs.Diffs.Queries qualified as DQ
77import Share.BackgroundJobs.Errors (reportError )
88import Share.BackgroundJobs.Monad (Background , withTag )
99import Share.BackgroundJobs.Workers (newWorker )
10- import Share.Branch (Branch ( .. ) )
10+ import Share.Branch (branchCausals_ )
1111import Share.Codebase qualified as Codebase
1212import Share.Contribution (Contribution (.. ))
13+ import Share.Env qualified as Env
1314import Share.IDs
1415import Share.IDs qualified as IDs
1516import Share.Metrics qualified as Metrics
16- import Share.NamespaceDiffs (NamespaceDiffError (MissingEntityError ))
1717import Share.Postgres qualified as PG
1818import Share.Postgres.Contributions.Queries qualified as ContributionsQ
1919import Share.Postgres.Notifications qualified as Notif
@@ -23,6 +23,7 @@ import Share.Utils.Logging qualified as Logging
2323import Share.Web.Authorization qualified as AuthZ
2424import Share.Web.Errors (EntityMissing (.. ), ErrorID (.. ))
2525import Share.Web.Share.Diffs.Impl qualified as Diffs
26+ import System.Clock qualified as Clock
2627
2728-- | Check every 10 minutes if we haven't heard on the notifications channel.
2829-- Just in case we missed a notification.
@@ -32,43 +33,70 @@ maxPollingIntervalSeconds = 10 * 60
3233worker :: Ki. Scope -> Background ()
3334worker scope = do
3435 authZReceipt <- AuthZ. backgroundJobAuthZ
36+ badUnliftCodebaseRuntime <- Codebase. badAskUnliftCodebaseRuntime
37+ unisonRuntime <- asks Env. sandboxedRuntime
38+ let makeRuntime :: Codebase. CodebaseEnv -> IO (Codebase. CodebaseRuntime IO )
39+ makeRuntime codebase = do
40+ runtime <- Codebase. codebaseRuntimeTransaction unisonRuntime codebase
41+ pure (badUnliftCodebaseRuntime runtime)
3542 newWorker scope " diffs:contributions" $ forever do
3643 Notif. waitOnChannel Notif. ContributionDiffChannel (maxPollingIntervalSeconds * 1000000 )
37- processDiffs authZReceipt >>= \ case
38- Left (contributionId, e) ->
39- withTag " contribution-id" (IDs. toText contributionId) $ do
40- reportError e
41- Right _ -> pure ()
44+ processDiffs authZReceipt makeRuntime
4245
43- processDiffs :: AuthZ. AuthZReceipt -> Background (Either (ContributionId , NamespaceDiffError ) () )
44- processDiffs authZReceipt = Metrics. recordContributionDiffDuration . runExceptT $ do
45- mayContributionId <- PG. runTransaction DQ. claimContributionToDiff
46- for_ mayContributionId (diffContribution authZReceipt)
47- case mayContributionId of
48- Just contributionId -> do
49- Logging. textLog (" Recomputed contribution diff: " <> tShow contributionId)
50- & Logging. withTag (" contribution-id" , tShow contributionId)
51- & Logging. withSeverity Logging. Info
52- & Logging. logMsg
53- -- Keep processing releases until we run out of them.
54- either throwError pure =<< lift (processDiffs authZReceipt)
55- Nothing -> pure ()
46+ -- Process diffs until we run out of them. We claim a diff in a transaction and compute the diff in the same
47+ -- transaction, with a row lock on the contribution id (which is skipped by other workers). There's therefore no chance
48+ -- that we claim a diff but fail to write the result of computing that diff back to the database.
49+ processDiffs :: AuthZ. AuthZReceipt -> (Codebase. CodebaseEnv -> IO (Codebase. CodebaseRuntime IO )) -> Background ()
50+ processDiffs authZReceipt makeRuntime = do
51+ let loop :: Background ()
52+ loop = do
53+ result <-
54+ PG. runTransactionMode PG. RepeatableRead PG. ReadWrite do
55+ DQ. claimContributionToDiff >>= \ case
56+ Nothing -> pure Nothing
57+ Just contributionId -> do
58+ startTime <- PG. transactionUnsafeIO (Clock. getTime Clock. Monotonic )
59+ result <- PG. catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId)
60+ pure (Just (contributionId, startTime, result))
61+ whenJust result \ (contributionId, startTime, result) -> do
62+ withTag " contribution-id" (IDs. toText contributionId) do
63+ case result of
64+ Left err -> reportError err
65+ Right didWork -> do
66+ when didWork do
67+ liftIO (Metrics. recordContributionDiffDuration startTime)
68+ Logging. textLog " Computed contribution diff"
69+ & Logging. withSeverity Logging. Info
70+ & Logging. logMsg
71+ loop
72+ loop
5673
57- diffContribution :: AuthZ. AuthZReceipt -> ContributionId -> ExceptT (ContributionId , NamespaceDiffError ) Background ()
58- diffContribution authZReceipt contributionId = withExceptT (contributionId,) . mapExceptT (withTag " contribution-id" (IDs. toText contributionId)) $ do
59- ( bestCommonAncestorCausalId,
60- project,
61- newBranch@ Branch {causal = newBranchCausalId},
62- oldBranch@ Branch {causal = oldBranchCausalId}
63- ) <- ExceptT $ PG. tryRunTransaction $ do
64- Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <- ContributionsQ. contributionById contributionId
65- project <- Q. projectById projectId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " project:missing" ) " Project not found" )
66- newBranch <- Q. branchById newBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " branch:missing" ) " Source branch not found" )
67- oldBranch <- Q. branchById oldBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " branch:missing" ) " Target branch not found" )
68- pure (bestCommonAncestorCausalId, project, newBranch, oldBranch)
74+ -- Check whether a causal diff has already been computed, and if it hasn't, compute and store it. Otherwise, do nothing.
75+ -- Returns whether or not we did any work.
76+ maybeComputeAndStoreCausalDiff ::
77+ AuthZ. AuthZReceipt ->
78+ (Codebase. CodebaseEnv -> IO (Codebase. CodebaseRuntime IO )) ->
79+ ContributionId ->
80+ PG. Transaction EntityMissing Bool
81+ maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId = do
82+ Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <-
83+ ContributionsQ. contributionById contributionId
84+ project <- Q. projectById projectId `whenNothingM` throwError (EntityMissing (ErrorID " project:missing" ) " Project not found" )
85+ newBranch <- Q. branchById newBranchId `whenNothingM` throwError (EntityMissing (ErrorID " branch:missing" ) " Source branch not found" )
86+ oldBranch <- Q. branchById oldBranchId `whenNothingM` throwError (EntityMissing (ErrorID " branch:missing" ) " Target branch not found" )
6987 let oldCodebase = Codebase. codebaseForProjectBranch authZReceipt project oldBranch
7088 let newCodebase = Codebase. codebaseForProjectBranch authZReceipt project newBranch
71- -- This method saves the diff so it'll be there when we need it, so we don't need to do anything with it.
72- _ <-
73- Diffs. diffCausals authZReceipt (oldCodebase, oldBranchCausalId) (newCodebase, newBranchCausalId) bestCommonAncestorCausalId
74- pure ()
89+ let oldCausal = oldBranch ^. branchCausals_
90+ let newCausal = newBranch ^. branchCausals_
91+ ContributionsQ. existsPrecomputedNamespaceDiff (oldCodebase, oldCausal) (newCodebase, newCausal) >>= \ case
92+ True -> pure False
93+ False -> do
94+ oldRuntime <- PG. transactionUnsafeIO (makeRuntime oldCodebase)
95+ newRuntime <- PG. transactionUnsafeIO (makeRuntime newCodebase)
96+ _ <-
97+ Diffs. computeAndStoreCausalDiff
98+ authZReceipt
99+ (oldCodebase, oldRuntime, oldCausal)
100+ (newCodebase, newRuntime, newCausal)
101+ bestCommonAncestorCausalId
102+ pure True
0 commit comments