Skip to content

Commit d8a7dfc

Browse files
authored
Add MockAzureSchemaRegistryClient (#9322)
1 parent 72111ec commit d8a7dfc

21 files changed

Lines changed: 545 additions & 100 deletions

File tree

docs/Changelog.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,7 @@ description: Stay informed with detailed changelogs covering new features, impro
360360
Can be configured by implementing TypingConfigurationProvider and registering it as SPI.
361361
By default, the existing permissive behavior is preserved.
362362
* [#9286](https://github.com/TouK/nussknacker/pull/9286) Fixed handling of Avro records in Flink SQL component.
363+
* [#9322](https://github.com/TouK/nussknacker/pull/9322) Added mockable Azure Schema Registry client
363364

364365
## 1.18
365366

engine/lite/components/kafka-tests/src/test/scala/pl/touk/nussknacker/engine/lite/components/AzureSchemaRegistryKafkaAvroTest.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,8 @@ import pl.touk.nussknacker.engine.schemedkafka.KafkaUniversalComponentTransforme
2828
topicParamName
2929
}
3030
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{SchemaId, SchemaVersionOption}
31-
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.{
32-
AzureSchemaRegistryClientFactory,
33-
AzureUtils,
34-
SchemaNameTopicMatchStrategy
35-
}
31+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.{AzureUtils, SchemaNameTopicMatchStrategy}
32+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.client.AzureSchemaRegistryClientFactory
3633
import pl.touk.nussknacker.engine.spel.SpelExtension._
3734
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner
3835
import pl.touk.nussknacker.engine.util.test.TestScenarioRunner.RunnerListResult

utils/kafka-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/KafkaComponentsConfig.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ import scala.concurrent.duration._
1111

1212
case class SchemaRegistryClientKafkaConfig(
1313
kafkaProperties: Map[String, String],
14-
cacheConfig: SchemaRegistryCacheConfig,
15-
avroAsJsonSerialization: Option[Boolean]
14+
cacheConfig: SchemaRegistryCacheConfig = SchemaRegistryCacheConfig(),
15+
avroAsJsonSerialization: Option[Boolean] = None
1616
)
1717

1818
case class KafkaComponentsConfig(

utils/lite-components-testkit/src/main/scala/pl/touk/nussknacker/engine/lite/util/test/LiteKafkaTestScenarioRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
2626
SchemaRegistryClientFactoryWithRegistration,
2727
SchemaRegistryClientWithRegistration
2828
}
29-
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.AzureSchemaRegistryClient
29+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.client.AzureSchemaRegistryClient
3030
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.ConfluentUtils
3131
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.confluent.client.{
3232
ConfluentSchemaRegistryClient,

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/SchemaRegistryClient.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@ trait SchemaRegistryClient {
1717

1818
/**
1919
* Latest fresh schema by topic - it should be always fresh schema
20-
*
21-
* @param topic
22-
* @param isKey
23-
* @return
2420
*/
2521
protected def getLatestFreshSchema(
2622
topic: UnspecializedTopicName,
@@ -53,9 +49,9 @@ trait SchemaRegistryClient {
5349

5450
}
5551

56-
object EmptySchemaRegistry extends SchemaRegistryClient {
52+
object EmptySchemaRegistry extends SchemaRegistryClientWithRegistration {
5753

58-
private val errorMessage = "There is no schema in empty schema registry";
54+
private val errorMessage = "There is no schema in empty schema registry"
5955
private val error = SchemaError(errorMessage)
6056

6157
override def getSchemaById(id: SchemaId): SchemaWithMetadata = throw new IllegalStateException(errorMessage)
@@ -78,6 +74,9 @@ object EmptySchemaRegistry extends SchemaRegistryClient {
7874
isKey: Boolean
7975
): Validated[SchemaRegistryError, List[Integer]] = Validated.Invalid(error)
8076

77+
override def registerSchema(topic: UnspecializedTopicName, isKey: Boolean, schema: ParsedSchema): SchemaId =
78+
throw new IllegalStateException(errorMessage)
79+
8180
}
8281

8382
// This trait is mainly for testing mechanism purpose - in production implementation we assume that all schemas
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.client
2+
3+
import cats.data.Validated
4+
import cats.data.Validated.{Invalid, Valid}
5+
import com.azure.data.schemaregistry.models.SchemaProperties
6+
import com.typesafe.scalalogging.LazyLogging
7+
import io.confluent.kafka.schemaregistry.ParsedSchema
8+
import io.confluent.kafka.schemaregistry.avro.AvroSchema
9+
import org.apache.avro.Schema
10+
import pl.touk.nussknacker.engine.kafka.UnspecializedTopicName
11+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
12+
SchemaId,
13+
SchemaRegistryClientWithRegistration,
14+
SchemaRegistryError,
15+
SchemaTopicError
16+
}
17+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.SchemaNameTopicMatchStrategy
18+
19+
trait AzureSchemaRegistryClient extends SchemaRegistryClientWithRegistration with LazyLogging {
20+
21+
protected def checkAvroSchema(schema: ParsedSchema): AvroSchema = {
22+
schema match {
23+
case avroSchema: AvroSchema => avroSchema
24+
case _ => throw new IllegalArgumentException("Currently only avro schema is supported for Azure implementation")
25+
}
26+
}
27+
28+
protected def getOneMatchingSchemaName(
29+
topicName: UnspecializedTopicName,
30+
isKey: Boolean
31+
): Validated[SchemaRegistryError, String] = {
32+
getAllFullSchemaNames.andThen { fullSchemaNames =>
33+
val matchingFullSchemaNames = SchemaNameTopicMatchStrategy.getMatchingSchemas(topicName, fullSchemaNames, isKey)
34+
matchingFullSchemaNames match {
35+
case one :: Nil =>
36+
Valid(one)
37+
case Nil =>
38+
Invalid(SchemaTopicError(s"Schema for topic: $topicName not found"))
39+
case moreThenOnce =>
40+
// We can't pick one in this case because there is no option to recognize which one is newer.
41+
// I've tried to parse schemaId to UUID but it is saved in Version 4 UUID format (random) and has no timestamp
42+
Invalid(SchemaTopicError(s"Ambiguous schemas: ${moreThenOnce.mkString(", ")} for topic: $topicName"))
43+
}
44+
}
45+
}
46+
47+
protected def getAllFullSchemaNames: Validated[SchemaRegistryError, List[String]]
48+
49+
override def registerSchema(topicName: UnspecializedTopicName, isKey: Boolean, schema: ParsedSchema): SchemaId = {
50+
val schemaNameBasedOnTopic = SchemaNameTopicMatchStrategy.schemaNameFromTopicName(topicName, isKey)
51+
val avroSchema = checkAvroSchema(schema).rawSchema()
52+
if (avroSchema.getType == Schema.Type.RECORD) {
53+
require(
54+
avroSchema.getName == schemaNameBasedOnTopic,
55+
s"Invalid record schema name ${avroSchema.getName} Should be: $schemaNameBasedOnTopic."
56+
)
57+
} else {
58+
// for primitive types we have to register schema on two names - one for listing of topics purpose and second one based on name, to be possible to serialize such object
59+
// by KafkaAvroSerializer - it just lookup into schema registry based on schema's full name. Can be tricky which one id is returned - in most cases it easy better
60+
// to return this based on name
61+
registerSchemaVersionIfNotExists(schema, Some(schemaNameBasedOnTopic))
62+
}
63+
SchemaId.fromString(registerSchemaVersionIfNotExists(schema).getId)
64+
}
65+
66+
// forceSchemaNameOpt is for special purposes like primitive schemas when there is no name
67+
def registerSchemaVersionIfNotExists(
68+
schema: ParsedSchema,
69+
forceSchemaNameOpt: Option[String] = None
70+
): SchemaProperties
71+
72+
def getSchemaIdByContent(schema: AvroSchema): SchemaId
73+
74+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.client
2+
3+
import pl.touk.nussknacker.engine.kafka._
4+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._
5+
6+
object AzureSchemaRegistryClientFactory extends SchemaRegistryClientFactoryWithRegistration {
7+
8+
override type SchemaRegistryClientT = AzureSchemaRegistryClient
9+
10+
override def create(config: SchemaRegistryClientKafkaConfig): SchemaRegistryClientT = {
11+
new DefaultAzureSchemaRegistryClient(config)
12+
}
13+
14+
}

utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/AzureSchemaRegistryClientFactory.scala renamed to utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/azure/client/DefaultAzureSchemaRegistryClient.scala

Lines changed: 30 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure
1+
package pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.client
22

33
import cats.data.Validated
44
import cats.data.Validated.{Invalid, Valid}
@@ -9,29 +9,36 @@ import com.azure.data.schemaregistry.implementation.models.SchemasGetByIdRespons
99
import com.azure.data.schemaregistry.models.{SchemaFormat, SchemaProperties, SchemaRegistrySchema}
1010
import io.confluent.kafka.schemaregistry.ParsedSchema
1111
import io.confluent.kafka.schemaregistry.avro.AvroSchema
12-
import org.apache.avro.Schema
1312
import org.apache.commons.io.IOUtils
14-
import pl.touk.nussknacker.engine.kafka._
15-
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry._
16-
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.internal._
13+
import pl.touk.nussknacker.engine.kafka.{
14+
KafkaComponentsConfig,
15+
KafkaUtils,
16+
SchemaRegistryClientKafkaConfig,
17+
UnspecializedTopicName
18+
}
19+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.{
20+
SchemaId,
21+
SchemaRegistryError,
22+
SchemaRegistryUnknownError,
23+
SchemaVersionError,
24+
SchemaWithMetadata
25+
}
26+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.SchemaNameTopicMatchStrategy
27+
import pl.touk.nussknacker.engine.schemedkafka.schemaregistry.azure.internal.{
28+
AzureConfigurationFactory,
29+
AzureHttpPipelineFactory,
30+
AzureTokenCredentialFactory,
31+
EnhancedSchemasImpl,
32+
SchemaRegistryJsonSerializer
33+
}
1734
import reactor.core.publisher.Mono
1835

1936
import java.nio.charset.StandardCharsets
20-
import scala.compat.java8.FunctionConverters._
37+
import scala.compat.java8.FunctionConverters.asJavaFunction
2138
import scala.jdk.CollectionConverters._
2239
import scala.util.control.NonFatal
2340

24-
object AzureSchemaRegistryClientFactory extends SchemaRegistryClientFactoryWithRegistration {
25-
26-
override type SchemaRegistryClientT = AzureSchemaRegistryClient
27-
28-
override def create(config: SchemaRegistryClientKafkaConfig): SchemaRegistryClientT = {
29-
new AzureSchemaRegistryClient(config)
30-
}
31-
32-
}
33-
34-
class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends SchemaRegistryClientWithRegistration {
41+
class DefaultAzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends AzureSchemaRegistryClient {
3542

3643
private val azureConfiguration = AzureConfigurationFactory.createFromKafkaProperties(config.kafkaProperties)
3744
private val credential = AzureTokenCredentialFactory.createCredential(azureConfiguration)
@@ -54,7 +61,7 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
5461
)
5562

5663
override def getSchemaById(id: SchemaId): SchemaWithMetadata = {
57-
toSchemaWithMetada(schemaRegistryClient.getSchema(id.asString))
64+
toSchemaWithMetadata(schemaRegistryClient.getSchema(id.asString))
5865
}
5966

6067
override protected def getByTopicAndVersion(
@@ -70,11 +77,11 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
7077
case NonFatal(ex) => Invalid(SchemaVersionError(ex.getMessage))
7178
}
7279
}
73-
.map(toSchemaWithMetada)
80+
.map(toSchemaWithMetadata)
7481

7582
}
7683

77-
private def toSchemaWithMetada(result: SchemaRegistrySchema) = {
84+
private def toSchemaWithMetadata(result: SchemaRegistrySchema) = {
7885
SchemaWithMetadata(new AvroSchema(result.getDefinition), SchemaId.fromString(result.getProperties.getId))
7986
}
8087

@@ -125,56 +132,17 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
125132
.map(UnspecializedTopicName.apply)
126133
}
127134

128-
private def getOneMatchingSchemaName(
129-
topicName: UnspecializedTopicName,
130-
isKey: Boolean
131-
): Validated[SchemaRegistryError, String] = {
132-
getAllFullSchemaNames.andThen { fullSchemaNames =>
133-
val matchingFullSchemaNames = SchemaNameTopicMatchStrategy.getMatchingSchemas(topicName, fullSchemaNames, isKey)
134-
matchingFullSchemaNames match {
135-
case one :: Nil =>
136-
Valid(one)
137-
case Nil =>
138-
Invalid(SchemaTopicError(s"Schema for topic: $topicName not found"))
139-
case moreThenOnce =>
140-
// We can't pick one in this case because there is no option to recognize which one is newer.
141-
// I've tried to parse schemaId to UUID but it is saved in Version 4 UUID format (random) and has no timestamp
142-
Invalid(SchemaTopicError(s"Ambiguous schemas: ${moreThenOnce.mkString(", ")} for topic: $topicName"))
143-
}
144-
}
145-
}
146-
147135
private def getVersions(fullSchemaName: String): Validated[SchemaRegistryError, List[Integer]] = {
148136
invokeBlocking(enhancedSchemasService.getVersionsWithResponseAsync(schemaGroup, fullSchemaName, _))
149137
.map(_.getValue.getSchemaVersions().asScala.toList)
150138
}
151139

152-
private def getAllFullSchemaNames: Validated[SchemaRegistryError, List[String]] = {
140+
override protected def getAllFullSchemaNames: Validated[SchemaRegistryError, List[String]] = {
153141
invokeBlocking(enhancedSchemasService.getSchemasWithResponseAsync(schemaGroup, _))
154142
.map(_.getValue.getSchemas().asScala.toList)
155143
}
156144

157-
override def registerSchema(topicName: UnspecializedTopicName, isKey: Boolean, schema: ParsedSchema): SchemaId = {
158-
val schemaNameBasedOnTopic = SchemaNameTopicMatchStrategy.schemaNameFromTopicName(topicName, isKey)
159-
val avroSchema = checkAvroSchema(schema).rawSchema()
160-
if (avroSchema.getType == Schema.Type.RECORD) {
161-
require(
162-
avroSchema.getName == schemaNameBasedOnTopic,
163-
s"Invalid record schema name ${avroSchema.getName} Should be: $schemaNameBasedOnTopic."
164-
)
165-
SchemaId.fromString(registerSchemaVersionIfNotExists(schema).getId)
166-
} else {
167-
// for primitive types we have to register schema on two names - one for listing of topics purpose and second one based on name, to be possible to serialize such object
168-
// by KafkaAvroSerializer - it just lookup into schema registry based on schema's full name. Can be tricky which one id is returned - in most cases it easy better
169-
// to return this based on name
170-
registerSchemaVersionIfNotExists(schema, Some(schemaNameBasedOnTopic))
171-
SchemaId.fromString(registerSchemaVersionIfNotExists(schema).getId)
172-
}
173-
174-
}
175-
176-
// forceSchemaNameOpt is for special purposes like primitive schemas when there is no name
177-
def registerSchemaVersionIfNotExists(
145+
override def registerSchemaVersionIfNotExists(
178146
schema: ParsedSchema,
179147
forceSchemaNameOpt: Option[String] = None
180148
): SchemaProperties = {
@@ -189,21 +157,14 @@ class AzureSchemaRegistryClient(config: SchemaRegistryClientKafkaConfig) extends
189157
}
190158
}
191159

192-
def getSchemaIdByContent(schema: AvroSchema): SchemaId = {
160+
override def getSchemaIdByContent(schema: AvroSchema): SchemaId = {
193161
SchemaId.fromString(
194162
schemaRegistryClient
195163
.getSchemaProperties(schemaGroup, schema.rawSchema().getFullName, schema.canonicalString(), SchemaFormat.AVRO)
196164
.getId
197165
)
198166
}
199167

200-
private def checkAvroSchema(schema: ParsedSchema): AvroSchema = {
201-
schema match {
202-
case avroSchema: AvroSchema => avroSchema
203-
case _ => throw new IllegalArgumentException("Currently only avro schema is supported for Azure implementation")
204-
}
205-
}
206-
207168
private def invokeBlocking[T](f: Context => Mono[T]): Validated[SchemaRegistryError, T] = {
208169
try {
209170
Valid(FluxUtil.withContext(context => f(context)).block())

0 commit comments

Comments
 (0)