@@ -68,7 +68,7 @@ pub use kyoto::{
68
68
69
69
use kyoto:: Receiver ;
70
70
use kyoto:: UnboundedReceiver ;
71
- use kyoto:: { Event , IndexedBlock } ;
71
+ use kyoto:: { BlockHash , Event , IndexedBlock } ;
72
72
73
73
pub mod builder;
74
74
@@ -97,6 +97,8 @@ pub struct UpdateSubscriber {
97
97
chain : local_chain:: LocalChain ,
98
98
// receive graph
99
99
graph : IndexedTxGraph < ConfirmationBlockTime , KeychainTxOutIndex < KeychainKind > > ,
100
+ // staged changes for the chain
101
+ chain_changeset : BTreeMap < u32 , Option < BlockHash > > ,
100
102
}
101
103
102
104
impl UpdateSubscriber {
@@ -110,6 +112,7 @@ impl UpdateSubscriber {
110
112
receiver,
111
113
chain : LocalChain :: from_tip ( cp) ?,
112
114
graph : IndexedTxGraph :: new ( index. clone ( ) ) ,
115
+ chain_changeset : BTreeMap :: new ( ) ,
113
116
} )
114
117
}
115
118
@@ -121,47 +124,48 @@ impl UpdateSubscriber {
121
124
/// running node. Production applications should define how the application handles
122
125
/// these events and displays them to end users.
123
126
pub async fn update ( & mut self ) -> Option < Update > {
124
- let mut chain_changeset = BTreeMap :: new ( ) ;
125
127
while let Some ( message) = self . receiver . recv ( ) . await {
126
128
match message {
127
129
Event :: Block ( IndexedBlock { height, block } ) => {
128
130
let hash = block. header . block_hash ( ) ;
129
- chain_changeset. insert ( height, Some ( hash) ) ;
131
+ self . chain_changeset . insert ( height, Some ( hash) ) ;
130
132
let _ = self . graph . apply_block_relevant ( & block, height) ;
131
133
}
132
134
Event :: BlocksDisconnected ( headers) => {
133
135
for header in headers {
134
136
let height = header. height ;
135
- chain_changeset. insert ( height, None ) ;
137
+ self . chain_changeset . insert ( height, None ) ;
136
138
}
137
139
}
138
140
Event :: Synced ( SyncUpdate {
139
141
tip,
140
142
recent_history,
141
143
} ) => {
142
- if chain_changeset. is_empty ( )
144
+ if self . chain_changeset . is_empty ( )
143
145
&& self . chain . tip ( ) . height ( ) == tip. height
144
146
&& self . chain . tip ( ) . hash ( ) == tip. hash
145
147
{
146
148
// return early if we're already synced
147
149
return None ;
148
150
}
149
151
recent_history. into_iter ( ) . for_each ( |( height, header) | {
150
- chain_changeset. insert ( height, Some ( header. block_hash ( ) ) ) ;
152
+ self . chain_changeset
153
+ . insert ( height, Some ( header. block_hash ( ) ) ) ;
151
154
} ) ;
152
155
break ;
153
156
}
154
157
}
155
158
}
156
- self . chain
157
- . apply_changeset ( & local_chain:: ChangeSet :: from ( chain_changeset) )
158
- . expect ( "chain was initialized with genesis" ) ;
159
159
Some ( self . get_scan_response ( ) )
160
160
}
161
161
162
162
// When the client is believed to have synced to the chain tip of most work,
163
163
// we can return a wallet update.
164
164
fn get_scan_response ( & mut self ) -> Update {
165
+ let chain_changeset = core:: mem:: take ( & mut self . chain_changeset ) ;
166
+ self . chain
167
+ . apply_changeset ( & local_chain:: ChangeSet :: from ( chain_changeset) )
168
+ . expect ( "chain was initialized with genesis" ) ;
165
169
let tx_update = TxUpdate :: from ( self . graph . graph ( ) . clone ( ) ) ;
166
170
let graph = core:: mem:: take ( & mut self . graph ) ;
167
171
let last_active_indices = graph. index . last_used_indices ( ) ;
0 commit comments