Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Pulsar.Client.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
<Title>Pulsar.Client</Title>
<RootNamespace>Pulsar.Client</RootNamespace>
<AssemblyName>Pulsar.Client</AssemblyName>
<Version>3.15.1</Version>
<Version>3.15.2</Version>
<Company>F# community</Company>
<Description>.NET client library for Apache Pulsar</Description>
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
<PackageReleaseNotes>Auto cluster failover refactoring</PackageReleaseNotes>
<PackageReleaseNotes>Fixed date representation for JSON schema</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>Apache;Pulsar;F#;FSharp</PackageTags>
<Authors>Vladimir Shchur and contributors</Authors>
<PackageVersion>3.15.1</PackageVersion>
<PackageVersion>3.15.2</PackageVersion>
<DebugType>portable</DebugType>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down
39 changes: 30 additions & 9 deletions src/Pulsar.Client/Schema/JsonSchema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,39 @@ namespace Pulsar.Client.Schema

open System
open System.Collections.Generic
open System.Dynamic
open System.Text
open System.Text.Json
open System.Text.Json.Serialization
open Avro
open Pulsar.Client.Api
open AvroSchemaGenerator
open Pulsar.Client.Common

type internal DateTimeTimestampMillisConverter() =
inherit JsonConverter<DateTime>()

override this.Read(reader: byref<Utf8JsonReader>, _, _) =
match reader.TokenType with
| JsonTokenType.Number -> // timestamp in milliseconds since Unix epoch
reader.GetInt64()
|> DateTimeOffset.FromUnixTimeMilliseconds
|> _.UtcDateTime
| JsonTokenType.String -> // ISO 8601 string
reader.GetDateTime()
Comment thread
Lanayx marked this conversation as resolved.
| _ ->
raise <| JsonException $"Unexpected token parsing DateTime. Expected Number or String, got {reader.TokenType}."

override this.Write(writer: Utf8JsonWriter, value: DateTime, _) =
value.ToUniversalTime()
|> DateTimeOffset
|> _.ToUnixTimeMilliseconds()
|> writer.WriteNumberValue

type internal JsonSchema<'T> () =
inherit ISchema<'T>()
let parameterIsClass = typeof<'T>.IsClass
let options = JsonSerializerOptions(IgnoreNullValues = true)
let options = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)
do options.Converters.Add(DateTimeTimestampMillisConverter())
let stringSchema = typeof<'T>.GetSchema()
override this.SchemaInfo = { Name = ""; Type = SchemaType.JSON; Schema = stringSchema |> Encoding.UTF8.GetBytes; Properties = Map.empty }
override this.Encode value =
Expand All @@ -25,7 +46,7 @@ type internal JsonSchema<'T> () =

type internal GenericJsonSchema (topicSchema: TopicSchema) =
inherit ISchema<GenericRecord>()
let dynamicSerializerOptions = JsonSerializerOptions(IgnoreNullValues = true)
let dynamicSerializerOptions = JsonSerializerOptions(DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull)
do dynamicSerializerOptions.Converters.Add <| DynamicJsonConverter()
let stringSchema = topicSchema.SchemaInfo.Schema |> Encoding.UTF8.GetString
let avroSchema = Schema.Parse(stringSchema) :?> RecordSchema
Expand All @@ -41,10 +62,10 @@ type internal GenericJsonSchema (topicSchema: TopicSchema) =
let doc = JsonSerializer.Deserialize<IDictionary<string, obj>>(ReadOnlySpan bytes, dynamicSerializerOptions)
let fields =
schemaFields
|> Seq.map (fun sf -> { Name = sf.Name; Value = doc.[sf.Name]; Index = sf.Pos })
|> Seq.map (fun sf -> { Name = sf.Name; Value = doc[sf.Name]; Index = sf.Pos })
|> Seq.toArray
let scemaVersionBytes =
topicSchema.SchemaVersion
|> Option.map _.Bytes
|> Option.toObj
GenericRecord(scemaVersionBytes, fields)
let schemaVersionBytes =
match topicSchema.SchemaVersion with
| Some v -> v.Bytes
| None -> null
GenericRecord(schemaVersionBytes, fields)
10 changes: 8 additions & 2 deletions tests/IntegrationTests/Schema.fs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,13 @@ let tests =
.SubscriptionName("test-subscription")
.SubscribeAsync()

let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = DateTime.UtcNow }
// JSON schema encodes DateTime as timestamp-millis, so keep the
// expected value at the same precision used by the wire payload.
let inputDate =
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds()
|> DateTimeOffset.FromUnixTimeMilliseconds
|> _.UtcDateTime
let input = { SimpleRecord3.Name = "abc"; Age = 20; Date = inputDate }
let! _ = producer.SendAsync(input)

let! (msg : Message<SimpleRecord3>) = consumer.ReceiveAsync()
Expand Down Expand Up @@ -556,4 +562,4 @@ let tests =
Log.Debug("Finished Auto consume with multi-version schema")
}
]


59 changes: 59 additions & 0 deletions tests/UnitTests/Internal/SchemaTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module Pulsar.Client.UnitTests.Internal.SchemaTests
open System
open System.Collections.Generic
open System.Diagnostics
open System.Text
open System.Text.Json
open AvroGenerated
open Expecto
open Expecto.Flip
Expand All @@ -26,6 +28,9 @@ type ProtobufSchemaTest = {
[<CLIMutable>]
type AvroSchemaTest = { X: string; Y: ResizeArray<int> }

[<CLIMutable>]
type DateTimeSchemaTest = { OccurredAt: DateTime }

[<CLIMutable>]
[<ProtoContract>]
type ProtobufNativeSchemaTest = {
Expand Down Expand Up @@ -193,6 +198,60 @@ let tests =
Expect.equal "" input.X output.X
Expect.sequenceEqual "" input.Y output.Y
}

test "JSON schema DateTime encoding works for all kinds of DateTime values" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let inputs = [
DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Local).AddTicks(715180L)
DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Unspecified).AddTicks(715180L)
DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L)
]

for input in inputs do
let payload = schema.Encode({ OccurredAt = input })
use doc = JsonDocument.Parse(payload)
let occuredAtProperty = doc.RootElement.GetProperty("OccurredAt")
Expect.equal "" JsonValueKind.Number occuredAtProperty.ValueKind

let timestamp = occuredAtProperty.GetInt64()
let expected =
input
|> DateTimeOffset
|> _.ToUnixTimeMilliseconds()
Expect.equal "" expected timestamp
}

test "JSON schema DateTime decoding accepts numeric timestamp value" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let expected = DateTimeOffset.FromUnixTimeMilliseconds(1776666603071L).UtcDateTime
let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":1776666603071}""")

let output = schema.Decode(payload)

Expect.equal "" expected output.OccurredAt
}

test "JSON schema DateTime decoding accepts legacy string timestamp value" {
let schema = Schema.JSON<DateTimeSchemaTest>()
let expected = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L)
let payload = Encoding.UTF8.GetBytes("""{"OccurredAt":"2026-04-20T06:30:03.071518Z"}""")

let output = schema.Decode(payload)

Expect.equal "" expected output.OccurredAt
}

test "Avro schema DateTime encoding writes numeric timestamp value" {
let schema = Schema.AVRO<DateTimeSchemaTest>()
let input = { OccurredAt = DateTime(2026, 4, 20, 6, 30, 3, DateTimeKind.Utc).AddTicks(715180L) }
use stream = new IO.MemoryStream(schema.Encode(input))
let decoder = Avro.IO.BinaryDecoder(stream)
let unionBranch = decoder.ReadLong()
let timestamp = decoder.ReadLong()

Expect.equal "" 1L unionBranch
Expect.equal "" (DateTimeOffset(input.OccurredAt).ToUnixTimeMilliseconds()) timestamp
}

test "Protobuf native" {
let inputs = [{ ProtobufNativeSchemaTest.foo = "X1"; bar = 1.0; time = DateTime.Now}]
Expand Down
Loading