diff --git a/src/Pulsar.Client/Pulsar.Client.fsproj b/src/Pulsar.Client/Pulsar.Client.fsproj
index 582f06b4..5a09d41d 100644
--- a/src/Pulsar.Client/Pulsar.Client.fsproj
+++ b/src/Pulsar.Client/Pulsar.Client.fsproj
@@ -7,17 +7,17 @@
Pulsar.Client
Pulsar.Client
Pulsar.Client
- 3.15.1
+ 3.15.2
F# community
.NET client library for Apache Pulsar
https://github.com/fsprojects/pulsar-client-dotnet
- Auto cluster failover refactoring
+ Fixed date representation for JSON schema
MIT
https://github.com/fsprojects/pulsar-client-dotnet
git
Apache;Pulsar;F#;FSharp
Vladimir Shchur and contributors
- 3.15.1
+ 3.15.2
portable
true
README.md
diff --git a/src/Pulsar.Client/Schema/JsonSchema.fs b/src/Pulsar.Client/Schema/JsonSchema.fs
index c6028cde..bcdcbb6f 100644
--- a/src/Pulsar.Client/Schema/JsonSchema.fs
+++ b/src/Pulsar.Client/Schema/JsonSchema.fs
@@ -2,18 +2,39 @@ namespace Pulsar.Client.Schema
open System
open System.Collections.Generic
-open System.Dynamic
open System.Text
open System.Text.Json
+open System.Text.Json.Serialization
open Avro
open Pulsar.Client.Api
open AvroSchemaGenerator
open Pulsar.Client.Common
+type internal DateTimeTimestampMillisConverter() =
+ inherit JsonConverter()
+
+ override this.Read(reader: byref, _, _) =
+ match reader.TokenType with
+ | JsonTokenType.Number -> // timestamp in milliseconds since Unix epoch
+ reader.GetInt64()
+ |> DateTimeOffset.FromUnixTimeMilliseconds
+ |> _.UtcDateTime
+ | JsonTokenType.String -> // ISO 8601 string
+ reader.GetDateTime()
+ | _ ->
+ raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}."
+
+ override this.Write(writer: Utf8JsonWriter, value: DateTime, _) =
+ value.ToUniversalTime()
+ |> DateTimeOffset
+ |> _.ToUnixTimeMilliseconds()
+ |> writer.WriteNumberValue
+
type internal JsonSchema<'T> () =
inherit ISchema<'T>()
let parameterIsClass = typeof<'T>.IsClass
- let options = JsonSerializerOptions(IgnoreNullValues = true)
+ let options = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)
+ do options.Converters.Add(DateTimeTimestampMillisConverter())
let stringSchema = typeof<'T>.GetSchema()
override this.SchemaInfo = { Name = ""; Type = SchemaType.JSON; Schema = stringSchema |> Encoding.UTF8.GetBytes; Properties = Map.empty }
override this.Encode value =
@@ -25,7 +46,7 @@ type internal JsonSchema<'T> () =
type internal GenericJsonSchema (topicSchema: TopicSchema) =
inherit ISchema()
- let dynamicSerializerOptions = JsonSerializerOptions(IgnoreNullValues = true)
+ let dynamicSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)
do dynamicSerializerOptions.Converters.Add <| DynamicJsonConverter()
let stringSchema = topicSchema.SchemaInfo.Schema |> Encoding.UTF8.GetString
let avroSchema = Schema.Parse(stringSchema) :?> RecordSchema
@@ -41,10 +62,10 @@ type internal GenericJsonSchema (topicSchema: TopicSchema) =
let doc = JsonSerializer.Deserialize>(ReadOnlySpan bytes, dynamicSerializerOptions)
let fields =
schemaFields
- |> Seq.map (fun sf -> { Name = sf.Name; Value = doc.[sf.Name]; Index = sf.Pos })
+ |> Seq.map (fun sf -> { Name = sf.Name; Value = doc[sf.Name]; Index = sf.Pos })
|> Seq.toArray
- let scemaVersionBytes =
- topicSchema.SchemaVersion
- |> Option.map _.Bytes
- |> Option.toObj
- GenericRecord(scemaVersionBytes, fields)
\ No newline at end of file
+ let schemaVersionBytes =
+ match topicSchema.SchemaVersion with
+ | Some v -> v.Bytes
+ | None -> null
+ GenericRecord(schemaVersionBytes, fields)
diff --git a/tests/IntegrationTests/Schema.fs b/tests/IntegrationTests/Schema.fs
index 19f2b4e0..fbb70c14 100644
--- a/tests/IntegrationTests/Schema.fs
+++ b/tests/IntegrationTests/Schema.fs
@@ -111,7 +111,13 @@ let tests =
.SubscriptionName("test-subscription")
.SubscribeAsync()
- let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = DateTime.UtcNow }
+ // JSON schema encodes DateTime as timestamp-millis, so keep the
+ // expected value at the same precision used by the wire payload.
+ let inputDate =
+ DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
+ |> DateTimeOffset.FromUnixTimeMilliseconds
+ |> _.UtcDateTime
+ let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = inputDate }
let! _ = producer.SendAsync(input)
let! (msg : Message) = consumer.ReceiveAsync()
@@ -556,4 +562,4 @@ let tests =
Log.Debug("Finished Auto consume with multi-version schema")
}
]
-
\ No newline at end of file
+
diff --git a/tests/UnitTests/Internal/SchemaTests.fs b/tests/UnitTests/Internal/SchemaTests.fs
index 93caa2b3..64ae75ff 100644
--- a/tests/UnitTests/Internal/SchemaTests.fs
+++ b/tests/UnitTests/Internal/SchemaTests.fs
@@ -3,6 +3,8 @@ module Pulsar.Client.UnitTests.Internal.SchemaTests
open System
open System.Collections.Generic
open System.Diagnostics
+open System.Text
+open System.Text.Json
open AvroGenerated
open Expecto
open Expecto.Flip
@@ -26,6 +28,9 @@ type ProtobufSchemaTest = {
[]
type AvroSchemaTest = { X: string; Y: ResizeArray }
+[]
+type DateTimeSchemaTest = { OccurredAt: DateTime }
+
[]
[]
type ProtobufNativeSchemaTest = {
@@ -193,6 +198,60 @@ let tests =
Expect.equal "" input.X output.X
Expect.sequenceEqual "" input.Y output.Y
}
+
+ test "JSON schema DateTime encoding works for all kinds of DateTime values" {
+ let schema = Schema.JSON()
+ let inputs = [
+ DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L)
+ DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Unspecified).AddTicks(715180L)
+ DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L)
+ ]
+
+ for input in inputs do
+ let payload = schema.Encode({ OccurredAt = input })
+ use doc = JsonDocument.Parse(payload)
+ let occuredAtProperty = doc.RootElement.GetProperty("OccurredAt")
+ Expect.equal "" JsonValueKind.Number occuredAtProperty.ValueKind
+
+ let timestamp = occuredAtProperty.GetInt64()
+ let expected =
+ input
+ |> DateTimeOffset
+ |> _.ToUnixTimeMilliseconds()
+ Expect.equal "" expected timestamp
+ }
+
+ test "JSON schema DateTime decoding accepts numeric timestamp value" {
+ let schema = Schema.JSON()
+ let expected = DateTimeOffset.FromUnixTimeMilliseconds(1776666603071L).UtcDateTime
+ let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":1776666603071}""")
+
+ let output = schema.Decode(payload)
+
+ Expect.equal "" expected output.OccurredAt
+ }
+
+ test "JSON schema DateTime decoding accepts legacy string timestamp value" {
+ let schema = Schema.JSON()
+ let expected = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L)
+ let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":"2026-04-20T06:30:03.071518Z"}""")
+
+ let output = schema.Decode(payload)
+
+ Expect.equal "" expected output.OccurredAt
+ }
+
+ test "Avro schema DateTime encoding writes numeric timestamp value" {
+ let schema = Schema.AVRO()
+ let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) }
+ use stream = new IO.MemoryStream(schema.Encode(input))
+ let decoder = Avro.IO.BinaryDecoder(stream)
+ let unionBranch = decoder.ReadLong()
+ let timestamp = decoder.ReadLong()
+
+ Expect.equal "" 1L unionBranch
+ Expect.equal "" (DateTimeOffset(input.OccurredAt).ToUnixTimeMilliseconds()) timestamp
+ }
test "Protobuf native" {
let inputs = [{ ProtobufNativeSchemaTest.foo = "X1"; bar = 1.0; time = DateTime.Now}]