Skip to content
This repository was archived by the owner on Jul 19, 2024. It is now read-only.

Service Bus Batch Trigger #15

Closed
BrianVallelunga opened this issue Feb 15, 2017 · 73 comments
Closed

Service Bus Batch Trigger #15

BrianVallelunga opened this issue Feb 15, 2017 · 73 comments
Assignees
Labels

Comments

@BrianVallelunga
Copy link

I'd like to start a conversation around supporting batches of messages from an Azure Service Bus. A similar request was made for Storage Queues in Azure/azure-webjobs-sdk#625

Ideally, I'd be able to specify something like:

public void Handle([ServiceBusBatchTrigger("myqueue", 25)] BrokeredMessage[] messages)
{
  // Process the batch of messages  
}

There are a lot of scenarios where batching message processing is far more efficient than working with an individual message. In my specific case, I'm doing database inserts and those could be done much more efficiently in a batch.

I'm going to look into a similar project that supports Storage Queues: https://github.com/ealsur/WebJobs.Extensions.GroupQueueTrigger

Is this something others would be interested in?

@tcsatheesh
Copy link
Member

tcsatheesh commented Feb 26, 2017

I think this would be extremely useful. I ran a simple test to use the ServiceBusTrigger in a WebJob S2 instance and the performance I got was about 22 messages a second. The current model is just not sufficient for higher throughput.

The sample I am using is here https://github.com/tcsatheesh/samples/tree/master/ServiceBusTrigger

If you change the PrefetchCount and MaxConcurrentCalls to say 100 and 100. I still only see a throughput of 21msg/sec.

@christopheranderson
Copy link

Related to Azure/azure-webjobs-sdk#570, but keeping this one open due to the completness of the request.

@roband915
Copy link

roband915 commented Oct 11, 2017

Are there any work in progress with this issue?

@mishakogan
Copy link

mishakogan commented Dec 4, 2017

I have the same exact issue but for azure storage queues. I need to subscribe to a "batch" of queue messages instead of just one using Azure Functions.
What is the status of this issue? ref: ealsur/WebJobs.Extensions.GroupQueueTrigger#3

@sergey-netdev
Copy link

Any update on this? Performance of ServiceBusTrigger is just unacceptable (like 5 msg/sec in my case and 10-15% cpu load max).

@mattmcdEcova
Copy link

Our organization would really love to leverage this functionality when it becomes available

@bryceg
Copy link

bryceg commented Mar 1, 2018

This would be great for us also, we use javascript for our funcs so perhaps another config parameter in the binding configuration to specify batch size and max wait time for that batch size

@syedhassaanahmed
Copy link

+1

@nzthiago
Copy link
Member

nzthiago commented Mar 6, 2018

We also have customers asking for this one

@MayankSri
Copy link

ServiceBus batch trigger would be a great option for multiple scenarios.

@leszekkoc
Copy link

+1

1 similar comment
@gorillapower
Copy link

+1

@ThomasArdal
Copy link

Can people please thumbs up on the issue rather than +1? +1 causes an email, which I don't really want in this scenario (and now I'm causing an email with this! 😃)

@batizar
Copy link

batizar commented Apr 12, 2019

@christopheranderson any updates on this?! Thanks.

@alrod alrod self-assigned this May 15, 2019
@M05pr1mty
Copy link

Is there any update on when to expect this? Would be critical for a current project volume. Thanks!

@alrod
Copy link
Member

alrod commented May 30, 2019

ServiceBus SDK does not support batched receive. I marked the issue as blocked until batched receive be implemented in ServiceBus SDK.
Service Bus SDK is open source, so community contributions are welcome.

@SeanFeldman
Copy link

SeanFeldman commented May 30, 2019

Service Bus supports Task<IList<Message>> ReceiveAsync(int maxMessageCount) which is intended to receive more than a single message. While there's an issue with returning maxMessageCount or less, this would still be considered a batched receive.
@alrod could you please elaborate on your comment?

@jeffhollan
Copy link

Great @fabiocav @alrod would be good to consider this as one of the items as we get ready to “GA” sessions shortly. Also requires we pass in a batch to the execution or else order would get all fuddled.

@alrod
Copy link
Member

alrod commented Jun 13, 2019

PR: Azure/azure-webjobs-sdk#2224

@alrod
Copy link
Member

alrod commented Aug 5, 2019

The feature is in beta state (Microsoft.Azure.WebJobs.Extensions.ServiceBus 3.1.0-beta4-batch). If you want to try please reference the package from here:
https://www.myget.org/feed/azure-appservice/package/nuget/Microsoft.Azure.WebJobs.Extensions.ServiceBus

I would be grateful for any feedback.

@alrod alrod transferred this issue from Azure/azure-webjobs-sdk Aug 5, 2019
@SeanFeldman
Copy link

@alrod could you please share a link to the documentation (or PR) for this new addition? Thank you.

@alrod
Copy link
Member

alrod commented Aug 6, 2019

There is not documentation in the moment.
Using:

        [FunctionName("Function1")]
        public static void Run([ServiceBusTrigger("myqueue", Connection = "AzureWebJobsServiceBus")]string[] mySbMsg, ILogger log)
        {..}

Also you can specify Message[], byte[][], PocoClass[] as the input trigger param.

Note ServiceBusOptions defined in host.json don't apply to batch processing

PR:
#11

@nzthiago
Copy link
Member

@alrod does that batching work with sessions enabled? I.e., can we batch read and have the "isSessionsEnabled" attribute?

@alrod
Copy link
Member

alrod commented Aug 28, 2019

@alrod does that batching work with sessions enabled? I.e., can we batch read and have the "isSessionsEnabled" attribute?

Unfortunately Task<IList<Message>> ReceiveAsync(int maxMessageCount) does not support sessions. So there is no session support for batch trigger.

@nzthiago
Copy link
Member

@alrod I think when you use ISessionClient (which inherits and expands on IMessageReceiver) it will still be able to ReceiveAsync with a batch for a specific SessionId (gotta call AcceptMessageSessionAsync first to get an IMessageSession).

I created a simple sample netcoreapp2.2 here: https://github.com/nzthiago/ServiceBusBatchReceiveWithSessions

As you can see it sends 10 messages to 2 sessions ("1", and "2"). Then it tries to read from only session "1" in batches of 5 and it's successful:
image

@rasavant-ms
Copy link

rasavant-ms commented Aug 29, 2019

VERIFIED THAT THIS WORKS WITH THE BETA PACKAGE. IGNORE MESSAGE!

Would be nice to have the metadata properties available for the earlier versions supported as valid parameter Types in the new package version - Microsoft.Azure.WebJobs.Extensions.ServiceBus 3.1.0-beta4-batch

I got an error while executing a function with the 'EnqueuedTimeUtc' property -
[8/29/2019 3:39:35 AM] Function1: Microsoft.Azure.WebJobs.Host: Error indexing method 'Function1'. Microsoft.Azure.WebJobs.Host: Cannot bind parameter 'enqueuedTimeUtc' to type DateTime. Make sure the parameter Type is supported by the binding. If you're using binding extensions (e.g. Azure Storage, ServiceBus, Timers, etc.) make sure you've called the registration method for the extension(s) in your startup code (e.g. builder.AddAzureStorage(), builder.AddServiceBus(), builder.AddTimers(), etc.).

@JoeyRobles
Copy link

@alrod Microsoft.Azure.WebJobs.Extensions.ServiceBus was updated to 3.1.0 a couple of days ago
Was batching in Service Bus Queue Triggers included? Thanks in advanced.

@remartens
Copy link

@JoeyRobles it looks like it doesn't. It looks like the 3.1.0 is based on the 3.1.0-beta4 and not on the 3.1.0-beta4-batch. :-(

@batizar
Copy link

batizar commented Sep 24, 2020

Also how the batch size would work in relation to prefetch count? thanks.

@RobertPinson
Copy link

@alrod Does the Service Bus trigger batching work for node.js functions?

@alrod
Copy link
Member

alrod commented Oct 1, 2020

@RobertPinson yes, it works. you need to add "cardinality": "many" in function.json
#65

@RobertPinson
Copy link

@nzthiago
Copy link
Member

nzthiago commented Oct 1, 2020

Addressing this documentation feedback would help with all these later questions in this thread: https://github.com/MicrosoftDocs/azure-docs/issues/52706

@alrod
Copy link
Member

alrod commented Oct 1, 2020

@RobertPinson, @batizar
This is not documented yet. You need to use host.json.

Example:

"ServiceBusOptions" : {
    "BatchOptions": {
        "MaxMessageCount": 1000,
        "OperationTimeout": "00:01:00"
        "AutoComplete": "true"
    }
}

@jeffhollan
Copy link

Created a task to track this (internal doc tracking site with our team) https://dev.azure.com/mseng/TechnicalContent/_workitems/edit/1778524

@alrod other piece here would be what the function.json would look like for non-C# when using batch. Does this support cardinality: many or is that not needed? In short what would my JavaScript function look like that receives a batch instead of a single message?

@shaeney
Copy link

shaeney commented Oct 2, 2020

Assuming you have Batching via the array of Messages, as per:

[ServiceBusTrigger("%ActivitiesSource%", "%ActivitiesSourceSubscription%", Connection = "ActivitiesConnectionString"], Message[] messages, MessageReceiver messageReceiver, ILogger log)

Assuming AutoComplete is off, an unhandled exception is thrown "Operation is not valid due to the current state of the object." when using the MessageReceiver to Complete or DeadLetter the message.

messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

If you remove batching, by removing the array, the same operations no longer generate the exception?

Does batching, via the array, somehow prevent interact with the MessageReceiver to throw the exception?

@batizar
Copy link

batizar commented Oct 6, 2020

@RobertPinson, @batizar
This is not documented yet. You need to use host.json.

Example:

"ServiceBusOptions" : {
    "BatchOptions": {
        "MaxMessageCount": 1000,
        "OperationTimeout": "00:01:00"
        "AutoComplete": "true"
    }
}

@alrod thanks for you reply, how is AutoComplete setting there is different than what we set in MessageHandlerOptions in WebJobs appSettings like this? does that mean that the whole batch gets completed as one or each messages can be completed when its processing is finished?

"AzureWebJobs": {
    "Extensions": {
      "ServiceBus": {
        "PrefetchCount": 50,
        "MessageHandlerOptions": {
          "AutoComplete": false,
          "MaxConcurrentCalls": 16,
          "MaxAutoRenewDuration": "00:05:00"
        }
      }
    }
  }

@shaeney
Copy link

shaeney commented Oct 6, 2020

I have a Function with a ServiceBusTrigger, such as below
public Task SingleMessage( [ServiceBusTrigger("%ActivitiesSource%", "%ActivitiesSourceSubscription%", Connection = "ActivitiesConnectionString")] Message message, ILogger log)
This generates an exception:

Microsoft.Azure.WebJobs.Script.WebHost: Scope disposed{no name} is disposed and scoped instances are disposed and no longer available.

However, if I simply swap "Message message" to "string message", it works?

I should add, I want to use Batch Messages, with the message receiver, with autocomplete disabled, such that I can manage failures of each message in the batch independently.

@tripodsan
Copy link

tripodsan commented Oct 7, 2020

thanks @alrod the cardinality: many worked in my node function, can you set maxMessageCount in a node function?

Hi @RobertPinson, when I try this, I get an error:

Binding parameters to complex objects (such as 'Object[]') uses Json.NET serialization or XML object serialization. 1. If ContentType is 'application/json' deserialize as JSON 2. If ContentType is not 'application/json' attempt to deserialize using Message.GetBody, which will handle cases like XML object serialization .....

similar to this: Azure/azure-functions-templates#905

but I'm not able to solve this...

Nevermind, I found my problem, I needed to ensure I get the 2.x extensions:

  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }

However, the batching still doesn't work....

@RobertPinson
Copy link

RobertPinson commented Oct 7, 2020

thanks @alrod the cardinality: many worked in my node function, can you set maxMessageCount in a node function?

Hi @RobertPinson, when I try this, I get an error:

Binding parameters to complex objects (such as 'Object[]') uses Json.NET serialization or XML object serialization. 1. If ContentType is 'application/json' deserialize as JSON 2. If ContentType is not 'application/json' attempt to deserialize using Message.GetBody, which will handle cases like XML object serialization .....

similar to this: Azure/azure-functions-templates#905

but I'm not able to solve this...

Nevermind, I found my problem, I needed to ensure I get the 2.x extensions:

  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }

However, the batching still doesn't work....

Hi @tripodsan the Service Bus batch read feature is in the latest version of the Microsoft.Azure.WebJobs.Extensions.ServiceBus 4.2.0 which is not in the latest extension bundle. the version of the service bus extension in the latest bundle is 3.2.0.

To explicitly install the extensions you need to remove the extensionBundle section from host.js and run func extensions install

instructions here https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-register

@nicklasjepsen
Copy link

Really appreciate the support for a batch trigger, however, I can't find it documented anywhere. Is there any official documentation of this functionality?

@jashuRc
Copy link

jashuRc commented May 6, 2021

Assuming you have Batching via the array of Messages, as per:

[ServiceBusTrigger("%ActivitiesSource%", "%ActivitiesSourceSubscription%", Connection = "ActivitiesConnectionString"], Message[] messages, MessageReceiver messageReceiver, ILogger log)

Assuming AutoComplete is off, an unhandled exception is thrown "Operation is not valid due to the current state of the object." when using the MessageReceiver to Complete or DeadLetter the message.

messageReceiver.CompleteAsync(message.SystemProperties.LockToken);

If you remove batching, by removing the array, the same operations no longer generate the exception?

Does batching, via the array, somehow prevent interact with the MessageReceiver to throw the exception?

https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#message-settlement

@jashuRc
Copy link

jashuRc commented May 6, 2021

Really appreciate the support for a batch trigger, however, I can't find it documented anywhere. Is there any official documentation of this functionality?

Here: https://github.com/Azure/azure-sdk-for-net/blob/Microsoft.Azure.WebJobs.Extensions.ServiceBus_5.0.0-beta.2/sdk/servicebus/Microsoft.Azure.WebJobs.Extensions.ServiceBus/README.md#azure-webjobs-service-bus-client-library-for-net

@nzthiago
Copy link
Member

nzthiago commented May 6, 2021

I would recommend that to be in the official docs on https://docs.microsoft.com/en-us/azure/azure-functions/functions-bindings-service-bus for better discoverability given @nicklasjepsen couldn't find it (including non .NET languages)

@jothyb
Copy link

jothyb commented May 24, 2021

@alrod Can we use service bus batching in java?Should we add anything in the @ServiceBusQueueTrigger annotation to enable this

@nzthiago
Copy link
Member

@jothyb it doesn't seem like there's documentation for the batch trigger in Java yet; but it's certainly possible, with the Cardinality set to MANY and an array of string in the trigger declaration, see here for an example: https://github.com/Azure/azure-functions-java-worker/blob/976635d81a31f2464f8b5ca1925d91b6e1b139a2/endtoendtests/src/main/java/com/microsoft/azure/functions/endtoend/ServiceBusQueueTriggerTests.java#L24

@jothyb
Copy link

jothyb commented May 25, 2021

@nzthiago Thank you for the response .I was able to receive batch messages,but now the problem is I need to get the message properties(custom properties) which is set as part of each message.For single message we achieved it using bindingName,but for batch is there a way to get it in java @alrod

@nzthiago
Copy link
Member

@jothyb yes you'll need to update the extension bundle to v2, it seems the current VS Code template still points to v1.

I did some digging, it seems that the User Properties are exposed by the Service Bus extension, so I played with it a little and was able to get the user properties by using @BindingName("UserPropertiesArray"). Here's my example:

package com.function;
import java.util.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.*;

public class Sbjavatesttempfunc {
    @FunctionName("Sbjavatesttempfunc")
    public void Sbjavatesttempfunc(
            @ServiceBusQueueTrigger(name = "messages", queueName = "sbjavatesttempqueue", connection = "ServiceBusConnString", 
                                    cardinality = Cardinality.MANY, dataType = "string") List<String> messages,
            @BindingName("UserPropertiesArray") List<String> userPropertiesArray,
            final ExecutionContext context) {
        context.getLogger().info("Java Service Bus trigger processed a request. Total message count:" + messages.size());

        for(int i = 0; i < messages.size(); i++){
            context.getLogger().info("Message number : " + i);
            context.getLogger().info(messages.get(i));
            context.getLogger().info("User properties for message: " + i);
            context.getLogger().info(userPropertiesArray.get(i));
        }
    }
}

When it reads a batch of messages, it will print out as follows:
image

You can probably convert the userPropertiesArray to a Map, but I kept it simple, let me know if this works for you.

@jothyb
Copy link

jothyb commented May 26, 2021

Thank you @nzthiago ,it worked 👍 .

@Nehmiabm
Copy link

Hello there,

Is batching currently supported only for InProcess funcitons? I tried the host batch configurations, for Isolated function, with string[] ServiceBusTrigger param but it's not working. Any update if this is supported for Isolated functions?

@anirudhgarg
Copy link
Member

@nzthiago Thank you for the response .I was able to receive batch messages,but now the problem is I need to get the message properties(custom properties) which is set as part of each message.For single message we achieved it using bindingName,but for batch is there a way to get it in java @alrod

Here are some examples: https://github.com/Azure-Samples/azure-functions-samples-java/blob/master/src/main/java/com/functions/ServiceBusQueueTriggerFunction.java#L26

@nzthiago
Copy link
Member

Hello there,

Is batching currently supported only for InProcess funcitons? I tried the host batch configurations, for Isolated function, with string[] ServiceBusTrigger param but it's not working. Any update if this is supported for Isolated functions?

SB batch trigger should work in isolated worker so the team will investigate as part of Azure/azure-functions-dotnet-worker#906 to see if there's any issues. Please follow that issue given this one is closed.

@kshyju
Copy link
Member

kshyju commented May 24, 2022

@Nehmiabm Batching of messages works as expected, for service bus trigger in dotnet isolated model. See working sample code & host JSON here. Let us know if you still have trouble with this.

@Niladri24dutta
Copy link

Niladri24dutta commented Jan 24, 2023

@nzthiago I am trying to use batch messages in service bus trigger for Azure functions . I am using Azure functions python library . As per the below URL I can see that the batching was enabled on python version from 1.5.0 onwards
Azure/azure-functions-python-library#73
https://github.com/Azure/azure-functions-python-library/releases/tag/1.5.0
I am using the below URL as a reference for extensions under bundle version 3x.
https://github.com/Azure/azure-functions-extension-bundles/blob/4f5934a18989353e36d771d0a964f14e6cd17ac3/src/Microsoft.Azure.Functions.ExtensionBundle/extensions.json
But when I am trying to use batch messages in my function app , I am getting JSON .NET deserialization error in the binding parameter. Here are the details -

Azure function version : 4
Azure function python package version : 1.8.0
Extension bundle version : 3x

host.json

{

  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.3.0, 4.0.0)"
  },
  "extensions":   {
    "serviceBus": {
      "prefetchCount": 20,
      "messageHandlerOptions": {
        "autoComplete": true,
        "maxConcurrentCalls": 1,
        "maxAutoRenewDuration": "00:05:00"
      }
    }
  }
}

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "serviceBusTrigger",
      "direction": "in",
      "cardinality": "many",
      "queueName": "queue-many",
      "connection": "SERVICE_BUS_CONNECTION"
    }
  ]
}

sample code in function app

import logging

import azure.functions as func
from typing import List


def main(msg: List[func.ServiceBusMessage]):
    message_length = len(msg)
    if message_length > 1:
        logging.warn('Handling multiple requests')

am I missing anything here ?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests