-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathrun.py
166 lines (137 loc) · 5.47 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import logging
import os
from concurrent.futures import ProcessPoolExecutor as Pool
from importlib import reload
from time import sleep
from netqasm.logging.glob import get_netqasm_logger
from netqasm.logging.output import (reset_struct_loggers,
save_all_struct_loggers)
from netqasm.runtime import env, process_logs
from netqasm.runtime.app_config import AppConfig
from netqasm.runtime.application import ApplicationInstance
from netqasm.runtime.settings import Formalism
from netqasm.sdk.classical_communication import reset_socket_hub
from netqasm.sdk.config import LogConfig
from netqasm.sdk.shared_memory import SharedMemoryManager
from netqasm.util.yaml import dump_yaml
from simulaqron.network import Network
from simulaqron.settings import SimBackend, simulaqron_settings
from simulaqron.toolbox import has_module
logger = get_netqasm_logger()
# TODO similar code to squidasm.run.run, make base-class and subclasses?
_SIMULAQRON_BACKENDS = {
Formalism.STAB: SimBackend.STABILIZER,
Formalism.KET: SimBackend.PROJECTQ,
Formalism.DM: SimBackend.QUTIP,
}
def as_completed(futures, names=None, sleep_time=0):
futures = list(futures)
if names is not None:
names = list(names)
while len(futures) > 0:
for i, future in enumerate(futures):
if future.done():
futures.pop(i)
if names is None:
yield future
else:
name = names.pop(i)
yield future, name
if sleep_time > 0:
sleep(sleep_time)
def reset(save_loggers=False):
if save_loggers:
save_all_struct_loggers()
SharedMemoryManager.reset_memories()
reset_socket_hub()
reset_struct_loggers()
# Reset logging
logging.shutdown()
reload(logging)
def check_sim_backend(sim_backend):
if sim_backend in [SimBackend.PROJECTQ, SimBackend.PYQRACK, SimBackend.QUTIP]:
assert has_module.main(sim_backend.value), f"To use {sim_backend} as backend you need to install the package"
def run_sim_backend(node_names, sim_backend):
logger.debug(f"Starting simulaqron sim_backend process with nodes {node_names}")
check_sim_backend(sim_backend=sim_backend)
simulaqron_settings.sim_backend = sim_backend.value
network = Network(name="default", nodes=node_names, force=True, new=True)
network.start()
return network
def run_applications(
app_instance: ApplicationInstance,
num_rounds=1,
network_cfg=None,
log_cfg=None,
results_file=None,
formalism=Formalism.KET,
post_function=None,
flavour=None,
enable_logging=True,
hardware=None,
use_app_config=True, # whether to give app_config as argument to app's main()
):
"""Executes functions containing application scripts,
Parameters
----------
applications : dict
Keys should be names of nodes
Values should be the functions
"""
# app_names = [app_cfg.app_name for app_cfg in app_cfgs]
app_names = [program.party for program in app_instance.app.programs]
sim_backend = _SIMULAQRON_BACKENDS[formalism]
if enable_logging:
log_cfg = LogConfig() if log_cfg is None else log_cfg
app_instance.logging_cfg = log_cfg
log_dir = (
os.path.abspath("./log") if log_cfg.log_dir is None else log_cfg.log_dir
)
if not os.path.exists(log_dir):
os.mkdir(log_dir)
timed_log_dir = env.get_timed_log_dir(log_dir)
app_instance.logging_cfg.log_subroutines_dir = timed_log_dir
app_instance.logging_cfg.comm_log_dir = timed_log_dir
with Pool(len(app_names)) as executor:
# Start the backend process
network = run_sim_backend(app_names, sim_backend=sim_backend)
# Start the application processes
app_futures = []
programs = app_instance.app.programs
for program in programs:
inputs = app_instance.program_inputs[program.party]
if use_app_config:
app_cfg = AppConfig(
app_name=program.party,
node_name=program.party, # node name should be same as app name
main_func=program.entry,
log_config=app_instance.logging_cfg,
inputs=inputs,
)
inputs["app_config"] = app_cfg
future = executor.submit(program.entry, **inputs)
app_futures.append(future)
# for app_cfg in app_cfgs:
# inputs = app_cfg.inputs
# if use_app_config:
# inputs['app_config'] = app_cfg
# future = executor.submit(app_cfg.main_func, **inputs)
# app_futures.append(future)
# Join the application processes and the backend
names = [f'app_{app_name}' for app_name in app_names]
results = {}
for future, name in as_completed(app_futures, names=names):
results[name] = future.result()
# if results_file is not None:
# save_results(results=results, results_file=results_file)
if enable_logging:
assert timed_log_dir is not None
path = os.path.join(timed_log_dir, "results.yaml")
dump_yaml(data=results, file_path=path)
network.stop()
if enable_logging:
process_logs.make_last_log(log_dir=timed_log_dir)
reset(save_loggers=True)
return [results]
def save_results(results, results_file):
dump_yaml(data=results, file_path=results_file)