Skip to content

Commit 1aef420

Browse files
committed
problem: upstreams may produce blocks with a heavy delay and replace existing head with an older block
solution: order conflicting head blocks by difficulty on PoW and priority on PoS
1 parent e8a9ffb commit 1aef420

File tree

11 files changed

+539
-264
lines changed

11 files changed

+539
-264
lines changed

src/main/kotlin/io/emeraldpay/dshackle/startup/ConfiguredUpstreams.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import io.emeraldpay.dshackle.reader.StandardRpcReader
2828
import io.emeraldpay.dshackle.upstream.CurrentMultistreamHolder
2929
import io.emeraldpay.dshackle.upstream.ForkWatchFactory
3030
import io.emeraldpay.dshackle.upstream.Head
31-
import io.emeraldpay.dshackle.upstream.MergedHead
31+
import io.emeraldpay.dshackle.upstream.MergedPowHead
3232
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinCacheUpdate
3333
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcHead
3434
import io.emeraldpay.dshackle.upstream.bitcoin.BitcoinRpcUpstream
@@ -188,7 +188,7 @@ open class ConfiguredUpstreams(
188188
val head: Head = conn.zeroMq?.let { zeroMq ->
189189
val server = ZMQServer(zeroMq.host, zeroMq.port, "hashblock")
190190
val zeroMqHead = BitcoinZMQHead(server, directApi, extractBlock)
191-
MergedHead(listOf(rpcHead, zeroMqHead))
191+
MergedPowHead(listOf(rpcHead, zeroMqHead))
192192
} ?: rpcHead
193193

194194
val subscriptions = conn.zeroMq?.let { zeroMq ->

src/main/kotlin/io/emeraldpay/dshackle/upstream/MergedHead.kt

-62
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package io.emeraldpay.dshackle.upstream
2+
3+
import io.emeraldpay.dshackle.cache.Caches
4+
import io.emeraldpay.dshackle.cache.CachesEnabled
5+
import io.emeraldpay.dshackle.data.BlockContainer
6+
import org.slf4j.LoggerFactory
7+
import org.springframework.context.Lifecycle
8+
import reactor.core.Disposable
9+
import reactor.core.publisher.Flux
10+
import java.util.concurrent.locks.ReentrantReadWriteLock
11+
import java.util.function.Function
12+
import kotlin.concurrent.read
13+
import kotlin.concurrent.write
14+
15+
class MergedPosHead(
16+
private val sources: Iterable<Pair<Int, Head>>
17+
) : AbstractHead(), Lifecycle, CachesEnabled {
18+
19+
companion object {
20+
private val log = LoggerFactory.getLogger(MergedPosHead::class.java)
21+
}
22+
private var subscription: Disposable? = null
23+
24+
private val lock = ReentrantReadWriteLock()
25+
private val headLimit = 16
26+
private var head: List<Pair<Int, BlockContainer>> = emptyList()
27+
28+
override fun isRunning(): Boolean {
29+
return subscription != null
30+
}
31+
32+
override fun start() {
33+
sources.forEach {
34+
val head = it.second
35+
if (head is Lifecycle && !head.isRunning) {
36+
head.start()
37+
}
38+
}
39+
subscription?.dispose()
40+
subscription = super.follow(merge(sources.map { Pair(it.first, it.second.getFlux()) }))
41+
}
42+
43+
fun merge(sources: Iterable<Pair<Int, Flux<BlockContainer>>>): Flux<BlockContainer> {
44+
return Flux.merge(
45+
sources.map {
46+
it.second.transform(process(it.first))
47+
}
48+
).distinctUntilChanged {
49+
it.hash
50+
}
51+
}
52+
53+
fun process(priority: Int): Function<Flux<BlockContainer>, Flux<BlockContainer>> {
54+
return Function { source ->
55+
source.handle { block, sink ->
56+
if (onNext(priority, block)) {
57+
val top = lock.read {
58+
head.lastOrNull()
59+
}
60+
if (top != null) {
61+
sink.next(top.second)
62+
}
63+
}
64+
}
65+
}
66+
}
67+
68+
private fun onNext(priority: Int, block: BlockContainer): Boolean {
69+
val prev = lock.read {
70+
head.find { it.second.height == block.height }
71+
}
72+
if (prev != null && prev.first > priority) {
73+
return false
74+
}
75+
lock.write {
76+
// first, check if existing data for the height is better
77+
val prev = head.find { it.second.height == block.height }
78+
if (prev != null && prev.first > priority) {
79+
return false
80+
}
81+
82+
// otherwise add it to the list
83+
val fresh = if (head.isEmpty()) {
84+
// just the first run, so nothing to do yet
85+
listOf(Pair(priority, block))
86+
} else if (head.last().second.height < block.height) {
87+
// new block, just add it on top
88+
head + Pair(priority, block)
89+
} else if (head.all { it.first < priority }) {
90+
// filled with low priority upstream that may be invalid, so replace the whole list
91+
listOf(Pair(priority, block))
92+
} else {
93+
// situation when we have that block in the list and since we did the checks above it can have only a lower priority
94+
// now there are two options: the same block or different block.
95+
// if it's in the middle keep the rest anyway b/c a higher priority upstream would fix it with the following updates
96+
head.map {
97+
if (it.second.height == block.height) {
98+
Pair(priority, block)
99+
} else {
100+
it
101+
}
102+
}
103+
}
104+
head = fresh.takeLast(headLimit)
105+
return true
106+
}
107+
}
108+
109+
override fun stop() {
110+
sources.forEach {
111+
val head = it.second
112+
if (head is Lifecycle && head.isRunning) {
113+
head.stop()
114+
}
115+
}
116+
subscription?.dispose()
117+
subscription = null
118+
}
119+
120+
override fun setCaches(caches: Caches) {
121+
sources.forEach {
122+
val head = it.second
123+
if (head is CachesEnabled) {
124+
head.setCaches(caches)
125+
}
126+
}
127+
}
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/**
2+
* Copyright (c) 2020 EmeraldPay, Inc
3+
* Copyright (c) 2019 ETCDEV GmbH
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package io.emeraldpay.dshackle.upstream
18+
19+
import io.emeraldpay.dshackle.cache.Caches
20+
import io.emeraldpay.dshackle.cache.CachesEnabled
21+
import io.emeraldpay.dshackle.data.BlockContainer
22+
import org.springframework.context.Lifecycle
23+
import reactor.core.Disposable
24+
import reactor.core.publisher.Flux
25+
import java.util.concurrent.locks.ReentrantReadWriteLock
26+
import java.util.function.Function
27+
import kotlin.concurrent.read
28+
import kotlin.concurrent.write
29+
30+
class MergedPowHead(
31+
private val sources: Iterable<Head>
32+
) : AbstractHead(), Lifecycle, CachesEnabled {
33+
34+
private var subscription: Disposable? = null
35+
36+
private val lock = ReentrantReadWriteLock()
37+
private val headLimit = 16
38+
private var head: List<BlockContainer> = emptyList()
39+
40+
override fun isRunning(): Boolean {
41+
return subscription != null
42+
}
43+
44+
override fun start() {
45+
sources.forEach { head ->
46+
if (head is Lifecycle && !head.isRunning) {
47+
head.start()
48+
}
49+
}
50+
subscription?.dispose()
51+
subscription = super.follow(merge(sources.map { it.getFlux() }))
52+
}
53+
54+
fun merge(sources: Iterable<Flux<BlockContainer>>): Flux<BlockContainer> {
55+
return Flux.merge(
56+
sources.map {
57+
it.transform(process())
58+
}
59+
).distinctUntilChanged {
60+
it.hash
61+
}
62+
}
63+
64+
fun process(): Function<Flux<BlockContainer>, Flux<BlockContainer>> {
65+
return Function { source ->
66+
source.handle { block, sink ->
67+
if (onNext(block)) {
68+
val top = lock.read {
69+
head.lastOrNull()
70+
}
71+
if (top != null) {
72+
sink.next(top)
73+
}
74+
}
75+
}
76+
}
77+
}
78+
79+
private fun onNext(block: BlockContainer): Boolean {
80+
val prev = lock.read {
81+
head.find { it.height == block.height }
82+
}
83+
if (prev != null && prev.difficulty > block.difficulty) {
84+
return false
85+
}
86+
lock.write {
87+
// first, check if existing data for the height is better
88+
val prev = head.find { it.height == block.height }
89+
if (prev != null && prev.difficulty > block.difficulty) {
90+
return false
91+
}
92+
93+
// otherwise add it to the list
94+
val fresh = if (head.isEmpty()) {
95+
// just the first run, so nothing to do yet
96+
listOf(block)
97+
} else if (head.last().height < block.height) {
98+
// new block, just add it on top
99+
head + block
100+
} else {
101+
// situation when we have that block in the list and since we checked it above it has a lower priority
102+
// now there are two options: the same block or different block.
103+
// if it's in the middle keep the rest anyway b/c a higher priority upstream would fix it with the following updates
104+
head.map {
105+
if (it.height == block.height) {
106+
block
107+
} else {
108+
it
109+
}
110+
}
111+
}
112+
head = fresh
113+
// drop all blocks on top of this one if their difficulty is lower
114+
.filterNot { it.height > block.height && it.difficulty < block.difficulty }
115+
.takeLast(headLimit)
116+
return true
117+
}
118+
}
119+
120+
override fun stop() {
121+
sources.forEach { head ->
122+
if (head is Lifecycle && head.isRunning) {
123+
head.stop()
124+
}
125+
}
126+
subscription?.dispose()
127+
subscription = null
128+
}
129+
130+
override fun setCaches(caches: Caches) {
131+
sources.forEach {
132+
if (it is CachesEnabled) {
133+
it.setCaches(caches)
134+
}
135+
}
136+
}
137+
}

src/main/kotlin/io/emeraldpay/dshackle/upstream/bitcoin/BitcoinMultistream.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import io.emeraldpay.dshackle.upstream.EmptyHead
2626
import io.emeraldpay.dshackle.upstream.HardcodedReader
2727
import io.emeraldpay.dshackle.upstream.Head
2828
import io.emeraldpay.dshackle.upstream.IntegralRpcReader
29-
import io.emeraldpay.dshackle.upstream.MergedHead
29+
import io.emeraldpay.dshackle.upstream.MergedPowHead
3030
import io.emeraldpay.dshackle.upstream.Multistream
3131
import io.emeraldpay.dshackle.upstream.Upstream
3232
import io.emeraldpay.dshackle.upstream.VerifyingReader
@@ -104,7 +104,7 @@ open class BitcoinMultistream(
104104
}
105105
}
106106
} else {
107-
val newHead = MergedHead(sourceUpstreams.map { it.getHead() }).apply {
107+
val newHead = MergedPowHead(sourceUpstreams.map { it.getHead() }).apply {
108108
this.start()
109109
}
110110
val lagObserver = BitcoinHeadLagObserver(newHead, sourceUpstreams)

0 commit comments

Comments
 (0)