Skip to content

Commit b6a14a2

Browse files
committed
asyncio: stop using get_event_loop(). introduce ~singleton loop.
asyncio.get_event_loop() became deprecated in python3.10. (see python/cpython#83710) ``` .../electrum/electrum/daemon.py:470: DeprecationWarning: There is no current event loop self.asyncio_loop = asyncio.get_event_loop() .../electrum/electrum/network.py:276: DeprecationWarning: There is no current event loop self.asyncio_loop = asyncio.get_event_loop() ``` Also, according to that thread, "set_event_loop() [... is] not deprecated by oversight". So, we stop using get_event_loop() and set_event_loop() in our own code. Note that libraries we use (such as the stdlib for python <3.10), might call get_event_loop, which then relies on us having called set_event_loop e.g. for the GUI thread. To work around this, a custom event loop policy providing a get_event_loop implementation is used. Previously, we have been using a single asyncio event loop, created with util.create_and_start_event_loop, and code in many places got a reference to this loop using asyncio.get_event_loop(). Now, we still use a single asyncio event loop, but it is now stored as a global in util._asyncio_event_loop (access with util.get_asyncio_loop()). I believe these changes also fix spesmilo#5376
1 parent 334da24 commit b6a14a2

13 files changed

+111
-58
lines changed

electrum/commands.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ def _run(self, method, args, password_getter=None, **kwargs):
187187
kwargs.pop('wallet')
188188

189189
coro = f(*args, **kwargs)
190-
fut = asyncio.run_coroutine_threadsafe(coro, asyncio.get_event_loop())
190+
fut = asyncio.run_coroutine_threadsafe(coro, util.get_asyncio_loop())
191191
result = fut.result()
192192

193193
if self._callback:

electrum/daemon.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def request(config: SimpleConfig, endpoint, args=(), timeout=60):
124124
rpc_user, rpc_password = get_rpc_credentials(config)
125125
server_url = 'http://%s:%d' % (host, port)
126126
auth = aiohttp.BasicAuth(login=rpc_user, password=rpc_password)
127-
loop = asyncio.get_event_loop()
127+
loop = util.get_asyncio_loop()
128128
async def request_coroutine():
129129
if socktype == 'unix':
130130
connector = aiohttp.UnixConnector(path=path)
@@ -467,7 +467,7 @@ def __init__(self, config: SimpleConfig, fd=None, *, listen_jsonrpc=True):
467467
if 'wallet_path' in config.cmdline_options:
468468
self.logger.warning("Ignoring parameter 'wallet_path' for daemon. "
469469
"Use the load_wallet command instead.")
470-
self.asyncio_loop = asyncio.get_event_loop()
470+
self.asyncio_loop = util.get_asyncio_loop()
471471
self.network = None
472472
if not config.get('offline'):
473473
self.network = Network(config, daemon=self)

electrum/exchange_rate.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def get_historical_rates(self, ccy: str, cache_dir: str) -> None:
148148
if h is None:
149149
h = self.read_historical_rates(ccy, cache_dir)
150150
if h is None or h['timestamp'] < time.time() - 24*3600:
151-
asyncio.get_event_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
151+
util.get_asyncio_loop().create_task(self.get_historical_rates_safe(ccy, cache_dir))
152152

153153
def history_ccys(self) -> Sequence[str]:
154154
return []
@@ -471,7 +471,7 @@ async def query_all_exchanges_for_their_ccys_over_network():
471471
for name, klass in exchanges.items():
472472
exchange = klass(None, None)
473473
await group.spawn(get_currencies_safe(name, exchange))
474-
loop = asyncio.get_event_loop()
474+
loop = util.get_asyncio_loop()
475475
try:
476476
loop.run_until_complete(query_all_exchanges_for_their_ccys_over_network())
477477
except Exception as e:

electrum/gui/kivy/main_window.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ def __init__(self, **kwargs):
394394
self.is_exit = False
395395
self.wallet = None # type: Optional[Abstract_Wallet]
396396
self.pause_time = 0
397-
self.asyncio_loop = asyncio.get_event_loop()
397+
self.asyncio_loop = util.get_asyncio_loop()
398398
self.password = None
399399
self._use_single_password = False
400400
self.resume_dialog = None

electrum/lnworker.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,7 @@ async def add_peer(self, connect_str: str) -> Peer:
496496
# Try DNS-resolving the host (if needed). This is simply so that
497497
# the caller gets a nice exception if it cannot be resolved.
498498
try:
499-
await asyncio.get_event_loop().getaddrinfo(host, port)
499+
await asyncio.get_running_loop().getaddrinfo(host, port)
500500
except socket.gaierror:
501501
raise ConnStringFormatError(_('Hostname does not resolve (getaddrinfo failed)'))
502502
# add peer

electrum/network.py

+7-5
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ def __init__(self, config: SimpleConfig, *, daemon: 'Daemon' = None):
273273
init_retry_delay_urgent=1,
274274
)
275275

276-
self.asyncio_loop = asyncio.get_event_loop()
276+
self.asyncio_loop = util.get_asyncio_loop()
277277
assert self.asyncio_loop.is_running(), "event loop not running"
278278

279279
assert isinstance(config, SimpleConfig), f"config should be a SimpleConfig instead of {type(config)}"
@@ -381,9 +381,11 @@ async def stop_gossip(self, *, full_shutdown: bool = False):
381381
self.channel_db = None
382382
self.path_finder = None
383383

384-
def run_from_another_thread(self, coro, *, timeout=None):
385-
assert util.get_running_loop() != self.asyncio_loop, 'must not be called from network thread'
386-
fut = asyncio.run_coroutine_threadsafe(coro, self.asyncio_loop)
384+
@classmethod
385+
def run_from_another_thread(cls, coro, *, timeout=None):
386+
loop = util.get_asyncio_loop()
387+
assert util.get_running_loop() != loop, 'must not be called from asyncio thread'
388+
fut = asyncio.run_coroutine_threadsafe(coro, loop)
387389
return fut.result(timeout)
388390

389391
@staticmethod
@@ -1321,7 +1323,7 @@ def send_http_on_proxy(cls, method, url, **kwargs):
13211323
assert util.get_running_loop() != network.asyncio_loop
13221324
loop = network.asyncio_loop
13231325
else:
1324-
loop = asyncio.get_event_loop()
1326+
loop = util.get_asyncio_loop()
13251327
coro = asyncio.run_coroutine_threadsafe(cls._send_http_on_proxy(method, url, **kwargs), loop)
13261328
# note: _send_http_on_proxy has its own timeout, so no timeout here:
13271329
return coro.result()

electrum/sql_db.py

-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ class SqlDB(Logger):
2626
def __init__(self, asyncio_loop: asyncio.BaseEventLoop, path, commit_interval=None):
2727
Logger.__init__(self)
2828
self.asyncio_loop = asyncio_loop
29-
asyncio.set_event_loop(asyncio_loop)
3029
self.stopping = False
3130
self.stopped_event = asyncio.Event()
3231
self.path = path

electrum/tests/test_lnpeer.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import electrum
1616
import electrum.trampoline
1717
from electrum import bitcoin
18+
from electrum import util
1819
from electrum import constants
1920
from electrum.network import Network
2021
from electrum.ecc import ECPrivkey
@@ -62,7 +63,7 @@ def __init__(self, tx_queue):
6263
user_config = {}
6364
user_dir = tempfile.mkdtemp(prefix="electrum-lnpeer-test-")
6465
self.config = simple_config.SimpleConfig(user_config, read_user_dir_function=lambda: user_dir)
65-
self.asyncio_loop = asyncio.get_event_loop()
66+
self.asyncio_loop = util.get_asyncio_loop()
6667
self.channel_db = ChannelDB(self)
6768
self.channel_db.data_loaded.set()
6869
self.path_finder = LNPathFinder(self.channel_db)
@@ -1361,4 +1362,4 @@ async def f():
13611362

13621363

13631364
def run(coro):
1364-
return asyncio.run_coroutine_threadsafe(coro, loop=asyncio.get_event_loop()).result()
1365+
return asyncio.run_coroutine_threadsafe(coro, loop=util.get_asyncio_loop()).result()

electrum/tests/test_lnrouter.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import shutil
55
import asyncio
66

7+
from electrum import util
78
from electrum.util import bh2u, bfh, create_and_start_event_loop
89
from electrum.lnutil import ShortChannelID
910
from electrum.lnonion import (OnionHopsDataSingle, new_onion_packet,
@@ -64,7 +65,7 @@ def prepare_graph(self):
6465
"""
6566
class fake_network:
6667
config = self.config
67-
asyncio_loop = asyncio.get_event_loop()
68+
asyncio_loop = util.get_asyncio_loop()
6869
trigger_callback = lambda *args: None
6970
register_callback = lambda *args: None
7071
interface = None

electrum/tests/test_lntransport.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22

3+
from electrum import util
34
from electrum.ecc import ECPrivkey
45
from electrum.lnutil import LNPeerAddr
56
from electrum.lntransport import LNResponderTransport, LNTransport
@@ -11,6 +12,15 @@
1112

1213
class TestLNTransport(ElectrumTestCase):
1314

15+
def setUp(self):
16+
super().setUp()
17+
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
18+
19+
def tearDown(self):
20+
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
21+
self._loop_thread.join(timeout=1)
22+
super().tearDown()
23+
1424
@needs_test_with_all_chacha20_implementations
1525
def test_responder(self):
1626
# local static
@@ -38,11 +48,11 @@ async def read(self, num_bytes):
3848
assert num_bytes == 66
3949
return bytes.fromhex('00b9e3a702e93e3a9948c2ed6e5fd7590a6e1c3a0344cfc9d5b57357049aa22355361aa02e55a8fc28fef5bd6d71ad0c38228dc68b1c466263b47fdf31e560e139ba')
4050
transport = LNResponderTransport(ls_priv, Reader(), Writer())
41-
asyncio.get_event_loop().run_until_complete(transport.handshake(epriv=e_priv))
51+
asyncio.run_coroutine_threadsafe(
52+
transport.handshake(epriv=e_priv), self.asyncio_loop).result()
4253

4354
@needs_test_with_all_chacha20_implementations
4455
def test_loop(self):
45-
loop = asyncio.get_event_loop()
4656
responder_shaked = asyncio.Event()
4757
server_shaked = asyncio.Event()
4858
responder_key = ECPrivkey.generate_random_key()
@@ -96,4 +106,4 @@ async def f():
96106
server.close()
97107
await server.wait_closed()
98108

99-
loop.run_until_complete(f())
109+
asyncio.run_coroutine_threadsafe(f(), self.asyncio_loop).result()

electrum/tests/test_network.py

+18-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from electrum.interface import Interface, ServerAddr
99
from electrum.crypto import sha256
1010
from electrum.util import bh2u
11+
from electrum import util
1112

1213
from . import ElectrumTestCase
1314

@@ -17,7 +18,9 @@ async def spawn(self, x): return
1718

1819
class MockNetwork:
1920
taskgroup = MockTaskGroup()
20-
asyncio_loop = asyncio.get_event_loop()
21+
22+
def __init__(self):
23+
self.asyncio_loop = util.get_asyncio_loop()
2124

2225
class MockInterface(Interface):
2326
def __init__(self, config):
@@ -52,9 +55,15 @@ def tearDownClass(cls):
5255

5356
def setUp(self):
5457
super().setUp()
58+
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
5559
self.config = SimpleConfig({'electrum_path': self.electrum_path})
5660
self.interface = MockInterface(self.config)
5761

62+
def tearDown(self):
63+
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
64+
self._loop_thread.join(timeout=1)
65+
super().tearDown()
66+
5867
def test_fork_noconflict(self):
5968
blockchain.blockchains = {}
6069
self.interface.q.put_nowait({'block_height': 8, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: False}})
@@ -66,7 +75,8 @@ def mock_connect(height):
6675
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
6776
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
6877
ifa = self.interface
69-
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
78+
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
79+
self.assertEqual(('fork', 8), fut.result())
7080
self.assertEqual(self.interface.q.qsize(), 0)
7181

7282
def test_fork_conflict(self):
@@ -80,7 +90,8 @@ def mock_connect(height):
8090
self.interface.q.put_nowait({'block_height': 5, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
8191
self.interface.q.put_nowait({'block_height': 6, 'mock': {'binary':1,'check':lambda x: True, 'connect': lambda x: True}})
8292
ifa = self.interface
83-
self.assertEqual(('fork', 8), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=7)))
93+
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=7), util.get_asyncio_loop())
94+
self.assertEqual(('fork', 8), fut.result())
8495
self.assertEqual(self.interface.q.qsize(), 0)
8596

8697
def test_can_connect_during_backward(self):
@@ -93,7 +104,8 @@ def mock_connect(height):
93104
self.interface.q.put_nowait({'block_height': 3, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
94105
self.interface.q.put_nowait({'block_height': 4, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
95106
ifa = self.interface
96-
self.assertEqual(('catchup', 5), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=4)))
107+
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=4), util.get_asyncio_loop())
108+
self.assertEqual(('catchup', 5), fut.result())
97109
self.assertEqual(self.interface.q.qsize(), 0)
98110

99111
def mock_fork(self, bad_header):
@@ -113,7 +125,8 @@ def test_chain_false_during_binary(self):
113125
self.interface.q.put_nowait({'block_height': 5, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
114126
self.interface.q.put_nowait({'block_height': 6, 'mock': {'catchup':1, 'check': lambda x: False, 'connect': lambda x: True}})
115127
ifa = self.interface
116-
self.assertEqual(('catchup', 7), asyncio.get_event_loop().run_until_complete(ifa.sync_until(8, next_height=6)))
128+
fut = asyncio.run_coroutine_threadsafe(ifa.sync_until(8, next_height=6), util.get_asyncio_loop())
129+
self.assertEqual(('catchup', 7), fut.result())
117130
self.assertEqual(self.interface.q.qsize(), 0)
118131

119132

electrum/tests/test_wallet_vertical.py

+13-26
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from electrum import storage, bitcoin, keystore, bip32, slip39, wallet
1010
from electrum import Transaction
1111
from electrum import SimpleConfig
12+
from electrum import util
1213
from electrum.address_synchronizer import TX_HEIGHT_UNCONFIRMED, TX_HEIGHT_UNCONF_PARENT
1314
from electrum.wallet import (sweep, Multisig_Wallet, Standard_Wallet, Imported_Wallet,
1415
restore_wallet_from_text, Abstract_Wallet, BumpFeeStrategy)
@@ -18,6 +19,7 @@
1819
from electrum.transaction import (TxOutput, Transaction, PartialTransaction, PartialTxOutput,
1920
PartialTxInput, tx_from_any, TxOutpoint)
2021
from electrum.mnemonic import seed_type
22+
from electrum.network import Network
2123

2224
from electrum.plugins.trustedcoin import trustedcoin
2325

@@ -699,8 +701,14 @@ class TestWalletSending(TestCaseForTestnet):
699701

700702
def setUp(self):
701703
super().setUp()
704+
self.asyncio_loop, self._stop_loop, self._loop_thread = util.create_and_start_event_loop()
702705
self.config = SimpleConfig({'electrum_path': self.electrum_path})
703706

707+
def tearDown(self):
708+
self.asyncio_loop.call_soon_threadsafe(self._stop_loop.set_result, 1)
709+
self._loop_thread.join(timeout=1)
710+
super().tearDown()
711+
704712
def create_standard_wallet_from_seed(self, seed_words, *, config=None, gap_limit=2):
705713
if config is None:
706714
config = self.config
@@ -1369,14 +1377,7 @@ async def get_transaction(self, txid, timeout=None):
13691377
raise Exception("unexpected txid")
13701378
def has_internet_connection(self):
13711379
return True
1372-
def run_from_another_thread(self, coro, *, timeout=None):
1373-
loop, stop_loop, loop_thread = create_and_start_event_loop()
1374-
fut = asyncio.run_coroutine_threadsafe(coro, loop)
1375-
try:
1376-
return fut.result(timeout)
1377-
finally:
1378-
loop.call_soon_threadsafe(stop_loop.set_result, 1)
1379-
loop_thread.join(timeout=1)
1380+
run_from_another_thread = Network.run_from_another_thread
13801381
def get_local_height(self):
13811382
return 0
13821383
def blockchain(self):
@@ -1429,14 +1430,7 @@ async def get_transaction(self, txid, timeout=None):
14291430
raise Exception("unexpected txid")
14301431
def has_internet_connection(self):
14311432
return True
1432-
def run_from_another_thread(self, coro, *, timeout=None):
1433-
loop, stop_loop, loop_thread = create_and_start_event_loop()
1434-
fut = asyncio.run_coroutine_threadsafe(coro, loop)
1435-
try:
1436-
return fut.result(timeout)
1437-
finally:
1438-
loop.call_soon_threadsafe(stop_loop.set_result, 1)
1439-
loop_thread.join(timeout=1)
1433+
run_from_another_thread = Network.run_from_another_thread
14401434
def get_local_height(self):
14411435
return 0
14421436
def blockchain(self):
@@ -1844,8 +1838,8 @@ async def get_transaction(self, txid):
18441838
network = NetworkMock()
18451839
dest_addr = 'tb1q3ws2p0qjk5vrravv065xqlnkckvzcpclk79eu2'
18461840
sweep_coro = sweep(privkeys, network=network, config=self.config, to_address=dest_addr, fee=5000, locktime=1325785, tx_version=1)
1847-
loop = asyncio.get_event_loop()
1848-
tx = loop.run_until_complete(sweep_coro)
1841+
loop = util.get_asyncio_loop()
1842+
tx = asyncio.run_coroutine_threadsafe(sweep_coro, loop).result()
18491843

18501844
tx_copy = tx_from_any(tx.serialize())
18511845
self.assertEqual('010000000129349e5641d79915e9d0282fdbaee8c3df0b6731bab9d70bf626e8588bde24ac010000004847304402206bf0d0a93abae0d5873a62ebf277a5dd2f33837821e8b93e74d04e19d71b578002201a6d729bc159941ef5c4c9e5fe13ece9fc544351ba531b00f68ba549c8b38a9a01fdffffff01b82e0f00000000001600148ba0a0bc12b51831f58c7ea8607e76c5982c071fd93a1400',
@@ -2199,14 +2193,7 @@ async def get_transaction(self, txid, timeout=None):
21992193
raise Exception("unexpected txid")
22002194
def has_internet_connection(self):
22012195
return True
2202-
def run_from_another_thread(self, coro, *, timeout=None):
2203-
loop, stop_loop, loop_thread = create_and_start_event_loop()
2204-
fut = asyncio.run_coroutine_threadsafe(coro, loop)
2205-
try:
2206-
return fut.result(timeout)
2207-
finally:
2208-
loop.call_soon_threadsafe(stop_loop.set_result, 1)
2209-
loop_thread.join(timeout=1)
2196+
run_from_another_thread = Network.run_from_another_thread
22102197
def get_local_height(self):
22112198
return 0
22122199
def blockchain(self):

0 commit comments

Comments
 (0)