@@ -7,13 +7,13 @@ import Share.BackgroundJobs.Diffs.Queries qualified as DQ
77import Share.BackgroundJobs.Errors (reportError )
88import Share.BackgroundJobs.Monad (Background , withTag )
99import Share.BackgroundJobs.Workers (newWorker )
10- import Share.Branch (Branch ( .. ) )
10+ import Share.Branch (branchCausals_ )
1111import Share.Codebase qualified as Codebase
1212import Share.Contribution (Contribution (.. ))
13+ import Share.Env qualified as Env
1314import Share.IDs
1415import Share.IDs qualified as IDs
1516import Share.Metrics qualified as Metrics
16- import Share.NamespaceDiffs (NamespaceDiffError (MissingEntityError ))
1717import Share.Postgres qualified as PG
1818import Share.Postgres.Contributions.Queries qualified as ContributionsQ
1919import Share.Postgres.Notifications qualified as Notif
@@ -23,52 +23,81 @@ import Share.Utils.Logging qualified as Logging
2323import Share.Web.Authorization qualified as AuthZ
2424import Share.Web.Errors (EntityMissing (.. ), ErrorID (.. ))
2525import Share.Web.Share.Diffs.Impl qualified as Diffs
26+ import System.Clock qualified as Clock
2627
27- -- | Check every 10 minutes if we haven't heard on the notifications channel.
28+ -- | Check every 30 seconds if we haven't heard on the notifications channel.
2829-- Just in case we missed a notification.
2930maxPollingIntervalSeconds :: Int
30- maxPollingIntervalSeconds = 10 * 60
31+ maxPollingIntervalSeconds = 30
3132
3233worker :: Ki. Scope -> Background ()
3334worker scope = do
3435 authZReceipt <- AuthZ. backgroundJobAuthZ
36+ badUnliftCodebaseRuntime <- Codebase. badAskUnliftCodebaseRuntime
37+ unisonRuntime <- asks Env. sandboxedRuntime
38+ let makeRuntime :: Codebase. CodebaseEnv -> IO (Codebase. CodebaseRuntime IO )
39+ makeRuntime codebase = do
40+ runtime <- Codebase. codebaseRuntime' unisonRuntime codebase
41+ pure (badUnliftCodebaseRuntime runtime)
3542 newWorker scope " diffs:contributions" $ forever do
3643 Notif. waitOnChannel Notif. ContributionDiffChannel (maxPollingIntervalSeconds * 1000000 )
37- processDiffs authZReceipt >>= \ case
38- Left (contributionId, e) ->
39- withTag " contribution-id" (IDs. toText contributionId) $ do
40- reportError e
41- Right _ -> pure ()
44+ processDiffs authZReceipt makeRuntime
4245
43- processDiffs :: AuthZ. AuthZReceipt -> Background (Either (ContributionId , NamespaceDiffError ) () )
44- processDiffs authZReceipt = Metrics. recordContributionDiffDuration . runExceptT $ do
45- mayContributionId <- PG. runTransaction DQ. claimContributionToDiff
46- for_ mayContributionId (diffContribution authZReceipt)
47- case mayContributionId of
48- Just contributionId -> do
49- Logging. textLog (" Recomputed contribution diff: " <> tShow contributionId)
50- & Logging. withTag (" contribution-id" , tShow contributionId)
51- & Logging. withSeverity Logging. Info
52- & Logging. logMsg
53- -- Keep processing releases until we run out of them.
54- either throwError pure =<< lift (processDiffs authZReceipt)
55- Nothing -> pure ()
46+ -- Process diffs until we run out of them. We claim a diff in a transaction, commit it, then proceed to compute the
47+ -- diff. There's therefore a chance we claim a diff and fail to compute it (due to e.g. server restart). The current
48+ -- solution to these "at most once" semantics is to simply re-enqueue a diff job if necessary; e.g. in the view diff
49+ -- endpoint handler.
50+ processDiffs :: AuthZ. AuthZReceipt -> (Codebase. CodebaseEnv -> IO (Codebase. CodebaseRuntime IO )) -> Background ()
51+ processDiffs authZReceipt makeRuntime = do
52+ let loop :: Background ()
53+ loop = do
54+ result <-
55+ PG. runTransactionMode PG. RepeatableRead PG. ReadWrite do
56+ DQ. claimContributionToDiff >>= \ case
57+ Nothing -> pure Nothing
58+ Just contributionId -> do
59+ startTime <- PG. transactionUnsafeIO (Clock. getTime Clock. Monotonic )
60+ result <- PG. catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId)
61+ pure (Just (contributionId, startTime, result))
62+ whenJust result \ (contributionId, startTime, result) -> do
63+ withTag " contribution-id" (IDs. toText contributionId) do
64+ case result of
65+ Left err -> reportError err
66+ Right didWork -> do
67+ when didWork do
68+ liftIO (Metrics. recordContributionDiffDuration startTime)
69+ Logging. textLog " Computed contribution diff"
70+ & Logging. withSeverity Logging. Info
71+ & Logging. logMsg
72+ loop
73+ loop
5674
57- diffContribution :: AuthZ. AuthZReceipt -> ContributionId -> ExceptT (ContributionId , NamespaceDiffError ) Background ()
58- diffContribution authZReceipt contributionId = withExceptT (contributionId,) . mapExceptT (withTag " contribution-id" (IDs. toText contributionId)) $ do
59- ( bestCommonAncestorCausalId,
60- project,
61- newBranch@ Branch {causal = newBranchCausalId},
62- oldBranch@ Branch {causal = oldBranchCausalId}
63- ) <- ExceptT $ PG. tryRunTransaction $ do
64- Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <- ContributionsQ. contributionById contributionId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " contribution:missing" ) " Contribution not found" )
65- project <- Q. projectById projectId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " project:missing" ) " Project not found" )
66- newBranch <- Q. branchById newBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " branch:missing" ) " Source branch not found" )
67- oldBranch <- Q. branchById oldBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID " branch:missing" ) " Target branch not found" )
68- pure (bestCommonAncestorCausalId, project, newBranch, oldBranch)
75+ -- Check whether a causal diff has already been computed, and if it hasn't, compute and store it. Otherwise, do nothing.
76+ -- Returns whether or not we did any work.
77+ maybeComputeAndStoreCausalDiff ::
78+ AuthZ. AuthZReceipt ->
79+ (Codebase. CodebaseEnv -> IO (Codebase. CodebaseRuntime IO )) ->
80+ ContributionId ->
81+ PG. Transaction EntityMissing Bool
82+ maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId = do
83+ Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <-
84+ ContributionsQ. contributionById contributionId `whenNothingM` throwError (EntityMissing (ErrorID " contribution:missing" ) " Contribution not found" )
85+ project <- Q. projectById projectId `whenNothingM` throwError (EntityMissing (ErrorID " project:missing" ) " Project not found" )
86+ newBranch <- Q. branchById newBranchId `whenNothingM` throwError (EntityMissing (ErrorID " branch:missing" ) " Source branch not found" )
87+ oldBranch <- Q. branchById oldBranchId `whenNothingM` throwError (EntityMissing (ErrorID " branch:missing" ) " Target branch not found" )
6988 let oldCodebase = Codebase. codebaseForProjectBranch authZReceipt project oldBranch
7089 let newCodebase = Codebase. codebaseForProjectBranch authZReceipt project newBranch
71- -- This method saves the diff so it'll be there when we need it, so we don't need to do anything with it.
72- _ <-
73- Diffs. diffCausals authZReceipt (oldCodebase, oldBranchCausalId) (newCodebase, newBranchCausalId) bestCommonAncestorCausalId
74- pure ()
90+ let oldCausal = oldBranch ^. branchCausals_
91+ let newCausal = newBranch ^. branchCausals_
92+ ContributionsQ. existsPrecomputedNamespaceDiff (oldCodebase, oldCausal) (newCodebase, newCausal) >>= \ case
93+ True -> pure False
94+ False -> do
95+ oldRuntime <- PG. transactionUnsafeIO (makeRuntime oldCodebase)
96+ newRuntime <- PG. transactionUnsafeIO (makeRuntime newCodebase)
97+ _ <-
98+ Diffs. computeAndStoreCausalDiff
99+ authZReceipt
100+ (oldCodebase, oldRuntime, oldCausal)
101+ (newCodebase, newRuntime, newCausal)
102+ bestCommonAncestorCausalId
103+ pure True
0 commit comments