Skip to content

Commit b0a3a9b

Browse files
authored
Merge pull request #200 from jet/stj
Switch Equinox.Cosmos over to System.Text.Json
2 parents a36e742 + 3fabd8f commit b0a3a9b

38 files changed

+815
-245
lines changed

Diff for: samples/Infrastructure/Services.fs

+32-6
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
module Samples.Infrastructure.Services
22

33
open Domain
4+
open FsCodec
5+
open FsCodec.SystemTextJson.Serialization
46
open Microsoft.Extensions.DependencyInjection
57
open System
8+
open System.Text.Json
9+
10+
[<NoComparison>]
11+
type StreamCodec<'event, 'context> =
12+
| JsonElementCodec of IEventCodec<'event, JsonElement, 'context>
13+
| Utf8ArrayCodec of IEventCodec<'event, byte[], 'context>
614

715
type StreamResolver(storage) =
8-
member __.Resolve
9-
( codec : FsCodec.IEventCodec<'event,byte[],_>,
16+
member __.ResolveWithJsonElementCodec
17+
( codec : IEventCodec<'event, JsonElement, _>,
1018
fold: ('state -> 'event seq -> 'state),
1119
initial: 'state,
1220
snapshot: (('event -> bool) * ('state -> 'event))) =
@@ -15,6 +23,14 @@ type StreamResolver(storage) =
1523
let store = Equinox.Cosmos.Context(gateway, databaseId, containerId)
1624
let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot else Equinox.Cosmos.AccessStrategy.Unoptimized
1725
Equinox.Cosmos.Resolver<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
26+
| _ -> failwith "Currently, only Cosmos can be used with a JsonElement codec."
27+
28+
member __.ResolveWithUtf8ArrayCodec
29+
( codec : IEventCodec<'event, byte[], _>,
30+
fold: ('state -> 'event seq -> 'state),
31+
initial: 'state,
32+
snapshot: (('event -> bool) * ('state -> 'event))) =
33+
match storage with
1834
| Storage.StorageConfig.Es (context, caching, unfolds) ->
1935
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
2036
Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
@@ -23,24 +39,34 @@ type StreamResolver(storage) =
2339
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
2440
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
2541
Equinox.SqlStreamStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
42+
| _ -> failwith "Only EventStore, Memory Store, and SQL Store can be used with a byte array codec."
2643

2744
type ServiceBuilder(storageConfig, handlerLog) =
2845
let resolver = StreamResolver(storageConfig)
2946

3047
member __.CreateFavoritesService() =
3148
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
3249
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
33-
Backend.Favorites.Service(handlerLog, resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot))
50+
51+
match storageConfig with
52+
| Storage.StorageConfig.Cosmos _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithJsonElementCodec(Favorites.Events.JsonElementCodec.codec JsonSerializer.defaultOptions, fold, initial, snapshot))
53+
| _ -> Backend.Favorites.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(Favorites.Events.Utf8ArrayCodec.codec, fold, initial, snapshot))
3454

3555
member __.CreateSaveForLaterService() =
3656
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
3757
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
38-
Backend.SavedForLater.Service(handlerLog, resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot), maxSavedItems=50)
58+
59+
match storageConfig with
60+
| Storage.StorageConfig.Cosmos _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithJsonElementCodec(SavedForLater.Events.JsonElementCodec.codec JsonSerializer.defaultOptions,fold,initial,snapshot), maxSavedItems=50)
61+
| _ -> Backend.SavedForLater.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(SavedForLater.Events.Utf8ArrayCodec.codec,fold,initial,snapshot), maxSavedItems=50)
3962

4063
member __.CreateTodosService() =
4164
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
4265
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
43-
TodoBackend.Service(handlerLog, resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))
66+
67+
match storageConfig with
68+
| Storage.StorageConfig.Cosmos _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithJsonElementCodec(TodoBackend.Events.JsonElementCodec.codec JsonSerializer.defaultOptions,fold,initial,snapshot))
69+
| _ -> TodoBackend.Service(handlerLog, resolver.ResolveWithUtf8ArrayCodec(TodoBackend.Events.Utf8ArrayCodec.codec,fold,initial,snapshot))
4470

4571
let register (services : IServiceCollection, storageConfig, handlerLog) =
4672
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
@@ -49,4 +75,4 @@ let register (services : IServiceCollection, storageConfig, handlerLog) =
4975

5076
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateFavoritesService()
5177
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateSaveForLaterService()
52-
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateTodosService()
78+
regF <| fun sp -> sp.GetService<ServiceBuilder>().CreateTodosService()

Diff for: samples/Store/Backend/Backend.fsproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
</ItemGroup>
1919

2020
<ItemGroup>
21-
<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
21+
<ProjectReference Include="..\..\..\src\Equinox.Core\Equinox.Core.fsproj" />
2222
<ProjectReference Include="..\Domain\Domain.fsproj" />
2323
</ItemGroup>
2424

Diff for: samples/Store/Domain/Cart.fs

+29-2
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,34 @@ module Events =
2424
| ItemQuantityChanged of ItemQuantityChangeInfo
2525
| ItemWaiveReturnsChanged of ItemWaiveReturnsInfo
2626
interface TypeShape.UnionContract.IUnionContract
27-
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
27+
28+
module Utf8ArrayCodec =
29+
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
30+
31+
module JsonElementCodec =
32+
open FsCodec.SystemTextJson
33+
open System.Text.Json
34+
35+
let private encode (options: JsonSerializerOptions) =
36+
fun (evt: Event) ->
37+
match evt with
38+
| Snapshotted state -> "Snapshotted", JsonSerializer.SerializeToElement(state, options)
39+
| ItemAdded addInfo -> "ItemAdded", JsonSerializer.SerializeToElement(addInfo, options)
40+
| ItemRemoved removeInfo -> "ItemRemoved", JsonSerializer.SerializeToElement(removeInfo, options)
41+
| ItemQuantityChanged changeInfo -> "ItemQuantityChanged", JsonSerializer.SerializeToElement(changeInfo, options)
42+
| ItemWaiveReturnsChanged waiveInfo -> "ItemWaiveReturnsChanged", JsonSerializer.SerializeToElement(waiveInfo, options)
43+
44+
let private tryDecode (options: JsonSerializerOptions) =
45+
fun (eventType, data: JsonElement) ->
46+
match eventType with
47+
| "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement<Compaction.State>(data, options))
48+
| "ItemAdded" -> Some (ItemAdded <| JsonSerializer.DeserializeElement<ItemAddInfo>(data, options))
49+
| "ItemRemoved" -> Some (ItemRemoved <| JsonSerializer.DeserializeElement<ItemRemoveInfo>(data, options))
50+
| "ItemQuantityChanged" -> Some (ItemQuantityChanged <| JsonSerializer.DeserializeElement<ItemQuantityChangeInfo>(data, options))
51+
| "ItemWaiveReturnsChanged" -> Some (ItemWaiveReturnsChanged <| JsonSerializer.DeserializeElement<ItemWaiveReturnsInfo>(data, options))
52+
| _ -> None
53+
54+
let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)
2855

2956
module Fold =
3057
type ItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool }
@@ -79,4 +106,4 @@ module Commands =
79106
match waived with
80107
| Some waived when itemExistsWithDifferentWaiveStatus skuId waived ->
81108
yield Events.ItemWaiveReturnsChanged { context = c; skuId = skuId; waived = waived }
82-
| _ -> () ]
109+
| _ -> () ]

Diff for: samples/Store/Domain/ContactPreferences.fs

+21-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,26 @@ module Events =
1313
type Event =
1414
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
1515
interface TypeShape.UnionContract.IUnionContract
16-
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
16+
17+
module Utf8ArrayCodec =
18+
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
19+
20+
module JsonElementCodec =
21+
open FsCodec.SystemTextJson
22+
open System.Text.Json
23+
24+
let private encode (options: JsonSerializerOptions) =
25+
fun (evt: Event) ->
26+
match evt with
27+
| Updated value -> "contactPreferencesChanged", JsonSerializer.SerializeToElement(value, options)
28+
29+
let private tryDecode (options: JsonSerializerOptions) =
30+
fun (eventType, data: JsonElement) ->
31+
match eventType with
32+
| "contactPreferencesChanged" -> Some (Updated <| JsonSerializer.DeserializeElement<Value>(data, options))
33+
| _ -> None
34+
35+
let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)
1736

1837
module Fold =
1938

@@ -37,4 +56,4 @@ module Commands =
3756
match command with
3857
| Update ({ preferences = preferences } as value) ->
3958
if state = preferences then [] else
40-
[ Events.Updated value ]
59+
[ Events.Updated value ]

Diff for: samples/Store/Domain/Domain.fsproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
2222
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
2323

24-
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.0" />
24+
<PackageReference Include="FsCodec.NewtonsoftJson" Version="2.0.1" />
2525
</ItemGroup>
2626

2727
<ItemGroup>
28-
<ProjectReference Include="..\..\..\src\Equinox\Equinox.fsproj" />
28+
<ProjectReference Include="..\..\..\src\Equinox.Core\Equinox.Core.fsproj" />
2929
</ItemGroup>
3030

3131
</Project>

Diff for: samples/Store/Domain/Favorites.fs

+23-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,28 @@ module Events =
1414
| Favorited of Favorited
1515
| Unfavorited of Unfavorited
1616
interface TypeShape.UnionContract.IUnionContract
17-
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
17+
18+
module Utf8ArrayCodec =
19+
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
20+
21+
module JsonElementCodec =
22+
open FsCodec.SystemTextJson
23+
open System.Text.Json
24+
25+
let private encode (options: JsonSerializerOptions) = fun (evt: Event) ->
26+
match evt with
27+
| Snapshotted snapshotted -> "Snapshotted", JsonSerializer.SerializeToElement(snapshotted, options)
28+
| Favorited favorited -> "Favorited", JsonSerializer.SerializeToElement(favorited, options)
29+
| Unfavorited unfavorited -> "Unfavorited", JsonSerializer.SerializeToElement(unfavorited, options)
30+
31+
let private tryDecode (options: JsonSerializerOptions) = fun (eventType, data: JsonElement) ->
32+
match eventType with
33+
| "Snapshotted" -> Some (Snapshotted <| JsonSerializer.DeserializeElement<Snapshotted>(data, options))
34+
| "Favorited" -> Some (Favorited <| JsonSerializer.DeserializeElement<Favorited>(data, options))
35+
| "Unfavorited" -> Some (Unfavorited <| JsonSerializer.DeserializeElement<Unfavorited>(data, options))
36+
| _ -> None
37+
38+
let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)
1839

1940
module Fold =
2041

@@ -56,4 +77,4 @@ module Commands =
5677
yield Events.Favorited { date = date; skuId = skuId } ]
5778
| Unfavorite skuId ->
5879
if doesntHave skuId then [] else
59-
[ Events.Unfavorited { skuId = skuId } ]
80+
[ Events.Unfavorited { skuId = skuId } ]

Diff for: samples/Store/Domain/SavedForLater.fs

+27-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,32 @@ module Events =
2929
/// Addition of a collection of skus to the list
3030
| Added of Added
3131
interface TypeShape.UnionContract.IUnionContract
32-
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
32+
33+
module Utf8ArrayCodec =
34+
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
35+
36+
module JsonElementCodec =
37+
open FsCodec.SystemTextJson
38+
open System.Text.Json
39+
40+
let private encode (options: JsonSerializerOptions) =
41+
fun (evt: Event) ->
42+
match evt with
43+
| Compacted compacted -> Compaction.EventType, JsonSerializer.SerializeToElement(compacted, options)
44+
| Merged merged -> "Merged", JsonSerializer.SerializeToElement(merged, options)
45+
| Removed removed -> "Removed", JsonSerializer.SerializeToElement(removed, options)
46+
| Added added -> "Added", JsonSerializer.SerializeToElement(added, options)
47+
48+
let private tryDecode (options: JsonSerializerOptions) =
49+
fun (eventType, data: JsonElement) ->
50+
match eventType with
51+
| Compaction.EventType -> Some (Compacted <| JsonSerializer.DeserializeElement<Compaction.Compacted>(data, options))
52+
| "Merged" -> Some (Merged <| JsonSerializer.DeserializeElement<Merged>(data, options))
53+
| "Removed" -> Some (Removed <| JsonSerializer.DeserializeElement<Removed>(data, options))
54+
| "Added" -> Some (Added <| JsonSerializer.DeserializeElement<Added>(data, options))
55+
| _ -> None
56+
57+
let codec options = FsCodec.Codec.Create<Event, JsonElement>(encode options, tryDecode options)
3358

3459
module Fold =
3560
open Events
@@ -104,4 +129,4 @@ module Commands =
104129
let index = Index state
105130
let net = skus |> Array.filter (index.DoesNotAlreadyContainSameOrMoreRecent dateSaved)
106131
if Array.isEmpty net then true, []
107-
else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ]
132+
else validateAgainstInvariants [ Events.Added { skus = net ; dateSaved = dateSaved } ]

Diff for: samples/Store/Integration/CartIntegration.fs

+9-8
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ open Equinox
44
open Equinox.Cosmos.Integration
55
open Equinox.EventStore
66
open Equinox.MemoryStore
7+
open FsCodec.SystemTextJson.Serialization
78
open Swensen.Unquote
89

910
#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
@@ -15,19 +16,19 @@ let createMemoryStore () =
1516
// we want to validate that the JSON UTF8 is working happily
1617
VolatileStore<byte[]>()
1718
let createServiceMemory log store =
18-
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))
19-
20-
let codec = Domain.Cart.Events.codec
19+
Backend.Cart.Service(log, fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.Utf8ArrayCodec.codec, fold, initial).Resolve(id,?option=opt))
2120

21+
let eventStoreCodec = Domain.Cart.Events.Utf8ArrayCodec.codec
2222
let resolveGesStreamWithRollingSnapshots gateway =
23-
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
23+
fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
2424
let resolveGesStreamWithoutCustomAccessStrategy gateway =
25-
fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt)
25+
fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial).Resolve(id,?option=opt)
2626

27+
let cosmosCodec = Domain.Cart.Events.JsonElementCodec.codec JsonSerializer.defaultOptions
2728
let resolveCosmosStreamWithSnapshotStrategy gateway =
28-
fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
29+
fun (id,opt) -> Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
2930
let resolveCosmosStreamWithoutCustomAccessStrategy gateway =
30-
fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
31+
fun (id,opt) -> Cosmos.Resolver(gateway, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
3132

3233
let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
3334
service.ExecuteManyAsync(cartId, false, seq {
@@ -82,4 +83,4 @@ type Tests(testOutputHelper) =
8283
let ``Can roundtrip against Cosmos, correctly folding the events with With Snapshotting`` args = Async.RunSynchronously <| async {
8384
let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveCosmosStreamWithSnapshotStrategy
8485
do! act service args
85-
}
86+
}

Diff for: samples/Store/Integration/ContactPreferencesIntegration.fs

+9-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
open Equinox
44
open Equinox.Cosmos.Integration
5+
open FsCodec.SystemTextJson.Serialization
56
open Swensen.Unquote
67
open Xunit
78

@@ -14,19 +15,20 @@ let createMemoryStore () =
1415
let createServiceMemory log store =
1516
Backend.ContactPreferences.Service(log, MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)
1617

17-
let codec = Domain.ContactPreferences.Events.codec
18+
let eventStoreCodec = Domain.ContactPreferences.Events.Utf8ArrayCodec.codec
1819
let resolveStreamGesWithOptimizedStorageSemantics gateway =
19-
EventStore.Resolver(gateway 1, codec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve
20+
EventStore.Resolver(gateway 1, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve
2021
let resolveStreamGesWithoutAccessStrategy gateway =
21-
EventStore.Resolver(gateway defaultBatchSize, codec, fold, initial).Resolve
22+
EventStore.Resolver(gateway defaultBatchSize, eventStoreCodec, fold, initial).Resolve
2223

24+
let cosmosCodec = Domain.ContactPreferences.Events.JsonElementCodec.codec JsonSerializer.defaultOptions
2325
let resolveStreamCosmosWithLatestKnownEventSemantics gateway =
24-
Cosmos.Resolver(gateway 1, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve
26+
Cosmos.Resolver(gateway 1, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve
2527
let resolveStreamCosmosUnoptimized gateway =
26-
Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve
28+
Cosmos.Resolver(gateway defaultBatchSize, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve
2729
let resolveStreamCosmosRollingUnfolds gateway =
2830
let access = Cosmos.AccessStrategy.Custom(Domain.ContactPreferences.Fold.isOrigin, Domain.ContactPreferences.Fold.transmute)
29-
Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve
31+
Cosmos.Resolver(gateway defaultBatchSize, cosmosCodec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve
3032

3133
type Tests(testOutputHelper) =
3234
let testOutput = TestOutputAdapter testOutputHelper
@@ -79,4 +81,4 @@ type Tests(testOutputHelper) =
7981
let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async {
8082
let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds
8183
do! act service args
82-
}
84+
}

0 commit comments

Comments
 (0)