Skip to content

Commit 9b5632d

Browse files
mw5hclaude
andcommitted
fixup! importer: set up FunctionResolver for computed column type-checking
Move FunctionResolver setup from makeInputConverter (shared semaCtx) to NewDatumRowConverter (per-instance) to eliminate a data race. Multiple parallel import workers were sharing the same semaCtx.FunctionResolver backed by a *descs.Collection, causing concurrent access to leasedDescriptors.maybeInitReadTimestamp. Each DatumRowConverter now creates its own bare-bones descs.Collection inside a short-lived txn, mirroring the existing per-instance sequence resolution pattern. Since bare-bones collections lack a lease manager, we use ByIDWithoutLeased via the new NewDistSQLFunctionResolverFromGetter constructor. This also simplifies the makeInputConverter signature back to accepting *kv.DB instead of descs.DB + *descs.Collection. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fee44f6 commit 9b5632d

5 files changed

Lines changed: 72 additions & 47 deletions

File tree

pkg/sql/catalog/descs/dist_sql_function_resolver.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,14 @@ func NewDistSQLFunctionResolver(descs *Collection, txn *kv.Txn) *DistSQLFunction
3434
}
3535
}
3636

37+
// NewDistSQLFunctionResolverFromGetter returns a new DistSQLFunctionResolver
38+
// using the provided ByIDGetter directly. This is useful when the caller
39+
// needs to control how descriptors are looked up, for example using
40+
// ByIDWithoutLeased on a bare-bones collection that lacks a lease manager.
41+
func NewDistSQLFunctionResolverFromGetter(g ByIDGetter) *DistSQLFunctionResolver {
42+
return &DistSQLFunctionResolver{g: g}
43+
}
44+
3745
// ResolveFunction implements tree.FunctionReferenceResolver interface.
3846
// It only resolves builtin functions.
3947
// TODO(chengxiong): extract resolution logics in schema_resolver.go into

pkg/sql/importer/import_processor.go

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import (
1313

1414
"github.com/cockroachdb/cockroach/pkg/base"
1515
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
16+
"github.com/cockroachdb/cockroach/pkg/kv"
1617
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1718
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
1819
"github.com/cockroachdb/cockroach/pkg/roachpb"
1920
"github.com/cockroachdb/cockroach/pkg/settings"
2021
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
2122
"github.com/cockroachdb/cockroach/pkg/sql/bulksst"
2223
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
23-
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
2424
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
2525
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
2626
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
@@ -259,8 +259,7 @@ func makeInputConverter(
259259
evalCtx *eval.Context,
260260
kvCh chan row.KVBatch,
261261
seqChunkProvider *row.SeqChunkProvider,
262-
descsDB descs.DB,
263-
descriptors *descs.Collection,
262+
db *kv.DB,
264263
) (inputConverter, error) {
265264
if len(spec.Tables) > 1 {
266265
return nil, errors.AssertionFailedf("%s only supports reading a single, pre-specified table", spec.Format.Format.String())
@@ -290,32 +289,6 @@ func makeInputConverter(
290289
}
291290
}
292291

293-
// If the table has computed columns, set up a FunctionResolver so that
294-
// any user-defined functions referenced in computed expressions can be
295-
// resolved during type-checking. This mirrors the pattern used by
296-
// ColumnBackfiller.InitForDistributedUse.
297-
hasComputedCols := false
298-
for _, col := range desc.WritableColumns() {
299-
if col.IsComputed() {
300-
hasComputedCols = true
301-
break
302-
}
303-
}
304-
if hasComputedCols {
305-
if err := descsDB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
306-
semaCtx.FunctionResolver = descs.NewDistSQLFunctionResolver(txn.Descriptors(), txn.KV())
307-
return nil
308-
}); err != nil {
309-
return nil, err
310-
}
311-
// Release leases on any accessed descriptors now that the
312-
// resolver is configured. We do this so that leases are not
313-
// held for the entire import process.
314-
if descriptors != nil {
315-
descriptors.ReleaseAll(ctx)
316-
}
317-
}
318-
319292
readerParallelism := int(spec.ReaderParallelism)
320293
if readerParallelism <= 0 {
321294
readerParallelism = int(readerParallelismSetting.Get(&evalCtx.Settings.SV))
@@ -324,7 +297,6 @@ func makeInputConverter(
324297
readerParallelism = runtime.GOMAXPROCS(0)
325298
}
326299

327-
db := descsDB.KV()
328300
switch spec.Format.Format {
329301
case roachpb.IOFileFormat_CSV:
330302
isWorkload := true

pkg/sql/importer/import_processor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestConverterFlushesBatches(t *testing.T) {
9595
params := base.TestServerArgs{}
9696
s := serverutils.StartServerOnly(t, params)
9797
defer s.Stopper().Stop(ctx)
98-
descsDB := s.ApplicationLayer().InternalDB().(descs.DB)
98+
kvDB := s.ApplicationLayer().DB()
9999

100100
tests := []testSpec{
101101
newTestSpec(ctx, t, csvFormat(), "testdata/csv/data-0"),
@@ -122,7 +122,7 @@ func TestConverterFlushesBatches(t *testing.T) {
122122
kvCh := make(chan row.KVBatch, batchSize)
123123
semaCtx := tree.MakeSemaContext(nil /* resolver */)
124124
conv, err := makeInputConverter(ctx, &semaCtx, converterSpec, &evalCtx, kvCh,
125-
nil /* seqChunkProvider */, descsDB, nil /* descriptors */)
125+
nil /* seqChunkProvider */, kvDB)
126126
if err != nil {
127127
t.Fatalf("makeInputConverter() error = %v", err)
128128
}

pkg/sql/importer/read_import_base.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func runImport(
9191
semaCtx := tree.MakeSemaContext(importResolver)
9292
conv, convErr := makeInputConverter(
9393
ctx, &semaCtx, spec, evalCtx, kvCh, seqChunkProvider,
94-
flowCtx.Cfg.DB, flowCtx.Descriptors,
94+
flowCtx.Cfg.DB.KV(),
9595
)
9696
if convErr != nil {
9797
return nil, convErr

pkg/sql/row/row_converter.go

Lines changed: 59 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -484,20 +484,65 @@ func NewDatumRowConverter(
484484
// MakeComputedExprs to map that of Datums.
485485
colsOrdered[ri.InsertColIDtoRowIndex.GetDefault(col.GetID())] = col
486486
}
487-
// Here, computeExprs will be nil if there's no computed column, or
488-
// the list of computed expressions (including nil, for those columns
489-
// that are not computed) otherwise, according to colsOrdered.
490-
c.computedExprs, _, err = schemaexpr.MakeComputedExprs(
491-
ctx,
492-
colsOrdered,
493-
c.tableDesc.PublicColumns(),
494-
c.tableDesc,
495-
tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.GetName())),
496-
c.EvalCtx,
497-
c.SemaCtx,
498-
)
499-
if err != nil {
500-
return nil, errors.Wrapf(err, "error type checking and building computed expression for IMPORT INTO")
487+
488+
// If any computed columns reference UDFs, we need a FunctionResolver
489+
// to resolve them by OID during type-checking. Set up a per-instance
490+
// resolver using a bare-bones descs.Collection inside a short-lived
491+
// txn, and call MakeComputedExprs within the txn so the resolver
492+
// remains valid. Each DatumRowConverter instance gets its own
493+
// resolver to avoid races when multiple import workers run in
494+
// parallel. This mirrors the sequence resolution pattern above.
495+
hasComputedCols := false
496+
for _, col := range colsOrdered {
497+
if col != nil && col.IsComputed() {
498+
hasComputedCols = true
499+
break
500+
}
501+
}
502+
if hasComputedCols && c.SemaCtx.FunctionResolver == nil && c.db != nil {
503+
cf := descs.NewBareBonesCollectionFactory(evalCtx.Settings, evalCtx.Codec)
504+
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(evalCtx.SessionDataStack)
505+
descsCol := cf.NewCollection(ctx, descs.WithDescriptorSessionDataProvider(dsdp))
506+
defer descsCol.ReleaseAll(ctx)
507+
err = c.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
508+
if err := txn.SetFixedTimestamp(ctx, hlc.Timestamp{WallTime: evalCtx.TxnTimestamp.UnixNano()}); err != nil {
509+
return err
510+
}
511+
// Use ByIDWithoutLeased because the bare-bones collection
512+
// does not have a lease manager.
513+
c.SemaCtx.FunctionResolver = descs.NewDistSQLFunctionResolverFromGetter(
514+
descsCol.ByIDWithoutLeased(txn).Get(),
515+
)
516+
c.computedExprs, _, err = schemaexpr.MakeComputedExprs(
517+
ctx,
518+
colsOrdered,
519+
c.tableDesc.PublicColumns(),
520+
c.tableDesc,
521+
tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.GetName())),
522+
c.EvalCtx,
523+
c.SemaCtx,
524+
)
525+
return err
526+
})
527+
if err != nil {
528+
return nil, errors.Wrapf(err, "error type checking and building computed expression for IMPORT INTO")
529+
}
530+
} else {
531+
// Here, computeExprs will be nil if there's no computed column, or
532+
// the list of computed expressions (including nil, for those columns
533+
// that are not computed) otherwise, according to colsOrdered.
534+
c.computedExprs, _, err = schemaexpr.MakeComputedExprs(
535+
ctx,
536+
colsOrdered,
537+
c.tableDesc.PublicColumns(),
538+
c.tableDesc,
539+
tree.NewUnqualifiedTableName(tree.Name(c.tableDesc.GetName())),
540+
c.EvalCtx,
541+
c.SemaCtx,
542+
)
543+
if err != nil {
544+
return nil, errors.Wrapf(err, "error type checking and building computed expression for IMPORT INTO")
545+
}
501546
}
502547

503548
// Here, partialIndexExprs will be nil if there are no partial indexes, or a

0 commit comments

Comments
 (0)