Skip to content

Switch Equinox.Cosmos over to System.Text.Json #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions samples/Infrastructure/Services.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
module Samples.Infrastructure.Services

open Domain
open FsCodec
open FsCodec.SystemTextJson.Serialization
open Microsoft.Extensions.DependencyInjection
open System
open System.Text.Json

[<NoComparison>]
type StreamCodec<'event, 'context> =
| JsonElementCodec of IEventCodec<'event, JsonElement, 'context>
| Utf8ArrayCodec of IEventCodec<'event, byte[], 'context>

type StreamResolver(storage) =
member __.Resolve
( codec : FsCodec.IEventCodec<'event,byte[],_>,
member __.ResolveWithJsonElementCodec
( codec : IEventCodec<'event, JsonElement, _>,
fold: ('state -> 'event seq -> 'state),
initial: 'state,
snapshot: (('event -> bool) * ('state -> 'event))) =
Expand All @@ -15,6 +23,14 @@ type StreamResolver(storage) =
let store = Equinox.Cosmos.Context(gateway, databaseId, containerId)
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot else Equinox.Cosmos.AccessStrategy.Unoptimized
Equinox.Cosmos.Resolver<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
| _ -> failwith "Currently, only Cosmos can be used with a JsonElement codec."

member __.ResolveWithUtf8ArrayCodec
( codec : IEventCodec<'event, byte[], _>,
fold: ('state -> 'event seq -> 'state),
initial: 'state,
snapshot: (('event -> bool) * ('state -> 'event))) =
match storage with
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
Expand All @@ -23,24 +39,34 @@ type StreamResolver(storage) =
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
| _ -> failwith "Only EventStore, Memory Store, and SQL Store can be used with a byte array codec."

type ServiceBuilder(storageConfig, handlerLog) =
let resolver = StreamResolver(storageConfig)

member __.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
Backend.Favorites.Service(handlerLog, resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot))

match storageConfig with
| Storage.StorageConfig.Cosmos _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithJsonElementCodec(Favorites.Events.JsonElementCodec.codec JsonSerializer.defaultOptions, fold, initial, snapshot))
| _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(Favorites.Events.Utf8ArrayCodec.codec, fold, initial, snapshot))

member __.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
Backend.SavedForLater.Service(handlerLog, resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot), maxSavedItems=50)

match storageConfig with
| Storage.StorageConfig.Cosmos _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithJsonElementCodec(SavedForLater.Events.JsonElementCodec.codec JsonSerializer.defaultOptions,fold,initial,snapshot), maxSavedItems=50)
| _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(SavedForLater.Events.Utf8ArrayCodec.codec,fold,initial,snapshot), maxSavedItems=50)

member __.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
TodoBackend.Service(handlerLog, resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))

match storageConfig with
| Storage.StorageConfig.Cosmos _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithJsonElementCodec(TodoBackend.Events.JsonElementCodec.codec JsonSerializer.defaultOptions,fold,initial,snapshot))
| _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(TodoBackend.Events.Utf8ArrayCodec.codec,fold,initial,snapshot))

let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
Expand All @@ -49,4 +75,4 @@ let register (services : IServiceCollection, storageConfig, handlerLog) =

regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateFavoritesService()
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateSaveForLaterService()
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateTodosService()
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateTodosService()
2 changes: 1 addition & 1 deletion samples/Store/Backend/Backend.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
<ProjectReference Include="..\..\..\src\Equinox.Core\Equinox.Core.fsproj" />
<ProjectReference Include="..\Domain\Domain.fsproj" />
</ItemGroup>

Expand Down
31 changes: 29 additions & 2 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,34 @@ module Events =
| ItemQuantityChanged of ItemQuantityChangeInfo
| ItemWaiveReturnsChanged of ItemWaiveReturnsInfo
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Utf8ArrayCodec =
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module JsonElementCodec =
open FsCodec.SystemTextJson
open System.Text.Json

let private encode (options: JsonSerializerOptions) =
fun (evt: Event) ->
match evt with
| Snapshotted state -> "Snapshotted", JsonSerializer.SerializeToElement(state, options)
| ItemAdded addInfo -> "ItemAdded", JsonSerializer.SerializeToElement(addInfo, options)
| ItemRemoved removeInfo -> "ItemRemoved", JsonSerializer.SerializeToElement(removeInfo, options)
| ItemQuantityChanged changeInfo -> "ItemQuantityChanged", JsonSerializer.SerializeToElement(changeInfo, options)
| ItemWaiveReturnsChanged waiveInfo -> "ItemWaiveReturnsChanged", JsonSerializer.SerializeToElement(waiveInfo, options)

let private tryDecode (options: JsonSerializerOptions) =
fun (eventType, data: JsonElement) ->
match eventType with
| "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement<Compaction.State>(data, options))
| "ItemAdded" -> Some (ItemAdded <| JsonSerializer.DeserializeElement<ItemAddInfo>(data, options))
| "ItemRemoved" -> Some (ItemRemoved <| JsonSerializer.DeserializeElement<ItemRemoveInfo>(data, options))
| "ItemQuantityChanged" -> Some (ItemQuantityChanged <| JsonSerializer.DeserializeElement<ItemQuantityChangeInfo>(data, options))
| "ItemWaiveReturnsChanged" -> Some (ItemWaiveReturnsChanged <| JsonSerializer.DeserializeElement<ItemWaiveReturnsInfo>(data, options))
| _ -> None

let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)

module Fold =
type ItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool }
Expand Down Expand Up @@ -79,4 +106,4 @@ module Commands =
match waived with
| Some waived when itemExistsWithDifferentWaiveStatus skuId waived ->
yield Events.ItemWaiveReturnsChanged { context = c; skuId = skuId; waived = waived }
| _ -> () ]
| _ -> () ]
23 changes: 21 additions & 2 deletions samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,26 @@ module Events =
type Event =
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Utf8ArrayCodec =
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module JsonElementCodec =
open FsCodec.SystemTextJson
open System.Text.Json

let private encode (options: JsonSerializerOptions) =
fun (evt: Event) ->
match evt with
| Updated value -> "contactPreferencesChanged", JsonSerializer.SerializeToElement(value, options)

let private tryDecode (options: JsonSerializerOptions) =
fun (eventType, data: JsonElement) ->
match eventType with
| "contactPreferencesChanged" -> Some (Updated <| JsonSerializer.DeserializeElement<Value>(data, options))
| _ -> None

let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)

module Fold =

Expand All @@ -37,4 +56,4 @@ module Commands =
match command with
| Update ({ preferences = preferences } as value) ->
if state = preferences then [] else
[ Events.Updated value ]
[ Events.Updated value ]
4 changes: 2 additions & 2 deletions samples/Store/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />

<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.0" />
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
<ProjectReference Include="..\..\..\src\Equinox.Core\Equinox.Core.fsproj" />
</ItemGroup>

</Project>
25 changes: 23 additions & 2 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,28 @@ module Events =
| Favorited of Favorited
| Unfavorited of Unfavorited
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Utf8ArrayCodec =
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module JsonElementCodec =
open FsCodec.SystemTextJson
open System.Text.Json

let private encode (options: JsonSerializerOptions) = fun (evt: Event) ->
match evt with
| Snapshotted snapshotted -> "Snapshotted", JsonSerializer.SerializeToElement(snapshotted, options)
| Favorited favorited -> "Favorited", JsonSerializer.SerializeToElement(favorited, options)
| Unfavorited unfavorited -> "Unfavorited", JsonSerializer.SerializeToElement(unfavorited, options)

let private tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) ->
match eventType with
| "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement<Snapshotted>(data, options))
| "Favorited" -> Some (Favorited <| JsonSerializer.DeserializeElement<Favorited>(data, options))
| "Unfavorited" -> Some (Unfavorited <| JsonSerializer.DeserializeElement<Unfavorited>(data, options))
| _ -> None

let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)

module Fold =

Expand Down Expand Up @@ -56,4 +77,4 @@ module Commands =
yield Events.Favorited { date = date; skuId = skuId } ]
| Unfavorite skuId ->
if doesntHave skuId then [] else
[ Events.Unfavorited { skuId = skuId } ]
[ Events.Unfavorited { skuId = skuId } ]
29 changes: 27 additions & 2 deletions samples/Store/Domain/SavedForLater.fs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,32 @@ module Events =
/// Addition of a collection of skus to the list
| Added of Added
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Utf8ArrayCodec =
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module JsonElementCodec =
open FsCodec.SystemTextJson
open System.Text.Json

let private encode (options: JsonSerializerOptions) =
fun (evt: Event) ->
match evt with
| Compacted compacted -> Compaction.EventType, JsonSerializer.SerializeToElement(compacted, options)
| Merged merged -> "Merged", JsonSerializer.SerializeToElement(merged, options)
| Removed removed -> "Removed", JsonSerializer.SerializeToElement(removed, options)
| Added added -> "Added", JsonSerializer.SerializeToElement(added, options)

let private tryDecode (options: JsonSerializerOptions) =
fun (eventType, data: JsonElement) ->
match eventType with
| Compaction.EventType -> Some (Compacted <| JsonSerializer.DeserializeElement<Compaction.Compacted>(data, options))
| "Merged" -> Some (Merged <| JsonSerializer.DeserializeElement<Merged>(data, options))
| "Removed" -> Some (Removed <| JsonSerializer.DeserializeElement<Removed>(data, options))
| "Added" -> Some (Added <| JsonSerializer.DeserializeElement<Added>(data, options))
| _ -> None

let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)

module Fold =
open Events
Expand Down Expand Up @@ -104,4 +129,4 @@ module Commands =
let index = Index state
let net = skus |> Array.filter (index.DoesNotAlreadyContainSameOrMoreRecent dateSaved)
if Array.isEmpty net then true, []
else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ]
else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ]
17 changes: 9 additions & 8 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ open Equinox
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
open FsCodec.SystemTextJson.Serialization
open Swensen.Unquote

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand All @@ -15,19 +16,19 @@ let createMemoryStore () =
// we want to validate that the JSON UTF8 is working happily
VolatileStore<byte[]>()
let createServiceMemory log store =
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))

let codec = Domain.Cart.Events.codec
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.Utf8ArrayCodec.codec, fold, initial).Resolve(id,?option=opt))

let eventStoreCodec = Domain.Cart.Events.Utf8ArrayCodec.codec
let resolveGesStreamWithRollingSnapshots gateway =
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
let resolveGesStreamWithoutCustomAccessStrategy gateway =
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt)
fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial).Resolve(id,?option=opt)

let cosmosCodec = Domain.Cart.Events.JsonElementCodec.codec JsonSerializer.defaultOptions
let resolveCosmosStreamWithSnapshotStrategy gateway =
fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
fun (id,opt) -> Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
let resolveCosmosStreamWithoutCustomAccessStrategy gateway =
fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
fun (id,opt) -> Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
Expand Down Expand Up @@ -82,4 +83,4 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly folding the events with With Snapshotting`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveCosmosStreamWithSnapshotStrategy
do! act service args
}
}
16 changes: 9 additions & 7 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

open Equinox
open Equinox.Cosmos.Integration
open FsCodec.SystemTextJson.Serialization
open Swensen.Unquote
open Xunit

Expand All @@ -14,19 +15,20 @@ let createMemoryStore () =
let createServiceMemory log store =
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)

let codec = Domain.ContactPreferences.Events.codec
let eventStoreCodec = Domain.ContactPreferences.Events.Utf8ArrayCodec.codec
let resolveStreamGesWithOptimizedStorageSemantics gateway =
EventStore.Resolver(gateway 1, codec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve
EventStore.Resolver(gateway 1, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve
let resolveStreamGesWithoutAccessStrategy gateway =
EventStore.Resolver(gateway defaultBatchSize, codec, fold, initial).Resolve
EventStore.Resolver(gateway defaultBatchSize, eventStoreCodec, fold, initial).Resolve

let cosmosCodec = Domain.ContactPreferences.Events.JsonElementCodec.codec JsonSerializer.defaultOptions
let resolveStreamCosmosWithLatestKnownEventSemantics gateway =
Cosmos.Resolver(gateway 1, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve
Cosmos.Resolver(gateway 1, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve
let resolveStreamCosmosUnoptimized gateway =
Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve
Cosmos.Resolver(gateway defaultBatchSize, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve
let resolveStreamCosmosRollingUnfolds gateway =
let access = Cosmos.AccessStrategy.Custom(Domain.ContactPreferences.Fold.isOrigin, Domain.ContactPreferences.Fold.transmute)
Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve
Cosmos.Resolver(gateway defaultBatchSize, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -79,4 +81,4 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds
do! act service args
}
}
Loading