Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 111 additions & 150 deletions internal/proto/v2/resolve.pb.go

Large diffs are not rendered by default.

82 changes: 82 additions & 0 deletions internal/server/adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"

pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
"github.com/datacommonsorg/mixer/internal/server/topic"
"github.com/datacommonsorg/mixer/internal/server/v2/resolve"
)

// topicExpanderAdapter adapts *topic.TopicCacheManager to the resolve.TopicExpander interface.
type topicExpanderAdapter struct {
m *topic.TopicCacheManager
}

// newTopicExpander is a Server factory method to instantiate and return a resolve.TopicExpander.
// It hides concrete adapter details entirely from core handlers.
func (s *Server) newTopicExpander() resolve.TopicExpander {
if s.topicCacheManager == nil {
return nil
}
return &topicExpanderAdapter{m: s.topicCacheManager}
}

// ExpandRoots resolves root topics using the underlying TopicCacheManager.
func (a *topicExpanderAdapter) ExpandRoots(ctx context.Context, expandTopics bool) ([]*pbv2.ResolveResponse_Entity_Candidate, error) {
if a.m == nil {
return nil, nil
}
return a.m.ExpandRoots(ctx, expandTopics)
}

// ExpandTopic resolves children for a given topic DCID.
func (a *topicExpanderAdapter) ExpandTopic(ctx context.Context, topicDcid string, expandTopics bool) ([]*pbv2.ResolveResponse_Entity_Candidate, error) {
if a.m == nil {
return nil, nil
}
return a.m.ExpandTopic(ctx, topicDcid, expandTopics)
}

// GetTopicDisplayName retrieves display names for topics.
func (a *topicExpanderAdapter) GetTopicDisplayName(ctx context.Context, topicDcid string) string {
if a.m == nil {
return ""
}
return a.m.GetTopicDisplayName(ctx, topicDcid)
}

// GetSVPropertyInfos maps dynamic Statistical Variable metadata into resolve package stubs.
func (a *topicExpanderAdapter) GetSVPropertyInfos(ctx context.Context, svDcids []string) (map[string]resolve.SVPropertyInfo, error) {
if a.m == nil {
return nil, nil
}
infos, err := a.m.GetStatVarInfos(ctx, svDcids)
if err != nil {
return nil, err
}
res := make(map[string]resolve.SVPropertyInfo, len(infos))
for k, info := range infos {
if info != nil {
res[k] = resolve.SVPropertyInfo{
Name: info.Name,
ObservationProperties: info.ObservationProperties,
}
}
}
return res, nil
}
102 changes: 98 additions & 4 deletions internal/server/datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
"github.com/datacommonsorg/mixer/internal/nodefetcher"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)

// DataSourceType represents the type of data source.
Expand All @@ -34,6 +36,21 @@ const (
TypeMock DataSourceType = "mock"
)

const (
// defaultFetchAllPageSize is the default page size for NodeFetchAll sequential queries.
defaultFetchAllPageSize = 1000

// fetchAllChunkSize is the size of node partitions used to chunk multi-node NodeFetchAll requests.
// Spanner's NodeFetcher limits returned edges to a strict threshold of 1,000 rows per query.
// While NodeFetchAll pages sequentially using NextToken, doing so for a massive list of
// independent nodes is highly inefficient. More importantly, if a query matches thousands of
// edges across multiple subject nodes, Spanner truncates the returned results within a page,
// silently dropping properties for alphabetically later nodes. Chunking the requested nodes
// into independent parallel batches guarantees we remain well below Spanner's truncation threshold
// (e.g., 200 nodes * 3 properties = 600 edges) while maximizing retrieval performance.
fetchAllChunkSize = 200
)

// DataSource interface defines the common methods for all data sources.
type DataSource interface {
Type() DataSourceType
Expand All @@ -52,16 +69,93 @@ type DataSource interface {

// NodeFetchAll fetches all NodeResponse pages for a given request by repeatedly calling ds.Node
// as long as a NextToken is returned and merges into single response.
// It automatically partitions multi-node requests into chunks of fetchAllChunkSize and executes
// them in parallel to prevent exceeding database pagination/truncation limits and maximize performance.
func NodeFetchAll(ctx context.Context, ds DataSource, req *pbv2.NodeRequest, pageSize int) (*pbv2.NodeResponse, error) {
if pageSize <= 0 {
return nil, fmt.Errorf("pageSize must be positive")
}
return nodefetcher.NodeFetchAllFunc(ctx, func(ctx context.Context, req *pbv2.NodeRequest) (*pbv2.NodeResponse, error) {
return ds.Node(ctx, req, pageSize)
}, req)

nodes := req.GetNodes()

// If it's a single-node request, bypass chunking and page sequentially
if len(nodes) <= fetchAllChunkSize {
return nodefetcher.NodeFetchAllFunc(ctx, func(ctx context.Context, req *pbv2.NodeRequest) (*pbv2.NodeResponse, error) {
return ds.Node(ctx, req, pageSize)
}, req)
}

chunks := chunkNodes(nodes, fetchAllChunkSize)
responses, err := fetchChunksParallel(ctx, ds, req, chunks, pageSize)
if err != nil {
return nil, err
}

return mergeDisjointResponses(responses), nil
}

const defaultFetchAllPageSize = 1000
// fetchChunksParallel fetches node metadata for partitioned chunks in parallel.
func fetchChunksParallel(
ctx context.Context,
ds DataSource,
req *pbv2.NodeRequest,
chunks [][]string,
pageSize int,
) ([]*pbv2.NodeResponse, error) {
g, groupCtx := errgroup.WithContext(ctx)
responses := make([]*pbv2.NodeResponse, len(chunks))

for idx, chunk := range chunks {
chunkIdx, chunkNodes := idx, chunk
g.Go(func() error {
chunkReq := proto.Clone(req).(*pbv2.NodeRequest)
chunkReq.Nodes = chunkNodes

resp, err := nodefetcher.NodeFetchAllFunc(groupCtx, func(ctx context.Context, req *pbv2.NodeRequest) (*pbv2.NodeResponse, error) {
return ds.Node(ctx, req, pageSize)
}, chunkReq)
if err != nil {
return err
}
responses[chunkIdx] = resp
return nil
})
}

if err := g.Wait(); err != nil {
return nil, err
}
return responses, nil
}

// chunkNodes partitions a string slice into disjoint chunks of the specified size.
func chunkNodes(nodes []string, size int) [][]string {
var chunks [][]string
for i := 0; i < len(nodes); i += size {
end := i + size
if end > len(nodes) {
end = len(nodes)
}
chunks = append(chunks, nodes[i:end])
}
return chunks
}

// mergeDisjointResponses deep-merges disjoint parallel NodeResponses into a single response.
func mergeDisjointResponses(responses []*pbv2.NodeResponse) *pbv2.NodeResponse {
accumulated := &pbv2.NodeResponse{
Data: make(map[string]*pbv2.LinkedGraph),
}
for _, resp := range responses {
if resp == nil {
continue
}
for k, v := range resp.GetData() {
accumulated.Data[k] = v
}
}
return accumulated
}

// NewNodeFetcher returns a nodefetcher.NodeAllFetcher for the given DataSource using the default page size.
func NewNodeFetcher(ds DataSource) nodefetcher.NodeAllFetcher {
Expand Down
12 changes: 9 additions & 3 deletions internal/server/handler_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,18 @@ func (s *Server) V2ResolveCore(
ctx context.Context,
in *resolve.NormalizedResolveRequest,
) (*pbv2.ResolveResponse, error) {
// Check for explicit "indicator" resolver, otherwise default to legacy place resolver logic.
if resolver := in.Request.GetResolver(); resolver == resolve.ResolveResolverIndicator {
// Check for explicit "indicator" or "topic" resolvers, otherwise default to legacy place resolver logic.
adapter := s.newTopicExpander()

resolver := in.Request.GetResolver()
switch resolver {
case resolve.ResolveResolverIndicator:
if !s.flags.EnableEmbeddingsResolver {
return nil, status.Errorf(codes.Unimplemented, "Resolving indicators is not enabled for this environment.")
}
return resolve.ResolveUsingEmbeddings(ctx, s.httpClient, s.embeddingsServerURL, s.resolveEmbeddingsIndexes, in.Request.GetNodes(), in.TypeOfValues)
return resolve.ResolveUsingEmbeddings(ctx, s.httpClient, s.embeddingsServerURL, s.resolveEmbeddingsIndexes, in.Request.GetNodes(), in.TypeOfValues, adapter, in.Request.GetExpandTopics())
case resolve.ResolveResolverTopic:
return resolve.ResolveTopics(ctx, adapter, in.Request.GetNodes(), in.Request.GetExpandTopics())
}

// Resolve places based on property expression
Expand Down
2 changes: 1 addition & 1 deletion internal/server/handler_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *Server) V2Resolve(
) (*pbv2.ResolveResponse, error) {
// TODO: Remove this once embeddings search (resolver == "indicator") are
// supported through Spanner.
if s.shouldDivertV2(ctx) && (in == nil || in.GetResolver() != "indicator") {
if s.shouldDivertV2(ctx) && (in == nil || (in.GetResolver() != "indicator" && in.GetResolver() != "topic")) {
return s.dispatcher.Resolve(ctx, in)
}

Expand Down
147 changes: 147 additions & 0 deletions internal/server/topic/expansion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2026 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package topic manages the in-memory and Redis caching of Knowledge Graph topic hierarchies.
// This file (expansion.go) implements topic tree navigation and candidate expansion algorithms.
package topic

import (
"context"
"fmt"
"log/slog"

pb "github.com/datacommonsorg/mixer/internal/proto"
pbv2 "github.com/datacommonsorg/mixer/internal/proto/v2"
)

// newCandidate creates a base candidate with standard type identification.
func newCandidate(dcid, entityType string) *pbv2.ResolveResponse_Entity_Candidate {
return &pbv2.ResolveResponse_Entity_Candidate{
Dcid: dcid,
TypeOf: []string{entityType},
}
}

// newTopicCandidate constructs a ResolveResponse_Entity_Candidate for a Topic.
func newTopicCandidate(dcid, name string) *pbv2.ResolveResponse_Entity_Candidate {
cand := newCandidate(dcid, "Topic")
cand.Name = name
return cand
}

// newSVCandidate constructs a ResolveResponse_Entity_Candidate for a Statistical Variable.
func newSVCandidate(dcid string, info *StatVarInfo) *pbv2.ResolveResponse_Entity_Candidate {
cand := newCandidate(dcid, "StatisticalVariable")
if info != nil {
cand.Name = info.Name
cand.ObservationProperties = info.ObservationProperties
}
return cand
}

// ExpandRoots returns top-level root topic candidates.
// If expandTopics is true, it fully expands descendant Statistical Variables under each root topic.
// If expandTopics is false, it populates each root topic with its immediate children (sub-topics and SVs).
func (m *TopicCacheManager) ExpandRoots(ctx context.Context, expandTopics bool) ([]*pbv2.ResolveResponse_Entity_Candidate, error) {
h, _ := m.GetHierarchy(ctx)
if h == nil || len(h.GetRootTopicDcids()) == 0 {
return nil, nil
}

var candidates []*pbv2.ResolveResponse_Entity_Candidate
for _, rootDcid := range h.GetRootTopicDcids() {
var name string
if node, ok := h.GetTopics()[rootDcid]; ok && node != nil {
name = node.GetName()
}
cand := newTopicCandidate(rootDcid, name)
children, err := m.ExpandTopic(ctx, rootDcid, expandTopics)
if err != nil {
slog.Error("Failed to expand root topic during resolve", "root", rootDcid, "error", err)
} else {
cand.Children = children
}
candidates = append(candidates, cand)
}
return candidates, nil
}

// ExpandTopic resolves children for a given topic DCID.
func (m *TopicCacheManager) ExpandTopic(ctx context.Context, topicDcid string, expandTopics bool) ([]*pbv2.ResolveResponse_Entity_Candidate, error) {
h, _ := m.GetHierarchy(ctx)
if h == nil || h.GetTopics() == nil {
return nil, nil
}

allSvDcids := collectTopicSVs(h, topicDcid, expandTopics)

// Batch load metadata for all SVs required in this view
infos, err := m.GetStatVarInfos(ctx, allSvDcids)
if err != nil {
return nil, fmt.Errorf("failed to get statistical variable infos: %w", err)
}

var candidates []*pbv2.ResolveResponse_Entity_Candidate
if expandTopics {
// Flattened descendant SV list
for _, svDcid := range allSvDcids {
candidates = append(candidates, newSVCandidate(svDcid, infos[svDcid]))
}
} else {
// Immediate members: Sub-Topics and SVs
if node, ok := h.GetTopics()[topicDcid]; ok && node != nil {
for _, childDcid := range node.GetRelevantVariables() {
if isTopicDcid(childDcid) {
var name string
if childNode, childOk := h.GetTopics()[childDcid]; childOk && childNode != nil {
name = childNode.GetName()
}
candidates = append(candidates, newTopicCandidate(childDcid, name))
} else {
candidates = append(candidates, newSVCandidate(childDcid, infos[childDcid]))
}
}
}
}

return candidates, nil
}

// collectTopicSVs initializes seenTopics cycle tracking map and initiates traversal.
func collectTopicSVs(h *pb.TopicHierarchy, topicDcid string, expandTopics bool) []string {
var res []string
collectTopicSVsInternal(h, topicDcid, expandTopics, make(map[string]struct{}), &res)
return res
}

// collectTopicSVsInternal performs traversal of a topic's hierarchy under seenTopics protection,
// collecting all resolved SV DCIDs inside the provided result slice in-place to prevent intermediate allocations.
func collectTopicSVsInternal(h *pb.TopicHierarchy, topicDcid string, expandTopics bool, seenTopics map[string]struct{}, res *[]string) {
if _, seen := seenTopics[topicDcid]; seen {
return
}
seenTopics[topicDcid] = struct{}{}

if node, ok := h.GetTopics()[topicDcid]; ok && node != nil {
for _, childDcid := range node.GetRelevantVariables() {
if isTopicDcid(childDcid) {
if expandTopics {
collectTopicSVsInternal(h, childDcid, expandTopics, seenTopics, res)
}
} else {
*res = append(*res, childDcid)
}
}
}
}
Loading
Loading