Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable servicebus batch trigger (cardinality=many) #73

Merged
merged 12 commits into from
Oct 13, 2020
Prev Previous commit
Next Next commit
Add service bus to handle multiple messages in batch
Hanzhang Zeng (Roger) committed Oct 10, 2020
commit 1b7085c7c051aaa679f6792063738b0bc2279030
337 changes: 302 additions & 35 deletions azure/functions/_servicebus.py
Original file line number Diff line number Diff line change
@@ -3,114 +3,381 @@

import abc
import datetime
import typing
from typing import Optional, Dict, Any


class ServiceBusMessage(abc.ABC):

@abc.abstractmethod
def get_body(self) -> bytes:
"""Return message body as bytes."""
"""Get the message body from ServiceBus
Returns:
--------
bytes
The ServiceBus message body in bytes form
"""
pass

@property
@abc.abstractmethod
def content_type(self) -> typing.Optional[str]:
"""Message content type."""
def content_type(self) -> Optional[str]:
"""Optionally describes the payload of the message,
with a descriptor following the format of RFC2045
Returns:
--------
Optional[str]
If content type is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def correlation_id(self) -> Optional[str]:
"""Enables an application to specify a context for the message for the
purposes of correlation
Returns:
--------
Optional[str]
If correlation id set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def correlation_id(self) -> typing.Optional[str]:
"""Message correlation identifier."""
def dead_letter_source(self) -> Optional[str]:
"""Only set in messages that have been dead-lettered and subsequently
auto-forwarded from the dead-letter queue to another entity.
Indicates the entity in which the message was dead-lettered.
This property is read-only.
Returns:
--------
Optional[str]
If dead letter source is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def delivery_count(self) -> typing.Optional[int]:
"""Number of times delivery has been attempted."""
def delivery_count(self) -> Optional[int]:
"""Number of deliveries that have been attempted for this message.
The count is incremented when a message lock expires,
or the message is explicitly abandoned by the receiver.
This property is read-only.
Returns:
--------
Optional[str]
If delivery count is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def enqueued_time_utc(self) -> typing.Optional[datetime.datetime]:
"""The date and time in UTC at which the message is enqueued"""
def enqueued_sequence_number(self) -> Optional[int]:
"""For messages that have been auto-forwarded,
this property reflects the sequence number that had first been
assigned to the message at its original point of submission.
This property is read-only.
Returns:
--------
Optional[int]
If enqueued sequence number is set, returns an int.
Otherwise,
returns None.
"""
pass

@property
@abc.abstractmethod
def expires_at_utc(self) -> typing.Optional[datetime.datetime]:
"""The date and time in UTC at which the message is set to expire."""
def enqueued_time_utc(self) -> Optional[datetime.datetime]:
"""The UTC instant at which the message has been accepted and stored
in the entity. This value can be used as an authoritative and neutral
arrival time indicator when the receiver does not want to trust the
sender's clock. This property is read-only.
Returns:
--------
Optional[datetime.datetime]
If enqueued time utc is set, returns a datetime.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def expiration_time(self) -> typing.Optional[datetime.datetime]:
"""The date and time in UTC at which the message is set to expire."""
def expires_at_utc(self) -> Optional[datetime.datetime]:
"""The UTC instant at which the message is marked for removal and no
longer available for retrieval from the entity due to its expiration.
Expiry is controlled by the TimeToLive property and this property is
computed from EnqueuedTimeUtc+TimeToLive. This property is read-only.
Returns:
--------
Optional[datetime.datetime]
If expires at utc is set, returns a datetime.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def label(self) -> typing.Optional[str]:
"""Application specific label."""
def expiration_time(self) -> Optional[datetime.datetime]:
"""(Deprecated, use expires_at_utc instead)"""
pass

@property
@abc.abstractmethod
def force_persistence(self) -> Optional[bool]:
"""For queues or topics that have the EnableExpress flag set,
this property can be set to indicate that the message must be
persisted to disk before it is acknowledged. This is the standard
behavior for all non-express entities.
Returns:
--------
Optional[bool]
If force persistence is set, returns a bool.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def label(self) -> Optional[str]:
"""This property enables the application to indicate the purpose of
the message to the receiver in a standardized fashion, similar to an
email subject line.
Returns:
--------
Optional[str]
If label is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def locked_until_utc(self) -> Optional[datetime.datetime]:
""" For messages retrieved under a lock (peek-lock receive mode,
not pre-settled) this property reflects the UTC instant until which
the message is held locked in the queue/subscription. When the lock
expires, the DeliveryCount is incremented and the message is again
available for retrieval. This property is read-only.
Returns:
--------
Optional[str]
If local until utc is set, returns a datetime.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def lock_token(self) -> Optional[str]:
""" The lock token is a reference to the lock that is being held by
the broker in peek-lock receive mode. The token can be used to pin the
lock permanently through the Deferral API and, with that, take the
message out of the regular delivery state flow.
This property is read-only.
Returns:
--------
Optional[str]
If local token is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def message_id(self) -> str:
"""Identifier used to identify the message."""
"""The message identifier is an application-defined value that
uniquely identifies the message and its payload.
The identifier is a free-form string and can reflect a GUID or an
identifier derived from the application context. If enabled, the
duplicate detection feature identifies and removes second and further
submissions of messages with the same MessageId.
Returns:
--------
str
The message identifier
"""
pass

@property
@abc.abstractmethod
def partition_key(self) -> Optional[str]:
""" For partitioned entities, setting this value enables assigning
related messages to the same internal partition, so that submission
sequence order is correctly recorded. The partition is chosen by a
hash function over this value and cannot be chosen directly. For
session-aware entities, the SessionId property overrides this value.
Returns:
--------
Optional[str]
If partition key is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def reply_to(self) -> Optional[str]:
"""This optional and application-defined value is a standard way to
express a reply path to the receiver of the message. When a sender
expects a reply, it sets the value to the absolute or relative path
of the queue or topic it expects the reply to be sent to.
Returns:
--------
Optional[str]
If reply to is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def partition_key(self) -> typing.Optional[str]:
"""Message partition key."""
def reply_to_session_id(self) -> Optional[str]:
"""This value augments the ReplyTo information and specifies which
SessionId should be set for the reply when sent to the reply entity.
Returns:
--------
Optional[str]
If reply to session id is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def reply_to(self) -> typing.Optional[str]:
"""The address of an entity to send replies to."""
def scheduled_enqueue_time(self) -> Optional[datetime.datetime]:
"""(Deprecated, use scheduled_enqueue_time_utc instead)"""
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think of a way to inform users through the logs (mainly for the in the runtime).


@property
@abc.abstractmethod
def reply_to_session_id(self) -> typing.Optional[str]:
"""A session identifier augmenting the reply_to address."""
def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]:
"""For messages that are only made available for retrieval after a
delay, this property defines the UTC instant at which the message
will be logically enqueued, sequenced, and therefore made available
for retrieval.
Returns:
--------
Optional[datetime.datetime]
If scheduled enqueue time utc is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def scheduled_enqueue_time(self) -> typing.Optional[datetime.datetime]:
"""The date and time in UTC at which the message will be enqueued."""
def session_id(self) -> Optional[str]:
"""For session-aware entities, this application-defined value
specifies the session affiliation of the message. Messages with the
same session identifier are subject to summary locking and enable
exact in-order processing and demultiplexing. For entities that are
not session-aware, this value is ignored.
Returns:
--------
Optional[str]
If session id is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def session_id(self) -> typing.Optional[str]:
"""The session identifier for a session-aware entity."""
def time_to_live(self) -> Optional[datetime.timedelta]:
""" This value is the relative duration after which the message
expires, starting from the instant the message has been accepted and
stored by the broker, as captured in EnqueueTimeUtc. When not set
explicitly, the assumed value is the DefaultTimeToLive for the
respective queue or topic. A message-level TimeToLive value cannot
be longer than the entity's DefaultTimeToLive setting.
If it is longer, it is silently adjusted.
Returns:
--------
Optional[datetime.timedelta]
If time to live is set, returns a timedelta.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def time_to_live(self) -> typing.Optional[datetime.timedelta]:
"""The TTL time interval."""
def to(self) -> Optional[str]:
""" This property is reserved for future use in routing scenarios and
currently ignored by the broker itself. Applications can use this
value in rule-driven auto-forward chaining scenarios to indicate the
intended logical destination of the message.
Returns:
--------
Optional[str]
If the recipient is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def to(self) -> typing.Optional[str]:
"""The address of an entity the message is addressed."""
def via_partition_key(self) -> Optional[str]:
"""If a message is sent via a transfer queue in the scope of a
transaction, this value selects the transfer queue partition.
Returns:
--------
Optional[str]
If via partition key is set, returns a string.
Otherwise, returns None.
"""
pass

@property
@abc.abstractmethod
def user_properties(self) -> typing.Dict[str, object]:
"""User-defined message metadata."""
def user_properties(self) -> Dict[str, Any]:
"""Contains user defined message properties.
Returns:
--------
Dict[str, Any]:
If user has set properties for the message, returns a dictionary.
If nothing is set, returns an empty dictionary.
"""
pass

@property
@abc.abstractmethod
def metadata(self) -> typing.Optional[typing.Mapping[str, typing.Any]]:
"""The serialized JSON string from trigger metadata"""
def metadata(self) -> Optional[Dict[str, Any]]:
"""Getting read-only trigger metadata in a Python dictionary.
Exposing the raw trigger_metadata to our customer. For cardinality=many
scenarios, each event points to the common metadata of all the events.
So when using metadata field when cardinality=many, it only needs to
take one of the events to get all the data (e.g. events[0].metadata).
Returns:
--------
Dict[str, object]
Return the Python dictionary of trigger metadata
"""
pass
233 changes: 177 additions & 56 deletions azure/functions/servicebus.py
Original file line number Diff line number Diff line change
@@ -19,43 +19,54 @@ def __init__(
trigger_metadata: Mapping[str, Any] = None,
content_type: Optional[str] = None,
correlation_id: Optional[str] = None,
delivery_count: Optional[int] = 0,
dead_letter_source: Optional[str] = None,
delivery_count: Optional[int] = None,
enqueued_sequence_number: Optional[int] = None,
enqueued_time_utc: Optional[datetime.datetime] = None,
expiration_time: Optional[datetime.datetime] = None,
expires_at_utc: Optional[datetime.datetime] = None,
force_persistence: Optional[bool] = None,
label: Optional[str] = None,
locked_until_utc: Optional[datetime.datetime] = None,
lock_token: Optional[str] = None,
message_id: str,
partition_key: Optional[str] = None,
reply_to: Optional[str] = None,
reply_to_session_id: Optional[str] = None,
scheduled_enqueue_time: Optional[datetime.datetime] = None,
scheduled_enqueue_time_utc: Optional[datetime.datetime] = None,
sequence_number: Optional[int] = 0,
session_id: Optional[str] = None,
time_to_live: Optional[datetime.timedelta] = None,
to: Optional[str] = None,
via_partition_key: Optional[str] = None,
user_properties: Dict[str, object]) -> None:

self.__body = body
self.__trigger_metadata = trigger_metadata
self.__content_type = content_type
self.__correlation_id = correlation_id
self.__dead_letter_source = dead_letter_source
self.__delivery_count = delivery_count
self.__enqueued_sequence_number = enqueued_sequence_number
self.__enqueued_time_utc = enqueued_time_utc
self.__expiration_time = expiration_time
self.__expires_at_utc = expires_at_utc
self.__force_persistence = force_persistence
self.__label = label
self.__locked_until_utc = locked_until_utc
self.__lock_token = lock_token
self.__message_id = message_id
self.__partition_key = partition_key
self.__reply_to = reply_to
self.__reply_to_session_id = reply_to_session_id
self.__scheduled_enqueue_time = scheduled_enqueue_time
self.__scheduled_enqueue_time_utc = scheduled_enqueue_time_utc
self.__sequence_number = sequence_number
self.__session_id = session_id
self.__time_to_live = time_to_live
self.__to = to
self.__via_partition_key = via_partition_key
self.__user_properties = user_properties

# Cache for trigger metadata after Python object conversion
self._trigger_metadata_pyobj: Optional[
Mapping[str, Any]] = None
self._trigger_metadata_pyobj: Optional[Mapping[str, Any]] = None

def get_body(self) -> bytes:
return self.__body
@@ -68,26 +79,47 @@ def content_type(self) -> Optional[str]:
def correlation_id(self) -> Optional[str]:
return self.__correlation_id

@property
def dead_letter_source(self) -> Optional[str]:
return self.__dead_letter_source

@property
def delivery_count(self) -> Optional[int]:
return self.__delivery_count

@property
def enqueued_sequence_number(self) -> Optional[int]:
return self.__enqueued_sequence_number

@property
def enqueued_time_utc(self) -> Optional[datetime.datetime]:
return self.__enqueued_time_utc

@property
def expiration_time(self) -> Optional[datetime.datetime]:
return self.__expiration_time
def expires_at_utc(self) -> Optional[datetime.datetime]:
return self.__expires_at_utc

@property
def expires_at_utc(self) -> Optional[datetime.datetime]:
def expiration_time(self) -> Optional[datetime.datetime]:
"""(Deprecated) Use expires_at_utc instead"""
return self.__expires_at_utc

@property
def force_persistence(self) -> Optional[bool]:
return self.__force_persistence

@property
def label(self) -> Optional[str]:
return self.__label

@property
def locked_until_utc(self) -> Optional[datetime.datetime]:
return self.__locked_until_utc

@property
def lock_token(self) -> Optional[str]:
return self.__lock_token

@property
def message_id(self) -> str:
return self.__message_id
@@ -106,7 +138,12 @@ def reply_to_session_id(self) -> Optional[str]:

@property
def scheduled_enqueue_time(self) -> Optional[datetime.datetime]:
return self.__scheduled_enqueue_time
"""(Deprecated) Use scheduled_enqueue_time_utc instead"""
return self.__scheduled_enqueue_time_utc

@property
def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]:
return self.__scheduled_enqueue_time_utc

@property
def session_id(self) -> Optional[str]:
@@ -120,25 +157,16 @@ def time_to_live(self) -> Optional[datetime.timedelta]:
def to(self) -> Optional[str]:
return self.__to

@property
def via_partition_key(self) -> Optional[str]:
return self.__via_partition_key

@property
def user_properties(self) -> Dict[str, object]:
return self.__user_properties

@property
def metadata(self) -> Optional[Mapping[str, Any]]:
"""Getting read-only trigger metadata in a Python dictionary.
Exposing the raw trigger_metadata to our customer. For cardinality=many
scenarios, each event points to the common metadata of all the events.
So when using metadata field when cardinality=many, it only needs to
take one of the events to get all the data (e.g. events[0].metadata).
Returns:
--------
Mapping[str, object]
Return the Python dictionary of trigger metadata
"""
def metadata(self) -> Optional[Dict[str, Any]]:
if self.__trigger_metadata is None:
return None

@@ -163,7 +191,9 @@ class ServiceBusMessageInConverter(meta.InConverter,

@classmethod
def check_input_type_annotation(cls, pytype: type) -> bool:
return issubclass(pytype, azf_sbus.ServiceBusMessage)
return issubclass(pytype, azf_sbus.ServiceBusMessage) or (
meta.is_iterable_type_annotation(
pytype, azf_sbus.ServiceBusMessage))

@classmethod
def decode(
@@ -188,11 +218,11 @@ def decode(
returns a list of ServiceBusMessage.
"""
if cls._is_cardinality_one(trigger_metadata):
raise Exception('DECODE SINGLE MESSAGE')
return cls.decode_single_message(data, trigger_metadata)
return cls.decode_single_message(data,
trigger_metadata=trigger_metadata)
elif cls._is_cardinality_many(trigger_metadata):
raise Exception('DECODE MULTIPLE MESSAGE')
return cls.decode_multiple_messages(data, trigger_metadata)
return cls.decode_multiple_messages(data,
trigger_metadata=trigger_metadata)
else:
raise NotImplementedError(
f'unsupported service bus data type: {data.type}')
@@ -226,16 +256,24 @@ def decode_single_message(cls, data: meta.Datum, *,
trigger_metadata, 'ContentType', python_type=str),
correlation_id=cls._decode_trigger_metadata_field(
trigger_metadata, 'CorrelationId', python_type=str),
dead_letter_source=cls._decode_trigger_metadata_field(
trigger_metadata, 'DeadLetterSource', python_type=str),
delivery_count=cls._decode_trigger_metadata_field(
trigger_metadata, 'DeliveryCount', python_type=int),
enqueued_sequence_number=cls._decode_trigger_metadata_field(
trigger_metadata, 'EnqueuedSequenceNumber', python_type=int),
enqueued_time_utc=cls._parse_datetime_metadata(
trigger_metadata, 'EnqueuedTimeUtc'),
expiration_time=cls._parse_datetime_metadata(
trigger_metadata, 'ExpirationTime'),
expires_at_utc=cls._parse_datetime_metadata(
trigger_metadata, 'ExpiresAtUtc'),
force_persistence=cls._decode_trigger_metadata_field(
trigger_metadata, 'ForcePersistence', python_type=bool),
label=cls._decode_trigger_metadata_field(
trigger_metadata, 'Label', python_type=str),
locked_until_utc=cls._parse_datetime_metadata(
trigger_metadata, 'LockedUntilUtc'),
lock_token=cls._decode_trigger_metadata_field(
trigger_metadata, 'LockToken', python_type=str),
message_id=cls._decode_trigger_metadata_field(
trigger_metadata, 'MessageId', python_type=str),
partition_key=cls._decode_trigger_metadata_field(
@@ -244,67 +282,150 @@ def decode_single_message(cls, data: meta.Datum, *,
trigger_metadata, 'ReplyTo', python_type=str),
reply_to_session_id=cls._decode_trigger_metadata_field(
trigger_metadata, 'ReplyToSessionId', python_type=str),
scheduled_enqueue_time=cls._parse_datetime_metadata(
trigger_metadata, 'ScheduledEnqueueTime'),
scheduled_enqueue_time_utc=cls._parse_datetime_metadata(
trigger_metadata, 'ScheduledEnqueueTimeUtc'),
session_id=cls._decode_trigger_metadata_field(
trigger_metadata, 'SessionId', python_type=str),
time_to_live=cls._parse_timedelta_metadata(
trigger_metadata, 'TimeToLive'),
to=cls._decode_trigger_metadata_field(
trigger_metadata, 'To', python_type=str),
via_partition_key=cls._decode_trigger_metadata_field(
trigger_metadata, 'ViaPartitionKey', python_type=str),
user_properties=cls._decode_trigger_metadata_field(
trigger_metadata, 'UserProperties', python_type=dict),
)

@classmethod
def decode_multiple_messages(cls, data: meta.Datum, *,
trigger_metadata: Mapping[str, meta.Datum]) -> List[ServiceBusMessage]:
"""Unlike EventHub, the trigger_metadata already contains a set of
arrays (e.g. 'ContentTypeArray', 'CorrelationidArray'...). We can
retrieve message properties directly from those array.
"""
if data.type == 'collection_bytes':
parsed_data = data.value.bytes

elif data.type == 'collection_string':
parsed_data = data.value.string

# Input Trigger IotHub Event
elif data.type == 'json':
parsed_data = json.loads(data.value)

sys_props = trigger_metadata.get('SystemPropertiesArray')

parsed_sys_props: List[Any] = []
if sys_props is not None:
parsed_sys_props = json.loads(sys_props.value)

messages = []
for i in range(len(parsed_data)):
enqueued_time = parsed_sys_props[i].get('EnqueuedTimeUtc')

message = ServiceBusMessage(
body=cls._marshall_message_body(parsed_data[i], data.type),
trigger_metadata=trigger_metadata,
)

messages.append(message)

return messages
return cls._extract_messages(parsed_data, data.type, trigger_metadata)

@classmethod
def _is_cardinality_many(cls, trigger_metadata) -> bool:
return 'SystemPropertiesArray' in trigger_metadata
return 'UserPropertiesArray' in trigger_metadata

@classmethod
def _is_cardinality_one(cls, trigger_metadata) -> bool:
return 'SystemProperties' in trigger_metadata
return 'UserProperties' in trigger_metadata

@classmethod
def _get_event_count(cls, trigger_metadata) -> int:
datum = trigger_metadata['UserPropertiesArray']
user_props = json.loads(datum.value)
return len(user_props)

@classmethod
def _marshall_message_body(cls, parsed_data, data_type) -> str:
if data_type == 'bytes':
def _marshall_message_body(cls, parsed_data, data_type) -> bytes:
if data_type == 'str':
return parsed_data.encode('utf-8')

if data_type == 'json':
return json.dumps(parsed_data).encode('utf-8')

return parsed_data

@classmethod
def _extract_messages(cls,
parsed_data: str,
data_type: type,
trigger_metadata: Mapping[str, meta.Datum]
) -> List[ServiceBusMessage]:

num_messages = cls._get_event_count(trigger_metadata)

messages = []
content_types: List[str] = (
trigger_metadata['ContentTypeArray'].value.string
)
correlation_ids: List[str] = (
trigger_metadata['CorrelationIdArray'].value.string
)
dead_letter_sources: List[str] = (
trigger_metadata['DeadLetterSourceArray'].value.string
)
delivery_counts: List[int] = json.loads(
trigger_metadata['DeliveryCountArray'].value
)
enqueued_time_utcs: List[str] = json.loads(
trigger_metadata['EnqueuedTimeUtcArray'].value
)
expires_at_utcs: List[str] = json.loads(
trigger_metadata['ExpiresAtUtcArray'].value
)
labels: List[str] = (
trigger_metadata['LabelArray'].value.string
)
lock_tokens: List[str] = (
trigger_metadata['LockTokenArray'].value.string
)
message_ids: List[str] = (
trigger_metadata['MessageIdArray'].value.string
)
sequence_numbers: List[int] = (
trigger_metadata['SequenceNumberArray'].value.sint64
)
tos: List[str] = (
trigger_metadata['ToArray'].value.string
)
reply_tos: List[str] = (
trigger_metadata['ReplyToArray'].value.string
)
user_properties_list: List[Dict[str, Any]] = json.loads(
trigger_metadata['UserPropertiesArray'].value
)

for i in range(num_messages):
messages.append(ServiceBusMessage(
body=cls._marshall_message_body(parsed_data[i], data_type),
trigger_metadata=trigger_metadata,
content_type=cls._get_or_none(content_types, i),
correlation_id=cls._get_or_none(correlation_ids, i),
dead_letter_source=cls._get_or_none(dead_letter_sources, i),
delivery_count=cls._get_or_none(delivery_counts, i),
enqueued_time_utc=cls._parse_datetime(
cls._get_or_none(enqueued_time_utcs, i)),
expires_at_utc=cls._parse_datetime(
cls._get_or_none(expires_at_utcs, i)),
label=cls._get_or_none(labels, i),
lock_token=cls._get_or_none(lock_tokens, i),
message_id=cls._get_or_none(message_ids, i),
sequence_number=cls._get_or_none(sequence_numbers, i),
to=cls._get_or_none(tos, i),
reply_to=cls._get_or_none(reply_tos, i),
user_properties=cls._get_or_none(user_properties_list, i)
))
return messages

@classmethod
def _get_or_none(cls, list_: List[Any], index: int) -> Any:
"""Some metadata array does not contain any values (e.g.
correlation_ids array may be empty [] when there's multiple messages).
This results in a IndexError when referencing the message. To avoid
this issue, when getting the value, we should return None is index is
out of bound.
"""
if index >= len(list_):
return None

return list_[index]


class ServiceBusMessageOutConverter(meta.OutConverter, binding='serviceBus'):

@classmethod