Skip to content

The repository contains Python code for a data pipeline that ingests files from Azure Blob Storage, extracts and chunks text, redacts sensitive information, and generates embeddings. The embeddings and redacted chunks are stored in a SQL database in Fabric

License

Notifications You must be signed in to change notification settings

Azure-Samples/fabric-sqldb-ai-ragpipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

26 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

RAG Pipeline using SQL Database in Fabric

This project provides step-by-step guidance for building a Retrieval Augmented Generation (RAG) pipeline to make your data AI-ready using Microsoft . The RAG pipeline is triggered every time a file is uploaded to Azure Blob Storage. The pipeline processes the file by extracting the textual content, chunking the text, redacting any PII data, generating embeddings, and finally storing the embeddings in a SQL Database in Fabric as a vector store. The stored content can later be utilized to build AI applications such as recommendation engines, chatbots, smart search, etc

Data Pipeline Activity Workflow

Prerequisite

This project requires users to bring their own key (BYOK) for AI services, which also means creating these services outside of the Microsoft Fabric platform.

Dataset

Considering the file formats supported by the Document Intelligence Service, we will utilize the PDF files from Resume Dataset from Kaggle.

Steps

  1. Create a workspace named IntellegentApp.

  2. Create a Lakehouse named blob_filestorage.

  3. Create SQL Database in Fabric named datamart.

  4. Navigate to the workspace IntelligentApp, click Import - Notebook - From this computer and then import the notebook cp_azblob_lakehouse.ipynb from the cloned repository’s notebook folder. Import Notebook

  5. Attach the Lakehouse blob_filestorage to cp_azblob_lakehouse notebook

    • Open the notebook, on the Explorer click Add sources.
    • Select Existing data sources.
    • Select blob_filestorage from OneLake catalog and then click Connect.

    Attach Lakehouse

  6. Create a User Data Function

    • Navigate to the IntelligentApp workspace and click +New item
    • Search for Function and Select "User data function(s)".
    • Provide the name file_processor.
    • Click "New Function".
    • Add Lakehouse and SQL Database as managed connection(s).
      • In the Home menu, click Manage Connections and then click "+ Add data connection".
      • From the OneLake catalog select datamart (SQL Database) and then click "Connect".
      • Repeat the previous step to add blob_filestorage (Lakehouse) as a managed connection.
    • Click Library management, and add the following dependencies (Click "+ Add from PyPI to add the dependencies").The dependencies are also listed in /functions/requirements.txt file of the cloned repository.Ensure you are using fabric-user-data-functions version 0.2.28rc0 or higher. Add Managed Connection & Dependencies
    • In the function editor, replace existing content with the contents of function\function_app.py from the cloned repository.
    • Click "Publsh"(on the top right) to deploy the function. Once the functions are deployed, click "Refresh".
  7. Create a Data Pipeline by navigating to the workspace and then clicking on "+ New Item"

    • Search and select "Data Pipeline".
    • Provide the name blob_ingest_pipeline.
  8. Create a Data Pipeline storage trigger by clicking "Add trigger (preview)" button and provide the following configuration :

    • Source: Select Azure Blob Storage events.

    • Storage account: Connect to existing Azure Blob Storage account.

    • Subscription: Select your Azure subscription.

    • Azure Blob Storage account: Select the blob storage account under your subscription.

    • Eventstream name: blob_ingest_stream.

      Create Trigger

    Click "Next", to configure the event type and source

    • Event Type(s): Select only the Microsoft.Storage.BlobCreated event. This will ensure that an event is generated each time a new blob object is uploaded.

    Click "Next" to review the configuration. Then, click "Connect" to connect to the blob storage. A successful connection will be indicated by the status "Successful". Finally, click "Save".

    On the Set alert screen, under Save location, configure the following settings,

    • Select, Create a new item.
    • New item name: blob_activator

    Click "Create" to create and save the alert.

    Save alert

    Now that we have setup the stream, it's time to define the blob_ingest_pipeline.

    Pipeline Definition

    Pipeline can be defined in two distinct ways as outlined below,

    Import Template

    Templates offer a quick way to begin building data pipelines. Importing a template brings in all required activities for orchestrating a pipeline.

    To import a template,

    • Navigate to the Home menu of the data pipeline.
    • Click, Use a template
    • From the Pipeline templates page click Import template.
    • Import the file template/ AI-Develop RAG pipeline using SQL database in Fabric.zip from the cloned repository. Save alert

    The imported data pipeline is preloaded with all necessary activities, variables, and connectors required for end-to-end orchestration. Consequently, there is no need to manually add a variable or an activity. Instead, you can proceed directly to configuring values for the variables and each activity parameter in the pipeline, as detailed in the Blank Canvas section.

    Blank Canvas

    1. Establish pipeline variables. Click on the pipeline canvas, select the Variables menu, and then click "+ New" to add and configure values for the following variables:

      Configure Pipeline Parameters

      Name Type Value Comments
      fileName String @pipeline()?.TriggerEvent?.FileName
      container String @pipeline()?.TriggerEvent?.FolderPath
      source String @pipeline()?.TriggerEvent?.Source
      cognitiveServiceEndpoint String https://YOUR-MULTI-SERVICE-ACCOUNT-NAME.cognitiveservices.azure.com/ Replace YOUR-MULTI-SERVICE-ACCOUNT-NAME with the name of your multi-service account
      apiKey String YOUR-MULTI-SERVICE-ACCOUNT-APIKEY Replace YOUR-MULTI-SERVICE-ACCOUNT-APIKEY with the apikey of your multi-service account
      openAIEndpoint String https://YOUR-OPENAI-ACCOUNT-NAME.openai.azure.com/ Replace YOUR-OPENAI-ACCOUNT-NAME with the name of your Azure OpenAI Account
      openAIKey String YOUR-OPENAI-APIKEY Replace YOUR-OPENAI-APIKEY with the apikey of your Azure OpenAI Account
      embeddingModel String text-embedding-3-small
      recepientEmailAddress String to-email-address receipeint email address
      senderEmailAddress String from-email-address sender's email address

      Configure Pipeline Parameters

    2. Add a Notebook activity. The notebook associated with this activity utilizes NotebookUtils to manage file system. During the execution of the notebook, a folder corresponding to the container name will be created if it does not exist. Subsequently, the file will be copied from Azure Blob Storage to the Lakehouse folder. Configure this activity as outlined below:

      • General Tab,

        • Name: azureblob_to_lakehouse
      • Settings Tab,

        • Notebook: cp_azblob_lakehouse

        Base parameters

        Click "+New" to add the following parameters,

        • Name: fileName
          • Type: String
          • Value: @variables('fileName')
        • Name: container
          • Type: String
          • Value: @variables('container')
        • Name: source
          • Type: String
          • Value: @variables('source')

      Use On Success connector of the activity to link to the subsequent function (Extract Text) activity.

    3. Add a Functions activity. The function extract_text associated with this activity uses Azure AI Document Intelligence service to extract the "text" content from the file copied into the Lakehouse by the previous activity.Configure this activity as outlined below :

      • General Tab,

        • Name: Extract Text
      • Settings Tab,

        • Type: Fabric user data functions
        • Connection: : Sign-in (if not already) using your workspace credentials.
        • Workspace: IntelligentApp (default selecteed)
        • User data functions: file_processor
        • Function: extract_text

        Parameters:

        • Name: filePath
          • Type: str
          • Value: @activity('azureblob_to_lakehouse').output.result.exitValue
        • Name: cognitiveServicesEndpoint
          • Type: str
          • Value: @variables('cognitiveServiceEndpoint')
        • Name: apiKey
          • Type: str
          • Value: @variables('apiKey')

      Use On Completion connector of the activity to link to the subsequent If Condition (Text Extraction Results) activity.

    4. Add an If Conditions activity to verify the success of the text extraction in the previous step.If the extraction was unsuccessful, an email would be sent to the configured recepient and the pipeline would be terminated.Configure this activity as outlined below:

      • General Tab,
        • Name: Text Extraction Results
      • Activities Tab,
        • Expression: @empty(activity('Extract Text').error)

        • Case: False. Edit the false condition using the edit (pencil) icon, and add the following activities,

          Office 365 Outlook activity: To send alert emails.

          • General Tab,

            • Name: Text Extraction Failure Email Alert
          • Settings Tab,

            • Signed in as: Sign-in (if not already) using your workspace credentials.
            • To:@variables('recepientEmailAddress')
            • Subject:Text Extraction Error
            • Body:<pre>@{replace(string(activity('Extract Text').error.message), '\','')}</pre>

            Advanced,

            • From: @variables('senderEmailAddress')
            • Importance: High

            Use On Success connector of the activity to link to the subsequent Fail activity.

          Fail activity: To terminate the pipeline

          • General Tab,
            • Name: Text Extraction Process Failure
          • Settings Tab,
            • Fail message: @{replace(string(activity('Extract Text').error), '\','')}
            • Error code: @{activity('Extract Text').statuscode}

      Return to the main canvas by clicking the pipeline name blob_ingest_pipeline and use the On Success connector of the If Condition activity to link to the subsequent Function (Generate Chunks) activity.

    5. Add a Functions activity. The function chunk_text associated with this activity uses tiktoken tokenizer to "generate chunks" for the text extracted by the previous activity.Configure this activity as outlined below :

      • General Tab,

        • Name: Generate Chunks
      • Settings Tab,

        • Type: Fabric user data functions
        • Connection: : Sign-in (if not already) using your workspace credentials.
        • Workspace: IntelligentApp (default selecteed)
        • User data functions: file_processor
        • Function: chunk_text

        Parameters:

        • Name: text_
          • Type: str
          • Value: @activity('Extract Text').output.output
        • Name: maxToken
          • Type: int
          • Value: 500
        • Name: encoding
          • Type: str
          • Value: cl100k_base

      Use On Success connector of the activity to link to the subsequent Function (Redact PII Data) activity.

    6. Add a Functions activity. The function redact_text associated with this activity uses Azure AI Language service to "Redact PII Data" for the chunks generated by the preceding activity. The chunking of text is done prior to redaction to comply with the service limits requirements for the PII detection feature.Configure this activity as outlined below :

      • General Tab,

        • Name: Redact PII Data
      • Settings Tab,

        • Type: Fabric user data functions
        • Connection: : Sign-in (if not already) using your workspace credentials.
        • Workspace: IntelligentApp (default selecteed)
        • User data functions: file_processor
        • Function: redact_text

        Parameters:

        • Name: text
          • Type: list
          • Value: @activity('Generate Chunks').output.output
        • Name: cognitiveServicesEndpoint
          • Type: str
          • Value: @variables('cognitiveServiceEndpoint')
        • Name: apiKey
          • Type: str
          • Value: @variables('apiKey')

      Redact PII Data

      Use On Completion connector of the activity to link to the subsequent If Condition (PII Redaction Results) activity.

    7. Add an If Conditions activity to verify the success of the PII redaction in the previous step. If the redaction was unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated.Configure this activity as outlined below:

      • General Tab,
        • Name: PII Reaction Results
      • Activities Tab,
        • Expression: @empty(activity('Redact PII Data').error)

        • Case: False. Edit the false condition using the edit (pencil) icon, and add the following activities,

          Office 365 Outlook activity: To send alert emails.

          • General Tab,

            • Name: Redaction Failure Email Alert
          • Settings Tab,

            • Signed in as: Sign-in (if not already) using your workspace credentials.
            • To:@variables('recepientEmailAddress')
            • Subject:Text Extraction Error
            • Body:<pre>@{replace(string(activity('Redact PII Data').error.message), '\','')}</pre>

            Advanced,

            • From: @variables('senderEmailAddress')
            • Importance: High

            Use On Success connector of the activity to link to the subsequent Fail activity.

          Fail activity: To terminate the pipeline

          • General Tab,
            • Name: Text Extraction Process Failure
          • Settings Tab,
            • Fail message: @{replace(string(activity('Redact PII Data').error), '\','')}
            • Error code: @{activity('Redact PII Data').statuscode}

      Return to the main canvas by clicking the pipeline name blob_ingest_pipeline and use the On Success connector of the If Condition activity to link to the subsequent Function (Generate Embeddings) activity.

    8. Add a Functions activity. The function generate_embeddings associated with this activity uses Azure Open AI Service embedding model to convert the redacted chunks into embeddings.Configure this activity as outlined below :

      • General Tab,

        • Name: Generate Embeddings
      • Settings Tab,

        • Type: Fabric user data functions
        • Connection: : Sign-in (if not already) using your workspace credentials.
        • Workspace: IntelligentApp (default selecteed)
        • User data functions: file_processor
        • Function: generate_embeddings

        Parameters:

        • Name: text
          • Type: list
          • Value: @activity('Redact PII Data').output.output
        • Name: openAIServiceEndpoint
          • Type: str
          • Value: @variables('openAIEndpoint')
        • Name: embeddingModel
          • Type: str
          • Value: @variables('embeddingModel')
        • Name: openAIKey
          • Type: str
          • Value: @variables('openAIKey')
        • Name: fileName
          • Type: str
          • Value: @variables('fileName')

      Use On Completion connector of the activity to link to the subsequent If Condition (Generate Embeddings Results) activity.

    9. Add an If Conditions activity activity to verify the success of the Generate Embeddings in the previous step. If the embeddings generation were unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated.Configure this activity as outlined below:

      • General Tab,
        • Name: Generate Embeddings Results
      • Activities Tab,
        • Expression: @empty(activity('Generate Embeddings').error)

        • Case: False. Edit the false condition using the edit (pencil) icon, and add the following activities,

          Office 365 Outlook activity: To send alert emails.

          • General Tab,

            • Name: Generate Embeddings Failure Email Alert
          • Settings Tab,

            • Signed in as: Sign-in (if not already) using your workspace credentials.
            • To:@variables('recepientEmailAddress')
            • Subject:Generate Embeddings Error
            • Body:<pre>@{replace(string(activity('Generate Embeddings').error.message), '\','')}</pre>

            Advanced,

            • From: @variables('senderEmailAddress')
            • Importance: High

            Use On Success connector of the activity to link to the subsequent Fail activity.

          Fail activity: To terminate the pipeline

          • General Tab,
            • Name: Text Extraction Process Failure
          • Settings Tab,
            • Fail message: @{replace(string(activity('Generate Embeddings').error), '\','')}
            • Error code: @{activity('Generate Embeddings').statuscode}

      Return to the main canvas by clicking the pipeline name blob_ingest_pipeline and use the On Success connector of the If Condition activity to link to the subsequent Function (Create Database Objects) activity.

    10. Add a Functions activity. The function create_table associated with this activity executes a SQL command to create a documents table within the previously created datamart, SQL Database.Configure this activity as outlined below :

      • General Tab,
        • Name: Create Database Objects
      • Settings Tab,
        • Type: Fabric user data functions
        • Connection: : Sign-in (if not already) using your workspace credentials.
        • Workspace: IntelligentApp (default selecteed)
        • User data functions: file_processor
        • Function: create_table

      Use On Success connector of the activity to link to the subsequent If Condition (Generate Embeddings Results) activity.

    11. Add a Functions activity. The function “insert_data” associated with this activity executes a SQL command to bulk insert rows in the documents table created in the previous activity.Configure this activity as outlined below :

      • General Tab,

        • Name: Save Data
      • Settings Tab,

        • Type: Fabric user data functions
        • Connection: : Sign-in (if not already) using your workspace credentials.
        • Workspace: IntelligentApp (default selecteed)
        • User data functions: file_processor
        • Function: insert_data

        Parameters:

        • Name: data
          • Type: list
          • Value: @activity('Generate Embeddings').output.output

Troubleshooting

  • When adding a Python library from PyPI to User Data Functions, you might notice an error, such as a wiggly line under the library name (e.g., azure-ai-textanalytics), like a spelling mistake. Users should ensure the library name is spelled correctly and then ignore the error by tabbing out to the Version dropdown and selecting the correct version. This transient error should resolve itself.

  • The imported pipeline reportedly doesn't seem to preload with the parameter values. For each activity in the pipeline, ensure that the parameter values are provided and correct. Refer to the Blank Canvas section for the required parameters and their values.

Execute Pipeline (Pipeline in Action)

Let's put everything we have done so far into perspective and see the pipeline in action.

  • Upload a PDF file,

    • Use the Azure Storage Explorer or alternatively Azure Portal and create a Blob container named resume.
    • Upload a PDF file from the Kaggle dataset.

    Create a Container and Upload a PDF File

  • Pipeline execution review,

    • From the pipeline’s “Run” menu, select View run history and select the recent pipeline run.
    • In the details view, check to see if the status is Succeeded
    • In case of a Failure, try to Rerun the pipeline using the rerun option.

    Pipeline Review

  • Review Lakehouse,

    • A folder with the same name as that of the container (resume) is created.
    • The PDF file is copied from Azure Blob Storage to the Lakehouse files.

    Lakehouse Review

  • Review Database

    • The document table should be automatically created by the pipeline.
    • Redacted chunk data and the embeddings stored in the documents table.

    Database Review

About

The repository contains Python code for a data pipeline that ingests files from Azure Blob Storage, extracts and chunks text, redacts sensitive information, and generates embeddings. The embeddings and redacted chunks are stored in a SQL database in Fabric

Resources

License

Code of conduct

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published