Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion share-api.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ library
Share.App
Share.Backend
Share.BackgroundJobs
Share.BackgroundJobs.Diffs.ContributionDiffs
Share.BackgroundJobs.Diffs.CausalDiffs
Share.BackgroundJobs.Diffs.Queries
Share.BackgroundJobs.Diffs.Types
Share.BackgroundJobs.Errors
Share.BackgroundJobs.Monad
Share.BackgroundJobs.Search.DefinitionSync
Expand Down
31 changes: 31 additions & 0 deletions sql/2025-05-09_track_contribution_causals.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
-- Track the causals on the contribution itself; this is useful because the branches may change _after_ the
-- contribution is merged or closed.
ALTER TABLE contributions
ADD COLUMN source_causal_id INTEGER NULL REFERENCES causals (id) ON DELETE NO ACTION,
ADD COLUMN target_causal_id INTEGER NULL REFERENCES causals (id) ON DELETE NO ACTION
;

-- Backfill the new columns with current causal of each contribution, this will be incorrect for old contributions
-- where we can't infer what the source branch _was_ at when it was merged.
UPDATE contributions
SET source_causal_id = (
SELECT pb.causal_id
FROM project_branches pb
WHERE pb.id = contributions.source_branch
LIMIT 1
),
target_causal_id = (
SELECT pb.causal_id
FROM project_branches pb
WHERE pb.id = contributions.target_branch
LIMIT 1
)
WHERE source_causal_id IS NULL
AND target_causal_id IS NULL
;

-- Make the new columns non-nullable
ALTER TABLE contributions
ALTER COLUMN source_causal_id SET NOT NULL,
ALTER COLUMN target_causal_id SET NOT NULL
;
14 changes: 14 additions & 0 deletions sql/2025-05-12_causal_diff_queue.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
DROP TABLE contribution_diff_queue;

-- Table for causal diffs we want to compute.
-- Also keyed by the codebase owner of each side of the diff since
-- the sandboxed terms may affect how the diff looks.
CREATE TABLE causal_diff_queue (
from_causal_id INTEGER NOT NULL REFERENCES causals(id) ON DELETE CASCADE,
to_causal_id INTEGER NOT NULL REFERENCES causals(id) ON DELETE CASCADE,
from_codebase_owner UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
to_codebase_owner UUID NOT NULL REFERENCES users(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

PRIMARY KEY (from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner)
);
2 changes: 1 addition & 1 deletion src/Share/BackgroundJobs.hs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Share.BackgroundJobs (startWorkers) where

import Ki.Unlifted qualified as Ki
import Share.BackgroundJobs.Diffs.ContributionDiffs qualified as ContributionDiffs
import Share.BackgroundJobs.Diffs.CausalDiffs qualified as ContributionDiffs
import Share.BackgroundJobs.Monad (Background)
import Share.BackgroundJobs.Search.DefinitionSync qualified as DefnSearch
import Share.BackgroundJobs.Webhooks.Worker qualified as Webhooks
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
module Share.BackgroundJobs.Diffs.ContributionDiffs (worker) where
module Share.BackgroundJobs.Diffs.CausalDiffs (worker) where

import Control.Lens
import Control.Monad.Except
import Ki.Unlifted qualified as Ki
import Share.BackgroundJobs.Diffs.Queries qualified as DQ
import Share.BackgroundJobs.Diffs.Types (CausalDiffInfo (..))
import Share.BackgroundJobs.Errors (reportError)
import Share.BackgroundJobs.Monad (Background, withTag)
import Share.BackgroundJobs.Monad (Background, withTags)
import Share.BackgroundJobs.Workers (newWorker)
import Share.Branch (branchCausals_)
import Share.Codebase qualified as Codebase
import Share.Contribution (Contribution (..))
import Share.Env qualified as Env
import Share.IDs
import Share.IDs qualified as IDs
import Share.Metrics qualified as Metrics
import Share.Postgres qualified as PG
import Share.Postgres.Causal.Queries qualified as CausalQ
import Share.Postgres.Contributions.Queries qualified as ContributionsQ
import Share.Postgres.Notifications qualified as Notif
import Share.Postgres.Queries qualified as Q
import Share.Prelude
import Share.Utils.Logging qualified as Logging
import Share.Web.Authorization qualified as AuthZ
import Share.Web.Errors (EntityMissing (..), ErrorID (..))
import Share.Web.Errors (EntityMissing (..))
import Share.Web.Share.Diffs.Impl qualified as Diffs
import System.Clock qualified as Clock

Expand All @@ -39,8 +36,8 @@ worker scope = do
makeRuntime codebase = do
runtime <- Codebase.codebaseRuntimeTransaction unisonRuntime codebase
pure (badUnliftCodebaseRuntime runtime)
newWorker scope "diffs:contributions" $ forever do
Notif.waitOnChannel Notif.ContributionDiffChannel (maxPollingIntervalSeconds * 1000000)
newWorker scope "causal-diffs" $ forever do
Notif.waitOnChannel Notif.CausalDiffChannel (maxPollingIntervalSeconds * 1000000)
processDiffs authZReceipt makeRuntime

-- Process diffs until we run out of them. We claim a diff in a transaction and compute the diff in the same
Expand All @@ -52,20 +49,26 @@ processDiffs authZReceipt makeRuntime = do
loop = do
result <-
PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do
DQ.claimContributionToDiff >>= \case
DQ.claimCausalDiff >>= \case
Nothing -> pure Nothing
Just contributionId -> do
Just causalDiffInfo -> do
startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic)
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId)
pure (Just (contributionId, startTime, result))
whenJust result \(contributionId, startTime, result) -> do
withTag "contribution-id" (IDs.toText contributionId) do
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt makeRuntime causalDiffInfo)
pure (Just (causalDiffInfo, startTime, result))
whenJust result \(CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner}, startTime, result) -> do
let tags =
[ ("from-causal-id", IDs.toText fromCausalId),
("to-causal-id", IDs.toText toCausalId),
("from-codebase-owner", IDs.toText fromCodebaseOwner),
("to-codebase-owner", IDs.toText toCodebaseOwner)
]
withTags tags do
case result of
Left err -> reportError err
Right didWork -> do
when didWork do
liftIO (Metrics.recordContributionDiffDuration startTime)
Logging.textLog "Computed contribution diff"
liftIO (Metrics.recordCausalDiffDuration startTime)
Logging.textLog "Computed causal diff"
& Logging.withSeverity Logging.Info
& Logging.logMsg
loop
Expand All @@ -76,27 +79,21 @@ processDiffs authZReceipt makeRuntime = do
maybeComputeAndStoreCausalDiff ::
AuthZ.AuthZReceipt ->
(Codebase.CodebaseEnv -> IO (Codebase.CodebaseRuntime IO)) ->
ContributionId ->
CausalDiffInfo ->
PG.Transaction EntityMissing Bool
maybeComputeAndStoreCausalDiff authZReceipt makeRuntime contributionId = do
Contribution {bestCommonAncestorCausalId, sourceBranchId = newBranchId, targetBranchId = oldBranchId, projectId} <-
ContributionsQ.contributionById contributionId
project <- Q.projectById projectId `whenNothingM` throwError (EntityMissing (ErrorID "project:missing") "Project not found")
newBranch <- Q.branchById newBranchId `whenNothingM` throwError (EntityMissing (ErrorID "branch:missing") "Source branch not found")
oldBranch <- Q.branchById oldBranchId `whenNothingM` throwError (EntityMissing (ErrorID "branch:missing") "Target branch not found")
let oldCodebase = Codebase.codebaseForProjectBranch authZReceipt project oldBranch
let newCodebase = Codebase.codebaseForProjectBranch authZReceipt project newBranch
let oldCausal = oldBranch ^. branchCausals_
let newCausal = newBranch ^. branchCausals_
ContributionsQ.existsPrecomputedNamespaceDiff (oldCodebase, oldCausal) (newCodebase, newCausal) >>= \case
maybeComputeAndStoreCausalDiff authZReceipt makeRuntime (CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner}) = do
bestCommonAncestorCausalId <- CausalQ.bestCommonAncestor fromCausalId toCausalId
let fromCodebase = Codebase.codebaseEnv authZReceipt $ Codebase.codebaseLocationForUserCodebase fromCodebaseOwner
let toCodebase = Codebase.codebaseEnv authZReceipt $ Codebase.codebaseLocationForUserCodebase toCodebaseOwner
ContributionsQ.existsPrecomputedNamespaceDiff (fromCodebase, fromCausalId) (toCodebase, toCausalId) >>= \case
True -> pure False
False -> do
oldRuntime <- PG.transactionUnsafeIO (makeRuntime oldCodebase)
newRuntime <- PG.transactionUnsafeIO (makeRuntime newCodebase)
fromRuntime <- PG.transactionUnsafeIO (makeRuntime fromCodebase)
toRuntime <- PG.transactionUnsafeIO (makeRuntime toCodebase)
_ <-
Diffs.computeAndStoreCausalDiff
authZReceipt
(oldCodebase, oldRuntime, oldCausal)
(newCodebase, newRuntime, newCausal)
(fromCodebase, fromRuntime, fromCausalId)
(toCodebase, toRuntime, toCausalId)
bestCommonAncestorCausalId
pure True
48 changes: 34 additions & 14 deletions src/Share/BackgroundJobs/Diffs/Queries.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
module Share.BackgroundJobs.Diffs.Queries
( submitContributionsToBeDiffed,
claimContributionToDiff,
claimCausalDiff,
)
where

import Share.BackgroundJobs.Diffs.Types
import Share.IDs
import Share.Postgres
import Share.Postgres.Notifications qualified as Notif
Expand All @@ -15,28 +16,47 @@ submitContributionsToBeDiffed contributions = do
[sql|
WITH new_contributions(contribution_id) AS (
SELECT * FROM ^{singleColumnTable (toList contributions)}
), diff_info(from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner) AS (
SELECT c.target_causal_id, c.source_causal_id, COALESCE(target_branch.contributor_id, target_project.owner_user_id), COALESCE(source_branch.contributor_id, source_project.owner_user_id)
FROM new_contributions nc
JOIN contributions c ON c.id = nc.contribution_id
JOIN project_branches source_branch ON source_branch.id = c.source_branch
JOIN project_branches target_branch ON target_branch.id = c.target_branch
JOIN projects source_project ON source_project.id = source_branch.project_id
JOIN projects target_project ON target_project.id = target_branch.project_id
)
INSERT INTO contribution_diff_queue (contribution_id)
SELECT nc.contribution_id FROM new_contributions nc
INSERT INTO causal_diff_queue (from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner)
SELECT from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner
FROM diff_info
WHERE NOT EXISTS (
SELECT FROM namespace_diffs nd
WHERE nd.left_causal_id = diff_info.from_causal_id
AND nd.right_causal_id = diff_info.to_causal_id
AND nd.left_codebase_owner_user_id = diff_info.from_codebase_owner
AND nd.right_codebase_owner_user_id = diff_info.to_codebase_owner
)
ON CONFLICT DO NOTHING
|]
Notif.notifyChannel Notif.ContributionDiffChannel
Notif.notifyChannel Notif.CausalDiffChannel

-- | Claim the oldest contribution in the queue to be diffed.
claimContributionToDiff :: Transaction e (Maybe ContributionId)
claimContributionToDiff = do
query1Col
claimCausalDiff :: Transaction e (Maybe CausalDiffInfo)
claimCausalDiff = do
query1Row
[sql|
WITH chosen_contribution(contribution_id) AS (
SELECT q.contribution_id
FROM contribution_diff_queue q
WITH chosen(from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner) AS (
SELECT q.from_causal_id, q.to_causal_id, q.from_codebase_owner, q.to_codebase_owner
FROM causal_diff_queue q
ORDER BY q.created_at ASC
LIMIT 1
-- Skip any that are being synced by other workers.
FOR UPDATE SKIP LOCKED
)
DELETE FROM contribution_diff_queue
USING chosen_contribution
WHERE contribution_diff_queue.contribution_id = chosen_contribution.contribution_id
RETURNING chosen_contribution.contribution_id
DELETE FROM causal_diff_queue
USING chosen
WHERE causal_diff_queue.from_causal_id = chosen.from_causal_id
AND causal_diff_queue.to_causal_id = chosen.to_causal_id
AND causal_diff_queue.from_codebase_owner = chosen.from_codebase_owner
AND causal_diff_queue.to_codebase_owner = chosen.to_codebase_owner
RETURNING chosen.from_causal_id, chosen.to_causal_id, chosen.from_codebase_owner, chosen.to_codebase_owner
|]
20 changes: 20 additions & 0 deletions src/Share/BackgroundJobs/Diffs/Types.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module Share.BackgroundJobs.Diffs.Types (CausalDiffInfo (..)) where

import Share.IDs
import Share.Postgres qualified as PG
import Share.Postgres.IDs

data CausalDiffInfo = CausalDiffInfo
{ fromCausalId :: CausalId,
toCausalId :: CausalId,
fromCodebaseOwner :: UserId,
toCodebaseOwner :: UserId
}

instance PG.DecodeRow CausalDiffInfo where
decodeRow =
CausalDiffInfo
<$> PG.decodeField
<*> PG.decodeField
<*> PG.decodeField
<*> PG.decodeField
6 changes: 5 additions & 1 deletion src/Share/BackgroundJobs/Monad.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Share.BackgroundJobs.Monad
withWorkerName,
runBackground,
withTag,
withTags,
)
where

Expand All @@ -29,7 +30,10 @@ withWorkerName :: Text -> Background a -> Background a
withWorkerName name = localBackgroundCtx \ctx -> ctx {workerName = name}

withTag :: Text -> Text -> Background a -> Background a
withTag key value = localBackgroundCtx \ctx -> ctx {loggingTags = Map.insert key value (loggingTags ctx)}
withTag key value = withTags [(key, value)]

withTags :: [(Text, Text)] -> Background a -> Background a
withTags tags = localBackgroundCtx \ctx -> ctx {loggingTags = Map.union (Map.fromList tags) (loggingTags ctx)}

instance Logging.MonadLogger Background where
logMsg msg = do
Expand Down
4 changes: 4 additions & 0 deletions src/Share/Contribution.hs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ data Contribution = Contribution
status :: ContributionStatus,
sourceBranchId :: BranchId,
targetBranchId :: BranchId,
sourceCausalId :: CausalId,
targetCausalId :: CausalId,
bestCommonAncestorCausalId :: Maybe CausalId,
createdAt :: UTCTime,
updatedAt :: UTCTime,
Expand All @@ -94,6 +96,8 @@ instance Hasql.DecodeRow Contribution where
status <- PG.decodeField
sourceBranchId <- PG.decodeField
targetBranchId <- PG.decodeField
sourceCausalId <- PG.decodeField
targetCausalId <- PG.decodeField
bestCommonAncestorCausalId <- PG.decodeField
createdAt <- PG.decodeField
updatedAt <- PG.decodeField
Expand Down
6 changes: 3 additions & 3 deletions src/Share/Metrics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ module Share.Metrics
tickUserSignup,
recordBackgroundImportDuration,
recordDefinitionSearchIndexDuration,
recordContributionDiffDuration,
recordCausalDiffDuration,
recordWebhookSendingDuration,
)
where
Expand Down Expand Up @@ -445,8 +445,8 @@ recordBackgroundImportDuration = timeActionIntoHistogram backgroundImportDuratio
recordDefinitionSearchIndexDuration :: (MonadUnliftIO m) => m r -> m r
recordDefinitionSearchIndexDuration = timeActionIntoHistogram definitionSearchIndexDurationSeconds (deployment, service)

recordContributionDiffDuration :: Clock.TimeSpec -> IO ()
recordContributionDiffDuration startTime = do
recordCausalDiffDuration :: Clock.TimeSpec -> IO ()
recordCausalDiffDuration startTime = do
endTime <- Clock.getTime Monotonic
recordLatency contributionDiffDurationSeconds (deployment, service) startTime endTime

Expand Down
Loading
Loading