Skip to content

Automating Detection Monitoring with Detection-as-Code

Original URL: https://blog.nviso.eu/2025/10/07/detection-engineering-practicing-detection-as-code-monitoring-part-7/?amp

Introduction

This article, part 7 of a series on Detection Engineering, focuses on automating the monitoring of deployed detections. Effective maintenance is crucial in detection engineering to prevent alert fatigue and ensure rules remain relevant. The article addresses the limitations of built-in SIEM/EDR features for rule monitoring and tuning, advocating for a proactive approach through automation to catch issues before they impact security operations.

Monitoring Checks

The article outlines four key types of monitoring checks to automate:

  • Trigger Rate of Detections: Monitoring the average rate at which detections are triggered.
  • Entities Triggering Rate: Identifying the frequency of entity appearances in detections.
  • Detection Tampering: Tracking and reporting when detections are disabled or deleted.
  • Detection Health: Monitoring daily and monthly failure rates of detections.

The goal is to automate these checks and generate Azure DevOps work items for identified issues.

App Registration

To access the Log Analytics API for querying detection information, an App registration in Azure is required. This app needs the "Log Analytics Reader" role. This role is assigned in the Log Analytics workspace under Access Control (IAM) -> Role Assignments.

Monitoring Detection Trigger Rate and Entities Appearances

The article provides a Kusto Query Language (KQL) query to analyze security incidents in Sentinel. This query calculates various metrics:

  • Incident Title and Related Analytic Rule ID
  • Incident Count within the lookback period
  • First and last occurrence timestamps
  • Daily Occurrence Average and Standard Deviation
  • Search Duration and Overall Daily Average/StdDev
  • Entities Appearances (entities and their counts)

The query uses incident_count_threshold and incident_avg_threshold to filter results, allowing incidents exceeding defined thresholds to be flagged.

let lookback_time = 30d;
let incident_count_threshold = -1;
let incident_avg_threshold = -1;
let incidents = SecurityIncident
| where TimeGenerated > ago(lookback_time)
| where ProviderName == "Microsoft XDR"
| mv-expand RelatedAnalyticRuleId = RelatedAnalyticRuleIds
| extend RelatedAnalyticRuleId = tostring(RelatedAnalyticRuleId)
| project Title, TimeGenerated, Day = startofday(TimeGenerated), RelatedAnalyticRuleId;
let daily_counts = incidents
| summarize DailyCount = count() by Title, RelatedAnalyticRuleId, Day;
daily_counts
| join kind=inner (
    incidents
    | summarize IncidentCount = count(), MinDateTime = min(TimeGenerated), MaxDateTime = max(TimeGenerated) by Title, RelatedAnalyticRuleId
) on Title and RelatedAnalyticRuleId
| summarize IncidentCount = sum(DailyCount), MinDateTime = max(MinDateTime), MaxDateTime = max(MaxDateTime), DailyOccurenceAvg = round(avg(DailyCount), 2), Occurences = count(), DailyOccurenceStdDev = round(stdev(DailyCount), 2) by Title, RelatedAnalyticRuleId
| extend DaysBetweenMinMax = datetime_diff("day", MaxDateTime, MinDateTime) + 1
| extend SearchDurationDays = datetime_diff("day", now(), ago(30d)) + 1
| extend FullSearchDailyAvg = round(todouble(IncidentCount) / todouble(SearchDurationDays), 2)
| extend FullSearchDailyStdDev = round(DailyOccurenceStdDev * sqrt(todouble(Occurences) / todouble(SearchDurationDays)), 2)
| where (incident_count_threshold != -1 and incident_avg_threshold != -1 and (IncidentCount > incident_count_threshold or FullSearchDailyAvg > incident_avg_threshold))
    or (incident_count_threshold != -1 and incident_avg_threshold == -1 and IncidentCount > incident_count_threshold)
    or (incident_count_threshold == -1 and incident_avg_threshold != -1 and FullSearchDailyAvg > incident_avg_threshold)
    or (incident_count_threshold == -1 and incident_avg_threshold == -1)
| project-reorder Title, RelatedAnalyticRuleId, IncidentCount, MinDateTime, MaxDateTime, DaysBetweenMinMax, Occurences, DailyOccurenceAvg, DailyOccurenceStdDev, SearchDurationDays, FullSearchDailyAvg, FullSearchDailyStdDev
| join kind = leftouter (
    SecurityIncident
    | where TimeGenerated > ago(lookback_time)
    | where ProviderName == "Microsoft XDR"
    | project IncidentNumber, IncidentName, Title, RelatedAnalyticRuleIds, AlertIds
    | mv-expand AlertId = AlertIds
    | extend AlertId = tostring(AlertId)
    | join kind = inner (
        SecurityAlert
        | where TimeGenerated > ago(lookback_time)
        | where ProductName == "Azure Sentinel" and ProductComponentName == "Scheduled Alerts"
        | project SystemAlertId, AlertName, Entities, AlertType
        | extend AnalyticRuleId = split(AlertType, "_")[-1]
        | mv-expand Entities = todynamic(Entities)
        | mv-expand Entities = todynamic(Entities)
        | mv-expand kind=array key = bag_keys(Entities)
        | extend PropertyName = tostring(key), PropertyValue = tostring(Entities[tostring(key)])
        | where not(PropertyName startswith "$") and PropertyName != ""
        | where PropertyName != "Type"
    ) on $left.AlertId == $right.SystemAlertId
    | extend AnalyticRuleId = tostring(AnalyticRuleId)
    | summarize EntitiesIncidentCount = count_distinct(IncidentNumber), EntitiesAlertCount = count_distinct(SystemAlertId), IncidentNumbers=make_set(IncidentNumber), SystemAlertIds=make_set(SystemAlertId), AlertNames = make_set(AlertName) by Title, AnalyticRuleId, PropertyName, PropertyValue
    | project-reorder Title, AnalyticRuleId, PropertyName, PropertyValue, EntitiesIncidentCount, EntitiesAlertCount, AlertNames, IncidentNumbers, SystemAlertIds
    | extend EntitiesAppearancesFormatted = strcat(PropertyName, ": ", PropertyValue, ", ", "IncidentCount: ", EntitiesIncidentCount, "<br />")
    | extend EntitiesAppearances = pack(PropertyName, PropertyValue, "IncidentCount", EntitiesIncidentCount)
    | order by toint(EntitiesAppearances.IncidentCount) desc
    | summarize EntitiesAppearancesFormatted = make_list(EntitiesAppearancesFormatted), EntitiesAppearances = make_list(EntitiesAppearances) by AnalyticRuleId, Title
) on $left.Title == $right.Title and $left.RelatedAnalyticRuleId == $right.AnalyticRuleId
| project-away Title1, AnalyticRuleId

Monitoring Detection Tampering and Health

This section focuses on detecting tampering (rule deletion/disabling) and monitoring the health of detection rules. Enabling health and audit logging in Microsoft Sentinel is a prerequisite. The article provides KQL queries for:

  • Identifying unauthorized deletions of analytic rules.
  • Detecting instances where analytic rules were disabled.
  • Identifying rules automatically disabled by Sentinel due to consecutive failures.
  • Calculating daily and monthly execution success/failure ratios.

The queries use variables like monthly_failures_ratio_threshold and daily_failures_ratio_threshold to define acceptable failure rates. The arrays deleted_rules_authorized_entities and disabled_rules_authorized_entities are used to exclude authorized entities from triggering alerts.

let lookback_time = 30d;
let deleted_rules_authorized_entities = dynamic([]);
let disabled_rules_authorized_entities = dynamic([]);
let monthly_failures_ratio_threshold = 20;
let daily_failures_ratio_threshold = 70;
let rule_deleted_results = SentinelAudit
| where TimeGenerated > ago(lookback_time)
| where Status == 'Success'
| where SentinelResourceType == 'Analytic Rule'
| where Description == "Analytics rule deleted"
| extend caller_name = tostring(ExtendedProperties.CallerName), caller_ip_address = tostring(ExtendedProperties.CallerIpAddress), RuleId = tostring(ExtendedProperties.ResourceId)
| where caller_name !in~ (deleted_rules_authorized_entities)
| project TimeGenerated, SentinelResourceName, original_description = Description, RuleId, work_item_title = strcat('🗑️ Analytics Rule Deleted [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' was deleted on ', format_datetime(TimeGenerated, 'MM-dd-yyyy HH:mm:ss'), ', by the entity ', caller_name, ' who logged in using the IP address ', caller_ip_address, '.\n<br />It is recommended to confirm the above activity.'), severity = 'High' ;
let rule_disabled_results = SentinelAudit
| where Status == 'Success'
| where SentinelResourceType == 'Analytic Rule'
| where ExtendedProperties.ResourceDiffMemberNames has 'Properties.Enabled'
| extend caller_name = tostring(ExtendedProperties.CallerName), caller_ip_address = tostring(ExtendedProperties.CallerIpAddress), RuleId = tostring(ExtendedProperties.ResourceId)
| where caller_name !in~ (disabled_rules_authorized_entities)
| extend WasEnabled = extract(@',"enabled":(\w+),.*}}', 1, tostring(ExtendedProperties.OriginalResourceState)), IsEnabled = extract(@',"enabled":(\w+),.*}}', 1, tostring(ExtendedProperties.UpdatedResourceState))
// Keep only rules that were enabled before, and are disabled now
| where WasEnabled =~ 'true' and IsEnabled =~ 'false'
| project TimeGenerated, SentinelResourceName, original_description = Description, RuleId, work_item_title = strcat('🚩 Analytics Rule Disabled [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' was disabled on ', format_datetime(TimeGenerated, 'MM-dd-yyyy HH:mm:ss'), ', by the entity ', caller_name, ' who logged in using the IP address ', caller_ip_address, '.\n<br />It is recommended to confirm the above activity.'), severity = 'High' ;
let rule_auto_disabled_results = SentinelHealth
| where TimeGenerated > ago(lookback_time)
| where SentinelResourceType == 'Analytics Rule'
| where Description has "Rule failed to run on multiple occasions and has been disabled"
| extend RuleId = tostring(ExtendedProperties.RuleId)
| project TimeGenerated, SentinelResourceName, Reason, original_description = Description, RuleId, work_item_title = strcat('🚨 Analytics Rule Auto-Disabled [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' was automatically disabled on ', format_datetime(TimeGenerated, 'MM-dd-yyyy HH:mm:ss'), ', and it will stop executing based on its configured schedule.'), severity = 'High' ;
let rule_failed_to_run_daily_rates = SentinelHealth
| where TimeGenerated > ago(lookback_time)
| where SentinelResourceType == 'Analytics Rule'
| where OperationName endswith "rule run"
| summarize DailyRuleExecutions = todouble(count()), DailyRuleUnsuccessfulExecutions = todouble(countif(Status == 'Failure')), DailyRuleSuccessfulExecutions = todouble(countif(Status == 'Success')), make_set(Status, 5), FailuresDescription = make_set_if(Description, Status == 'Failure', 10), FailuresReason = make_set_if(Reason, Status == 'Failure', 20) by SentinelResourceName, RuleId = tostring(ExtendedProperties.RuleId), bin(TimeGenerated, 1d)
| extend daily_success_rate = round((100.0 * (DailyRuleSuccessfulExecutions / DailyRuleExecutions)), 2), daily_failure_rate = round((100.0 * (DailyRuleUnsuccessfulExecutions / DailyRuleExecutions)), 2)
| project-reorder SentinelResourceName, daily_success_rate, daily_failure_rate, RuleId
| where daily_failure_rate > daily_failures_ratio_threshold
| extend work_item_title = strcat('⚠️ Daily Rule Failure Ratio Reached [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' exceeded the daily failure threshold on ', format_datetime(TimeGenerated, 'MM-dd-yyyy'), '.\n<br />\n<br />Daily Failure Rate: ', daily_failure_rate, '\n<br />Daily Success Rate: ', daily_success_rate, '\n<br />Daily Rule Executions: ', DailyRuleExecutions, '\n<br />Daily Rule Successful Executions: ', DailyRuleSuccessfulExecutions, '\n<br />Daily Rule Failed Executions: ', DailyRuleUnsuccessfulExecutions, '\n<br />\n<br />It is recommended to investigate the execution errors and tune the rule query.'), severity = 'Medium' ;
let rule_failed_to_run_monthly_rates = SentinelHealth
| where TimeGenerated > ago(lookback_time)
| where SentinelResourceType == 'Analytics Rule'
| where OperationName endswith "rule run"
| summarize MonthlyRuleExecutions = todouble(count()), MonthlyRuleUnsuccessfulExecutions = todouble(countif(Status == 'Failure')), MonthlyRuleSuccessfulExecutions = todouble(countif(Status == 'Success')), make_set(Status, 5), FailuresDescription = make_set_if(Description, Status == 'Failure', 10), FailuresReason = make_set_if(Reason, Status == 'Failure', 20) by SentinelResourceName, RuleId = tostring(ExtendedProperties.RuleId)
| extend monthly_success_rate = round((100.0 * (MonthlyRuleSuccessfulExecutions / MonthlyRuleExecutions)), 2), monthly_failure_rate = round((100.0 * (MonthlyRuleUnsuccessfulExecutions / MonthlyRuleExecutions)), 2)
| project-reorder SentinelResourceName, monthly_success_rate, monthly_failure_rate, RuleId
| where monthly_failure_rate >= monthly_failures_ratio_threshold
| extend work_item_title = strcat('⚠️ Monthly Rule Failure Ratio Reached [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' exceeded the montly failure threshold for the period ', format_datetime(ago(30d), "MM-dd-yyyy"), ' to ', format_datetime(now(), "MM-dd-yyyy"), '.\n<br />\n<br />Monthly Failure Rate: ', monthly_failure_rate, '\n<br />Monthly Success Rate: ', monthly_success_rate, '\n<br />Monthly Rule Executions: ', MonthlyRuleExecutions, '\n<br />Monthly Rule Successful Executions: ', MonthlyRuleSuccessfulExecutions, '\n<br />Monthly Rule Failed Executions: ', MonthlyRuleUnsuccessfulExecutions, '\n<br />\n<br />It is recommended to investigate the execution errors and tune the rule query.'), severity = 'Low' ;
union rule_auto_disabled_results, rule_deleted_results, rule_failed_to_run_monthly_rates, rule_failed_to_run_daily_rates, rule_disabled_results
| project-reorder work_item_title, work_item_description, severity

Automating Work Item Creation from the Pipelines

The article uses the "CreateWorkItem" extension from the Azure DevOps marketplace to automate work item creation. This extension enables defining work items as a JSON array within a pipeline step. The preventDuplicates, keyFields, updateDuplicates, and updateRules parameters manage duplicate and update existing work items.

- task: CreateWorkItem@2
  displayName: 'Bulk Create Work Items'
  inputs:
    bulkCreate: true
    workItemsJson: |
      [
        {
          "workItemType": "Task",
          "title": "Work item title",
          "fieldMappings": [
            "Description=Insert description here",
            "Severity=Low"
          ],
          "preventDuplicates": true,
          "keyFields": [
            "Title"
          ],
          "updateDuplicates": true,
          "updateRules": "Description|=Appended Description"
        }
      ]

Jinja Templating

Jinja is used to control the format of created work items. The article provides templates for both detection trigger rates and tampering/health checks.

  • Detection Trigger Rate and Entity Appearance Template:
[{% for result in results %}
{
  "workItemType": "Task",
  "title": "{{result.Title}}",
  "fieldMappings": [
    "Description=Report generated at: {{ now_timestamp }}<br />Title: {{ result.Title }}<br />Incidents observed from {{ result.MinDateTime }} to {{ result.MaxDateTime }}<br />Search duration: {{ result.SearchDurationDays }} days<br />Total incidents: {{ result.IncidentCount }}<br />Active days: {{ result.Occurences }} out of {{ result.DaysBetweenMinMax }}<br />Average per active day: {{ result.DailyOccurenceAvg }}<br />StdDev per active day: {{ result.DailyOccurenceStdDev }}<br />Average per day (full search): {{ result.FullSearchDailyAvg }}<br />StdDev per day (full search): {{ result.FullSearchDailyStdDev }}<br />Entities:<br /> {{ result.EntitiesAppearancesFormatted | replace('["', "") | replace('"]', "") | replace('","', "") | replace('"', "\"") }}<br />",
    "Severity={{ 'High' if result.FullSearchDailyAvg > 100 else 'Medium' if result.FullSearchDailyAvg > 10 else 'Low' }}"
  ],
  "preventDuplicates": true,
  "keyFields": ["Title"],
  "updateDuplicates": true,
  "updateRules": "Description|=<br />---<br />Report generated at: {{ now_timestamp }}<br />Title: {{ result.Title }}<br />Incidents observed from {{ result.MinDateTime }} to {{ result.MaxDateTime }}<br />Search duration: {{ result.SearchDurationDays }} days<br />Total incidents: {{ result.IncidentCount }}<br />Active days: {{ result.Occurences }} out of {{ result.DaysBetweenMinMax }}<br />Average per active day: {{ result.DailyOccurenceAvg }}<br />StdDev per active day: {{ result.DailyOccurenceStdDev }}<br />Average per day (full search): {{ result.FullSearchDailyAvg }}<br />StdDev per day (full search): {{ result.FullSearchDailyStdDev }}<br />Entities:<br /> {{ result.EntitiesAppearancesFormatted | replace('["', "") | replace('"]', "") | replace('","', "") | replace('"', "\"") }}<br />"
}{% if not loop.last %},{% endif %}{% endfor %}
]
  • Detection Tampering and Health Checks Template:
[{% for result in results %}
{
  "workItemType": "Task",
  "title": "{{result.work_item_title}}",
  "fieldMappings": [
    "Description=Report generated at: {{ now_timestamp }}<br />Title: {{ result.work_item_title }}<br />{{ result.work_item_description }}<br /><br />Rule ID: {{ result.RuleId }}<br />",
    "Severity={{ result.severity }}"
  ],
  "preventDuplicates": true,
  "keyFields": ["Title"],
  "updateDuplicates": true,
  "updateRules": "Description|=<br />---<br />Update generated at: {{ now_timestamp }}<br />Title: {{ result.work_item_title }}<br />{{ result.work_item_description }}<br /><br />Rule ID: {{ result.RuleId }}<br />"
}{% if not loop.last %},{% endif %}{% endfor %}
]

Detection Monitoring Script

A Python script is provided that accepts command-line arguments, executes the KQL queries, filters the results to include only detections defined in the local repository, and generates JSON for work item creation.

Key functionalities of the script:

  • get_repository_detection_info(): Scans the local "detections" directory for metadata YAML files, creating a dictionary mapping detection IDs to titles and versions.
  • get_results_for_detections_in_repo(): Filters query results, matching detection IDs against the local repository data.
  • run_query_and_generate_work_item_dict(): Executes the query, filters results, renders the Jinja2 template to create JSON, and saves results to JSON and CSV files.
  • Main function (main) handles argument parsing and calls run_query_and_generate_work_item_dict.
import argparse
import os
import sys
import json
import yaml
import csv
from lib.platforms import Sentinel
from tabulate import tabulate
from datetime import datetime, timezone
from jinja2 import Environment, FileSystemLoader

def get_repository_detection_info() -> dict:
    """
    Scans the local detections folder for metadata YAML files and builds a dictionary
    mapping detection IDs to their title and version.
    """
    detection_info = {}
    for root, _, files in os.walk("detections"):
        for file in files:
            if file.endswith("_meta.yml"):
                file_path = os.path.join(root, file)
                with open(file_path, "r") as f:
                    try:
                        yaml_content = yaml.safe_load(f)
                        file_id = yaml_content.get("id")
                        title = yaml_content.get("title")
                        version = yaml_content.get("version")
                        if file_id:
                            detection_info[file_id] = {"title": title, "version": version}
                    except yaml.YAMLError as e:
                        print(f"##vso[task.logissue type=error]Error reading {file_path}: {e}")
                        return detection_info

def get_results_for_detections_in_repo(results_kv: list, compare_field: str):
    results_for_detections_in_repo = []
    # Load detection metadata from the repo
    repository_detections = get_repository_detection_info()
    for result_line in results_kv:
        compare_value = result_line[compare_field]
        if compare_value.startswith("["):  # If field is a list of IDs
            detection_rule_ids = []
            try:
                # Deserialize the RelatedAnalyticRuleIds field
                detection_rule_ids = json.loads(compare_value)
            except:
                print(f"##vso[task.logissue type=error]Error loading {compare_field} for {result_line}")
            # Check if any rule ID matches our local detections
            for detection_rule_id in detection_rule_ids:
                if detection_rule_id in repository_detections:
                    results_for_detections_in_repo.append(result_line)
                    break  # Stop checking if at least one match is found
        else:  # consider it a string
            if compare_value in repository_detections:
                results_for_detections_in_repo.append(result_line)
    return results_for_detections_in_repo

def run_query_and_generate_work_item_dict(
    tenant: str, platform: str, query: str, template: str, compare_field: str
):
    """
    Executes a query against a security platform (e.g. Sentinel), filters results
    based on detections defined in the local repository, and prints a summary table
    of relevant incidents.
    """
    # Initialize platform object and run query
    pl = Sentinel(tenant)
    platform_response = pl.run_query(query)

    if platform_response.success:
        # Extract rows and column headers from the query result
        data = platform_response.data
        columns = [column["name"] for column in data["tables"][0]["columns"]]
        rows = data["tables"][0]["rows"]
        results_kv = [dict(zip(columns, row)) for row in rows]  # Convert to list of dictionaries

        # Loop through results and match them with detection IDs in repo
        results_for_detections_in_repo = get_results_for_detections_in_repo(results_kv, compare_field)

        # Display the filtered results in a formatted table
        print(tabulate(results_for_detections_in_repo, headers="keys", tablefmt="grid"))

        # Create work items dictionary
        now_timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H.%M.%S UTC")

        if template:
            env = Environment(loader=FileSystemLoader("pipelines/scripts/templates"))
            tmplt = env.get_template(template)
            template_name = os.path.basename(template).removesuffix(".jinja")
            template_content = tmplt.render(results=results_for_detections_in_repo, now_timestamp=now_timestamp)
            print(template_content)
            print(f"##vso[task.setvariable variable={template_name}]{template_content.replace('\n', ' ')}")

        # If there are results:
        if len(results_for_detections_in_repo) > 0:
            result_filename = f"results_{now_timestamp}"
            if template:
                result_filename = f"{os.path.basename(template).removesuffix('.jinja')}_results"
            try:
                # Save as JSON file
                with open(f"pipelines/results/{result_filename}.json", 'w') as json_file:
                    json_file.write(json.dumps(results_for_detections_in_repo, indent=4))
            except Exception as e:
                print(f"##vso[task.logissue type=error]Error saving JSON file: {str(e)}")

            try:
                columns_names = results_for_detections_in_repo[0].keys()
                with open(f"pipelines/results/{result_filename}.csv", 'w', newline="", encoding="utf-8") as csv_file:
                    writer = csv.DictWriter(csv_file, fieldnames=columns_names, quoting=csv.QUOTE_ALL)
                    writer.writeheader()
                    writer.writerows(results_for_detections_in_repo)
            except Exception as e:
                print(f"##vso[task.logissue type=error]Error saving CSV file: {str(e)}")


def main():
    parser = argparse.ArgumentParser(description="Alert Monitoring Script")
    parser.add_argument("--tenants", type=str, required=True, help="Tenant to gather stats.")
    parser.add_argument("--platform", type=str, required=True, help="Platform of tenant.")
    parser.add_argument("--template", type=str, help="Templates to use for result output.")
    parser.add_argument(
        "--detection-compare-field",
        type=str,
        required=True,
        help="Query results field to match to repository detection IDs.",
    )
    args = parser.parse_args()

    # Read query from environment variable
    query = os.getenv("QUERY")
    if query is None:
        print("QUERY environment variable is not set.")
        sys.exit(1)

    run_query_and_generate_work_item_dict(
        tenant=args.tenants,
        platform=args.platform,
        query=query,
        template=args.template,
        compare_field=args.detection_compare_field,
    )


# Standard Python entry point check
if __name__ == "__main__":
    main()

Detection Monitoring Pipeline

The article details an Azure DevOps pipeline designed to execute KQL queries, process data with the Python script, and manage work items.

The pipeline:

  • Runs on a weekly schedule.
  • Accepts parameters for lookback time, incident thresholds, and failure ratios.
  • Uses two KQL queries defined in pipeline variables (kql_query and kql_query2).
  • Installs Python dependencies, executes the detection_monitoring.py script twice (once for trigger rates and once for performance), publishes the results as artifacts, and conditionally creates/updates work items based on pipeline parameters.
  • Includes a step to publish the results as pipeline artifacts.

```yaml name: Detection Monitoring trigger: none schedules: - cron: "0 2 * * 1" # At 01:00 UTC every Monday displayName: Weekly run branches: include: - main parameters: - name: lookback_time displayName: Lookback Time (days) type: number default: 30 - name: incident_count_threshold displayName: Incident Count Threshold type: number default: -1 # -1 disables this check - name: incident_avg_threshold displayName: Incident Average Threshold type: number default: -1 # -1 disables this check - name: monthly_failures_ratio_threshold displayName: Monthly Failures Ratio threshold. type: number default: 20 - name: daily_failures_ratio_threshold displayName: Daily Failures Ratio threshold. type: number default: 70 - name: create_work_items displayName: Create work items. type: boolean default: true variables: kql_query: | let lookback_time = ${{ parameters.lookback_time }}d; let incident_count_threshold = ${{ parameters.incident_count_threshold }}; let incident_avg_threshold = ${{ parameters.incident_avg_threshold }}; let incidents = SecurityIncident | where TimeGenerated > ago(lookback_time) | where ProviderName == "Microsoft XDR" | mv-expand RelatedAnalyticRuleId = RelatedAnalyticRuleIds | extend RelatedAnalyticRuleId = tostring(RelatedAnalyticRuleId) | project Title, TimeGenerated, Day = startofday(TimeGenerated), RelatedAnalyticRuleId; let daily_counts = incidents | summarize DailyCount = count() by Title, RelatedAnalyticRuleId, Day; daily_counts | join kind=inner ( incidents | summarize IncidentCount = count(), MinDateTime = min(TimeGenerated), MaxDateTime = max(TimeGenerated) by Title, RelatedAnalyticRuleId ) on Title and RelatedAnalyticRuleId | summarize IncidentCount = sum(DailyCount), MinDateTime = max(MinDateTime), MaxDateTime = max(MaxDateTime), DailyOccurenceAvg = round(avg(DailyCount), 2), Occurences = count(), DailyOccurenceStdDev = round(stdev(DailyCount), 2) by Title, RelatedAnalyticRuleId | extend DaysBetweenMinMax = datetime_diff("day", MaxDateTime, MinDateTime) + 1 | extend SearchDurationDays = datetime_diff("day", now(), ago(30d)) + 1 | extend FullSearchDailyAvg = round(todouble(IncidentCount) / todouble(SearchDurationDays), 2) | extend FullSearchDailyStdDev = round(DailyOccurenceStdDev * sqrt(todouble(Occurences) / todouble(SearchDurationDays)), 2) | where (incident_count_threshold != -1 and incident_avg_threshold != -1 and (IncidentCount > incident_count_threshold or FullSearchDailyAvg > incident_avg_threshold)) or (incident_count_threshold != -1 and incident_avg_threshold == -1 and IncidentCount > incident_count_threshold) or (incident_count_threshold == -1 and incident_avg_threshold != -1 and FullSearchDailyAvg > incident_avg_threshold) or (incident_count_threshold == -1 and incident_avg_threshold == -1) | project-reorder Title, RelatedAnalyticRuleId, IncidentCount, MinDateTime, MaxDateTime, DaysBetweenMinMax, Occurences, DailyOccurenceAvg, DailyOccurenceStdDev, SearchDurationDays, FullSearchDailyAvg, FullSearchDailyStdDev | join kind = inner (SecurityIncident | where TimeGenerated > ago(lookback_time) | where ProviderName == "Microsoft XDR" | project IncidentNumber, IncidentName, Title, RelatedAnalyticRuleIds, AlertIds | mv-expand AlertId = AlertIds | extend AlertId = tostring(AlertId) | join kind = inner ( SecurityAlert | where TimeGenerated > ago(lookback_time) | where ProductName == "Azure Sentinel" and ProductComponentName == "Scheduled Alerts" | project SystemAlertId, AlertName, Entities, AlertType | extend AnalyticRuleId = split(AlertType, "_")[-1] | mv-expand Entities = todynamic(Entities) | mv-expand Entities = todynamic(Entities) | mv-expand kind=array key = bag_keys(Entities) | extend PropertyName = tostring(key), PropertyValue = tostring(Entities[tostring(key)]) | where not(PropertyName startswith "$") and PropertyName != "" | where PropertyName != "Type" ) on $left.AlertId == $right.SystemAlertId | extend AnalyticRuleId = tostring(AnalyticRuleId) | summarize EntitiesIncidentCount = count_distinct(IncidentNumber), EntitiesAlertCount = count_distinct(SystemAlertId), IncidentNumbers=make_set(IncidentNumber), SystemAlertIds=make_set(SystemAlertId), AlertNames = make_set(AlertName) by Title, AnalyticRuleId, PropertyName, PropertyValue | project-reorder Title, AnalyticRuleId, PropertyName, PropertyValue, EntitiesIncidentCount, EntitiesAlertCount, AlertNames, IncidentNumbers, SystemAlertIds | extend EntitiesAppearancesFormatted = strcat(PropertyName, ": ", PropertyValue, ", ", "IncidentCount: ", EntitiesIncidentCount, "
") | extend EntitiesAppearances = pack(PropertyName, PropertyValue, "IncidentCount", EntitiesIncidentCount) | order by toint(EntitiesAppearances.IncidentCount) desc | summarize EntitiesAppearancesFormatted = make_list(EntitiesAppearancesFormatted), EntitiesAppearances = make_list(EntitiesAppearances) by AnalyticRuleId, Title ) on $left.Title == $right.Title and $left.RelatedAnalyticRuleId == $right.AnalyticRuleId | project-away Title1, AnalyticRuleId kql_query2: | let lookback_time = ${{ parameters.lookback_time }}d; let deleted_rules_authorized_entities = dynamic([]); let disabled_rules_authorized_entities = dynamic([]); let monthly_failures_ratio_threshold = ${{ parameters.monthly_failures_ratio_threshold }}; let daily_failures_ratio_threshold = ${{ parameters.daily_failures_ratio_threshold }}; let rule_auto_disabled_results = SentinelHealth | where Time