@@ -1169,6 +1169,137 @@ def get_signals(signal_array, frame, root_or_cache, ns, multiplex_id, float_fact
1169
1169
new_signal .add_attribute ("LongName" , signal_name )
1170
1170
frame .add_signal (new_signal )
1171
1171
1172
+ def get_frame_from_multiplexed_ipdu (pdu , target_frame , multiplex_translation , root_or_cache , ns , float_factory ):
1173
+ selector_byte_order = get_child (pdu , "SELECTOR-FIELD-BYTE-ORDER" , root_or_cache , ns )
1174
+ selector_len = get_child (pdu , "SELECTOR-FIELD-LENGTH" , root_or_cache , ns )
1175
+ selector_start = get_child (pdu , "SELECTOR-FIELD-START-POSITION" , root_or_cache , ns )
1176
+ is_little_endian = False
1177
+ if selector_byte_order .text == 'MOST-SIGNIFICANT-BYTE-LAST' :
1178
+ is_little_endian = True
1179
+ is_signed = False # unsigned
1180
+ multiplexor = canmatrix .Signal (
1181
+ "Multiplexor" ,
1182
+ start_bit = int (selector_start .text ),
1183
+ size = int (selector_len .text ),
1184
+ is_little_endian = is_little_endian ,
1185
+ multiplex = "Multiplexor" )
1186
+
1187
+ multiplexor ._initValue = 0
1188
+ target_frame .add_signal (multiplexor )
1189
+ static_part = get_child (pdu , "STATIC-PART" , root_or_cache , ns )
1190
+ ipdu = get_child (static_part , "I-PDU" , root_or_cache , ns )
1191
+ if ipdu is not None :
1192
+ pdu_sig_mappings = get_child (ipdu , "SIGNAL-TO-PDU-MAPPINGS" , root_or_cache , ns )
1193
+ pdu_sig_mapping = get_children (pdu_sig_mappings , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
1194
+ get_signals (pdu_sig_mapping , target_frame , root_or_cache , ns , None , float_factory )
1195
+ multiplex_translation [get_element_name (ipdu , ns )] = get_element_name (pdu , ns )
1196
+
1197
+ dynamic_part = get_child (pdu , "DYNAMIC-PART" , root_or_cache , ns )
1198
+ # segmentPositions = arGetChild(dynamic_part, "SEGMENT-POSITIONS", arDict, ns)
1199
+ # segmentPosition = arGetChild(segmentPositions, "SEGMENT-POSITION", arDict, ns)
1200
+ # byteOrder = arGetChild(segmentPosition, "SEGMENT-BYTE-ORDER", arDict, ns)
1201
+ # segLength = arGetChild(segmentPosition, "SEGMENT-LENGTH", arDict, ns)
1202
+ # segPos = arGetChild(segmentPosition, "SEGMENT-POSITION", arDict, ns)
1203
+ dynamic_part_alternatives = get_child (dynamic_part , "DYNAMIC-PART-ALTERNATIVES" , root_or_cache , ns )
1204
+ dynamic_part_alternative_list = get_children (dynamic_part_alternatives , "DYNAMIC-PART-ALTERNATIVE" ,
1205
+ root_or_cache , ns )
1206
+ for alternative in dynamic_part_alternative_list :
1207
+ selector_id = get_child (alternative , "SELECTOR-FIELD-CODE" , root_or_cache , ns )
1208
+ ipdu = get_child (alternative , "I-PDU" , root_or_cache , ns )
1209
+ multiplex_translation [get_element_name (ipdu , ns )] = get_element_name (pdu , ns )
1210
+ if ipdu is not None :
1211
+ pdu_sig_mappings = get_child (ipdu , "SIGNAL-TO-PDU-MAPPINGS" , root_or_cache , ns )
1212
+ pdu_sig_mapping = get_children (pdu_sig_mappings , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
1213
+ get_signals (pdu_sig_mapping , target_frame , root_or_cache , ns , selector_id .text , float_factory )
1214
+
1215
+
1216
+
1217
+ def get_frame_from_container_ipdu (pdu , target_frame , root_or_cache , ns , float_factory ):
1218
+ target_frame .is_fd = True
1219
+ pdus = get_children (pdu , "CONTAINED-PDU-TRIGGERING" , root_or_cache , ns )
1220
+ signal_group_id = 1
1221
+ singnals_grouped = []
1222
+ header_type = get_child (pdu , "HEADER-TYPE" , root_or_cache , ns ).text
1223
+ if header_type == "SHORT-HEADER" :
1224
+ header_length = 32
1225
+ target_frame .add_signal (canmatrix .Signal (start_bit = 0 , size = 24 , name = "Header_ID" , multiplex = "Multiplexor" , is_little_endian = True ))
1226
+ target_frame .add_signal (canmatrix .Signal (start_bit = 24 , size = 8 , name = "Header_DLC" , is_little_endian = True ))
1227
+ elif header_type == "LONG-HEADER" :
1228
+ header_length = 64
1229
+ target_frame .add_signal (canmatrix .Signal (start_bit = 0 , size = 32 , name = "Header_ID" , multiplex = "Multiplexor" ,
1230
+ is_little_endian = True ))
1231
+ target_frame .add_signal (canmatrix .Signal (start_bit = 32 , size = 32 , name = "Header_DLC" , is_little_endian = True ))
1232
+ else :
1233
+ raise ("header " + header_type + " not supported for containers yet" )
1234
+ # none type
1235
+ #TODO
1236
+
1237
+ for cpdu in pdus :
1238
+ ipdu = get_child (cpdu , "I-PDU" , root_or_cache , ns )
1239
+ try :
1240
+ if header_type == "SHORT-HEADER" :
1241
+ header_id = get_child (ipdu , "HEADER-ID-SHORT-HEADER" , root_or_cache , ns ).text
1242
+ elif header_type == "LONG-HEADER" :
1243
+ header_id = get_child (ipdu , "HEADER-ID-LONG-HEADER" , root_or_cache , ns ).text
1244
+ else :
1245
+ #none type
1246
+ pass
1247
+ except AttributeError :
1248
+ header_id = "0"
1249
+ if header_id .startswith ("0x" ):
1250
+ header_id = int (header_id , 16 )
1251
+ else :
1252
+ header_id = int (header_id )
1253
+
1254
+ # pdu_sig_mapping = get_children(ipdu, "I-SIGNAL-IN-I-PDU", root_or_cache, ns)
1255
+ pdu_sig_mapping = get_children (ipdu , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
1256
+ # TODO
1257
+ if pdu_sig_mapping :
1258
+ get_signals (pdu_sig_mapping , target_frame , root_or_cache , ns , header_id , float_factory , bit_offset = header_length )
1259
+ new_signals = []
1260
+ for signal in target_frame :
1261
+ if signal .name not in singnals_grouped and signal .name is not "Header_ID" and signal .name is not "Header_DLC" :
1262
+ new_signals .append (signal .name )
1263
+ target_frame .add_signal_group ("HEARDER_ID_" + str (header_id ), signal_group_id , new_signals )
1264
+ singnals_grouped += new_signals
1265
+ signal_group_id += 1
1266
+
1267
+ def store_frame_timings (target_frame , cyclic_timing , event_timing , minimum_delay , repeats , starting_time , time_offset , repeating_time , root_or_cache , time_period , ns , float_factory ):
1268
+ if cyclic_timing is not None and event_timing is not None :
1269
+ target_frame .add_attribute ("GenMsgSendType" , "cyclicAndSpontanX" ) # CycleAndSpontan
1270
+ if minimum_delay is not None :
1271
+ target_frame .add_attribute ("GenMsgDelayTime" , str (int (float_factory (minimum_delay .text ) * 1000 )))
1272
+ if repeats is not None :
1273
+ target_frame .add_attribute ("GenMsgNrOfRepetitions" , repeats .text )
1274
+ elif cyclic_timing is not None :
1275
+ target_frame .add_attribute ("GenMsgSendType" , "cyclicX" ) # CycleX
1276
+ if minimum_delay is not None :
1277
+ target_frame .add_attribute ("GenMsgDelayTime" , str (int (float_factory (minimum_delay .text ) * 1000 )))
1278
+ if repeats is not None :
1279
+ target_frame .add_attribute ("GenMsgNrOfRepetitions" , repeats .text )
1280
+ else :
1281
+ target_frame .add_attribute ("GenMsgSendType" , "spontanX" ) # Spontan
1282
+ if minimum_delay is not None :
1283
+ target_frame .add_attribute ("GenMsgDelayTime" , str (int (float_factory (minimum_delay .text ) * 1000 )))
1284
+ if repeats is not None :
1285
+ target_frame .add_attribute ("GenMsgNrOfRepetitions" , repeats .text )
1286
+
1287
+ if starting_time is not None :
1288
+ value = get_child (starting_time , "VALUE" , root_or_cache , ns )
1289
+ target_frame .add_attribute ("GenMsgStartDelayTime" , str (int (float_factory (value .text ) * 1000 )))
1290
+ elif cyclic_timing is not None :
1291
+ value = get_child (time_offset , "VALUE" , root_or_cache , ns )
1292
+ if value is not None :
1293
+ target_frame .add_attribute ("GenMsgStartDelayTime" , str (int (float_factory (value .text ) * 1000 )))
1294
+
1295
+ value = get_child (repeating_time , "VALUE" , root_or_cache , ns )
1296
+ if value is not None :
1297
+ target_frame .add_attribute ("GenMsgCycleTime" , str (int (float_factory (value .text ) * 1000 )))
1298
+ elif cyclic_timing is not None :
1299
+ value = get_child (time_period , "VALUE" , root_or_cache , ns )
1300
+ if value is not None :
1301
+ target_frame .add_attribute ("GenMsgCycleTime" , str (int (float_factory (value .text ) * 1000 )))
1302
+
1172
1303
1173
1304
def get_frame (frame_triggering , root_or_cache , multiplex_translation , ns , float_factory ):
1174
1305
# type: (_Element, _DocRoot, dict, str, typing.Callable) -> typing.Union[canmatrix.Frame, None]
@@ -1218,46 +1349,7 @@ def get_frame(frame_triggering, root_or_cache, multiplex_translation, ns, float_
1218
1349
logger .debug (get_element_name (pdu , ns ))
1219
1350
1220
1351
if pdu is not None and "MULTIPLEXED-I-PDU" in pdu .tag :
1221
- selector_byte_order = get_child (pdu , "SELECTOR-FIELD-BYTE-ORDER" , root_or_cache , ns )
1222
- selector_len = get_child (pdu , "SELECTOR-FIELD-LENGTH" , root_or_cache , ns )
1223
- selector_start = get_child (pdu , "SELECTOR-FIELD-START-POSITION" , root_or_cache , ns )
1224
- is_little_endian = False
1225
- if selector_byte_order .text == 'MOST-SIGNIFICANT-BYTE-LAST' :
1226
- is_little_endian = True
1227
- is_signed = False # unsigned
1228
- multiplexor = canmatrix .Signal (
1229
- "Multiplexor" ,
1230
- start_bit = int (selector_start .text ),
1231
- size = int (selector_len .text ),
1232
- is_little_endian = is_little_endian ,
1233
- multiplex = "Multiplexor" )
1234
-
1235
- multiplexor ._initValue = 0
1236
- new_frame .add_signal (multiplexor )
1237
- static_part = get_child (pdu , "STATIC-PART" , root_or_cache , ns )
1238
- ipdu = get_child (static_part , "I-PDU" , root_or_cache , ns )
1239
- if ipdu is not None :
1240
- pdu_sig_mappings = get_child (ipdu , "SIGNAL-TO-PDU-MAPPINGS" , root_or_cache , ns )
1241
- pdu_sig_mapping = get_children (pdu_sig_mappings , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
1242
- get_signals (pdu_sig_mapping , new_frame , root_or_cache , ns , None , float_factory )
1243
- multiplex_translation [get_element_name (ipdu , ns )] = get_element_name (pdu , ns )
1244
-
1245
- dynamic_part = get_child (pdu , "DYNAMIC-PART" , root_or_cache , ns )
1246
- # segmentPositions = arGetChild(dynamic_part, "SEGMENT-POSITIONS", arDict, ns)
1247
- # segmentPosition = arGetChild(segmentPositions, "SEGMENT-POSITION", arDict, ns)
1248
- # byteOrder = arGetChild(segmentPosition, "SEGMENT-BYTE-ORDER", arDict, ns)
1249
- # segLength = arGetChild(segmentPosition, "SEGMENT-LENGTH", arDict, ns)
1250
- # segPos = arGetChild(segmentPosition, "SEGMENT-POSITION", arDict, ns)
1251
- dynamic_part_alternatives = get_child (dynamic_part , "DYNAMIC-PART-ALTERNATIVES" , root_or_cache , ns )
1252
- dynamic_part_alternative_list = get_children (dynamic_part_alternatives , "DYNAMIC-PART-ALTERNATIVE" , root_or_cache , ns )
1253
- for alternative in dynamic_part_alternative_list :
1254
- selector_id = get_child (alternative , "SELECTOR-FIELD-CODE" , root_or_cache , ns )
1255
- ipdu = get_child (alternative , "I-PDU" , root_or_cache , ns )
1256
- multiplex_translation [get_element_name (ipdu , ns )] = get_element_name (pdu , ns )
1257
- if ipdu is not None :
1258
- pdu_sig_mappings = get_child (ipdu , "SIGNAL-TO-PDU-MAPPINGS" , root_or_cache , ns )
1259
- pdu_sig_mapping = get_children (pdu_sig_mappings , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
1260
- get_signals (pdu_sig_mapping , new_frame , root_or_cache , ns , selector_id .text , float_factory )
1352
+ get_frame_from_multiplexed_ipdu (pdu , new_frame , multiplex_translation , root_or_cache , ns , float_factory )
1261
1353
1262
1354
if new_frame .comment is None :
1263
1355
new_frame .add_comment (get_element_desc (pdu , root_or_cache , ns ))
@@ -1281,75 +1373,10 @@ def get_frame(frame_triggering, root_or_cache, multiplex_translation, ns, float_
1281
1373
time_offset = get_child (cyclic_timing , "TIME-OFFSET" , root_or_cache , ns )
1282
1374
time_period = get_child (cyclic_timing , "TIME-PERIOD" , root_or_cache , ns )
1283
1375
1284
- if cyclic_timing is not None and event_timing is not None :
1285
- new_frame .add_attribute ("GenMsgSendType" , "cyclicAndSpontanX" ) # CycleAndSpontan
1286
- if minimum_delay is not None :
1287
- new_frame .add_attribute ("GenMsgDelayTime" , str (int (float_factory (minimum_delay .text ) * 1000 )))
1288
- if repeats is not None :
1289
- new_frame .add_attribute ("GenMsgNrOfRepetitions" , repeats .text )
1290
- elif cyclic_timing is not None :
1291
- new_frame .add_attribute ("GenMsgSendType" , "cyclicX" ) # CycleX
1292
- if minimum_delay is not None :
1293
- new_frame .add_attribute ("GenMsgDelayTime" , str (int (float_factory (minimum_delay .text ) * 1000 )))
1294
- if repeats is not None :
1295
- new_frame .add_attribute ("GenMsgNrOfRepetitions" , repeats .text )
1296
- else :
1297
- new_frame .add_attribute ("GenMsgSendType" , "spontanX" ) # Spontan
1298
- if minimum_delay is not None :
1299
- new_frame .add_attribute ("GenMsgDelayTime" , str (int (float_factory (minimum_delay .text ) * 1000 )))
1300
- if repeats is not None :
1301
- new_frame .add_attribute ("GenMsgNrOfRepetitions" , repeats .text )
1302
-
1303
- if starting_time is not None :
1304
- value = get_child (starting_time , "VALUE" , root_or_cache , ns )
1305
- new_frame .add_attribute ("GenMsgStartDelayTime" , str (int (float_factory (value .text ) * 1000 )))
1306
- elif cyclic_timing is not None :
1307
- value = get_child (time_offset , "VALUE" , root_or_cache , ns )
1308
- if value is not None :
1309
- new_frame .add_attribute ("GenMsgStartDelayTime" , str (int (float_factory (value .text ) * 1000 )))
1310
-
1311
- value = get_child (repeating_time , "VALUE" , root_or_cache , ns )
1312
- if value is not None :
1313
- new_frame .add_attribute ("GenMsgCycleTime" , str (int (float_factory (value .text ) * 1000 )))
1314
- elif cyclic_timing is not None :
1315
- value = get_child (time_period , "VALUE" , root_or_cache , ns )
1316
- if value is not None :
1317
- new_frame .add_attribute ("GenMsgCycleTime" , str (int (float_factory (value .text ) * 1000 )))
1318
-
1376
+ store_frame_timings (new_frame , cyclic_timing , event_timing , minimum_delay , repeats , starting_time , time_offset , repeating_time , root_or_cache , time_period , ns , float_factory )
1319
1377
1320
1378
if pdu .tag == ns + "CONTAINER-I-PDU" :
1321
- pdus = get_children (pdu , "CONTAINED-PDU-TRIGGERING" , root_or_cache , ns )
1322
- signal_group_id = 1
1323
- singnals_grouped = []
1324
- header_type = get_child (pdu , "HEADER-TYPE" , root_or_cache , ns ).text
1325
- if header_type == "SHORT-HEADER" :
1326
- header_length = 32
1327
- new_frame .add_signal (canmatrix .Signal (start_bit = 0 , size = 24 , name = "Header_ID" , multiplex = "Multiplexor" , is_little_endian = True ))
1328
- new_frame .add_signal (canmatrix .Signal (start_bit = 24 , size = 8 , name = "Header_DLC" , is_little_endian = True ))
1329
- else :
1330
- pass
1331
- #TODO
1332
-
1333
-
1334
- for cpdu in pdus :
1335
- ipdu = get_child (cpdu , "I-PDU" , root_or_cache , ns )
1336
- try :
1337
- #TODO
1338
- header_id = get_child (ipdu , "HEADER-ID-SHORT-HEADER" , root_or_cache , ns ).text
1339
- except AttributeError :
1340
- header_id = "0"
1341
- # pdu_sig_mapping = get_children(ipdu, "I-SIGNAL-IN-I-PDU", root_or_cache, ns)
1342
- pdu_sig_mapping = get_children (ipdu , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
1343
- # TODO
1344
- if pdu_sig_mapping :
1345
- get_signals (pdu_sig_mapping , new_frame , root_or_cache , ns , int (header_id ), float_factory , bit_offset = header_length )
1346
- new_signals = []
1347
- for signal in new_frame :
1348
- if signal .name not in singnals_grouped and signal .name is not "Header_ID" and signal .name is not "Header_DLC" :
1349
- new_signals .append (signal .name )
1350
- new_frame .add_signal_group ("HEARDER_ID_" + header_id , signal_group_id , new_signals )
1351
- singnals_grouped += new_signals
1352
- signal_group_id += 1
1379
+ get_frame_from_container_ipdu (pdu , new_frame , root_or_cache , ns , float_factory )
1353
1380
1354
1381
else :
1355
1382
pdu_sig_mapping = get_children (pdu , "I-SIGNAL-TO-I-PDU-MAPPING" , root_or_cache , ns )
@@ -1375,6 +1402,7 @@ def get_frame(frame_triggering, root_or_cache, multiplex_translation, ns, float_
1375
1402
get_signals (signal_to_pdu_maps , new_frame , root_or_cache , ns , None , float_factory ) # todo BUG expects list, not item
1376
1403
else :
1377
1404
logger .debug ("Frame %s (assuming AR4.2) no PDU-TRIGGERINGS found" , new_frame .name )
1405
+ new_frame .fit_dlc ()
1378
1406
return new_frame
1379
1407
1380
1408
0 commit comments