Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit c3c62dd

Browse files
avaloncheRuteri
andauthored
Fix getting validators map for local relay (#20)
* Fix getting validators map for local relay * pr comments * add timer for updating known validators * improvement to local validator map fetching * lock for map updating * properly lock updates * get current slot if the mapping is empty * remove onForkchoiceUpdate * graceful shutdown * Split initial proposer sync from the proposer fetch loop (#28) Co-authored-by: Mateusz Morusiewicz <[email protected]>
1 parent 6b30dbb commit c3c62dd

12 files changed

+169
-135
lines changed

builder/beacon_client.go

+92-35
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http"
99
"strconv"
1010
"sync"
11+
"time"
1112

1213
"github.com/ethereum/go-ethereum/common"
1314
"github.com/ethereum/go-ethereum/common/hexutil"
@@ -19,82 +20,138 @@ type testBeaconClient struct {
1920
slot uint64
2021
}
2122

23+
func (b *testBeaconClient) Stop() {
24+
return
25+
}
26+
2227
func (b *testBeaconClient) isValidator(pubkey PubkeyHex) bool {
2328
return true
2429
}
2530
func (b *testBeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
2631
return PubkeyHex(hexutil.Encode(b.validator.Pk)), nil
2732
}
28-
func (b *testBeaconClient) onForkchoiceUpdate() (uint64, error) {
29-
return b.slot, nil
33+
func (b *testBeaconClient) Start() error {
34+
return nil
3035
}
3136

3237
type BeaconClient struct {
33-
endpoint string
38+
endpoint string
39+
slotsInEpoch uint64
40+
secondsInSlot uint64
41+
42+
mu sync.Mutex
43+
slotProposerMap map[uint64]PubkeyHex
3444

35-
mu sync.Mutex
36-
currentEpoch uint64
37-
currentSlot uint64
38-
nextSlotProposer PubkeyHex
39-
slotProposerMap map[uint64]PubkeyHex
45+
closeCh chan struct{}
4046
}
4147

42-
func NewBeaconClient(endpoint string) *BeaconClient {
48+
func NewBeaconClient(endpoint string, slotsInEpoch uint64, secondsInSlot uint64) *BeaconClient {
4349
return &BeaconClient{
4450
endpoint: endpoint,
51+
slotsInEpoch: slotsInEpoch,
52+
secondsInSlot: secondsInSlot,
4553
slotProposerMap: make(map[uint64]PubkeyHex),
54+
closeCh: make(chan struct{}),
4655
}
4756
}
4857

58+
func (b *BeaconClient) Stop() {
59+
close(b.closeCh)
60+
}
61+
4962
func (b *BeaconClient) isValidator(pubkey PubkeyHex) bool {
5063
return true
5164
}
5265

5366
func (b *BeaconClient) getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error) {
54-
/* Only returns proposer if requestedSlot is currentSlot + 1, would be a race otherwise */
5567
b.mu.Lock()
5668
defer b.mu.Unlock()
5769

58-
if b.currentSlot+1 != requestedSlot {
59-
return PubkeyHex(""), errors.New("slot out of sync")
70+
nextSlotProposer, found := b.slotProposerMap[requestedSlot]
71+
if !found {
72+
log.Error("inconsistent proposer mapping", "requestSlot", requestedSlot, "slotProposerMap", b.slotProposerMap)
73+
return PubkeyHex(""), errors.New("inconsistent proposer mapping")
6074
}
61-
return b.nextSlotProposer, nil
75+
return nextSlotProposer, nil
6276
}
6377

64-
/* Returns next slot's proposer pubkey */
65-
// TODO: what happens if no block for previous slot - should still get next slot
66-
func (b *BeaconClient) onForkchoiceUpdate() (uint64, error) {
67-
b.mu.Lock()
68-
defer b.mu.Unlock()
78+
func (b *BeaconClient) Start() error {
79+
go b.UpdateValidatorMapForever()
80+
return nil
81+
}
6982

83+
func (b *BeaconClient) UpdateValidatorMapForever() {
84+
durationPerSlot := time.Duration(b.secondsInSlot) * time.Second
85+
86+
prevFetchSlot := uint64(0)
87+
88+
// fetch current epoch if beacon is online
7089
currentSlot, err := fetchCurrentSlot(b.endpoint)
7190
if err != nil {
72-
return 0, err
91+
log.Error("could not get current slot", "err", err)
92+
} else {
93+
currentEpoch := currentSlot / b.slotsInEpoch
94+
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch)
95+
if err != nil {
96+
log.Error("could not fetch validators map", "epoch", currentEpoch, "err", err)
97+
} else {
98+
b.mu.Lock()
99+
b.slotProposerMap = slotProposerMap
100+
b.mu.Unlock()
101+
}
73102
}
74103

75-
nextSlot := currentSlot + 1
104+
retryDelay := time.Second
76105

77-
b.currentSlot = currentSlot
78-
nextSlotEpoch := nextSlot / 32
106+
// Every half epoch request validators map, polling for the slot
107+
// more frequently to avoid missing updates on errors
108+
timer := time.NewTimer(retryDelay)
109+
defer timer.Stop()
110+
for true {
111+
select {
112+
case <-b.closeCh:
113+
return
114+
case <-timer.C:
115+
}
79116

80-
if nextSlotEpoch != b.currentEpoch {
81-
// TODO: this should be prepared in advance, possibly just fetch for next epoch in advance
82-
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, nextSlotEpoch)
117+
currentSlot, err := fetchCurrentSlot(b.endpoint)
83118
if err != nil {
84-
return 0, err
119+
log.Error("could not get current slot", "err", err)
120+
timer.Reset(retryDelay)
121+
continue
85122
}
86123

87-
b.currentEpoch = nextSlotEpoch
88-
b.slotProposerMap = slotProposerMap
89-
}
124+
// TODO: should poll after consistent slot within the epoch (slot % slotsInEpoch/2 == 0)
125+
nextFetchSlot := prevFetchSlot + b.slotsInEpoch/2
126+
if currentSlot < nextFetchSlot {
127+
timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
128+
continue
129+
}
90130

91-
nextSlotProposer, found := b.slotProposerMap[nextSlot]
92-
if !found {
93-
log.Error("inconsistent proposer mapping", "currentSlot", currentSlot, "slotProposerMap", b.slotProposerMap)
94-
return 0, errors.New("inconsistent proposer mapping")
131+
currentEpoch := currentSlot / b.slotsInEpoch
132+
slotProposerMap, err := fetchEpochProposersMap(b.endpoint, currentEpoch+1)
133+
if err != nil {
134+
log.Error("could not fetch validators map", "epoch", currentEpoch+1, "err", err)
135+
timer.Reset(retryDelay)
136+
continue
137+
}
138+
139+
prevFetchSlot = currentSlot
140+
b.mu.Lock()
141+
// remove previous epoch slots
142+
for k := range b.slotProposerMap {
143+
if k < currentEpoch*b.slotsInEpoch {
144+
delete(b.slotProposerMap, k)
145+
}
146+
}
147+
// update the slot proposer map for next epoch
148+
for k, v := range slotProposerMap {
149+
b.slotProposerMap[k] = v
150+
}
151+
b.mu.Unlock()
152+
153+
timer.Reset(time.Duration(nextFetchSlot-currentSlot) * durationPerSlot)
95154
}
96-
b.nextSlotProposer = nextSlotProposer
97-
return nextSlot, nil
98155
}
99156

100157
func fetchCurrentSlot(endpoint string) (uint64, error) {

builder/beacon_client_test.go

-92
Original file line numberDiff line numberDiff line change
@@ -174,95 +174,3 @@ func TestFetchEpochProposersMap(t *testing.T) {
174174
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a"), proposersMap[1])
175175
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), proposersMap[2])
176176
}
177-
178-
func TestOnForkchoiceUpdate(t *testing.T) {
179-
mbn := newMockBeaconNode()
180-
defer mbn.srv.Close()
181-
182-
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`)
183-
184-
mbn.proposerDuties[1] = []byte(`{
185-
"dependent_root": "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
186-
"execution_optimistic": false,
187-
"data": [
188-
{
189-
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74a",
190-
"validator_index": "1",
191-
"slot": "31"
192-
},
193-
{
194-
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b",
195-
"validator_index": "2",
196-
"slot": "32"
197-
},
198-
{
199-
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74c",
200-
"validator_index": "3",
201-
"slot": "33"
202-
}
203-
]
204-
}`)
205-
206-
bc := NewBeaconClient(mbn.srv.URL)
207-
slot, err := bc.onForkchoiceUpdate()
208-
require.NoError(t, err)
209-
require.Equal(t, slot, uint64(32))
210-
211-
pubkeyHex, err := bc.getProposerForNextSlot(32)
212-
require.NoError(t, err)
213-
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74b"), pubkeyHex)
214-
215-
_, err = bc.getProposerForNextSlot(31)
216-
require.EqualError(t, err, "slot out of sync")
217-
218-
_, err = bc.getProposerForNextSlot(33)
219-
require.EqualError(t, err, "slot out of sync")
220-
221-
mbn.headersCode = 404
222-
mbn.headersResp = []byte(`{ "code": 404, "message": "State not found" }`)
223-
224-
slot, err = NewBeaconClient(mbn.srv.URL).onForkchoiceUpdate()
225-
require.EqualError(t, err, "State not found")
226-
require.Equal(t, slot, uint64(0))
227-
228-
// Check that client does not fetch new proposers if epoch did not change
229-
mbn.headersCode = 200
230-
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "31", "proposer_index": "1" } } } ] }`)
231-
mbn.proposerDuties[1] = []byte(`{
232-
"data": [
233-
{
234-
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d",
235-
"validator_index": "4",
236-
"slot": "32"
237-
}
238-
]
239-
}`)
240-
241-
slot, err = bc.onForkchoiceUpdate()
242-
require.NoError(t, err, "")
243-
require.Equal(t, slot, uint64(32))
244-
245-
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "63", "proposer_index": "1" } } } ] }`)
246-
mbn.proposerDuties[2] = []byte(`{
247-
"data": [
248-
{
249-
"pubkey": "0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d",
250-
"validator_index": "4",
251-
"slot": "64"
252-
}
253-
]
254-
}`)
255-
256-
slot, err = bc.onForkchoiceUpdate()
257-
require.NoError(t, err, "")
258-
require.Equal(t, slot, uint64(64))
259-
260-
pubkeyHex, err = bc.getProposerForNextSlot(64)
261-
require.NoError(t, err)
262-
require.Equal(t, PubkeyHex("0x93247f2209abcacf57b75a51dafae777f9dd38bc7053d1af526f220a7489a6d3a2753e5f3e8b1cfe39b56f43611df74d"), pubkeyHex)
263-
264-
// Check proposers map error is routed out
265-
mbn.headersResp = []byte(`{ "data": [ { "header": { "message": { "slot": "65", "proposer_index": "1" } } } ] }`)
266-
_, err = bc.onForkchoiceUpdate()
267-
require.EqualError(t, err, "inconsistent proposer mapping")
268-
}

builder/builder.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,15 @@ type ValidatorData struct {
3232
type IBeaconClient interface {
3333
isValidator(pubkey PubkeyHex) bool
3434
getProposerForNextSlot(requestedSlot uint64) (PubkeyHex, error)
35-
onForkchoiceUpdate() (uint64, error)
35+
Start() error
36+
Stop()
3637
}
3738

3839
type IRelay interface {
3940
SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, vd ValidatorData) error
4041
GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
42+
Start() error
43+
Stop()
4144
}
4245

4346
type IBuilder interface {
@@ -89,7 +92,7 @@ func NewBuilder(sk *bls.SecretKey, ds flashbotsextra.IDatabaseService, relay IRe
8992
}
9093

9194
func (b *Builder) Start() error {
92-
return nil
95+
return b.relay.Start()
9396
}
9497

9598
func (b *Builder) Stop() error {

builder/config.go

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ type Config struct {
44
Enabled bool `toml:",omitempty"`
55
EnableValidatorChecks bool `toml:",omitempty"`
66
EnableLocalRelay bool `toml:",omitempty"`
7+
SlotsInEpoch uint64 `toml:",omitempty"`
8+
SecondsInSlot uint64 `toml:",omitempty"`
79
DisableBundleFetcher bool `toml:",omitempty"`
810
DryRun bool `toml:",omitempty"`
911
BuilderSecretKey string `toml:",omitempty"`
@@ -23,6 +25,8 @@ var DefaultConfig = Config{
2325
Enabled: false,
2426
EnableValidatorChecks: false,
2527
EnableLocalRelay: false,
28+
SlotsInEpoch: 32,
29+
SecondsInSlot: 12,
2630
DisableBundleFetcher: false,
2731
DryRun: false,
2832
BuilderSecretKey: "0x2fc12ae741f29701f8e30f5de6350766c020cb80768a0ff01e6838ffd2431e11",

builder/local_relay.go

+9
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,15 @@ func NewLocalRelay(sk *bls.SecretKey, beaconClient IBeaconClient, builderSigning
8484
}
8585
}
8686

87+
func (r *LocalRelay) Start() error {
88+
r.beaconClient.Start()
89+
return nil
90+
}
91+
92+
func (r *LocalRelay) Stop() {
93+
r.beaconClient.Stop()
94+
}
95+
8796
func (r *LocalRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error {
8897
log.Info("submitting block to local relay", "block", msg.ExecutionPayload.BlockHash.String())
8998
return r.submitBlock(msg)

builder/relay.go

+6
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ func (r *RemoteRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error
125125
return ValidatorData{}, ErrValidatorNotFound
126126
}
127127

128+
func (r *RemoteRelay) Start() error {
129+
return nil
130+
}
131+
132+
func (r *RemoteRelay) Stop() {}
133+
128134
func (r *RemoteRelay) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, _ ValidatorData) error {
129135
log.Info("submitting block to remote relay", "endpoint", r.endpoint)
130136
code, err := server.SendHTTPRequest(context.TODO(), *http.DefaultClient, http.MethodPost, r.endpoint+"/relay/v1/builder/blocks", msg, nil)

builder/relay_aggregator.go

+16
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,22 @@ func NewRemoteRelayAggregator(primary IRelay, secondary []IRelay) *RemoteRelayAg
2424
}
2525
}
2626

27+
func (r *RemoteRelayAggregator) Start() error {
28+
for _, relay := range r.relays {
29+
err := relay.Start()
30+
if err != nil {
31+
return err
32+
}
33+
}
34+
return nil
35+
}
36+
37+
func (r *RemoteRelayAggregator) Stop() {
38+
for _, relay := range r.relays {
39+
relay.Stop()
40+
}
41+
}
42+
2743
func (r *RemoteRelayAggregator) SubmitBlock(msg *boostTypes.BuilderSubmitBlockRequest, registration ValidatorData) error {
2844
r.registrationsCacheLock.RLock()
2945
defer r.registrationsCacheLock.RUnlock()

builder/relay_aggregator_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@ func (r *testRelay) GetValidatorForSlot(nextSlot uint64) (ValidatorData, error)
5858
return r.gvsVd, r.gvsErr
5959
}
6060

61+
func (r *testRelay) Start() error {
62+
return nil
63+
}
64+
65+
func (r *testRelay) Stop() {}
66+
6167
func TestRemoteRelayAggregator(t *testing.T) {
6268
t.Run("should return error if no relays return validator data", func(t *testing.T) {
6369
backend := newTestRelayAggBackend(3)

builder/service.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func Register(stack *node.Node, backend *eth.Ethereum, cfg *Config) error {
144144
copy(bellatrixForkVersion[:], bellatrixForkVersionBytes[:4])
145145
proposerSigningDomain := boostTypes.ComputeDomain(boostTypes.DomainTypeBeaconProposer, bellatrixForkVersion, genesisValidatorsRoot)
146146

147-
beaconClient := NewBeaconClient(cfg.BeaconEndpoint)
147+
beaconClient := NewBeaconClient(cfg.BeaconEndpoint, cfg.SlotsInEpoch, cfg.SecondsInSlot)
148148

149149
var localRelay *LocalRelay
150150
if cfg.EnableLocalRelay {

0 commit comments

Comments
 (0)