Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1402503
overhaul contribution diff worker
mitchellwrosen Apr 8, 2025
35c8ccf
generalize CodebaseRuntime
mitchellwrosen Apr 9, 2025
275055d
make diffCausals a single transaction
mitchellwrosen Apr 9, 2025
acf63d1
compute diff while holding lock on job row
mitchellwrosen Apr 9, 2025
6c7d2ad
break diffCausals up
mitchellwrosen Apr 23, 2025
3318aa0
model diff error and diff still computing states explicitly in the API
mitchellwrosen Apr 24, 2025
738c7c1
⅄ main → diff-api-tweaks
mitchellwrosen Apr 24, 2025
e0dee5e
ping background worker in contribution diffs endpoint
mitchellwrosen Apr 24, 2025
f87012e
flatten json
mitchellwrosen Apr 28, 2025
9fd2d38
⅄ main → diff-api-tweaks
mitchellwrosen Apr 28, 2025
1fb0602
hydrate terms with more parallelism
mitchellwrosen Apr 28, 2025
d1cb4d4
tweak indentation
mitchellwrosen Apr 28, 2025
ffc243c
fetch type decls in parallel
mitchellwrosen Apr 28, 2025
165d503
⅄ main → diff-api-tweaks
mitchellwrosen Apr 29, 2025
126e1e2
⅄ main → diff-api-tweaks
mitchellwrosen Apr 29, 2025
169c1e5
store namespace diff errors
mitchellwrosen May 1, 2025
76fe8d8
change "errorKind" to "tag" in error json
mitchellwrosen May 1, 2025
afde807
⅄ main → diff-api-tweaks
mitchellwrosen May 5, 2025
4c256d1
don't ping background worker in get contribution diff endpoint
mitchellwrosen May 6, 2025
7ed3d75
check notification channels on startup
mitchellwrosen May 6, 2025
d6e926c
address more PR comments
mitchellwrosen May 6, 2025
c5c79f8
⅄ main → diff-api-tweaks
mitchellwrosen May 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sql/2025-04-30-drop-namespace-diffs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Delete all previously-computed namespace diffs, because the diff payload is different now (we explicitly store
-- errors).
TRUNCATE namespace_diffs;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on whether you choose to re-queue diffs when the page is loaded and it's not cached, you may want to repopulate the queue with all existing contribution diffs after wiping them out.

If you have it set to requeue on page load then no need

3 changes: 2 additions & 1 deletion src/Share/Backend.hs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ displayType = \case
pure (UserObject decl)

evalDocRef ::
Codebase.CodebaseRuntime ->
Codebase.CodebaseRuntime IO ->
V2.TermReference ->
Codebase.CodebaseM e (Doc.EvaluatedDoc Symbol)
evalDocRef (CodebaseRuntime {codeLookup, cachedEvalResult, unisonRuntime}) termRef = do
Expand All @@ -245,6 +245,7 @@ evalDocRef (CodebaseRuntime {codeLookup, cachedEvalResult, unisonRuntime}) termR

typeOf :: Referent.Referent -> Codebase.CodebaseM e (Maybe (V1.Type Symbol ()))
typeOf termRef = fmap void <$> Codebase.loadTypeOfReferent (Cv.referent1to2 termRef)

eval :: V1.Term Symbol a -> Codebase.CodebaseM e (Maybe (V1.Term Symbol ()))
eval (Term.amap (const mempty) -> tm) = do
-- We use an empty ppe for evalutation, it's only used for adding additional context to errors.
Expand Down
100 changes: 64 additions & 36 deletions src/Share/BackgroundJobs/Diffs/ContributionDiffs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@ import Share.BackgroundJobs.Diffs.Queries qualified as DQ
import Share.BackgroundJobs.Errors (reportError)
import Share.BackgroundJobs.Monad (Background, withTag)
import Share.BackgroundJobs.Workers (newWorker)
import Share.Branch (Branch (..))
import Share.Branch (branchCausals_)
import Share.Codebase qualified as Codebase
import Share.Contribution (Contribution (..))
import Share.Env qualified as Env
import Share.IDs
import Share.IDs qualified as IDs
import Share.Metrics qualified as Metrics
import Share.NamespaceDiffs (NamespaceDiffError (MissingEntityError))
import Share.Postgres qualified as PG
import Share.Postgres.Contributions.Queries qualified as ContributionsQ
import Share.Postgres.Notifications qualified as Notif
Expand All @@ -23,6 +23,7 @@ import Share.Utils.Logging qualified as Logging
import Share.Web.Authorization qualified as AuthZ
import Share.Web.Errors (EntityMissing (..), ErrorID (..))
import Share.Web.Share.Diffs.Impl qualified as Diffs
import System.Clock qualified as Clock

-- | Check every 10 minutes if we haven't heard on the notifications channel.
-- Just in case we missed a notification.
Expand All @@ -32,43 +33,70 @@ maxPollingIntervalSeconds = 10 * 60
worker :: Ki.Scope -> Background ()
worker scope = do
authZReceipt <- AuthZ.backgroundJobAuthZ
badUnliftCodebaseRuntime <- Codebase.badAskUnliftCodebaseRuntime
unisonRuntime <- asks Env.sandboxedRuntime
let makeRuntime :: Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)
makeRuntime codebase = do
runtime <- Codebase.codebaseRuntimeTransaction unisonRuntime codebase
pure (badUnliftCodebaseRuntime runtime)
newWorker scope "diffs:contributions" $ forever do
Notif.waitOnChannel Notif.ContributionDiffChannel (maxPollingIntervalSeconds * 1000000)
processDiffs authZReceipt >>= \case
Left (contributionId, e) ->
withTag "contribution-id" (IDs.toText contributionId) $ do
reportError e
Right _ -> pure ()
processDiffs authZReceipt makeRuntime

processDiffs :: AuthZ.AuthZReceipt -> Background (Either (ContributionId, NamespaceDiffError) ())
processDiffs authZReceipt = Metrics.recordContributionDiffDuration . runExceptT $ do
mayContributionId <- PG.runTransaction DQ.claimContributionToDiff
for_ mayContributionId (diffContribution authZReceipt)
case mayContributionId of
Just contributionId -> do
Logging.textLog ("Recomputed contribution diff: " <> tShow contributionId)
& Logging.withTag ("contribution-id", tShow contributionId)
& Logging.withSeverity Logging.Info
& Logging.logMsg
-- Keep processing releases until we run out of them.
either throwError pure =<< lift (processDiffs authZReceipt)
Nothing -> pure ()
-- Process diffs until we run out of them. We claim a diff in a transaction and compute the diff in the same
-- transaction, with a row lock on the contribution id (which is skipped by other workers). There's therefore no chance
-- that we claim a diff but fail to write the result of computing that diff back to the database.
processDiffs :: AuthZ.AuthZReceipt -> (Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)) -> Background ()
processDiffs authZReceipt makeRuntime = do
let loop :: Background ()
loop = do
result <-
PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do
DQ.claimContributionToDiff >>= \case
Nothing -> pure Nothing
Just contributionId -> do
startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic)
Comment thread
ChrisPenner marked this conversation as resolved.
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId)
pure (Just (contributionId, startTime, result))
whenJust result \(contributionId, startTime, result) -> do
withTag "contribution-id" (IDs.toText contributionId) do
case result of
Left err -> reportError err
Right didWork -> do
when didWork do
liftIO (Metrics.recordContributionDiffDuration startTime)
Logging.textLog "Computed contribution diff"
& Logging.withSeverity Logging.Info
& Logging.logMsg
loop
loop

diffContribution :: AuthZ.AuthZReceipt -> ContributionId -> ExceptT (ContributionId, NamespaceDiffError) Background ()
diffContribution authZReceipt contributionId = withExceptT (contributionId,) . mapExceptT (withTag "contribution-id" (IDs.toText contributionId)) $ do
( bestCommonAncestorCausalId,
project,
newBranch@Branch {causal = newBranchCausalId},
oldBranch@Branch {causal = oldBranchCausalId}
) <- ExceptT $ PG.tryRunTransaction $ do
Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <- ContributionsQ.contributionById contributionId
project <- Q.projectById projectId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "project:missing") "Project not found")
newBranch <- Q.branchById newBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "branch:missing") "Source branch not found")
oldBranch <- Q.branchById oldBranchId `whenNothingM` throwError (MissingEntityError $ EntityMissing (ErrorID "branch:missing") "Target branch not found")
pure (bestCommonAncestorCausalId, project, newBranch, oldBranch)
-- Check whether a causal diff has already been computed, and if it hasn't, compute and store it. Otherwise, do nothing.
-- Returns whether or not we did any work.
maybeComputeAndStoreCausalDiff ::
AuthZ.AuthZReceipt ->
(Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)) ->
ContributionId ->
PG.Transaction EntityMissing Bool
maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId = do
Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <-
ContributionsQ.contributionById contributionId
project <- Q.projectById projectId `whenNothingM` throwError (EntityMissing (ErrorID "project:missing") "Project not found")
newBranch <- Q.branchById newBranchId `whenNothingM` throwError (EntityMissing (ErrorID "branch:missing") "Source branch not found")
oldBranch <- Q.branchById oldBranchId `whenNothingM` throwError (EntityMissing (ErrorID "branch:missing") "Target branch not found")
let oldCodebase = Codebase.codebaseForProjectBranch authZReceipt project oldBranch
let newCodebase = Codebase.codebaseForProjectBranch authZReceipt project newBranch
-- This method saves the diff so it'll be there when we need it, so we don't need to do anything with it.
_ <-
Diffs.diffCausals authZReceipt (oldCodebase, oldBranchCausalId) (newCodebase, newBranchCausalId) bestCommonAncestorCausalId
pure ()
let oldCausal = oldBranch ^. branchCausals_
let newCausal = newBranch ^. branchCausals_
ContributionsQ.existsPrecomputedNamespaceDiff (oldCodebase, oldCausal) (newCodebase, newCausal) >>= \case
True -> pure False
False -> do
oldRuntime <- PG.transactionUnsafeIO (makeRuntime oldCodebase)
newRuntime <- PG.transactionUnsafeIO (makeRuntime newCodebase)
_ <-
Diffs.computeAndStoreCausalDiff
authZReceipt
(oldCodebase, oldRuntime, oldCausal)
(newCodebase, newRuntime, newCausal)
bestCommonAncestorCausalId
pure True
38 changes: 18 additions & 20 deletions src/Share/BackgroundJobs/Diffs/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@ module Share.BackgroundJobs.Diffs.Queries
)
where

import Data.Foldable (toList)
import Data.Set (Set)
import Share.IDs
import Share.Postgres
import Share.Postgres.Notifications qualified as Notif
import Unison.Prelude

submitContributionsToBeDiffed :: (QueryM m) => Set ContributionId -> m ()
submitContributionsToBeDiffed contributions = do
execute_
[sql|
WITH new_contributions(contribution_id) AS (
SELECT * FROM ^{singleColumnTable (toList contributions)}
)
INSERT INTO contribution_diff_queue (contribution_id)
SELECT nc.contribution_id FROM new_contributions nc
ON CONFLICT DO NOTHING
WITH new_contributions(contribution_id) AS (
SELECT * FROM ^{singleColumnTable (toList contributions)}
)
INSERT INTO contribution_diff_queue (contribution_id)
SELECT nc.contribution_id FROM new_contributions nc
|]
Notif.notifyChannel Notif.ContributionDiffChannel

Expand All @@ -28,16 +26,16 @@ claimContributionToDiff :: Transaction e (Maybe ContributionId)
claimContributionToDiff = do
query1Col
[sql|
WITH chosen_contribution(contribution_id) AS (
SELECT q.contribution_id
FROM contribution_diff_queue q
ORDER BY q.created_at ASC
LIMIT 1
-- Skip any that are being synced by other workers.
FOR UPDATE SKIP LOCKED
)
DELETE FROM contribution_diff_queue
USING chosen_contribution
WHERE contribution_diff_queue.contribution_id = chosen_contribution.contribution_id
RETURNING chosen_contribution.contribution_id
WITH chosen_contribution(contribution_id) AS (
SELECT q.contribution_id
FROM contribution_diff_queue q
ORDER BY q.created_at ASC
LIMIT 1
-- Skip any that are being synced by other workers.
FOR UPDATE SKIP LOCKED
)
DELETE FROM contribution_diff_queue
USING chosen_contribution
WHERE contribution_diff_queue.contribution_id = chosen_contribution.contribution_id
RETURNING chosen_contribution.contribution_id
|]
2 changes: 1 addition & 1 deletion src/Share/Branch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ instance (Hasql.DecodeValue causal) => Hasql.DecodeRow (Branch causal) where
creatorId <- PG.decodeField
pure $ Branch {..}

branchCausals_ :: Traversal (Branch causal) (Branch causal') causal causal'
branchCausals_ :: Lens (Branch causal) (Branch causal') causal causal'
branchCausals_ f Branch {..} = (\causal -> Branch {causal, ..}) <$> f causal

branchCodebaseUser :: Branch causal -> UserId
Expand Down
Loading
Loading