Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to add batchsize in blob triggered Azure Function #10624

Closed
anime-shed opened this issue Nov 13, 2024 · 8 comments
Closed

How to add batchsize in blob triggered Azure Function #10624

anime-shed opened this issue Nov 13, 2024 · 8 comments

Comments

@anime-shed
Copy link

To prevent race conditions and exceed the 1.5 GB memory limit, I would like the BlobTrigger to process only 1 file at a time.

host.json

{
  "version": "2.0",
  "concurrency": {
    "dynamicConcurrencyEnabled": true,
    "snapshotPersistenceEnabled": true
  },
  "logging": {
    "logLevel": {
      "default": "Error"
    }
  },
  "extensions": {
    "blobs": {
      "maxDegreeOfParallelism": 2,
      "poisonBlobThreshold": 1
    },
    "queues": {
            "maxPollingInterval": "00:00:02",
            "visibilityTimeout" : "00:00:30",
            "batchSize": 1,
            "maxDequeueCount": 5,
            "newBatchThreshold": 8
        }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

The deployment for the above is in AKS.

I tried "How to ensure only one Azure Function BlobTrigger runs at a time?", but that also does not seem to be working.

@liliankasem
Copy link
Member

Can you share more about the function? Which language/stack? Extension version? Host version? The process may vary depending on this information, but in general, you need to set batchSize to 1, and newBatchThreshold to 0.

the maximum number of concurrent messages being processed per function is batchSize plus newBatchThreshold. This limit applies separately to each queue-triggered function.”

docs

It's also worth noting that there is a newer eventGrid based approach to blob triggers that is better to use:

https://learn.microsoft.com/en-us/azure/azure-functions/functions-event-grid-blob-trigger?pivots=programming-language-csharp

@anime-shed
Copy link
Author

anime-shed commented Nov 14, 2024

Which language/stack? Extension version? Host version?

host: 2.0
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobsFeatureFlags": "EnableWorkerIndex",
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1",

from blob_helper import initialize_blob_service_client,upload_dataframe_to_blob
import logging
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.function_name(name="PythonFunction")
@app.blob_trigger(
    arg_name="myblob", 
    path="sheets/input/{name}",  # Blob path for trigger
    connection="DataLakeConnectionString"
)

I can share any other specific information that may be required but I may not be able to share the entire code.

@liliankasem
Copy link
Member

Thanks, and did the guidance I shared not work for you?

@anime-shed
Copy link
Author

@liliankasem I tried what you suggested:
Image
Image

but as you can see here all files are read.

@liliankasem
Copy link
Member

Okay I was able to get this working with the following setup - can you give this a shot?

host.json

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensions": {
    "blobs": {
      "maxDegreeOfParallelism": 1
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

function_app.py

import azure.functions as func
import datetime
import json
import logging
from time import sleep

app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)


@app.blob_trigger(arg_name="myblob", path="samples-workitems/{name}", connection="BlobStorageConnectionString")
def BlobTrigger(myblob: func.InputStream):
    logging.info(f"Python blob trigger function processed blob. Now sleeping for 10 seconds")

    sleep(10)

    logging.warn(f"Name: {myblob.name} | Blob Size: {myblob.length} bytes")

Portal

App Settings

All the usual settings with the addition of:

  • WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1

Scale Out

Set scale out to 1:

Image

Results

I uploaded a bunch of files at the same time:

Image

And I can see the blob trigger processing them one at a time:

Image

@anime-shed
Copy link
Author

anime-shed commented Nov 18, 2024

I will try to make changes in the function app and let you know the results. But my deployment is on Azure Kubernetes Service, and there I can set the Appsetting var to WEBSITE_MAX_DYNAMIC_APPLICATION_SCALE_OUT = 1, but how do have the Dynamic Scale-Out enforcing, is it equal to the number of pods is a deployment?

Edit 1:
Also, is the above not possible with blobClient?

import logging
app = func.FunctionApp(http_auth_level=func.AuthLevel.ANONYMOUS)
@app.function_name(name="PythonFunction")
@app.blob_trigger(
    arg_name="myblob", 
    path="sheets/input/{name}",  # Blob path for trigger
    connection="DataLakeConnectionString"
)
def blob_trigger(myblob: blob.BlobClient):    
    # Read the blob content once
    blob_content = myblob.download_blob().readall()

@prashantguleria
Copy link

To avoid this you need to set newBatchThreshold as 0 , SO you can process only 1 file at a time.

@anime-shed
Copy link
Author

anime-shed commented Dec 11, 2024

@prashantguleria, it seems to be working fine on a single instance deployment with host.json:

{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensions": {
    "blobs": {
      "maxDegreeOfParallelism": 1
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  }
}

However, I am still facing issues with using it as is on a Kubernetes multiple cluster deployment.
Azure/azure-functions-python-worker#1613.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants