diff --git a/src/Pulsar.Client/Pulsar.Client.fsproj b/src/Pulsar.Client/Pulsar.Client.fsproj index 582f06b4..5a09d41d 100644 --- a/src/Pulsar.Client/Pulsar.Client.fsproj +++ b/src/Pulsar.Client/Pulsar.Client.fsproj @@ -7,17 +7,17 @@ Pulsar.Client Pulsar.Client Pulsar.Client - 3.15.1 + 3.15.2 F# community .NET client library for Apache Pulsar https://github.com/fsprojects/pulsar-client-dotnet - Auto cluster failover refactoring + Fixed date representation for JSON schema MIT https://github.com/fsprojects/pulsar-client-dotnet git Apache;Pulsar;F#;FSharp Vladimir Shchur and contributors - 3.15.1 + 3.15.2 portable true README.md diff --git a/src/Pulsar.Client/Schema/JsonSchema.fs b/src/Pulsar.Client/Schema/JsonSchema.fs index c6028cde..bcdcbb6f 100644 --- a/src/Pulsar.Client/Schema/JsonSchema.fs +++ b/src/Pulsar.Client/Schema/JsonSchema.fs @@ -2,18 +2,39 @@ namespace Pulsar.Client.Schema open System open System.Collections.Generic -open System.Dynamic open System.Text open System.Text.Json +open System.Text.Json.Serialization open Avro open Pulsar.Client.Api open AvroSchemaGenerator open Pulsar.Client.Common +type internal DateTimeTimestampMillisConverter() = + inherit JsonConverter() + + override this.Read(reader: byref, _, _) = + match reader.TokenType with + | JsonTokenType.Number -> // timestamp in milliseconds since Unix epoch + reader.GetInt64() + |> DateTimeOffset.FromUnixTimeMilliseconds + |> _.UtcDateTime + | JsonTokenType.String -> // ISO 8601 string + reader.GetDateTime() + | _ -> + raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}." + + override this.Write(writer: Utf8JsonWriter, value: DateTime, _) = + value.ToUniversalTime() + |> DateTimeOffset + |> _.ToUnixTimeMilliseconds() + |> writer.WriteNumberValue + type internal JsonSchema<'T> () = inherit ISchema<'T>() let parameterIsClass = typeof<'T>.IsClass - let options = JsonSerializerOptions(IgnoreNullValues = true) + let options = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull) + do options.Converters.Add(DateTimeTimestampMillisConverter()) let stringSchema = typeof<'T>.GetSchema() override this.SchemaInfo = { Name = ""; Type = SchemaType.JSON; Schema = stringSchema |> Encoding.UTF8.GetBytes; Properties = Map.empty } override this.Encode value = @@ -25,7 +46,7 @@ type internal JsonSchema<'T> () = type internal GenericJsonSchema (topicSchema: TopicSchema) = inherit ISchema() - let dynamicSerializerOptions = JsonSerializerOptions(IgnoreNullValues = true) + let dynamicSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull) do dynamicSerializerOptions.Converters.Add <| DynamicJsonConverter() let stringSchema = topicSchema.SchemaInfo.Schema |> Encoding.UTF8.GetString let avroSchema = Schema.Parse(stringSchema) :?> RecordSchema @@ -41,10 +62,10 @@ type internal GenericJsonSchema (topicSchema: TopicSchema) = let doc = JsonSerializer.Deserialize>(ReadOnlySpan bytes, dynamicSerializerOptions) let fields = schemaFields - |> Seq.map (fun sf -> { Name = sf.Name; Value = doc.[sf.Name]; Index = sf.Pos }) + |> Seq.map (fun sf -> { Name = sf.Name; Value = doc[sf.Name]; Index = sf.Pos }) |> Seq.toArray - let scemaVersionBytes = - topicSchema.SchemaVersion - |> Option.map _.Bytes - |> Option.toObj - GenericRecord(scemaVersionBytes, fields) \ No newline at end of file + let schemaVersionBytes = + match topicSchema.SchemaVersion with + | Some v -> v.Bytes + | None -> null + GenericRecord(schemaVersionBytes, fields) diff --git a/tests/IntegrationTests/Schema.fs b/tests/IntegrationTests/Schema.fs index 19f2b4e0..fbb70c14 100644 --- a/tests/IntegrationTests/Schema.fs +++ b/tests/IntegrationTests/Schema.fs @@ -111,7 +111,13 @@ let tests = .SubscriptionName("test-subscription") .SubscribeAsync() - let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = DateTime.UtcNow } + // JSON schema encodes DateTime as timestamp-millis, so keep the + // expected value at the same precision used by the wire payload. + let inputDate = + DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + |> DateTimeOffset.FromUnixTimeMilliseconds + |> _.UtcDateTime + let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = inputDate } let! _ = producer.SendAsync(input) let! (msg : Message) = consumer.ReceiveAsync() @@ -556,4 +562,4 @@ let tests = Log.Debug("Finished Auto consume with multi-version schema") } ] - \ No newline at end of file + diff --git a/tests/UnitTests/Internal/SchemaTests.fs b/tests/UnitTests/Internal/SchemaTests.fs index 93caa2b3..64ae75ff 100644 --- a/tests/UnitTests/Internal/SchemaTests.fs +++ b/tests/UnitTests/Internal/SchemaTests.fs @@ -3,6 +3,8 @@ module Pulsar.Client.UnitTests.Internal.SchemaTests open System open System.Collections.Generic open System.Diagnostics +open System.Text +open System.Text.Json open AvroGenerated open Expecto open Expecto.Flip @@ -26,6 +28,9 @@ type ProtobufSchemaTest = { [] type AvroSchemaTest = { X: string; Y: ResizeArray } +[] +type DateTimeSchemaTest = { OccurredAt: DateTime } + [] [] type ProtobufNativeSchemaTest = { @@ -193,6 +198,60 @@ let tests = Expect.equal "" input.X output.X Expect.sequenceEqual "" input.Y output.Y } + + test "JSON schema DateTime encoding works for all kinds of DateTime values" { + let schema = Schema.JSON() + let inputs = [ + DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L) + DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Unspecified).AddTicks(715180L) + DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) + ] + + for input in inputs do + let payload = schema.Encode({ OccurredAt = input }) + use doc = JsonDocument.Parse(payload) + let occuredAtProperty = doc.RootElement.GetProperty("OccurredAt") + Expect.equal "" JsonValueKind.Number occuredAtProperty.ValueKind + + let timestamp = occuredAtProperty.GetInt64() + let expected = + input + |> DateTimeOffset + |> _.ToUnixTimeMilliseconds() + Expect.equal "" expected timestamp + } + + test "JSON schema DateTime decoding accepts numeric timestamp value" { + let schema = Schema.JSON() + let expected = DateTimeOffset.FromUnixTimeMilliseconds(1776666603071L).UtcDateTime + let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":1776666603071}""") + + let output = schema.Decode(payload) + + Expect.equal "" expected output.OccurredAt + } + + test "JSON schema DateTime decoding accepts legacy string timestamp value" { + let schema = Schema.JSON() + let expected = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) + let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":"2026-04-20T06:30:03.071518Z"}""") + + let output = schema.Decode(payload) + + Expect.equal "" expected output.OccurredAt + } + + test "Avro schema DateTime encoding writes numeric timestamp value" { + let schema = Schema.AVRO() + let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) } + use stream = new IO.MemoryStream(schema.Encode(input)) + let decoder = Avro.IO.BinaryDecoder(stream) + let unionBranch = decoder.ReadLong() + let timestamp = decoder.ReadLong() + + Expect.equal "" 1L unionBranch + Expect.equal "" (DateTimeOffset(input.OccurredAt).ToUnixTimeMilliseconds()) timestamp + } test "Protobuf native" { let inputs = [{ ProtobufNativeSchemaTest.foo = "X1"; bar = 1.0; time = DateTime.Now}]