Fluent Bit #2 - Data Replay

Apr 20, 2025 min read

In my previous post, I demonstrated how to set up basic aggregated logging for firewall events using Fluent Bit, effectively reducing log ingestion costs in a way similar to Sentinel’s Summary rules. In this follow-up, I will walk you through a practical solution for storing these logs cost-effectively in an Azure Storage Account and show how you can later bring them back into Sentinel for analysis.

This setup serves as a simple proof of concept, highlighting how you can utilize affordable storage to retain logs for detection without sacrificing access to detailed information when needed.

Featured image Featured image

Recap: What We’ve Built So Far

In part 1, we set up the foundation for our logging solution. The initial configuration could:

  1. Receive traffic using syslog on port 5555
  2. Extract the source and destination IPs from sample log entries
  3. Aggregate log data through stream processing
  4. Forward aggregated logs directly to Sentinel

Initial pipeline Initial Fluent Bit pipeline

What’s next?

Now, we are going to enhance our setup with new features and make a few tweaks to the existing code. Here are the steps of the final solution:

  1. Receive traffic using syslog on port 5555 (no change)
  2. Extract the source and destination IPs from sample log entries (no change)
  3. Attach an extra timestamps as tags to each event (new step)
  4. Aggregate log data through stream processing (slight tagging modification)
  5. Store firewall ‘raw’ events in an Azure Storage Account (new step)
  6. Forward aggregated logs directly to Sentinel (no change)
  7. Create a Function App to retrieve stored logs when needed (new step)

Enhanced pipeline Enhanced pipeline - restoring only specific blobs (red)

Diving Deeper

3. Adding Timestamps as Tags

In Fluent Bit, tags play a crucial role. They’re used for various actions; in our case, they are used to filter events and determine the naming of Storage Blobs.

We want the storage blob name to include timestamp information so we can easily identify the correct blob during retrieval. Since the blob name will be set to the event’s tag, we need to update our solution to modify the event tags from ‘syslog.5555’ to a value that includes the timestamp.

Retagging pipeline Retagging pipeline. See the explanation and code below.

First, we use a LUA script to attach a new datetime field to each event, grouping them into 15-minute intervals (bins). Next, we update the tag format to look like this: firewall.[datetime tag].

  1. ‘firewall’: The static ‘firewall’ part helps us identify all the firewall logs later in the pipeline .
  2. datetime tag: The dynamic datetime segment groups all logs from the same 15-minute window together. This way, all logs in the same time bin are stored in a single Azure Blob, making it simple to restore just the relevant blob(s) when needed.

fluent-bit.conf file:

[FILTER]
    Name        lua
    Match       syslog.5555
    Script      /etc/fluent-bit/timestamp_tag.lua
    Call        add_datetime


[FILTER]
    Name          rewrite_tag
    Match         syslog.5555
    Rule          $datetime .* firewall.$datetime false
    Emitter_Name  re_emitted

timestamp_tag.lua file:

function add_datetime(tag, timestamp, record)
    -- Parse timestamp
    local ts = timestamp
    local year = os.date("%Y", ts)
    local month = os.date("%m", ts)
    local day = os.date("%d", ts)
    local hour = os.date("%H", ts)
    local minute = os.date("%M", ts)

    -- Round minute to 15-minute bins
    local minute_bin = math.floor(minute / 15) * 15

    -- Format the datetime string with the rounded minute
    local datetime_str = string.format("%s-%s-%s-%s-%02d-00", year, month, day, hour, minute_bin)

    new_record = record
    new_record["datetime"] = datetime_str
    return 1, timestamp, new_record
end

5. Store firewall events in an Azure Storage Account

Fluent Bit provides a built-in module for sending logs directly to Azure Blob Storage. By configuring the azure_blob output plugin, you can specify which logs to forward - in our case, all firewall logs - so they are written straight to your chosen Azure Storage container.

Thanks to the static ‘firewall’ part of the tagging step we implemented earlier, it is straightforward to select these logs using a match rule like ‘Match firewall.*’ in the configuration.

Firewall logs in the blob storage Storage Blob naming convention.

The name of each blob created in Azure will correspond to the tag of the event. For example, if a log has a timestamp of ‘2025-04-17 12:13:10’, its tag would be ‘firewall.2025-04-17-12-00-00’, and the resulting blob will use this as its filename. This approach ensures that logs are neatly organized in storage by their time bins, making retrieval and management much easier.

fluent-bit.conf file:

[OUTPUT]
    Name azure_blob
    Match firewall.*
    account_name     <storage_account_name>
    auth_type        key
    shared_key       <shared_key>
    container_name   <container_name>
    blob_type        appendblob
    auto_create_container on
    tls             on

At this stage, with a straightforward configuration, we’ve built a solution that processes incoming firewall logs and aggregates them by source and destination IP. The aggregated data is forwarded to Sentinel, while all raw firewall logs are stored in an Azure Storage Account, organized into 15-minute blobs.

7. Create a Function App to retrieve stored logs when needed

All the aggregated logs are in Sentinel and all the raw data is in the Storage Account. But the aggregated data is not always enough for proper investigations, hence it is a valid request from SOC analysts to be able to have access to the full information set. This can be done by developing a tool that reintroduces data from the Storage Account into Sentinel.

This section presents a PowerShell script to retrieve a blob by timestamp and fetch additional blobs from before and after that time. Analysts can specify a timestamp and the number of extra blobs to restore for deeper investigations. For example, selecting 2025-05-17 14:00:00 and 3 additional blobs retrieves:

  • firewall.2025-05-17-13-15-00 (3)
  • firewall.2025-05-17-13-30-00 (2)
  • firewall.2025-05-17-13-45-00 (1)
  • firewall.2025-05-17-14-00-00 (selected blob)
  • firewall.2025-05-17-14-15-00 (1)
  • firewall.2025-05-17-14-30-00 (2)
  • firewall.2025-05-17-14-45-00 (3)

You can implement more advanced storage and restoration features. The shared solution may not always precisely identify the correct blob and will restore entire blobs instead of specific events.

The Powershell code does not include the Sentinel ingestion logic. You can easily add either a legacy or a DCR-based ingestion method. The code only demonstrates reading logs from blobs.

First, we set up the essential variables and parameters. Authentication happens via Connection Strings:

param (
    [Parameter(Mandatory)]
    [string]$initialTimestamp,  # Format: "yyyy-MM-dd HH:mm:ss"

    [Parameter(Mandatory)]
    [int]$numberOfFiles
)


# Define variables
$storageConnectionString = "CONNECTIONSTRING"
$containerName = "CONTAINERNAME"

# Create a list to store the filenames
$fileList = @()

The necessary file names are generated based on the timestamps found in the blob names.

# parse timestamp for processing
$parsedTimestamp  = [datetime]::ParseExact($initialTimestamp, "yyyy-MM-dd HH:mm:ss", $null)

# Generate filenames with 15-minute increments before the initial timestamp
for ($i = -($numberOfFiles); $i -le 0; $i++) {
    $timestamp = $parsedTimestamp.AddMinutes(15 * $i)
    $filename = "firewall." + $timestamp.ToString("yyyy-MM-dd-HH-mm-ss")
    $fileList += $filename
}

# Generate filenames with 15-minute increments after the initial timestamp
for ($i = 1; $i -le $numberOfFiles; $i++) {
    $timestamp = $parsedTimestamp.AddMinutes(15 * $i)
    $filename = "firewall." + $timestamp.ToString("yyyy-MM-dd-HH-mm-ss")
    $fileList += $filename
}

Get all the blobs:

# Create a storage context using the connection string
$context = New-AzStorageContext -ConnectionString $storageConnectionString

# Get all blobs in the container
$blobs = Get-AzStorageBlob -Container $containerName -Context $context

Next, locate the blobs previously identified and added to the fileList. Retrieve their log contents and process them in smaller batches to ensure compatibility with the Log Analytics or Data Ingestion API limits.

foreach ($fileName in $fileList) {
    # Find the blob with the matching name
    $blob = $blobs | Where-Object { $_.Name -eq $fileName }
    
    if ($blob) {
        # Retrieve the content of the blob
        $cloudBlob = Get-AzStorageBlob -Blob $blob.Name -Container $containerName -Context $context
        $blobContent = $cloudBlob.ICloudBlob.DownloadText()

        # Split the content into entries
        $entries = $blobContent.Split([Environment]::NewLine) | Where-Object { $_ -ne "" }
        
    # Process entries in batches of 100
    $batchSize = 100
    for ($i = 0; $i -lt $entries.Count; $i += $batchSize) {
        $batch = $entries[$i..($i + $batchSize - 1)]
        
        # Process the batch of entries
        Write-Host "Processing batch of $($batch.Count) entries"
        $batchJson = $batch  | ConvertFrom-Json| ConvertTo-Json
        ############################
        ## SEND DATA TO SENTINEL ###
        ############################
    }

    } else {
        Write-Host "Blob with name $fileName does not exist."
    }
}

The provided PowerShell code can be deployed as a Function App and triggered manually by SOC analysts through a Sentinel Workbook, or automatically via an Automation Rule and Playbook when an incident occurs.

Final Solution

The goal was to make firewall logging more cost-effective while still supporting TI-based alerting and enabling full investigation capabilities. To achieve this, we used aggregated logging - sending only event counts by source and destination IP pairs to Sentinel in 15-minute intervals. Additionally, we leveraged low-cost storage accounts and a replay functionality, allowing us to retain and reintroduce all raw logs for detailed investigations when needed.

This is just a proof of concept designed to showcase the capabilities of Fluent Bit and similar tools. You are encouraged to use it as a starting point and adapt and expand it to fit your own requirements.

With the growing volume of logs being generated in modern enterprise environments, data storage costs are steadily increasing. It’s becoming more important than ever to manage your log data efficiently. Instead of sending all logs to expensive storage solutions, organizations should consider filtering, aggregating, or routing only the most valuable data. This approach can help control costs while still ensuring that critical information is available for analysis and alerting. Proper data management strategies will be essential as log volumes and associated expenses continue to rise.