diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index 30bcaabd5..b7e0c2623 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -1,12 +1,20 @@ module Samples.Infrastructure.Services open Domain +open FsCodec +open FsCodec.SystemTextJson.Serialization open Microsoft.Extensions.DependencyInjection open System +open System.Text.Json + +[] +type StreamCodec<'event, 'context> = + | JsonElementCodec of IEventCodec<'event, JsonElement, 'context> + | Utf8ArrayCodec of IEventCodec<'event, byte[], 'context> type StreamResolver(storage) = - member __.Resolve - ( codec : FsCodec.IEventCodec<'event,byte[],_>, + member __.ResolveWithJsonElementCodec + ( codec : IEventCodec<'event, JsonElement, _>, fold: ('state -> 'event seq -> 'state), initial: 'state, snapshot: (('event -> bool) * ('state -> 'event))) = @@ -15,6 +23,14 @@ type StreamResolver(storage) = let store = Equinox.Cosmos.Context(gateway, databaseId, containerId) let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot else Equinox.Cosmos.AccessStrategy.Unoptimized Equinox.Cosmos.Resolver<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve + | _ -> failwith "Currently, only Cosmos can be used with a JsonElement codec." + + member __.ResolveWithUtf8ArrayCodec + ( codec : IEventCodec<'event, byte[], _>, + fold: ('state -> 'event seq -> 'state), + initial: 'state, + snapshot: (('event -> bool) * ('state -> 'event))) = + match storage with | Storage.StorageConfig.Es (context, caching, unfolds) -> let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve @@ -23,6 +39,7 @@ type StreamResolver(storage) = | Storage.StorageConfig.Sql (context, caching, unfolds) -> let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None Equinox.SqlStreamStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve + | _ -> failwith "Only EventStore, Memory Store, and SQL Store can be used with a byte array codec." type ServiceBuilder(storageConfig, handlerLog) = let resolver = StreamResolver(storageConfig) @@ -30,17 +47,26 @@ type ServiceBuilder(storageConfig, handlerLog) = member __.CreateFavoritesService() = let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot - Backend.Favorites.Service(handlerLog, resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot)) + + match storageConfig with + | Storage.StorageConfig.Cosmos _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithJsonElementCodec(Favorites.Events.JsonElementCodec.codec JsonSerializer.defaultOptions, fold, initial, snapshot)) + | _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(Favorites.Events.Utf8ArrayCodec.codec, fold, initial, snapshot)) member __.CreateSaveForLaterService() = let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact - Backend.SavedForLater.Service(handlerLog, resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot), maxSavedItems=50) + + match storageConfig with + | Storage.StorageConfig.Cosmos _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithJsonElementCodec(SavedForLater.Events.JsonElementCodec.codec JsonSerializer.defaultOptions,fold,initial,snapshot), maxSavedItems=50) + | _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(SavedForLater.Events.Utf8ArrayCodec.codec,fold,initial,snapshot), maxSavedItems=50) member __.CreateTodosService() = let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot - TodoBackend.Service(handlerLog, resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot)) + + match storageConfig with + | Storage.StorageConfig.Cosmos _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithJsonElementCodec(TodoBackend.Events.JsonElementCodec.codec JsonSerializer.defaultOptions,fold,initial,snapshot)) + | _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(TodoBackend.Events.Utf8ArrayCodec.codec,fold,initial,snapshot)) let register (services : IServiceCollection, storageConfig, handlerLog) = let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore @@ -49,4 +75,4 @@ let register (services : IServiceCollection, storageConfig, handlerLog) = regF <| fun sp -> sp.GetService().CreateFavoritesService() regF <| fun sp -> sp.GetService().CreateSaveForLaterService() - regF <| fun sp -> sp.GetService().CreateTodosService() \ No newline at end of file + regF <| fun sp -> sp.GetService().CreateTodosService() diff --git a/samples/Store/Backend/Backend.fsproj b/samples/Store/Backend/Backend.fsproj index 8234b4a6c..fec9bcd4f 100644 --- a/samples/Store/Backend/Backend.fsproj +++ b/samples/Store/Backend/Backend.fsproj @@ -18,7 +18,7 @@ - + diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs index 348515519..bf6ce47e2 100644 --- a/samples/Store/Domain/Cart.fs +++ b/samples/Store/Domain/Cart.fs @@ -24,7 +24,34 @@ module Events = | ItemQuantityChanged of ItemQuantityChangeInfo | ItemWaiveReturnsChanged of ItemWaiveReturnsInfo interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = + fun (evt: Event) -> + match evt with + | Snapshotted state -> "Snapshotted", JsonSerializer.SerializeToElement(state, options) + | ItemAdded addInfo -> "ItemAdded", JsonSerializer.SerializeToElement(addInfo, options) + | ItemRemoved removeInfo -> "ItemRemoved", JsonSerializer.SerializeToElement(removeInfo, options) + | ItemQuantityChanged changeInfo -> "ItemQuantityChanged", JsonSerializer.SerializeToElement(changeInfo, options) + | ItemWaiveReturnsChanged waiveInfo -> "ItemWaiveReturnsChanged", JsonSerializer.SerializeToElement(waiveInfo, options) + + let private tryDecode (options: JsonSerializerOptions) = + fun (eventType, data: JsonElement) -> + match eventType with + | "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement(data, options)) + | "ItemAdded" -> Some (ItemAdded <| JsonSerializer.DeserializeElement(data, options)) + | "ItemRemoved" -> Some (ItemRemoved <| JsonSerializer.DeserializeElement(data, options)) + | "ItemQuantityChanged" -> Some (ItemQuantityChanged <| JsonSerializer.DeserializeElement(data, options)) + | "ItemWaiveReturnsChanged" -> Some (ItemWaiveReturnsChanged <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = type ItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool } @@ -79,4 +106,4 @@ module Commands = match waived with | Some waived when itemExistsWithDifferentWaiveStatus skuId waived -> yield Events.ItemWaiveReturnsChanged { context = c; skuId = skuId; waived = waived } - | _ -> () ] \ No newline at end of file + | _ -> () ] diff --git a/samples/Store/Domain/ContactPreferences.fs b/samples/Store/Domain/ContactPreferences.fs index 263efebef..140220496 100644 --- a/samples/Store/Domain/ContactPreferences.fs +++ b/samples/Store/Domain/ContactPreferences.fs @@ -13,7 +13,26 @@ module Events = type Event = | []Updated of Value interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = + fun (evt: Event) -> + match evt with + | Updated value -> "contactPreferencesChanged", JsonSerializer.SerializeToElement(value, options) + + let private tryDecode (options: JsonSerializerOptions) = + fun (eventType, data: JsonElement) -> + match eventType with + | "contactPreferencesChanged" -> Some (Updated <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = @@ -37,4 +56,4 @@ module Commands = match command with | Update ({ preferences = preferences } as value) -> if state = preferences then [] else - [ Events.Updated value ] \ No newline at end of file + [ Events.Updated value ] diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj index 295e2de15..d2f66d3b3 100644 --- a/samples/Store/Domain/Domain.fsproj +++ b/samples/Store/Domain/Domain.fsproj @@ -21,11 +21,11 @@ - + - + \ No newline at end of file diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs index d9e0bef13..c350052ef 100644 --- a/samples/Store/Domain/Favorites.fs +++ b/samples/Store/Domain/Favorites.fs @@ -14,7 +14,28 @@ module Events = | Favorited of Favorited | Unfavorited of Unfavorited interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = fun (evt: Event) -> + match evt with + | Snapshotted snapshotted -> "Snapshotted", JsonSerializer.SerializeToElement(snapshotted, options) + | Favorited favorited -> "Favorited", JsonSerializer.SerializeToElement(favorited, options) + | Unfavorited unfavorited -> "Unfavorited", JsonSerializer.SerializeToElement(unfavorited, options) + + let private tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) -> + match eventType with + | "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement(data, options)) + | "Favorited" -> Some (Favorited <| JsonSerializer.DeserializeElement(data, options)) + | "Unfavorited" -> Some (Unfavorited <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = @@ -56,4 +77,4 @@ module Commands = yield Events.Favorited { date = date; skuId = skuId } ] | Unfavorite skuId -> if doesntHave skuId then [] else - [ Events.Unfavorited { skuId = skuId } ] \ No newline at end of file + [ Events.Unfavorited { skuId = skuId } ] diff --git a/samples/Store/Domain/SavedForLater.fs b/samples/Store/Domain/SavedForLater.fs index 994ccc1f0..f936ac535 100644 --- a/samples/Store/Domain/SavedForLater.fs +++ b/samples/Store/Domain/SavedForLater.fs @@ -29,7 +29,32 @@ module Events = /// Addition of a collection of skus to the list | Added of Added interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = + fun (evt: Event) -> + match evt with + | Compacted compacted -> Compaction.EventType, JsonSerializer.SerializeToElement(compacted, options) + | Merged merged -> "Merged", JsonSerializer.SerializeToElement(merged, options) + | Removed removed -> "Removed", JsonSerializer.SerializeToElement(removed, options) + | Added added -> "Added", JsonSerializer.SerializeToElement(added, options) + + let private tryDecode (options: JsonSerializerOptions) = + fun (eventType, data: JsonElement) -> + match eventType with + | Compaction.EventType -> Some (Compacted <| JsonSerializer.DeserializeElement(data, options)) + | "Merged" -> Some (Merged <| JsonSerializer.DeserializeElement(data, options)) + | "Removed" -> Some (Removed <| JsonSerializer.DeserializeElement(data, options)) + | "Added" -> Some (Added <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = open Events @@ -104,4 +129,4 @@ module Commands = let index = Index state let net = skus |> Array.filter (index.DoesNotAlreadyContainSameOrMoreRecent dateSaved) if Array.isEmpty net then true, [] - else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ] \ No newline at end of file + else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ] diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index cbd377a58..4d7db0cbd 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -4,6 +4,7 @@ open Equinox open Equinox.Cosmos.Integration open Equinox.EventStore open Equinox.MemoryStore +open FsCodec.SystemTextJson.Serialization open Swensen.Unquote #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -15,19 +16,19 @@ let createMemoryStore () = // we want to validate that the JSON UTF8 is working happily VolatileStore() let createServiceMemory log store = - Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) - -let codec = Domain.Cart.Events.codec + Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.Utf8ArrayCodec.codec, fold, initial).Resolve(id,?option=opt)) +let eventStoreCodec = Domain.Cart.Events.Utf8ArrayCodec.codec let resolveGesStreamWithRollingSnapshots gateway = - fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) + fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) let resolveGesStreamWithoutCustomAccessStrategy gateway = - fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt) + fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial).Resolve(id,?option=opt) +let cosmosCodec = Domain.Cart.Events.JsonElementCodec.codec JsonSerializer.defaultOptions let resolveCosmosStreamWithSnapshotStrategy gateway = - fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) + fun (id,opt) -> Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt) let resolveCosmosStreamWithoutCustomAccessStrategy gateway = - fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt) + fun (id,opt) -> Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt) let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count = service.ExecuteManyAsync(cartId, false, seq { @@ -82,4 +83,4 @@ type Tests(testOutputHelper) = let ``Can roundtrip against Cosmos, correctly folding the events with With Snapshotting`` args = Async.RunSynchronously <| async { let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveCosmosStreamWithSnapshotStrategy do! act service args - } \ No newline at end of file + } diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index b592a9d90..c04fb7360 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -2,6 +2,7 @@ open Equinox open Equinox.Cosmos.Integration +open FsCodec.SystemTextJson.Serialization open Swensen.Unquote open Xunit @@ -14,19 +15,20 @@ let createMemoryStore () = let createServiceMemory log store = Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) -let codec = Domain.ContactPreferences.Events.codec +let eventStoreCodec = Domain.ContactPreferences.Events.Utf8ArrayCodec.codec let resolveStreamGesWithOptimizedStorageSemantics gateway = - EventStore.Resolver(gateway 1, codec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve + EventStore.Resolver(gateway 1, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve let resolveStreamGesWithoutAccessStrategy gateway = - EventStore.Resolver(gateway defaultBatchSize, codec, fold, initial).Resolve + EventStore.Resolver(gateway defaultBatchSize, eventStoreCodec, fold, initial).Resolve +let cosmosCodec = Domain.ContactPreferences.Events.JsonElementCodec.codec JsonSerializer.defaultOptions let resolveStreamCosmosWithLatestKnownEventSemantics gateway = - Cosmos.Resolver(gateway 1, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve + Cosmos.Resolver(gateway 1, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve let resolveStreamCosmosUnoptimized gateway = - Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve + Cosmos.Resolver(gateway defaultBatchSize, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve let resolveStreamCosmosRollingUnfolds gateway = let access = Cosmos.AccessStrategy.Custom(Domain.ContactPreferences.Fold.isOrigin, Domain.ContactPreferences.Fold.transmute) - Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve + Cosmos.Resolver(gateway defaultBatchSize, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve type Tests(testOutputHelper) = let testOutput = TestOutputAdapter testOutputHelper @@ -79,4 +81,4 @@ type Tests(testOutputHelper) = let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async { let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds do! act service args - } \ No newline at end of file + } diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 424e4c42d..2d3e90430 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -2,6 +2,7 @@ open Equinox open Equinox.Cosmos.Integration +open FsCodec.SystemTextJson.Serialization open Swensen.Unquote #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -14,18 +15,19 @@ let createMemoryStore () = let createServiceMemory log store = Backend.Favorites.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve) -let codec = Domain.Favorites.Events.codec +let eventStoreCodec = Domain.Favorites.Events.Utf8ArrayCodec.codec let createServiceGes gateway log = - let resolve = EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve + let resolve = EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve Backend.Favorites.Service(log, resolve) +let cosmosCodec = Domain.Favorites.Events.JsonElementCodec.codec JsonSerializer.defaultOptions let createServiceCosmos gateway log = - let resolve = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve + let resolve = Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve Backend.Favorites.Service(log, resolve) let createServiceCosmosRollingState gateway log = let access = Cosmos.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot - let resolve = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve + let resolve = Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve Backend.Favorites.Service(log, resolve) type Tests(testOutputHelper) = @@ -74,4 +76,4 @@ type Tests(testOutputHelper) = let gateway = createCosmosContext conn defaultBatchSize let service = createServiceCosmosRollingState gateway log do! act service args - } \ No newline at end of file + } diff --git a/samples/TodoBackend/Todo.fs b/samples/TodoBackend/Todo.fs index 0349f1ada..133e7d40c 100644 --- a/samples/TodoBackend/Todo.fs +++ b/samples/TodoBackend/Todo.fs @@ -19,7 +19,34 @@ module Events = | Cleared | Snapshotted of Snapshotted interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = + fun (evt: Event) -> + match evt with + | Added todo -> "Added", JsonSerializer.SerializeToElement(todo, options) + | Updated todo -> "Updated", JsonSerializer.SerializeToElement(todo, options) + | Deleted deleted -> "Deleted", JsonSerializer.SerializeToElement(deleted, options) + | Cleared -> "Cleared", Unchecked.defaultof + | Snapshotted snapshotted -> "Snapshotted", JsonSerializer.SerializeToElement(snapshotted, options) + + let private tryDecode (options: JsonSerializerOptions) = + fun (eventType, data: JsonElement) -> + match eventType with + | "Added" -> Some (Added <| JsonSerializer.DeserializeElement(data, options)) + | "Updated" -> Some (Updated <| JsonSerializer.DeserializeElement(data, options)) + | "Deleted" -> Some (Deleted <| JsonSerializer.DeserializeElement(data, options)) + | "Cleared" -> Some Cleared + | "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = type State = { items : Events.Todo list; nextId : int } @@ -81,4 +108,4 @@ type Service(log, resolve, ?maxAttempts) = member __.Patch(clientId, item: Events.Todo) : Async = async { let! state' = handle clientId (Command.Update item) - return List.find (fun x -> x.id = item.id) state' } \ No newline at end of file + return List.find (fun x -> x.id = item.id) state' } diff --git a/samples/Tutorial/Gapless.fs b/samples/Tutorial/Gapless.fs index b16991b90..e4d1180bd 100644 --- a/samples/Tutorial/Gapless.fs +++ b/samples/Tutorial/Gapless.fs @@ -18,7 +18,31 @@ module Events = | Released of Item | Snapshotted of Snapshotted interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = fun (evt: Event) -> + match evt with + | Reserved item -> "Reserved", JsonSerializer.SerializeToElement(item, options) + | Confirmed item -> "Confirmed", JsonSerializer.SerializeToElement(item, options) + | Released item -> "Released", JsonSerializer.SerializeToElement(item, options) + | Snapshotted snapshot -> "Snapshotted", JsonSerializer.SerializeToElement(snapshot, options) + + let private tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) -> + match eventType with + | "Reserved" -> Some (Reserved <| JsonSerializer.DeserializeElement(data, options)) + | "Confirmed" -> Some (Confirmed <| JsonSerializer.DeserializeElement(data, options)) + | "Released" -> Some (Released <| JsonSerializer.DeserializeElement(data, options)) + | "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) + module Fold = @@ -79,9 +103,12 @@ let [] appName = "equinox-tutorial-gapless" module Cosmos = open Equinox.Cosmos + open FsCodec.SystemTextJson.Serialization + let private createService (context,cache,accessStrategy) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let codec = Events.JsonElementCodec.codec JsonSerializer.defaultOptions + let resolve = Resolver(context, codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve Service(Serilog.Log.Logger, resolve) module Snapshot = @@ -94,4 +121,4 @@ module Cosmos = let createService (context,cache) = let accessStrategy = AccessStrategy.RollingState Fold.snapshot - createService(context,cache,accessStrategy) \ No newline at end of file + createService(context,cache,accessStrategy) diff --git a/samples/Tutorial/Index.fs b/samples/Tutorial/Index.fs index c45cd944c..6d39b4ca5 100644 --- a/samples/Tutorial/Index.fs +++ b/samples/Tutorial/Index.fs @@ -13,7 +13,28 @@ module Events = | Deleted of ItemIds | Snapshotted of Items<'v> interface TypeShape.UnionContract.IUnionContract - let codec<'v> = FsCodec.NewtonsoftJson.Codec.Create>() + + module Utf8ArrayCodec = + let codec<'v> = FsCodec.NewtonsoftJson.Codec.Create>() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode<'v> (options: JsonSerializerOptions) = fun (evt: Event<'v>) -> + match evt with + | Added items -> "Added", JsonSerializer.SerializeToElement(items, options) + | Deleted itemIds -> "Deleted", JsonSerializer.SerializeToElement(itemIds, options) + | Snapshotted items -> "Snapshotted", JsonSerializer.SerializeToElement(items, options) + + let private tryDecode<'v> (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) -> + match eventType with + | "Added" -> Some (Added <| JsonSerializer.DeserializeElement>(data, options)) + | "Deleted" -> Some (Deleted <| JsonSerializer.DeserializeElement(data, options)) + | "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement>(data, options)) + | _ -> None + + let codec<'v> options = FsCodec.Codec.Create, JsonElement>(encode<'v> options, tryDecode<'v> options) module Fold = @@ -54,14 +75,17 @@ let create resolve indexId = Service(indexId, resolve, maxAttempts = 3) module Cosmos = open Equinox.Cosmos + open FsCodec.SystemTextJson.Serialization + let createService<'v> (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.RollingState Fold.snapshot - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let codec = Events.JsonElementCodec.codec<'v> JsonSerializer.defaultOptions + let resolve = Resolver(context, codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve create resolve module MemoryStore = let createService store = - let resolve = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial).Resolve - create resolve \ No newline at end of file + let resolve = Equinox.MemoryStore.Resolver(store, Events.Utf8ArrayCodec.codec, Fold.fold, Fold.initial).Resolve + create resolve diff --git a/samples/Tutorial/Sequence.fs b/samples/Tutorial/Sequence.fs index 36e6aa633..264e26b50 100644 --- a/samples/Tutorial/Sequence.fs +++ b/samples/Tutorial/Sequence.fs @@ -25,7 +25,24 @@ module Events = type Event = | Reserved of Reserved interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = fun (evt: Event) -> + match evt with + | Reserved reserved -> "Reserved", JsonSerializer.SerializeToElement(reserved, options) + + let private tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) -> + match eventType with + | "Reserved" -> Some (Reserved <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options= FsCodec.Codec.Create(encode options, tryDecode options) module Fold = @@ -54,9 +71,12 @@ let create resolve = Service(Serilog.Log.ForContext(), resolve, maxAtte module Cosmos = open Equinox.Cosmos + open FsCodec.SystemTextJson.Serialization + let private createService (context,cache,accessStrategy) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let codec = Events.JsonElementCodec.codec JsonSerializer.defaultOptions + let resolve = Resolver(context, codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve create resolve module LatestKnownEvent = diff --git a/samples/Tutorial/Set.fs b/samples/Tutorial/Set.fs index 4e7437b60..c500df008 100644 --- a/samples/Tutorial/Set.fs +++ b/samples/Tutorial/Set.fs @@ -12,7 +12,28 @@ module Events = | Deleted of Items | Snapshotted of Items interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let private encode (options: JsonSerializerOptions) = fun (evt: Event) -> + match evt with + | Added items -> "Added", JsonSerializer.SerializeToElement(items, options) + | Deleted items -> "Deleted", JsonSerializer.SerializeToElement(items, options) + | Snapshotted items -> "Snapshotted", JsonSerializer.SerializeToElement(items, options) + + let private tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) -> + match eventType with + | "Added" -> Some (Added <| JsonSerializer.DeserializeElement(data, options)) + | "Deleted" -> Some (Deleted <| JsonSerializer.DeserializeElement(data, options)) + | "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = @@ -54,14 +75,17 @@ let create resolve setId = Service(Serilog.Log.ForContext(), setId, res module Cosmos = open Equinox.Cosmos + open FsCodec.SystemTextJson.Serialization + let createService (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.RollingState Fold.snapshot - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let codec = Events.JsonElementCodec.codec JsonSerializer.defaultOptions + let resolve = Resolver(context, codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve create resolve module MemoryStore = let createService store = - let resolve = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial).Resolve - create resolve \ No newline at end of file + let resolve = Equinox.MemoryStore.Resolver(store, Events.Utf8ArrayCodec.codec, Fold.fold, Fold.initial).Resolve + create resolve diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index 5de5e7a6b..aa80caa5f 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -40,7 +40,24 @@ module Events = type Event = | IdAssigned of IdAssigned interface TypeShape.UnionContract.IUnionContract - let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module Utf8ArrayCodec = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + + module JsonElementCodec = + open FsCodec.SystemTextJson + open System.Text.Json + + let encode (options: JsonSerializerOptions) = fun (evt: Event) -> + match evt with + | IdAssigned id -> "IdAssigned", JsonSerializer.SerializeToElement(id, options) + + let tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) -> + match eventType with + | "IdAssigned" -> Some (IdAssigned <| JsonSerializer.DeserializeElement(data, options)) + | _ -> None + + let codec options = FsCodec.Codec.Create(encode options, tryDecode options) module Fold = @@ -69,13 +86,16 @@ let create resolve = Service(Serilog.Log.ForContext(), resolve, 3) module Cosmos = open Equinox.Cosmos + open FsCodec.SystemTextJson.Serialization + let createService (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let codec = Events.JsonElementCodec.codec JsonSerializer.defaultOptions + let resolve = Resolver(context, codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve create resolve module EventStore = open Equinox.EventStore let createService context = - let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent).Resolve + let resolve = Resolver(context, Events.Utf8ArrayCodec.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent).Resolve create resolve diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index 95dde0c5c..c0e768822 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -17,6 +17,10 @@ + + + + diff --git a/src/Equinox.Core/Infrastructure.fs b/src/Equinox.Core/Infrastructure.fs index f8d2a7df1..aaa90d24a 100755 --- a/src/Equinox.Core/Infrastructure.fs +++ b/src/Equinox.Core/Infrastructure.fs @@ -12,6 +12,8 @@ type OAttribute = System.Runtime.InteropServices.OptionalAttribute type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute #if NET461 +let isNull v = v = null + module Array = let tryHead (array : 'T[]) = if array.Length = 0 then None @@ -28,12 +30,14 @@ module Array = elif predicate array.[i] then Some i else loop (i - 1) loop (array.Length - 1) + let singleton v = Array.create 1 v module Option = let filter predicate option = match option with None -> None | Some x -> if predicate x then Some x else None let toNullable option = match option with Some x -> Nullable x | None -> Nullable () let ofObj obj = match obj with null -> None | x -> Some x let toObj option = match option with None -> null | Some x -> x + let defaultWith f = function | Some v -> v | _ -> f() #endif type Async with @@ -69,6 +73,10 @@ type Async with sc ()) |> ignore) +#if NETSTANDARD2_1 + static member inline AwaitValueTask (vtask: ValueTask<'T>) : Async<'T> = vtask.AsTask() |> Async.AwaitTaskCorrect +#endif + [] module Regex = open System.Text.RegularExpressions diff --git a/src/Equinox.Core/Json/JsonElementHelpers.fs b/src/Equinox.Core/Json/JsonElementHelpers.fs new file mode 100644 index 000000000..f753f73f8 --- /dev/null +++ b/src/Equinox.Core/Json/JsonElementHelpers.fs @@ -0,0 +1,25 @@ +namespace FsCodec.SystemTextJson + +open System +open System.Buffers +open System.Runtime.InteropServices +open System.Text.Json + +[] +module JsonSerializerExtensions = + type JsonSerializer with + static member SerializeToElement(value: 'T, [] ?options: JsonSerializerOptions) = + JsonSerializer.Deserialize(ReadOnlySpan.op_Implicit(JsonSerializer.SerializeToUtf8Bytes(value, defaultArg options null))) + + static member DeserializeElement<'T>(element: JsonElement, [] ?options: JsonSerializerOptions) = +#if NETSTANDARD2_1 + let bufferWriter = ArrayBufferWriter() + ( + use jsonWriter = new Utf8JsonWriter(bufferWriter) + element.WriteTo(jsonWriter) + ) + JsonSerializer.Deserialize<'T>(bufferWriter.WrittenSpan, defaultArg options null) +#else + let json = element.GetRawText() + JsonSerializer.Deserialize<'T>(json, defaultArg options null) +#endif diff --git a/src/Equinox.Core/Json/JsonRecordConverter.fs b/src/Equinox.Core/Json/JsonRecordConverter.fs new file mode 100644 index 000000000..079382003 --- /dev/null +++ b/src/Equinox.Core/Json/JsonRecordConverter.fs @@ -0,0 +1,161 @@ +namespace FsCodec.SystemTextJson.Serialization + +open Equinox.Core +open System +open System.Collections.Generic +open System.Linq +open System.Linq.Expressions +open System.Text.Json +open System.Text.Json.Serialization +open FSharp.Reflection + +type JsonRecordConverterActivator = delegate of JsonSerializerOptions -> JsonConverter + +type IRecordFieldConverter = + abstract member Initialize: converter: JsonConverter -> unit + abstract member Read: reader: byref * typ: Type * options: JsonSerializerOptions -> obj + abstract member Write: writer: Utf8JsonWriter * value: obj * options: JsonSerializerOptions -> unit + +type RecordFieldConverter<'F> () = + let mutable converter = Unchecked.defaultof> + + interface IRecordFieldConverter with + member __.Initialize (c) = + converter <- c :?> JsonConverter<'F> + + member __.Read (reader, typ, options) = + converter.Read(&reader, typ, options) :> obj + + member __.Write (writer, value, options) = + converter.Write(writer, value :?> 'F, options) + +[] +type RecordField = { + name: string + fieldType: Type + index: int + isIgnored: bool + converter: IRecordFieldConverter option +} + +type JsonRecordConverter<'T> (options: JsonSerializerOptions) = + inherit JsonConverter<'T> () + + let recordType = typeof<'T> + + let constructor = FSharpValue.PreComputeRecordConstructor(recordType, true) + let getFieldValues = FSharpValue.PreComputeRecordReader(typeof<'T>, true) + + let fields = + FSharpType.GetRecordFields(recordType, true) + |> Array.mapi (fun idx f -> + { + name = + f.GetCustomAttributes(typedefof, true) + |> Array.tryHead + |> Option.map (fun attr -> (attr :?> JsonPropertyNameAttribute).Name) + |> Option.defaultWith (fun () -> + if options.PropertyNamingPolicy |> isNull + then f.Name + else options.PropertyNamingPolicy.ConvertName f.Name) + + fieldType = f.PropertyType + index = idx + isIgnored = f.GetCustomAttributes(typeof, true) |> Array.isEmpty |> not + converter = + f.GetCustomAttributes(typeof, true) + |> Array.tryHead + |> Option.map (fun attr -> attr :?> JsonConverterAttribute) + |> Option.bind (fun attr -> + let baseConverter = attr.CreateConverter(f.PropertyType) + + if baseConverter |> isNull then + failwithf "Field %s is decorated with a JsonConverter attribute, but it does not implement a CreateConverter method." f.Name + + if baseConverter.CanConvert(f.PropertyType) then + let converterType = typedefof>.MakeGenericType(f.PropertyType) + let converter = Activator.CreateInstance(converterType) :?> IRecordFieldConverter + converter.Initialize(baseConverter) + Some converter + else + None + ) + }) + + let fieldsByName = + fields + |> Array.map (fun f -> f.name, f) +#if NETSTANDARD2_1 + |> Array.map KeyValuePair.Create + |> (fun kvp -> Dictionary(kvp, StringComparer.OrdinalIgnoreCase)) +#else + |> Array.map KeyValuePair + |> (fun kvp -> kvp.ToDictionary((fun item -> item.Key), (fun item -> item.Value), StringComparer.OrdinalIgnoreCase)) +#endif + + let tryGetFieldByName name = + match fieldsByName.TryGetValue(name) with + | true, field -> Some field + | _ -> None + + let getFieldByName name = + match tryGetFieldByName name with + | Some field -> field + | _ -> KeyNotFoundException(sprintf "Failed to find a field named '%s' on record type '%s'." name recordType.Name) |> raise + + override __.Read (reader, typ, options) = + reader.ValidateTokenType(JsonTokenType.StartObject) + + let fields = Array.zeroCreate <| fields.Length + + while reader.Read() && reader.TokenType <> JsonTokenType.EndObject do + reader.ValidateTokenType(JsonTokenType.PropertyName) + + match tryGetFieldByName <| reader.GetString() with + | Some field -> + fields.[field.index] <- + match field.converter with + | Some converter -> + reader.Read() |> ignore + converter.Read(&reader, field.fieldType, options) + | None -> + JsonSerializer.Deserialize(&reader, field.fieldType, options) + | _ -> + reader.Skip() + + constructor fields :?> 'T + + override __.Write (writer, record, options) = + writer.WriteStartObject() + + let fieldValues = getFieldValues record + + (fields, fieldValues) + ||> Array.iter2 (fun field value -> + match value with + | :? JsonElement as je when je.ValueKind = JsonValueKind.Undefined -> () + | _ -> + if not field.isIgnored && not (options.IgnoreNullValues && isNull value) then + writer.WritePropertyName(field.name) + + match field.converter with + | Some converter -> converter.Write(writer, value, options) + | None -> JsonSerializer.Serialize(writer, value, options)) + + writer.WriteEndObject() + +type JsonRecordConverter () = + inherit JsonConverterFactory() + + override __.CanConvert typ = + FSharpType.IsRecord (typ, true) + + override __.CreateConverter (typ, options) = + let constructor = typedefof>.MakeGenericType(typ).GetConstructor(typeof |> Array.singleton) + let optionsParameter = Expression.Parameter(typeof, "options") + + let newExpression = Expression.New(constructor, optionsParameter) + let lambda = Expression.Lambda(typeof, newExpression, optionsParameter) + + let activator = lambda.Compile() :?> JsonRecordConverterActivator + activator.Invoke(options) diff --git a/src/Equinox.Core/Json/Options.fs b/src/Equinox.Core/Json/Options.fs new file mode 100644 index 000000000..6867c76f6 --- /dev/null +++ b/src/Equinox.Core/Json/Options.fs @@ -0,0 +1,14 @@ +namespace FsCodec.SystemTextJson.Serialization + +open System.Text.Json + +[] +module JsonSerializerOptionExtensions = + type JsonSerializerOptions with + static member Create() = + let options = JsonSerializerOptions() + options.Converters.Add(new JsonRecordConverter()) + options + +module JsonSerializer = + let defaultOptions = JsonSerializerOptions.Create() diff --git a/src/Equinox.Core/Json/Utf8JsonReaderExtensions.fs b/src/Equinox.Core/Json/Utf8JsonReaderExtensions.fs new file mode 100644 index 000000000..9e29bb5d3 --- /dev/null +++ b/src/Equinox.Core/Json/Utf8JsonReaderExtensions.fs @@ -0,0 +1,22 @@ +namespace FsCodec.SystemTextJson.Serialization + +open System.Text.Json +open System.Runtime.CompilerServices + +[] +type Utf8JsonReaderExtension = + [] + static member ValidateTokenType(reader: Utf8JsonReader, expectedTokenType) = + if reader.TokenType <> expectedTokenType then + sprintf "Expected a %A token, but encountered a %A token when parsing JSON." expectedTokenType (reader.TokenType) + |> JsonException + |> raise + + [] + static member ValidatePropertyName(reader: Utf8JsonReader, expectedPropertyName: string) = + reader.ValidateTokenType(JsonTokenType.PropertyName) + + if not <| reader.ValueTextEquals expectedPropertyName then + sprintf "Expected a property named '%s', but encountered property with name '%s'." expectedPropertyName (reader.GetString()) + |> JsonException + |> raise diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 334653586..7dbd01ef8 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -1,16 +1,17 @@ namespace Equinox.Cosmos.Store -open Equinox.Core -open FsCodec open Azure open Azure.Cosmos -open Newtonsoft.Json +open Equinox.Core +open FsCodec open Serilog open System open System.IO +open System.Text.Json +open System.Text.Json.Serialization /// A single Domain Event from the array held in a Batch -type [] +type [] // TODO for STJ v5: All fields required unless explicitly optional Event = { /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) t: DateTimeOffset // ISO 8601 @@ -19,24 +20,19 @@ type [] c: string // required /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB - [)>] - [] - d: byte[] // Required, but can be null so Nullary cases can work + d: JsonElement // TODO for STJ v5: Required, but can be null so Nullary cases can work - /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing) - [)>] - [] - m: byte[] + /// Optional metadata, as UTF-8 encoded json, ready to emit directly + m: JsonElement // TODO for STJ v5: Optional, not serialized if missing - /// Optional correlationId (can be null, not written if missing) - [] - correlationId : string + /// Optional correlationId + correlationId : string // TODO for STJ v5: Optional, not serialized if missing - /// Optional causationId (can be null, not written if missing) - [] - causationId : string } + /// Optional causationId + causationId : string // TODO for STJ v5: Optional, not serialized if missing + } - interface IEventData with + interface IEventData with member __.EventType = __.c member __.Data = __.d member __.Meta = __.m @@ -46,12 +42,11 @@ type [] member __.Timestamp = __.t /// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds) -type [] +type [] // TODO for STJ v5: All fields required unless explicitly optional Batch = { /// CosmosDB-mandated Partition Key, must be maintained within the document /// Not actually required if running in single partition mode, but for simplicity, we always write it - [] // Not requested in queries - p: string // "{streamName}" + p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries /// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string /// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it @@ -61,8 +56,7 @@ type [] /// When we read, we need to capture the value so we can retain it for caching purposes /// NB this is not relevant to fill in when we pass it to the writing stored procedure /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - [] - _etag: string + _etag: string // TODO for STJ v5: Optional, not serialized if missing /// base 'i' value for the Events held herein i: int64 // {index} @@ -78,7 +72,43 @@ type [] /// As one cannot sort by the implicit `id` field, we have an indexed `i` field for sort and range query use static member internal IndexedFields = [Batch.PartitionKeyField; "i"; "n"] +/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc +/// Only applied to snapshots in the Tip +type JsonCompressedBase64Converter() = + inherit JsonConverter() + + override __.Read (reader, _typeToConvert, options) = + if reader.TokenType = JsonTokenType.Null then + JsonSerializer.Deserialize(&reader, options) + else + let compressedBytes = reader.GetBytesFromBase64() + use input = new MemoryStream(compressedBytes) + use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) + use output = new MemoryStream() + decompressor.CopyTo(output) + JsonSerializer.Deserialize(ReadOnlySpan.op_Implicit(output.ToArray()), options) + + override __.Write (writer, value, _options) = + if value.ValueKind = JsonValueKind.Null || value.ValueKind = JsonValueKind.Undefined then + writer.WriteNullValue() + else + let input = System.Text.Encoding.UTF8.GetBytes(value.GetRawText()) + use output = new MemoryStream() + use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) + compressor.Write(input, 0, input.Length) + compressor.Close() + writer.WriteBase64StringValue(ReadOnlySpan.op_Implicit(output.ToArray())) + +type JsonCompressedBase64ConverterAttribute () = + inherit JsonConverterAttribute(typeof) + + static let converter = JsonCompressedBase64Converter() + + override __.CreateConverter _typeToConvert = + converter :> JsonConverter + /// Compaction/Snapshot/Projection Event based on the state at a given point in time `i` +[] type Unfold = { /// Base: Stream Position (Version) of State from which this Unfold Event was generated i: int64 @@ -90,61 +120,30 @@ type Unfold = c: string // required /// Event body - Json -> UTF-8 -> Deflate -> Base64 - [)>] - d: byte[] // required + [] + d: JsonElement // required /// Optional metadata, same encoding as `d` (can be null; not written if missing) - [)>] - [] - m: byte[] } // optional - -/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc -/// Only applied to snapshots in the Tip -and Base64DeflateUtf8JsonConverter() = - inherit JsonConverter() - let pickle (input : byte[]) : string = - if input = null then null else - - use output = new MemoryStream() - use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal) - compressor.Write(input,0,input.Length) - compressor.Close() - System.Convert.ToBase64String(output.ToArray()) - let unpickle str : byte[] = - if str = null then null else - - let compressedBytes = System.Convert.FromBase64String str - use input = new MemoryStream(compressedBytes) - use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress) - use output = new MemoryStream() - decompressor.CopyTo(output) - output.ToArray() - - override __.CanConvert(objectType) = - typeof.Equals(objectType) - override __.ReadJson(reader, _, _, serializer) = - //( if reader.TokenType = JsonToken.Null then null else - serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box - override __.WriteJson(writer, value, serializer) = - let pickled = value |> unbox |> pickle - serializer.Serialize(writer, pickled) + [] + m: JsonElement // TODO for STJ v5: Optional, not serialized if missing + } /// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document /// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`) /// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc -type [] +type [] // TODO for STJ v5: All fields required unless explicitly optional Tip = - { [] // Not requested in queries + { /// Partition key, as per Batch - p: string // "{streamName}" + p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries + /// Document Id within partition, as per Batch id: string // "{-1}" - Well known IdConstant used while this remains the pending batch /// When we read, we need to capture the value so we can retain it for caching purposes /// NB this is not relevant to fill in when we pass it to the writing stored procedure /// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed - [] - _etag: string + _etag: string // TODO for STJ v5: Optional, not serialized if missing /// base 'i' value for the Events held herein i: int64 @@ -172,7 +171,7 @@ module internal Position = let fromAppendAtEnd = fromI -1L // sic - needs to yield -1 let fromEtag (value : string) = { fromI -2L with etag = Some value } /// NB very inefficient compared to FromDocument or using one already returned to you - let fromMaxIndex (xs: ITimelineEvent[]) = + let fromMaxIndex (xs: ITimelineEvent[]) = if Array.isEmpty xs then fromKnownEmpty else fromI (1L + Seq.max (seq { for x in xs -> x.Index })) /// Create Position from Tip record context (facilitating 1 RU reads) @@ -186,9 +185,9 @@ module internal Position = type Direction = Forward | Backward override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" type internal Enum() = - static member internal Events(b: Tip) : ITimelineEvent seq = + static member internal Events(b: Tip) : ITimelineEvent seq = b.e |> Seq.mapi (fun offset x -> FsCodec.Core.TimelineEvent.Create(b.i + int64 offset, x.c, x.d, x.m, Guid.Empty, x.correlationId, x.causationId, x.t)) - static member Events(i: int64, e: Event[], startPos : Position option, direction) : ITimelineEvent seq = seq { + static member Events(i: int64, e: Event[], startPos : Position option, direction) : ITimelineEvent seq = seq { // If we're loading from a nominated position, we need to discard items in the batch before/after the start on the start page let isValidGivenStartPos i = match startPos with @@ -203,9 +202,9 @@ type internal Enum() = static member internal Events(b: Batch, startPos, direction) = Enum.Events(b.i, b.e, startPos, direction) |> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id - static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq { + static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq { for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold=true) } - static member EventsAndUnfolds(x: Tip): ITimelineEvent seq = + static member EventsAndUnfolds(x: Tip): ITimelineEvent seq = Enum.Events x |> Seq.append (Enum.Unfolds x.u) // where Index is equal, unfolds get delivered after the events so the fold semantics can be 'idempotent' @@ -232,8 +231,8 @@ module Log = | SyncResync of Measurement | SyncConflict of Measurement let prop name value (log : ILogger) = log.ForContext(name, value) - let propData name (events: #IEventData seq) (log : ILogger) = - let render = function null -> "null" | bytes -> System.Text.Encoding.UTF8.GetString bytes + let propData name (events: #IEventData seq) (log : ILogger) = + let render = function (j: JsonElement) when j.ValueKind <> JsonValueKind.Null -> j.GetRawText() | _ -> "null" let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (render e.Data) } log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) let propEvents = propData "events" @@ -255,7 +254,7 @@ module Log = let event (value : Event) (log : ILogger) = let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("cosmosEvt", ScalarValue(value))) log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt }) - let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length + let (|BlobLen|) = function (j: JsonElement) when j.ValueKind <> JsonValueKind.Null && j.ValueKind <> JsonValueKind.Undefined -> j.GetRawText().Length | _ -> 0 let (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes+metaBytes let (|BatchLen|) = Seq.sumBy (|EventLen|) @@ -451,7 +450,7 @@ function sync(req, expIndex, expEtag) { [] type Result = | Written of Position - | Conflict of Position * events: ITimelineEvent[] + | Conflict of Position * events: ITimelineEvent[] | ConflictUnknown of Position type [] Exp = Version of int64 | Etag of string | Any @@ -600,7 +599,7 @@ module internal Tip = let log = log |> Log.prop "_etag" tip._etag |> Log.prop "n" tip.n log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } - type [] Result = NotModified | NotFound | Found of Position * ITimelineEvent[] + type [] Result = NotModified | NotFound | Found of Position * ITimelineEvent[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.NotModified let tryLoad (log : ILogger) retryPolicy containerStream (maybePos: Position option): Async = async { let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGet get containerStream maybePos) log @@ -625,7 +624,7 @@ module internal Tip = // Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index let private processNextPage direction (streamName: string) startPos (enumerator: IAsyncEnumerator>) (log: ILogger) - : Async[] * Position option * float>> = async { + : Async[] * Position option * float>> = async { let! t, res = enumerator.MoveNext() |> Stopwatch.Time return @@ -644,19 +643,19 @@ module internal Tip = let maybePosition = batches |> Array.tryPick Position.tryFromBatch events, maybePosition, ru) } - let private run (log : ILogger) (readNextPage: IAsyncEnumerator> -> ILogger -> Async[] * Position option * float>>) + let private run (log : ILogger) (readNextPage: IAsyncEnumerator> -> ILogger -> Async[] * Position option * float>>) (maxPermittedBatchReads: int option) (query: AsyncSeq>) = let e = query.GetEnumerator() - let rec loop batchCount : AsyncSeq[] * Position option * float> = asyncSeq { + let rec loop batchCount : AsyncSeq[] * Position option * float> = asyncSeq { match maxPermittedBatchReads with | Some mpbr when batchCount >= mpbr -> log.Information "batch Limit exceeded"; invalidOp "batch Limit exceeded" | _ -> () let batchLog = log |> Log.prop "batchIndex" batchCount - let! (page : Option[] * Position option * float>) = readNextPage e batchLog + let! (page : Option[] * Position option * float>) = readNextPage e batchLog if page |> Option.isSome then yield page.Value @@ -664,7 +663,7 @@ module internal Tip = loop 0 - let private logQuery direction batchSize streamName interval (responsesCount, events : ITimelineEvent[]) n (ru: float) (log : ILogger) = + let private logQuery direction batchSize streamName interval (responsesCount, events : ITimelineEvent[]) n (ru: float) (log : ILogger) = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } let evt = Log.Event.Query (direction, responsesCount, reqMetric) @@ -673,7 +672,7 @@ module internal Tip = "EqxCosmos {action:l} {stream} v{n} {count}/{responses} {ms}ms rc={ru}", action, streamName, n, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) - let private calculateUsedVersusDroppedPayload stopIndex (xs: ITimelineEvent[]) : int * int = + let private calculateUsedVersusDroppedPayload stopIndex (xs: ITimelineEvent[]) : int * int = let mutable used, dropped = 0, 0 let mutable found = false for x in xs do @@ -684,10 +683,10 @@ module internal Tip = used, dropped let walk<'event> (log : ILogger) (container,stream) retryPolicy maxItems maxRequests direction startPos - (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) + (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) : Async = async { let responseCount = ref 0 - let mergeBatches (log : ILogger) (batchesBackward: AsyncSeq[] * Position option * float>) = async { + let mergeBatches (log : ILogger) (batchesBackward: AsyncSeq[] * Position option * float>) = async { let mutable lastResponse, maybeTipPos, ru = None, None, 0. let! events = batchesBackward @@ -714,7 +713,7 @@ module internal Tip = let retryingLoggingReadPage e = Log.withLoggedRetries retryPolicy "readAttempt" (readPage e) let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream let readlog = log |> Log.prop "direction" direction - let batches : AsyncSeq[] * Position option * float> = run readlog retryingLoggingReadPage maxRequests query + let batches : AsyncSeq[] * Position option * float> = run readlog retryingLoggingReadPage maxRequests query let! t, (events, maybeTipPos, ru) = mergeBatches log batches |> Stopwatch.Time let raws, decoded = (Array.map fst events), (events |> Seq.choose snd |> Array.ofSeq) let pos = match maybeTipPos with Some p -> p | None -> Position.fromMaxIndex raws @@ -723,7 +722,7 @@ module internal Tip = return pos, decoded } let walkLazy<'event> (log : ILogger) (container,stream) retryPolicy maxItems maxRequests direction startPos - (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) + (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) : AsyncSeq<'event[]> = asyncSeq { let responseCount = ref 0 let query = mkQuery (container,stream) maxItems direction startPos @@ -787,22 +786,24 @@ module Token = [] module Internal = [] - type InternalSyncResult = Written of StreamToken | ConflictUnknown of StreamToken | Conflict of StreamToken * ITimelineEvent[] + type InternalSyncResult = Written of StreamToken | ConflictUnknown of StreamToken | Conflict of StreamToken * ITimelineEvent[] [] type LoadFromTokenResult<'event> = Unchanged | Found of StreamToken * 'event[] namespace Equinox.Cosmos +open Azure.Cosmos open Equinox open Equinox.Core open Equinox.Cosmos.Store open FsCodec +open FsCodec.SystemTextJson.Serialization open FSharp.Control -open Azure.Cosmos open Serilog open System open System.Collections.Concurrent +open System.Text.Json /// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) type Connection(client: CosmosClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) = @@ -878,8 +879,8 @@ type Gateway(conn : Connection, batching : BatchingPolicy) = | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create containerStream pos') | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create containerStream pos') } -type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEventCodec<'event,byte[],'context>) = - let (|TryDecodeFold|) (fold: 'state -> 'event seq -> 'state) initial (events: ITimelineEvent seq) : 'state = Seq.choose codec.TryDecode events |> fold initial +type private Category<'event, 'state, 'context>(gateway : Gateway, codec : IEventCodec<'event,JsonElement,'context>) = + let (|TryDecodeFold|) (fold: 'state -> 'event seq -> 'state) initial (events: ITimelineEvent seq) : 'state = Seq.choose codec.TryDecode events |> fold initial member __.Load includeUnfolds containerStream fold initial isOrigin (log : ILogger): Async = async { let! token, events = if not includeUnfolds then gateway.LoadBackwardsStopping log containerStream (codec.TryDecode,isOrigin) @@ -1140,7 +1141,7 @@ type Connector /// ClientOptions for this Connector as configured member val ClientOptions = let maxAttempts, maxWait, timeout = Nullable maxRetryAttemptsOnRateLimitedRequests, Nullable maxRetryWaitTimeOnRateLimitedRequests, requestTimeout - let co = CosmosClientOptions(MaxRetryAttemptsOnRateLimitedRequests = maxAttempts, MaxRetryWaitTimeOnRateLimitedRequests = maxWait, RequestTimeout = timeout, Serializer = NewtonsoftJsonSerializer()) + let co = CosmosClientOptions(MaxRetryAttemptsOnRateLimitedRequests = maxAttempts, MaxRetryWaitTimeOnRateLimitedRequests = maxWait, RequestTimeout = timeout, Serializer = CosmosJsonSerializer(JsonSerializer.defaultOptions)) match mode with | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct | None | Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // default; only supports Https @@ -1186,12 +1187,13 @@ open Equinox.Cosmos.Store open FsCodec open FSharp.Control open System.Runtime.InteropServices +open System.Text.Json /// Outcome of appending events, specifying the new and/or conflicting events, together with the updated Target write position [] type AppendResult<'t> = | Ok of pos: 't - | Conflict of index: 't * conflictingEvents: ITimelineEvent[] + | Conflict of index: 't * conflictingEvents: ITimelineEvent[] | ConflictUnknown of index: 't /// Encapsulates the core facilities Equinox.Cosmos offers for operating directly on Events in Streams. @@ -1226,7 +1228,7 @@ type Context member __.ResolveStream(streamName) = containers.Resolve(conn.Client, null, streamName, gateway.CreateSyncStoredProcIfNotExists (Some log)) member __.CreateStream(streamName) = __.ResolveStream streamName |> fst - member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : AsyncSeq[]> = + member internal __.GetLazy((stream, startPos), ?batchSize, ?direction) : AsyncSeq[]> = let direction = defaultArg direction Direction.Forward let batching = BatchingPolicy(defaultArg batchSize batching.MaxItems) gateway.ReadLazy batching log stream direction startPos (Some,fun _ -> false) @@ -1251,11 +1253,11 @@ type Context /// Reads in batches of `batchSize` from the specified `Position`, allowing the reader to efficiently walk away from a running query /// ... NB as long as they Dispose! - member __.Walk(stream, batchSize, ?position, ?direction) : AsyncSeq[]> = + member __.Walk(stream, batchSize, ?position, ?direction) : AsyncSeq[]> = __.GetLazy((stream, position), batchSize, ?direction=direction) /// Reads all Events from a `Position` in a given `direction` - member __.Read(stream, ?position, ?maxCount, ?direction) : Async[]> = + member __.Read(stream, ?position, ?maxCount, ?direction) : Async[]> = __.GetInternal((stream, position), ?maxCount=maxCount, ?direction=direction) |> yieldPositionAndData /// Appends the supplied batch of events, subject to a consistency check based on the `position` @@ -1300,7 +1302,7 @@ module Events = let private stripPosition (f: Async): Async = async { let! (PositionIndex index) = f return index } - let private dropPosition (f: Async[]>): Async[]> = async { + let private dropPosition (f: Async[]>): Async[]> = async { let! _,xs = f return xs } let (|MinPosition|) = function @@ -1314,14 +1316,14 @@ module Events = /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let getAll (ctx: Context) (streamName: string) (MinPosition index: int64) (batchSize: int): FSharp.Control.AsyncSeq[]> = + let getAll (ctx: Context) (streamName: string) (MinPosition index: int64) (batchSize: int): FSharp.Control.AsyncSeq[]> = ctx.Walk(ctx.CreateStream streamName, batchSize, ?position=index) /// Returns an async array of events in the stream starting at the specified sequence number, /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is larger than the largest /// sequence number in the stream. - let get (ctx: Context) (streamName: string) (MinPosition index: int64) (maxCount: int): Async[]> = + let get (ctx: Context) (streamName: string) (MinPosition index: int64) (maxCount: int): Async[]> = ctx.Read(ctx.CreateStream streamName, ?position=index, maxCount=maxCount) |> dropPosition /// Appends a batch of events to a stream at the specified expected sequence number. @@ -1341,14 +1343,14 @@ module Events = /// reading in batches of the specified size. /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getAllBackwards (ctx: Context) (streamName: string) (MaxPosition index: int64) (batchSize: int): AsyncSeq[]> = + let getAllBackwards (ctx: Context) (streamName: string) (MaxPosition index: int64) (batchSize: int): AsyncSeq[]> = ctx.Walk(ctx.CreateStream streamName, batchSize, ?position=index, direction=Direction.Backward) /// Returns an async array of events in the stream backwards starting from the specified sequence number, /// number of events to read is specified by batchSize /// Returns an empty sequence if the stream is empty or if the sequence number is smaller than the smallest /// sequence number in the stream. - let getBackwards (ctx: Context) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async[]> = + let getBackwards (ctx: Context) (streamName: string) (MaxPosition index: int64) (maxCount: int): Async[]> = ctx.Read(ctx.CreateStream streamName, ?position=index, maxCount=maxCount, direction=Direction.Backward) |> dropPosition /// Obtains the `index` from the current write Position diff --git a/src/Equinox.Cosmos/CosmosJsonSerializer.fs b/src/Equinox.Cosmos/CosmosJsonSerializer.fs new file mode 100644 index 000000000..6b1a5a310 --- /dev/null +++ b/src/Equinox.Cosmos/CosmosJsonSerializer.fs @@ -0,0 +1,31 @@ +namespace Equinox.Cosmos.Store + +open Azure.Cosmos.Serialization +open Equinox.Core +open System.IO +open System.Text.Json + +type CosmosJsonSerializer (options: JsonSerializerOptions) = + inherit CosmosSerializer() + + override __.FromStream<'T> (stream) = + using (stream) (fun stream -> + if stream.Length = 0L then + Unchecked.defaultof<'T> + elif typeof.IsAssignableFrom(typeof<'T>) then + stream :> obj :?> 'T + else + JsonSerializer.DeserializeAsync<'T>(stream, options) + |> Async.AwaitValueTask + |> Async.RunSynchronously + ) + + override __.ToStream<'T> (input: 'T) = + let memoryStream = new MemoryStream() + + JsonSerializer.SerializeAsync(memoryStream, input, input.GetType(), options) + |> Async.AwaitTaskCorrect + |> Async.RunSynchronously + + memoryStream.Position <- 0L + memoryStream :> Stream diff --git a/src/Equinox.Cosmos/Equinox.Cosmos.fsproj b/src/Equinox.Cosmos/Equinox.Cosmos.fsproj index 902cfcd2f..4659eff70 100644 --- a/src/Equinox.Cosmos/Equinox.Cosmos.fsproj +++ b/src/Equinox.Cosmos/Equinox.Cosmos.fsproj @@ -11,7 +11,7 @@ - + @@ -26,10 +26,10 @@ - + \ No newline at end of file diff --git a/src/Equinox.Cosmos/NewtonsoftJsonSerializer.fs b/src/Equinox.Cosmos/NewtonsoftJsonSerializer.fs deleted file mode 100644 index 71bc33a10..000000000 --- a/src/Equinox.Cosmos/NewtonsoftJsonSerializer.fs +++ /dev/null @@ -1,39 +0,0 @@ -namespace Equinox.Cosmos.Store - -open System.IO; -open System.Text; -open Azure.Cosmos.Serialization; -open Newtonsoft.Json; -open Newtonsoft.Json.Serialization; - -type NewtonsoftJsonSerializer () = - inherit CosmosSerializer () - - let encoding = new UTF8Encoding(false, true) - let serializer = JsonSerializer.Create() - - override __.FromStream<'T> (stream: Stream): 'T = - use stream = stream - - if typeof.IsAssignableFrom(typeof<'T>) then - stream :> obj :?> 'T - else - use streamReader = new StreamReader(stream) - use jsonReader = new JsonTextReader(streamReader) - serializer.Deserialize<'T>(jsonReader) - - override __.ToStream<'T> (input: 'T): Stream = - let payload = new MemoryStream() - - ( - use streamWriter = new StreamWriter(payload, encoding = encoding, bufferSize = 1024, leaveOpen = true) - use jsonWriter = new JsonTextWriter(streamWriter) - - jsonWriter.Formatting <- Formatting.None - serializer.Serialize(jsonWriter, input) - jsonWriter.Flush() - streamWriter.Flush() - ) - - payload.Position <- 0L - payload :> Stream diff --git a/src/Equinox.EventStore/Equinox.EventStore.fsproj b/src/Equinox.EventStore/Equinox.EventStore.fsproj index 9394721b3..15847dde4 100644 --- a/src/Equinox.EventStore/Equinox.EventStore.fsproj +++ b/src/Equinox.EventStore/Equinox.EventStore.fsproj @@ -26,7 +26,7 @@ - + diff --git a/src/Equinox.MemoryStore/Equinox.MemoryStore.fsproj b/src/Equinox.MemoryStore/Equinox.MemoryStore.fsproj index 5eeae410f..4895d6ccb 100644 --- a/src/Equinox.MemoryStore/Equinox.MemoryStore.fsproj +++ b/src/Equinox.MemoryStore/Equinox.MemoryStore.fsproj @@ -24,7 +24,7 @@ - + \ No newline at end of file diff --git a/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj b/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj index 8f5feeeea..e7addd071 100644 --- a/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj +++ b/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj @@ -24,7 +24,7 @@ - + diff --git a/src/Equinox/Equinox.fsproj b/src/Equinox/Equinox.fsproj index 8ab6f3d94..417cbffcb 100644 --- a/src/Equinox/Equinox.fsproj +++ b/src/Equinox/Equinox.fsproj @@ -20,6 +20,7 @@ + \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index e7e726e9d..0e0e6138d 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -8,7 +8,7 @@ open Newtonsoft.Json.Linq open Swensen.Unquote open Serilog open System -open System.Text +open System.Text.Json #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -16,8 +16,8 @@ type TestEvents() = static member private Create(i, ?eventType, ?json) = EventData.FromUtf8Bytes ( sprintf "%s:%d" (defaultArg eventType "test_event") i, - Encoding.UTF8.GetBytes(defaultArg json "{\"d\":\"d\"}"), - Encoding.UTF8.GetBytes "{\"m\":\"m\"}") + IntegrationJsonSerializer.deserialize(defaultArg json "{\"d\":\"d\"}"), + IntegrationJsonSerializer.deserialize("{\"m\":\"m\"}") ) static member Create(i, c) = Array.init c (fun x -> TestEvents.Create(x+i)) type Tests(testOutputHelper) = @@ -69,8 +69,8 @@ type Tests(testOutputHelper) = test <@ match res with Choice2Of2 ((:? InvalidOperationException) as ex) -> ex.Message.StartsWith "Must write either events or unfolds." | x -> failwithf "%A" x @> } - let blobEquals (x: byte[]) (y: byte[]) = System.Linq.Enumerable.SequenceEqual(x,y) - let stringOfUtf8 (x: byte[]) = Encoding.UTF8.GetString(x) + let blobEquals (x: JsonElement) (y: JsonElement) = x.GetRawText().Equals(y.GetRawText()) + let stringOfUtf8 (x: JsonElement) = x.GetRawText() let xmlDiff (x: string) (y: string) = match JsonDiffPatchDotNet.JsonDiffPatch().Diff(JToken.Parse x,JToken.Parse y) with | null -> "" @@ -91,7 +91,7 @@ type Tests(testOutputHelper) = return TestEvents.Create(0,6) } - let verifyCorrectEventsEx direction baseIndex (expected: IEventData<_>[]) (xs: ITimelineEvent[]) = + let verifyCorrectEventsEx direction baseIndex (expected: IEventData<_>[]) (xs: ITimelineEvent[]) = let xs, baseIndex = if direction = Equinox.Cosmos.Store.Direction.Forward then xs, baseIndex else Array.rev xs, baseIndex - int64 (Array.length expected) + 1L diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 0b254b5f0..b74282e46 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -11,7 +11,7 @@ open System.Threading module Cart = let fold, initial = Domain.Cart.Fold.fold, Domain.Cart.Fold.initial let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot - let codec = Domain.Cart.Events.codec + let codec = Domain.Cart.Events.JsonElementCodec.codec IntegrationJsonSerializer.options let createServiceWithoutOptimization connection batchSize log = let store = createCosmosContext connection batchSize let resolve (id,opt) = Resolver(store, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve(id,?option=opt) @@ -40,7 +40,7 @@ module Cart = module ContactPreferences = let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferences.Fold.initial - let codec = Domain.ContactPreferences.Events.codec + let codec = Domain.ContactPreferences.Events.JsonElementCodec.codec IntegrationJsonSerializer.options let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate = let gateway = createGateway defaultBatchSize let resolve = Resolver(gateway, codec, fold, initial, CachingStrategy.NoCaching, AccessStrategy.Unoptimized).Resolve @@ -375,4 +375,4 @@ type Tests(testOutputHelper) = capture.Clear() do! addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service1 1 test <@ [EqxAct.Append] = capture.ExternalCalls @> - } \ No newline at end of file + } diff --git a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj index 01422e7c0..09c104a32 100644 --- a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj +++ b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj @@ -8,6 +8,7 @@ + diff --git a/tests/Equinox.Cosmos.Integration/Json.fs b/tests/Equinox.Cosmos.Integration/Json.fs new file mode 100644 index 000000000..398679399 --- /dev/null +++ b/tests/Equinox.Cosmos.Integration/Json.fs @@ -0,0 +1,27 @@ +[] +module Equinox.Cosmos.Integration.Json + +open System +open System.Text.Json +open System.Text.Json.Serialization +open Domain +open FsCodec.SystemTextJson +open FsCodec.SystemTextJson.Serialization + +type JsonSkuIdConverter () = + inherit JsonConverter() + + override __.Read (reader, _typ, _options) = + reader.GetString() |> Guid.Parse |> SkuId + + override __.Write (writer, value, _options) = + writer.WriteStringValue(string value) + +module IntegrationJsonSerializer = + let options = JsonSerializer.defaultOptions + options.Converters.Add(JsonSkuIdConverter()) + + let serialize (value: 'T) = JsonSerializer.Serialize(value, options) + let serializeToElement (value: 'T) = JsonSerializer.SerializeToElement(value, options) + let deserialize<'T> (json: string) = JsonSerializer.Deserialize<'T>(json, options) + let deserializeElement<'T> (jsonElement: JsonElement) = JsonSerializer.DeserializeElement<'T>(jsonElement, options) diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index fc283b40c..7bcbecb31 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -2,9 +2,9 @@ open Equinox.Cosmos open FsCheck.Xunit -open Newtonsoft.Json open Swensen.Unquote open System +open System.Text.Json open Xunit type Embedded = { embed : string } @@ -15,8 +15,19 @@ type Union = let defaultSettings = FsCodec.NewtonsoftJson.Settings.CreateDefault() +let encode (evt: Union) = + match evt with + | A e -> "A", IntegrationJsonSerializer.serializeToElement(e) + | B e -> "B", IntegrationJsonSerializer.serializeToElement(e) + +let tryDecode (eventType, data: JsonElement) = + match eventType with + | "A" -> Some (A <| IntegrationJsonSerializer.deserializeElement(data)) + | "B" -> Some (B <| IntegrationJsonSerializer.deserializeElement(data)) + | _ -> None + type Base64ZipUtf8Tests() = - let eventCodec = FsCodec.NewtonsoftJson.Codec.Create(defaultSettings) + let eventCodec = FsCodec.Codec.Create(encode, tryDecode) [] let ``serializes, achieving compression`` () = @@ -25,10 +36,10 @@ type Base64ZipUtf8Tests() = { i = 42L c = encoded.EventType d = encoded.Data - m = null + m = Unchecked.defaultof t = DateTimeOffset.MinValue } - let res = JsonConvert.SerializeObject e - test <@ res.Contains("\"d\":\"") && res.Length < 128 @> + let res = IntegrationJsonSerializer.serialize(e) + test <@ res.Contains("\"d\":\"") && res.Length < 138 @> [] let roundtrips value = @@ -43,11 +54,11 @@ type Base64ZipUtf8Tests() = { i = 42L c = encoded.EventType d = encoded.Data - m = null + m = Unchecked.defaultof t = DateTimeOffset.MinValue } - let ser = JsonConvert.SerializeObject(e) + let ser = IntegrationJsonSerializer.serialize(e) test <@ ser.Contains("\"d\":\"") @> - let des = JsonConvert.DeserializeObject(ser) + let des = IntegrationJsonSerializer.deserialize(ser) let d = FsCodec.Core.TimelineEvent.Create(-1L, des.c, des.d) let decoded = eventCodec.TryDecode d |> Option.get - test <@ value = decoded @> \ No newline at end of file + test <@ value = decoded @> diff --git a/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj b/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj index 4ea55867f..1aa5f6f2c 100644 --- a/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj +++ b/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj @@ -22,7 +22,7 @@ - + diff --git a/tests/Equinox.EventStore.Integration/StoreIntegration.fs b/tests/Equinox.EventStore.Integration/StoreIntegration.fs index 16370df83..f2e4c76c2 100644 --- a/tests/Equinox.EventStore.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/StoreIntegration.fs @@ -48,10 +48,10 @@ let createGesGateway connection batchSize = Context(connection, BatchingPolicy(m module Cart = let fold, initial = Domain.Cart.Fold.fold, Domain.Cart.Fold.initial - let codec = Domain.Cart.Events.codec + let codec = Domain.Cart.Events.Utf8ArrayCodec.codec let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot let createServiceWithoutOptimization log gateway = - Backend.Cart.Service(log, fun (id,opt) -> Resolver(gateway, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt)) + Backend.Cart.Service(log, fun (id,opt) -> Resolver(gateway, Domain.Cart.Events.Utf8ArrayCodec.codec, fold, initial).Resolve(id,?option=opt)) let createServiceWithCompaction log gateway = let resolve (id,opt) = Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt) Backend.Cart.Service(log, resolve) @@ -64,7 +64,7 @@ module Cart = module ContactPreferences = let fold, initial = Domain.ContactPreferences.Fold.fold, Domain.ContactPreferences.Fold.initial - let codec = Domain.ContactPreferences.Events.codec + let codec = Domain.ContactPreferences.Events.Utf8ArrayCodec.codec let createServiceWithoutOptimization log connection = let gateway = createGesGateway connection defaultBatchSize Backend.ContactPreferences.Service(log, Resolver(gateway, codec, fold, initial).Resolve) @@ -382,4 +382,4 @@ type Tests(testOutputHelper) = let! _ = service2.Read cartId let suboptimalExtraSlice = [singleSliceForward] test <@ singleBatchBackwards @ batchBackwardsAndAppend @ suboptimalExtraSlice @ singleBatchForward = capture.ExternalCalls @> - } \ No newline at end of file + } diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 72bb5aa7a..171c06656 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -11,6 +11,7 @@ open Serilog.Events open System open System.Net.Http open System.Threading +open System.Text.Json let [] appName = "equinox-tool" @@ -362,16 +363,24 @@ module CosmosStats = | _ -> failwith "please specify a `cosmos` endpoint" } module Dump = - let run (log : ILogger, verboseConsole, maybeSeq) (args : ParseResults) = - let a = DumpInfo args - let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq - let storeLog, storeConfig = a.ConfigureStore(log,createStoreLog) - let doU,doE = not(args.Contains EventsOnly),not(args.Contains UnfoldsOnly) - let doC,doJ,doP,doT = args.Contains Correlation,not(args.Contains JsonSkip),not(args.Contains PrettySkip),not(args.Contains TimeRegular) - let resolver = Samples.Infrastructure.Services.StreamResolver(storeConfig) + let logEvent (log: ILogger) (prevTs: DateTimeOffset option) doC doT (event: FsCodec.ITimelineEvent<'format>) (renderer: 'format -> string) = + let ty = if event.IsUnfold then "U" else "E" + let interval = + match prevTs with Some p when not event.IsUnfold -> Some (event.Timestamp - p) | _ -> None + |> function + | None -> if doT then "n/a" else "0" + | Some (i : TimeSpan) when not doT -> i.ToString() + | Some (i : TimeSpan) when i.TotalDays >= 1. -> i.ToString "d\dhh\hmm\m" + | Some i when i.TotalHours >= 1. -> i.ToString "h\hmm\mss\s" + | Some i when i.TotalMinutes >= 1. -> i.ToString "m\mss\.ff\s" + | Some i -> i.ToString("s\.fff\s") + if not doC then log.Information("{i,4}@{t:u}+{d,9} {u:l} {e:l} {data:l} {meta:l}", + event.Index, event.Timestamp, interval, ty, event.EventType, renderer event.Data, renderer event.Meta) + else log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}", + event.Index, event.Timestamp, interval, event.CorrelationId, event.CausationId, ty, event.EventType, renderer event.Data, renderer event.Meta) + event.Timestamp - let streams = args.GetResults DumpArguments.Stream - log.ForContext("streams",streams).Information("Reading...") + let dumpUtf8ArrayStorage (log: ILogger) (storeLog: ILogger) doU doE doC doJ doP doT (resolver: Services.StreamResolver) (streams: FsCodec.StreamName list) = let initial = List.empty let fold state events = (events,state) ||> Seq.foldBack (fun e l -> e :: l) let mutable unfolds = List.empty @@ -388,31 +397,59 @@ module Dump = | _ -> sprintf "(%d chars)" (System.Text.Encoding.UTF8.GetString(data).Length) with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "Parse failure"); reraise() let readStream (streamName : FsCodec.StreamName) = async { - let stream = resolver.Resolve(idCodec,fold,initial,isOriginAndSnapshot) streamName + let stream = resolver.ResolveWithUtf8ArrayCodec(idCodec,fold,initial,isOriginAndSnapshot) streamName let! _token,events = stream.Load storeLog let source = if not doE && not (List.isEmpty unfolds) then Seq.ofList unfolds else Seq.append events unfolds let mutable prevTs = None for x in source |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do - let ty,render = if x.IsUnfold then "U", render Newtonsoft.Json.Formatting.Indented else "E", render fo - let interval = - match prevTs with Some p when not x.IsUnfold -> Some (x.Timestamp - p) | _ -> None - |> function - | None -> if doT then "n/a" else "0" - | Some (i : TimeSpan) when not doT -> i.ToString() - | Some (i : TimeSpan) when i.TotalDays >= 1. -> i.ToString "d\dhh\hmm\m" - | Some i when i.TotalHours >= 1. -> i.ToString "h\hmm\mss\s" - | Some i when i.TotalMinutes >= 1. -> i.ToString "m\mss\.ff\s" - | Some i -> i.ToString("s\.fff\s") - prevTs <- Some x.Timestamp - if not doC then log.Information("{i,4}@{t:u}+{d,9} {u:l} {e:l} {data:l} {meta:l}", - x.Index, x.Timestamp, interval, ty, x.EventType, render x.Data, render x.Meta) - else log.Information("{i,4}@{t:u}+{d,9} Corr {corr} Cause {cause} {u:l} {e:l} {data:l} {meta:l}", - x.Index, x.Timestamp, interval, x.CorrelationId, x.CausationId, ty, x.EventType, render x.Data, render x.Meta) } + let render = if x.IsUnfold then render Newtonsoft.Json.Formatting.Indented else render fo + prevTs <- Some (logEvent log prevTs doC doT x render) } streams |> Seq.map readStream |> Async.Parallel |> Async.Ignore + let dumpJsonElementStorage (log: ILogger) (storeLog: ILogger) doU doE doC doJ _doP doT (resolver: Services.StreamResolver) (streams: FsCodec.StreamName list) = + let initial = List.empty + let fold state events = (events,state) ||> Seq.foldBack (fun e l -> e :: l) + let mutable unfolds = List.empty + let tryDecode (x : FsCodec.ITimelineEvent) = + if x.IsUnfold then unfolds <- x :: unfolds + Some x + let idCodec = FsCodec.Codec.Create((fun _ -> failwith "No encoding required"), tryDecode, (fun _ -> failwith "No mapCausation")) + let isOriginAndSnapshot = (fun (event : FsCodec.ITimelineEvent<_>) -> not doE && event.IsUnfold),fun _state -> failwith "no snapshot required" + let render (data : JsonElement) = + match data.ValueKind with + | JsonValueKind.Null | JsonValueKind.Undefined -> null + | _ when doJ -> data.GetRawText() + | _ -> sprintf "(%d chars)" (data.GetRawText().Length) + let readStream (streamName : FsCodec.StreamName) = async { + let stream = resolver.ResolveWithJsonElementCodec(idCodec,fold,initial,isOriginAndSnapshot) streamName + let! _token,events = stream.Load storeLog + let source = if not doE && not (List.isEmpty unfolds) then Seq.ofList unfolds else Seq.append events unfolds + let mutable prevTs = None + for x in source |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do + prevTs <- Some (logEvent log prevTs doC doT x render) } + streams + |> Seq.map readStream + |> Async.Parallel + |> Async.Ignore + + let run (log : ILogger, verboseConsole, maybeSeq) (args : ParseResults) = + let a = DumpInfo args + let createStoreLog verboseStore = createStoreLog verboseStore verboseConsole maybeSeq + let storeLog, storeConfig = a.ConfigureStore(log,createStoreLog) + let doU,doE = not(args.Contains EventsOnly),not(args.Contains UnfoldsOnly) + let doC,doJ,doP,doT = args.Contains Correlation,not(args.Contains JsonSkip),not(args.Contains PrettySkip),not(args.Contains TimeRegular) + let resolver = Samples.Infrastructure.Services.StreamResolver(storeConfig) + + let streams = args.GetResults DumpArguments.Stream + log.ForContext("streams",streams).Information("Reading...") + + match storeConfig with + | Storage.StorageConfig.Cosmos _ -> dumpJsonElementStorage log storeLog doU doE doC doJ doP doT resolver streams + | _ -> dumpUtf8ArrayStorage log storeLog doU doE doC doJ doP doT resolver streams + [] let main argv = let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name