Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions src/Pulsar.Client/Schema/JsonSchema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,37 @@
open System.Dynamic
open System.Text
open System.Text.Json
open System.Text.Json.Serialization
open Avro
open Pulsar.Client.Api
open AvroSchemaGenerator
open Pulsar.Client.Common

type internal DateTimeTimestampMillisConverter() =
inherit JsonConverter<DateTime>()

override this.Read(reader: byref<Utf8JsonReader>, _, _) =
match reader.TokenType with
| JsonTokenType.Number -> // timestamp in milliseconds since Unix epoch
reader.GetInt64()
|> DateTimeOffset.FromUnixTimeMilliseconds
|> _.UtcDateTime
| JsonTokenType.String -> // ISO 8601 string
reader.GetDateTime()
Comment thread
Lanayx marked this conversation as resolved.
| _ ->
raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}."

override this.Write(writer: Utf8JsonWriter, value: DateTime, _) =
value.ToUniversalTime()
|> DateTimeOffset
|> _.ToUnixTimeMilliseconds()
|> writer.WriteNumberValue

type internal JsonSchema<'T> () =
inherit ISchema<'T>()
let parameterIsClass = typeof<'T>.IsClass
let options = JsonSerializerOptions(IgnoreNullValues = true)

Check warning on line 37 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.

Check warning on line 37 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.

Check warning on line 37 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.

Check warning on line 37 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.
do options.Converters.Add(DateTimeTimestampMillisConverter())
let stringSchema = typeof<'T>.GetSchema()
override this.SchemaInfo = { Name = ""; Type = SchemaType.JSON; Schema = stringSchema |> Encoding.UTF8.GetBytes; Properties = Map.empty }
override this.Encode value =
Expand All @@ -25,7 +47,7 @@

type internal GenericJsonSchema (topicSchema: TopicSchema) =
inherit ISchema<GenericRecord>()
let dynamicSerializerOptions = JsonSerializerOptions(IgnoreNullValues = true)

Check warning on line 50 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.

Check warning on line 50 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.

Check warning on line 50 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.

Check warning on line 50 in src/Pulsar.Client/Schema/JsonSchema.fs

View workflow job for this annotation

GitHub Actions / build

This construct is deprecated. JsonSerializerOptions.IgnoreNullValues is obsolete. To ignore null values when serializing, set DefaultIgnoreCondition to JsonIgnoreCondition.WhenWritingNull.
do dynamicSerializerOptions.Converters.Add <| DynamicJsonConverter()
let stringSchema = topicSchema.SchemaInfo.Schema |> Encoding.UTF8.GetString
let avroSchema = Schema.Parse(stringSchema) :?> RecordSchema
Expand Down
10 changes: 8 additions & 2 deletions tests/IntegrationTests/Schema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ let tests =
.SubscriptionName("test-subscription")
.SubscribeAsync()

let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = DateTime.UtcNow }
// JSON schema encodes DateTime as timestamp-millis, so keep the
// expected value at the same precision used by the wire payload.
let inputDate =
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
|> DateTimeOffset.FromUnixTimeMilliseconds
|> _.UtcDateTime
let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = inputDate }
let! _ = producer.SendAsync(input)

let! (msg : Message<SimpleRecord3>) = consumer.ReceiveAsync()
Expand Down Expand Up @@ -556,4 +562,4 @@ let tests =
Log.Debug("Finished Auto consume with multi-version schema")
}
]


65 changes: 65 additions & 0 deletions tests/UnitTests/Internal/SchemaTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module Pulsar.Client.UnitTests.Internal.SchemaTests
open System
open System.Collections.Generic
open System.Diagnostics
open System.Text
open System.Text.Json
open AvroGenerated
open Expecto
open Expecto.Flip
Expand All @@ -26,6 +28,9 @@ type ProtobufSchemaTest = {
[<CLIMutable>]
type AvroSchemaTest = { X: string; Y: ResizeArray<int> }

[<CLIMutable>]
type DateTimeSchemaTest = { OccurredAt: DateTime }

[<CLIMutable>]
[<ProtoContract>]
type ProtobufNativeSchemaTest = {
Expand Down Expand Up @@ -193,6 +198,66 @@ let tests =
Expect.equal "" input.X output.X
Expect.sequenceEqual "" input.Y output.Y
}

test "JSON schema DateTime encoding writes numeric timestamp value" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) }
let payload = schema.Encode(input)
use doc = JsonDocument.Parse(payload)

Expect.equal "" JsonValueKind.Number (doc.RootElement.GetProperty("OccurredAt").ValueKind)
Comment thread
Lanayx marked this conversation as resolved.
Outdated
}

test "JSON schema DateTime encoding treats non-UTC values as UTC" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let inputs = [
DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L)
DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Unspecified).AddTicks(715180L)
]

for input in inputs do
let payload = schema.Encode({ OccurredAt = input })
use doc = JsonDocument.Parse(payload)
let timestamp = doc.RootElement.GetProperty("OccurredAt").GetInt64()
let expected =
DateTime.SpecifyKind(input, DateTimeKind.Utc)
|> DateTimeOffset
|> _.ToUnixTimeMilliseconds()

Expect.equal "" expected timestamp
}

test "JSON schema DateTime decoding accepts numeric timestamp value" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let expected = DateTimeOffset.FromUnixTimeMilliseconds(1776666603071L).UtcDateTime
let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":1776666603071}""")

let output = schema.Decode(payload)

Expect.equal "" expected output.OccurredAt
}

test "JSON schema DateTime decoding accepts legacy string timestamp value" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let expected = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L)
let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":"2026-04-20T06:30:03.071518Z"}""")

let output = schema.Decode(payload)

Expect.equal "" expected output.OccurredAt
}

test "Avro schema DateTime encoding writes numeric timestamp value" {
let schema = Schema.AVRO<DateTimeSchemaTest>()
let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) }
use stream = new IO.MemoryStream(schema.Encode(input))
let decoder = Avro.IO.BinaryDecoder(stream)
let unionBranch = decoder.ReadLong()
let timestamp = decoder.ReadLong()

Expect.equal "" 1L unionBranch
Expect.equal "" (DateTimeOffset(input.OccurredAt).ToUnixTimeMilliseconds()) timestamp
}

test "Protobuf native" {
let inputs = [{ ProtobufNativeSchemaTest.foo = "X1"; bar = 1.0; time = DateTime.Now}]
Expand Down
Loading