Skip to content

CASSGO-39 Add exec attempt interceptor (round 3)#1943

Open
abenn135 wants to merge 10 commits intoapache:trunkfrom
abenn135:abenn135/query-attempt-interceptor
Open

CASSGO-39 Add exec attempt interceptor (round 3)#1943
abenn135 wants to merge 10 commits intoapache:trunkfrom
abenn135:abenn135/query-attempt-interceptor

Conversation

@abenn135
Copy link
Copy Markdown

@abenn135 abenn135 commented Apr 9, 2026

This supersedes #1820 rebasing on top of, and taking into account, changes in v2.

Notably, this removes the ability to mutate the Statement in the Interceptor -- it is now embedded in the internalRequest, making it inaccessible to a public API without invasive refactoring.

Additionally, I modified queryMetrics in a few ways:

  • Made it threadsafe, so that latency and attempt count are updated under lock, preventing a potential race when reading latency
  • Separated out "attempts started" from "attempts completed". Speculative execution means that several attempts could happen concurrently, and this guarantees that (a) each attempt/interception gets its own attempt count, and (b) the retry policy respects attempts started, to prevent over-retry when earlier retries haven't completed yet.

Fixes #1786.

BenEddy and others added 8 commits April 8, 2026 16:06
Removes return statement that bypassed query attempt tracking.
Remove gocql.NewIterWithErr
To facilitate chaining interceptors
Replace with read-only addr fields.
internalRequest is an intentionally package-private type. We cannot expose it directly in public API QueryAttempt, so instead share the statement. Unfortunately, since the statement invoked by internalRequest.execute()/conn.execute() is embedded in the internalRequest itself, it cannot be modified by an interceptor without significant refactoring. Therefore, interceptor exposes only a copy of the statement.
Copy link
Copy Markdown
Member

@worryg0d worryg0d left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,

I'm unsure if this PR is ready for review, but I just looked at it. I like the overall idea of introducing a generic enough solution that enables us to implement request monitors, rate limiting, etc., but I'm concerned about the ability of in-place mutation of request objects. What are the actual use cases when you need to modify requests in interceptors instead of using the request configuration API?

I didn't dive deep into the implementation details, though, and mostly reviewed the public API.

Comment thread cluster.go Outdated
Comment thread example_interceptor_test.go Outdated
}
}

type QueryAttemptInterceptorChain struct {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make this a part of gocql api - we already expose a couple of similar types for event listeners: SessionReadyListenersMux , SchemaListenersMux, etc.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could... this was originally intended as sample code, and I don't know how many customers will actually need to compose multiple interceptors at a time. Do you want me to move this to query_executor.go?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually neutral on this, but we already have two use-cases where we could use interceptors: monitoring and rate-limiting, so having chain out of the box looks reasonable to me. If nobody has objections, you may make it part of gocql public api.

I think query_executor.go is fine.

Comment on lines +44 to +52
switch q := attempt.Statement.Statement().(type) {
case *gocql.Query:
// Inspect query
log.Println(q.Statement())
case *gocql.Batch:
// Inspect batch

log.Println(q.Entries[0].Stmt)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, this allows in-place modification of the submitted Query / Batch, and it violates the immutable nature that the driver follows, so changes made by interceptors might make Query / Batch objects potentially unreusable.

Is there any particular use case when you want to use the ability of interceptors to modify queries over their configuration API?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to apologize here -- the comments in cluster.go and doc.go were out of date. Due to refactors in the v2 API, creation of the internalRequest is upstream of exec invocation, and this is limits our ability to mutate the query/batch in the interceptor without re-exposing that class or more dramatic refactor. The query/batch are not mutable in the interceptor.

The interceptor still provides unique value over an Observer because it is invoked before exec() and can fail, providing an opportunity for e.g. load shedding.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just written a basic test to confirm that requests are immutable, and I'm able to mutate them in an interceptor:

type simpleInterceptor struct {
	logger StructuredLogger
}

func (s *simpleInterceptor) Intercept(ctx context.Context, attempt QueryAttempt, handler QueryAttemptHandler) (*Iter, error) {
	switch attempt.Statement.Statement().(type) {
	case *Query:
		query := attempt.Statement.Statement().(*Query)
		s.logger.Info("Intercepting query", NewLogFieldString("statement", query.Statement()))
		query.Consistency(One)
	default:
	}
	return handler(ctx)
}

func Test_QueryObjectImmutabilityInInterceptor(t *testing.T) {
	interceptor := &simpleInterceptor{logger: NewLogger(LogLevelInfo)}
	session := createSession(t, func(config *ClusterConfig) {
		config.ExecAttemptInterceptor = interceptor
	})
	defer session.Close()

	expectedConsistency := Quorum
	query := session.Query("SELECT host_id FROM system.local").Consistency(expectedConsistency)

	var hostID string
	err := query.Scan(&hostID)
	if err != nil {
		t.Fatalf("Failed to scan host ID: %v", err)
	}

	require.Equal(t, expectedConsistency.String(), query.GetConsistency().String(), "Query object should not be mutated by the interceptor")
}

Output:

2026/05/05 11:28:35 logger.go:165: INF gocql: Intercepting query statement=DROP KEYSPACE IF EXISTS gocql_test
2026/05/05 11:28:37 logger.go:165: INF gocql: Intercepting query statement=CREATE KEYSPACE gocql_test
	WITH replication = {
		'class' : 'SimpleStrategy',
		'replication_factor' : 1
	}
2026/05/05 11:28:39 logger.go:165: INF gocql: Intercepting query statement=SELECT host_id FROM system.local
--- FAIL: Test_QueryObjectImmutabilityInInterceptor (3.73s)
    /home/worry/projects/go/github.com/worryg0d/gocql/integration_test.go:1013: 
        	Error Trace:	/home/worry/projects/go/github.com/worryg0d/gocql/integration_test.go:1013
        	Error:      	Not equal: 
        	            	expected: "QUORUM"
        	            	actual  : "ONE"
        	            	
        	            	Diff:
        	            	--- Expected
        	            	+++ Actual
        	            	@@ -1 +1 @@
        	            	-QUORUM
        	            	+ONE
        	Test:       	Test_QueryObjectImmutabilityInInterceptor
        	Messages:   	Query object should not be mutated by the interceptor
FAIL
FAIL	github.com/apache/cassandra-gocql-driver/v2	3.736s

@coxley
Copy link
Copy Markdown

coxley commented Apr 29, 2026

It would be nice if there was a way to determine whether an observed query/attempt was itself triggered by speculative execution vs. retry policy. There's no reliable way to determine that today which makes it hard to observe behavior changes at scale in relation to tweaking one or the other.

…ments.

The interceptor class is called before every exec call, not just queries. The new class name reflects this.

Also, notably, the `ExecAttemptInterceptor` cannot mutate the query/batch -- it is downstream of the creation of the `internalRequest`, and that class is package-private and therefore cannot be passed into `Intercept()` directly. This commit clarifies this fact in comments. Making the query mutable would require more invasive refactoring and would violate immutability invariants intentionally built into the v2 interface.
@abenn135 abenn135 changed the title CASSGO-39 Add query attempt interceptor (round 3) CASSGO-39 Add exec attempt interceptor (round 3) May 1, 2026
@abenn135
Copy link
Copy Markdown
Author

abenn135 commented May 1, 2026

It would be nice if there was a way to determine whether an observed query/attempt was itself triggered by speculative execution vs. retry policy. There's no reliable way to determine that today which makes it hard to observe behavior changes at scale in relation to tweaking one or the other.

Sure, we could modify QueryAttempt to pass that information in. What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

CASSGO-39 Add query attempt interceptor

4 participants