Automating Detection Monitoring with Detection-as-Code¶
Introduction¶
This article, part 7 of a series on Detection Engineering, focuses on automating the monitoring of deployed detections. Effective maintenance is crucial in detection engineering to prevent alert fatigue and ensure rules remain relevant. The article addresses the limitations of built-in SIEM/EDR features for rule monitoring and tuning, advocating for a proactive approach through automation to catch issues before they impact security operations.
Monitoring Checks¶
The article outlines four key types of monitoring checks to automate:
- Trigger Rate of Detections: Monitoring the average rate at which detections are triggered.
- Entities Triggering Rate: Identifying the frequency of entity appearances in detections.
- Detection Tampering: Tracking and reporting when detections are disabled or deleted.
- Detection Health: Monitoring daily and monthly failure rates of detections.
The goal is to automate these checks and generate Azure DevOps work items for identified issues.
App Registration¶
To access the Log Analytics API for querying detection information, an App registration in Azure is required. This app needs the "Log Analytics Reader" role. This role is assigned in the Log Analytics workspace under Access Control (IAM) -> Role Assignments.
Monitoring Detection Trigger Rate and Entities Appearances¶
The article provides a Kusto Query Language (KQL) query to analyze security incidents in Sentinel. This query calculates various metrics:
- Incident Title and Related Analytic Rule ID
- Incident Count within the lookback period
- First and last occurrence timestamps
- Daily Occurrence Average and Standard Deviation
- Search Duration and Overall Daily Average/StdDev
- Entities Appearances (entities and their counts)
The query uses incident_count_threshold and incident_avg_threshold to filter results, allowing incidents exceeding defined thresholds to be flagged.
let lookback_time = 30d;
let incident_count_threshold = -1;
let incident_avg_threshold = -1;
let incidents = SecurityIncident
| where TimeGenerated > ago(lookback_time)
| where ProviderName == "Microsoft XDR"
| mv-expand RelatedAnalyticRuleId = RelatedAnalyticRuleIds
| extend RelatedAnalyticRuleId = tostring(RelatedAnalyticRuleId)
| project Title, TimeGenerated, Day = startofday(TimeGenerated), RelatedAnalyticRuleId;
let daily_counts = incidents
| summarize DailyCount = count() by Title, RelatedAnalyticRuleId, Day;
daily_counts
| join kind=inner (
incidents
| summarize IncidentCount = count(), MinDateTime = min(TimeGenerated), MaxDateTime = max(TimeGenerated) by Title, RelatedAnalyticRuleId
) on Title and RelatedAnalyticRuleId
| summarize IncidentCount = sum(DailyCount), MinDateTime = max(MinDateTime), MaxDateTime = max(MaxDateTime), DailyOccurenceAvg = round(avg(DailyCount), 2), Occurences = count(), DailyOccurenceStdDev = round(stdev(DailyCount), 2) by Title, RelatedAnalyticRuleId
| extend DaysBetweenMinMax = datetime_diff("day", MaxDateTime, MinDateTime) + 1
| extend SearchDurationDays = datetime_diff("day", now(), ago(30d)) + 1
| extend FullSearchDailyAvg = round(todouble(IncidentCount) / todouble(SearchDurationDays), 2)
| extend FullSearchDailyStdDev = round(DailyOccurenceStdDev * sqrt(todouble(Occurences) / todouble(SearchDurationDays)), 2)
| where (incident_count_threshold != -1 and incident_avg_threshold != -1 and (IncidentCount > incident_count_threshold or FullSearchDailyAvg > incident_avg_threshold))
or (incident_count_threshold != -1 and incident_avg_threshold == -1 and IncidentCount > incident_count_threshold)
or (incident_count_threshold == -1 and incident_avg_threshold != -1 and FullSearchDailyAvg > incident_avg_threshold)
or (incident_count_threshold == -1 and incident_avg_threshold == -1)
| project-reorder Title, RelatedAnalyticRuleId, IncidentCount, MinDateTime, MaxDateTime, DaysBetweenMinMax, Occurences, DailyOccurenceAvg, DailyOccurenceStdDev, SearchDurationDays, FullSearchDailyAvg, FullSearchDailyStdDev
| join kind = leftouter (
SecurityIncident
| where TimeGenerated > ago(lookback_time)
| where ProviderName == "Microsoft XDR"
| project IncidentNumber, IncidentName, Title, RelatedAnalyticRuleIds, AlertIds
| mv-expand AlertId = AlertIds
| extend AlertId = tostring(AlertId)
| join kind = inner (
SecurityAlert
| where TimeGenerated > ago(lookback_time)
| where ProductName == "Azure Sentinel" and ProductComponentName == "Scheduled Alerts"
| project SystemAlertId, AlertName, Entities, AlertType
| extend AnalyticRuleId = split(AlertType, "_")[-1]
| mv-expand Entities = todynamic(Entities)
| mv-expand Entities = todynamic(Entities)
| mv-expand kind=array key = bag_keys(Entities)
| extend PropertyName = tostring(key), PropertyValue = tostring(Entities[tostring(key)])
| where not(PropertyName startswith "$") and PropertyName != ""
| where PropertyName != "Type"
) on $left.AlertId == $right.SystemAlertId
| extend AnalyticRuleId = tostring(AnalyticRuleId)
| summarize EntitiesIncidentCount = count_distinct(IncidentNumber), EntitiesAlertCount = count_distinct(SystemAlertId), IncidentNumbers=make_set(IncidentNumber), SystemAlertIds=make_set(SystemAlertId), AlertNames = make_set(AlertName) by Title, AnalyticRuleId, PropertyName, PropertyValue
| project-reorder Title, AnalyticRuleId, PropertyName, PropertyValue, EntitiesIncidentCount, EntitiesAlertCount, AlertNames, IncidentNumbers, SystemAlertIds
| extend EntitiesAppearancesFormatted = strcat(PropertyName, ": ", PropertyValue, ", ", "IncidentCount: ", EntitiesIncidentCount, "<br />")
| extend EntitiesAppearances = pack(PropertyName, PropertyValue, "IncidentCount", EntitiesIncidentCount)
| order by toint(EntitiesAppearances.IncidentCount) desc
| summarize EntitiesAppearancesFormatted = make_list(EntitiesAppearancesFormatted), EntitiesAppearances = make_list(EntitiesAppearances) by AnalyticRuleId, Title
) on $left.Title == $right.Title and $left.RelatedAnalyticRuleId == $right.AnalyticRuleId
| project-away Title1, AnalyticRuleId
Monitoring Detection Tampering and Health¶
This section focuses on detecting tampering (rule deletion/disabling) and monitoring the health of detection rules. Enabling health and audit logging in Microsoft Sentinel is a prerequisite. The article provides KQL queries for:
- Identifying unauthorized deletions of analytic rules.
- Detecting instances where analytic rules were disabled.
- Identifying rules automatically disabled by Sentinel due to consecutive failures.
- Calculating daily and monthly execution success/failure ratios.
The queries use variables like monthly_failures_ratio_threshold and daily_failures_ratio_threshold to define acceptable failure rates. The arrays deleted_rules_authorized_entities and disabled_rules_authorized_entities are used to exclude authorized entities from triggering alerts.
let lookback_time = 30d;
let deleted_rules_authorized_entities = dynamic([]);
let disabled_rules_authorized_entities = dynamic([]);
let monthly_failures_ratio_threshold = 20;
let daily_failures_ratio_threshold = 70;
let rule_deleted_results = SentinelAudit
| where TimeGenerated > ago(lookback_time)
| where Status == 'Success'
| where SentinelResourceType == 'Analytic Rule'
| where Description == "Analytics rule deleted"
| extend caller_name = tostring(ExtendedProperties.CallerName), caller_ip_address = tostring(ExtendedProperties.CallerIpAddress), RuleId = tostring(ExtendedProperties.ResourceId)
| where caller_name !in~ (deleted_rules_authorized_entities)
| project TimeGenerated, SentinelResourceName, original_description = Description, RuleId, work_item_title = strcat('🗑️ Analytics Rule Deleted [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' was deleted on ', format_datetime(TimeGenerated, 'MM-dd-yyyy HH:mm:ss'), ', by the entity ', caller_name, ' who logged in using the IP address ', caller_ip_address, '.\n<br />It is recommended to confirm the above activity.'), severity = 'High' ;
let rule_disabled_results = SentinelAudit
| where Status == 'Success'
| where SentinelResourceType == 'Analytic Rule'
| where ExtendedProperties.ResourceDiffMemberNames has 'Properties.Enabled'
| extend caller_name = tostring(ExtendedProperties.CallerName), caller_ip_address = tostring(ExtendedProperties.CallerIpAddress), RuleId = tostring(ExtendedProperties.ResourceId)
| where caller_name !in~ (disabled_rules_authorized_entities)
| extend WasEnabled = extract(@',"enabled":(\w+),.*}}', 1, tostring(ExtendedProperties.OriginalResourceState)), IsEnabled = extract(@',"enabled":(\w+),.*}}', 1, tostring(ExtendedProperties.UpdatedResourceState))
// Keep only rules that were enabled before, and are disabled now
| where WasEnabled =~ 'true' and IsEnabled =~ 'false'
| project TimeGenerated, SentinelResourceName, original_description = Description, RuleId, work_item_title = strcat('🚩 Analytics Rule Disabled [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' was disabled on ', format_datetime(TimeGenerated, 'MM-dd-yyyy HH:mm:ss'), ', by the entity ', caller_name, ' who logged in using the IP address ', caller_ip_address, '.\n<br />It is recommended to confirm the above activity.'), severity = 'High' ;
let rule_auto_disabled_results = SentinelHealth
| where TimeGenerated > ago(lookback_time)
| where SentinelResourceType == 'Analytics Rule'
| where Description has "Rule failed to run on multiple occasions and has been disabled"
| extend RuleId = tostring(ExtendedProperties.RuleId)
| project TimeGenerated, SentinelResourceName, Reason, original_description = Description, RuleId, work_item_title = strcat('🚨 Analytics Rule Auto-Disabled [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' was automatically disabled on ', format_datetime(TimeGenerated, 'MM-dd-yyyy HH:mm:ss'), ', and it will stop executing based on its configured schedule.'), severity = 'High' ;
let rule_failed_to_run_daily_rates = SentinelHealth
| where TimeGenerated > ago(lookback_time)
| where SentinelResourceType == 'Analytics Rule'
| where OperationName endswith "rule run"
| summarize DailyRuleExecutions = todouble(count()), DailyRuleUnsuccessfulExecutions = todouble(countif(Status == 'Failure')), DailyRuleSuccessfulExecutions = todouble(countif(Status == 'Success')), make_set(Status, 5), FailuresDescription = make_set_if(Description, Status == 'Failure', 10), FailuresReason = make_set_if(Reason, Status == 'Failure', 20) by SentinelResourceName, RuleId = tostring(ExtendedProperties.RuleId), bin(TimeGenerated, 1d)
| extend daily_success_rate = round((100.0 * (DailyRuleSuccessfulExecutions / DailyRuleExecutions)), 2), daily_failure_rate = round((100.0 * (DailyRuleUnsuccessfulExecutions / DailyRuleExecutions)), 2)
| project-reorder SentinelResourceName, daily_success_rate, daily_failure_rate, RuleId
| where daily_failure_rate > daily_failures_ratio_threshold
| extend work_item_title = strcat('⚠️ Daily Rule Failure Ratio Reached [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' exceeded the daily failure threshold on ', format_datetime(TimeGenerated, 'MM-dd-yyyy'), '.\n<br />\n<br />Daily Failure Rate: ', daily_failure_rate, '\n<br />Daily Success Rate: ', daily_success_rate, '\n<br />Daily Rule Executions: ', DailyRuleExecutions, '\n<br />Daily Rule Successful Executions: ', DailyRuleSuccessfulExecutions, '\n<br />Daily Rule Failed Executions: ', DailyRuleUnsuccessfulExecutions, '\n<br />\n<br />It is recommended to investigate the execution errors and tune the rule query.'), severity = 'Medium' ;
let rule_failed_to_run_monthly_rates = SentinelHealth
| where TimeGenerated > ago(lookback_time)
| where SentinelResourceType == 'Analytics Rule'
| where OperationName endswith "rule run"
| summarize MonthlyRuleExecutions = todouble(count()), MonthlyRuleUnsuccessfulExecutions = todouble(countif(Status == 'Failure')), MonthlyRuleSuccessfulExecutions = todouble(countif(Status == 'Success')), make_set(Status, 5), FailuresDescription = make_set_if(Description, Status == 'Failure', 10), FailuresReason = make_set_if(Reason, Status == 'Failure', 20) by SentinelResourceName, RuleId = tostring(ExtendedProperties.RuleId)
| extend monthly_success_rate = round((100.0 * (MonthlyRuleSuccessfulExecutions / MonthlyRuleExecutions)), 2), monthly_failure_rate = round((100.0 * (MonthlyRuleUnsuccessfulExecutions / MonthlyRuleExecutions)), 2)
| project-reorder SentinelResourceName, monthly_success_rate, monthly_failure_rate, RuleId
| where monthly_failure_rate >= monthly_failures_ratio_threshold
| extend work_item_title = strcat('⚠️ Monthly Rule Failure Ratio Reached [RuleId: ', RuleId, ']'), work_item_description = strcat('The analytics rule ', SentinelResourceName, ' exceeded the montly failure threshold for the period ', format_datetime(ago(30d), "MM-dd-yyyy"), ' to ', format_datetime(now(), "MM-dd-yyyy"), '.\n<br />\n<br />Monthly Failure Rate: ', monthly_failure_rate, '\n<br />Monthly Success Rate: ', monthly_success_rate, '\n<br />Monthly Rule Executions: ', MonthlyRuleExecutions, '\n<br />Monthly Rule Successful Executions: ', MonthlyRuleSuccessfulExecutions, '\n<br />Monthly Rule Failed Executions: ', MonthlyRuleUnsuccessfulExecutions, '\n<br />\n<br />It is recommended to investigate the execution errors and tune the rule query.'), severity = 'Low' ;
union rule_auto_disabled_results, rule_deleted_results, rule_failed_to_run_monthly_rates, rule_failed_to_run_daily_rates, rule_disabled_results
| project-reorder work_item_title, work_item_description, severity
Automating Work Item Creation from the Pipelines¶
The article uses the "CreateWorkItem" extension from the Azure DevOps marketplace to automate work item creation. This extension enables defining work items as a JSON array within a pipeline step. The preventDuplicates, keyFields, updateDuplicates, and updateRules parameters manage duplicate and update existing work items.
- task: CreateWorkItem@2
displayName: 'Bulk Create Work Items'
inputs:
bulkCreate: true
workItemsJson: |
[
{
"workItemType": "Task",
"title": "Work item title",
"fieldMappings": [
"Description=Insert description here",
"Severity=Low"
],
"preventDuplicates": true,
"keyFields": [
"Title"
],
"updateDuplicates": true,
"updateRules": "Description|=Appended Description"
}
]
Jinja Templating¶
Jinja is used to control the format of created work items. The article provides templates for both detection trigger rates and tampering/health checks.
- Detection Trigger Rate and Entity Appearance Template:
[{% for result in results %}
{
"workItemType": "Task",
"title": "{{result.Title}}",
"fieldMappings": [
"Description=Report generated at: {{ now_timestamp }}<br />Title: {{ result.Title }}<br />Incidents observed from {{ result.MinDateTime }} to {{ result.MaxDateTime }}<br />Search duration: {{ result.SearchDurationDays }} days<br />Total incidents: {{ result.IncidentCount }}<br />Active days: {{ result.Occurences }} out of {{ result.DaysBetweenMinMax }}<br />Average per active day: {{ result.DailyOccurenceAvg }}<br />StdDev per active day: {{ result.DailyOccurenceStdDev }}<br />Average per day (full search): {{ result.FullSearchDailyAvg }}<br />StdDev per day (full search): {{ result.FullSearchDailyStdDev }}<br />Entities:<br /> {{ result.EntitiesAppearancesFormatted | replace('["', "") | replace('"]', "") | replace('","', "") | replace('"', "\"") }}<br />",
"Severity={{ 'High' if result.FullSearchDailyAvg > 100 else 'Medium' if result.FullSearchDailyAvg > 10 else 'Low' }}"
],
"preventDuplicates": true,
"keyFields": ["Title"],
"updateDuplicates": true,
"updateRules": "Description|=<br />---<br />Report generated at: {{ now_timestamp }}<br />Title: {{ result.Title }}<br />Incidents observed from {{ result.MinDateTime }} to {{ result.MaxDateTime }}<br />Search duration: {{ result.SearchDurationDays }} days<br />Total incidents: {{ result.IncidentCount }}<br />Active days: {{ result.Occurences }} out of {{ result.DaysBetweenMinMax }}<br />Average per active day: {{ result.DailyOccurenceAvg }}<br />StdDev per active day: {{ result.DailyOccurenceStdDev }}<br />Average per day (full search): {{ result.FullSearchDailyAvg }}<br />StdDev per day (full search): {{ result.FullSearchDailyStdDev }}<br />Entities:<br /> {{ result.EntitiesAppearancesFormatted | replace('["', "") | replace('"]', "") | replace('","', "") | replace('"', "\"") }}<br />"
}{% if not loop.last %},{% endif %}{% endfor %}
]
- Detection Tampering and Health Checks Template:
[{% for result in results %}
{
"workItemType": "Task",
"title": "{{result.work_item_title}}",
"fieldMappings": [
"Description=Report generated at: {{ now_timestamp }}<br />Title: {{ result.work_item_title }}<br />{{ result.work_item_description }}<br /><br />Rule ID: {{ result.RuleId }}<br />",
"Severity={{ result.severity }}"
],
"preventDuplicates": true,
"keyFields": ["Title"],
"updateDuplicates": true,
"updateRules": "Description|=<br />---<br />Update generated at: {{ now_timestamp }}<br />Title: {{ result.work_item_title }}<br />{{ result.work_item_description }}<br /><br />Rule ID: {{ result.RuleId }}<br />"
}{% if not loop.last %},{% endif %}{% endfor %}
]
Detection Monitoring Script¶
A Python script is provided that accepts command-line arguments, executes the KQL queries, filters the results to include only detections defined in the local repository, and generates JSON for work item creation.
Key functionalities of the script:
get_repository_detection_info(): Scans the local "detections" directory for metadata YAML files, creating a dictionary mapping detection IDs to titles and versions.get_results_for_detections_in_repo(): Filters query results, matching detection IDs against the local repository data.run_query_and_generate_work_item_dict(): Executes the query, filters results, renders the Jinja2 template to create JSON, and saves results to JSON and CSV files.- Main function (
main) handles argument parsing and callsrun_query_and_generate_work_item_dict.
import argparse
import os
import sys
import json
import yaml
import csv
from lib.platforms import Sentinel
from tabulate import tabulate
from datetime import datetime, timezone
from jinja2 import Environment, FileSystemLoader
def get_repository_detection_info() -> dict:
"""
Scans the local detections folder for metadata YAML files and builds a dictionary
mapping detection IDs to their title and version.
"""
detection_info = {}
for root, _, files in os.walk("detections"):
for file in files:
if file.endswith("_meta.yml"):
file_path = os.path.join(root, file)
with open(file_path, "r") as f:
try:
yaml_content = yaml.safe_load(f)
file_id = yaml_content.get("id")
title = yaml_content.get("title")
version = yaml_content.get("version")
if file_id:
detection_info[file_id] = {"title": title, "version": version}
except yaml.YAMLError as e:
print(f"##vso[task.logissue type=error]Error reading {file_path}: {e}")
return detection_info
def get_results_for_detections_in_repo(results_kv: list, compare_field: str):
results_for_detections_in_repo = []
# Load detection metadata from the repo
repository_detections = get_repository_detection_info()
for result_line in results_kv:
compare_value = result_line[compare_field]
if compare_value.startswith("["): # If field is a list of IDs
detection_rule_ids = []
try:
# Deserialize the RelatedAnalyticRuleIds field
detection_rule_ids = json.loads(compare_value)
except:
print(f"##vso[task.logissue type=error]Error loading {compare_field} for {result_line}")
# Check if any rule ID matches our local detections
for detection_rule_id in detection_rule_ids:
if detection_rule_id in repository_detections:
results_for_detections_in_repo.append(result_line)
break # Stop checking if at least one match is found
else: # consider it a string
if compare_value in repository_detections:
results_for_detections_in_repo.append(result_line)
return results_for_detections_in_repo
def run_query_and_generate_work_item_dict(
tenant: str, platform: str, query: str, template: str, compare_field: str
):
"""
Executes a query against a security platform (e.g. Sentinel), filters results
based on detections defined in the local repository, and prints a summary table
of relevant incidents.
"""
# Initialize platform object and run query
pl = Sentinel(tenant)
platform_response = pl.run_query(query)
if platform_response.success:
# Extract rows and column headers from the query result
data = platform_response.data
columns = [column["name"] for column in data["tables"][0]["columns"]]
rows = data["tables"][0]["rows"]
results_kv = [dict(zip(columns, row)) for row in rows] # Convert to list of dictionaries
# Loop through results and match them with detection IDs in repo
results_for_detections_in_repo = get_results_for_detections_in_repo(results_kv, compare_field)
# Display the filtered results in a formatted table
print(tabulate(results_for_detections_in_repo, headers="keys", tablefmt="grid"))
# Create work items dictionary
now_timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H.%M.%S UTC")
if template:
env = Environment(loader=FileSystemLoader("pipelines/scripts/templates"))
tmplt = env.get_template(template)
template_name = os.path.basename(template).removesuffix(".jinja")
template_content = tmplt.render(results=results_for_detections_in_repo, now_timestamp=now_timestamp)
print(template_content)
print(f"##vso[task.setvariable variable={template_name}]{template_content.replace('\n', ' ')}")
# If there are results:
if len(results_for_detections_in_repo) > 0:
result_filename = f"results_{now_timestamp}"
if template:
result_filename = f"{os.path.basename(template).removesuffix('.jinja')}_results"
try:
# Save as JSON file
with open(f"pipelines/results/{result_filename}.json", 'w') as json_file:
json_file.write(json.dumps(results_for_detections_in_repo, indent=4))
except Exception as e:
print(f"##vso[task.logissue type=error]Error saving JSON file: {str(e)}")
try:
columns_names = results_for_detections_in_repo[0].keys()
with open(f"pipelines/results/{result_filename}.csv", 'w', newline="", encoding="utf-8") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=columns_names, quoting=csv.QUOTE_ALL)
writer.writeheader()
writer.writerows(results_for_detections_in_repo)
except Exception as e:
print(f"##vso[task.logissue type=error]Error saving CSV file: {str(e)}")
def main():
parser = argparse.ArgumentParser(description="Alert Monitoring Script")
parser.add_argument("--tenants", type=str, required=True, help="Tenant to gather stats.")
parser.add_argument("--platform", type=str, required=True, help="Platform of tenant.")
parser.add_argument("--template", type=str, help="Templates to use for result output.")
parser.add_argument(
"--detection-compare-field",
type=str,
required=True,
help="Query results field to match to repository detection IDs.",
)
args = parser.parse_args()
# Read query from environment variable
query = os.getenv("QUERY")
if query is None:
print("QUERY environment variable is not set.")
sys.exit(1)
run_query_and_generate_work_item_dict(
tenant=args.tenants,
platform=args.platform,
query=query,
template=args.template,
compare_field=args.detection_compare_field,
)
# Standard Python entry point check
if __name__ == "__main__":
main()
Detection Monitoring Pipeline¶
The article details an Azure DevOps pipeline designed to execute KQL queries, process data with the Python script, and manage work items.
The pipeline:
- Runs on a weekly schedule.
- Accepts parameters for lookback time, incident thresholds, and failure ratios.
- Uses two KQL queries defined in pipeline variables (
kql_queryandkql_query2). - Installs Python dependencies, executes the
detection_monitoring.pyscript twice (once for trigger rates and once for performance), publishes the results as artifacts, and conditionally creates/updates work items based on pipeline parameters. - Includes a step to publish the results as pipeline artifacts.
```yaml
name: Detection Monitoring
trigger: none
schedules:
- cron: "0 2 * * 1" # At 01:00 UTC every Monday
displayName: Weekly run
branches:
include:
- main
parameters:
- name: lookback_time
displayName: Lookback Time (days)
type: number
default: 30
- name: incident_count_threshold
displayName: Incident Count Threshold
type: number
default: -1 # -1 disables this check
- name: incident_avg_threshold
displayName: Incident Average Threshold
type: number
default: -1 # -1 disables this check
- name: monthly_failures_ratio_threshold
displayName: Monthly Failures Ratio threshold.
type: number
default: 20
- name: daily_failures_ratio_threshold
displayName: Daily Failures Ratio threshold.
type: number
default: 70
- name: create_work_items
displayName: Create work items.
type: boolean
default: true
variables:
kql_query: |
let lookback_time = ${{ parameters.lookback_time }}d;
let incident_count_threshold = ${{ parameters.incident_count_threshold }};
let incident_avg_threshold = ${{ parameters.incident_avg_threshold }};
let incidents = SecurityIncident
| where TimeGenerated > ago(lookback_time)
| where ProviderName == "Microsoft XDR"
| mv-expand RelatedAnalyticRuleId = RelatedAnalyticRuleIds
| extend RelatedAnalyticRuleId = tostring(RelatedAnalyticRuleId)
| project Title, TimeGenerated, Day = startofday(TimeGenerated), RelatedAnalyticRuleId;
let daily_counts = incidents
| summarize DailyCount = count() by Title, RelatedAnalyticRuleId, Day;
daily_counts
| join kind=inner (
incidents
| summarize IncidentCount = count(), MinDateTime = min(TimeGenerated), MaxDateTime = max(TimeGenerated) by Title, RelatedAnalyticRuleId
) on Title and RelatedAnalyticRuleId
| summarize IncidentCount = sum(DailyCount), MinDateTime = max(MinDateTime), MaxDateTime = max(MaxDateTime), DailyOccurenceAvg = round(avg(DailyCount), 2), Occurences = count(), DailyOccurenceStdDev = round(stdev(DailyCount), 2) by Title, RelatedAnalyticRuleId
| extend DaysBetweenMinMax = datetime_diff("day", MaxDateTime, MinDateTime) + 1
| extend SearchDurationDays = datetime_diff("day", now(), ago(30d)) + 1
| extend FullSearchDailyAvg = round(todouble(IncidentCount) / todouble(SearchDurationDays), 2)
| extend FullSearchDailyStdDev = round(DailyOccurenceStdDev * sqrt(todouble(Occurences) / todouble(SearchDurationDays)), 2)
| where (incident_count_threshold != -1 and incident_avg_threshold != -1 and (IncidentCount > incident_count_threshold or FullSearchDailyAvg > incident_avg_threshold))
or (incident_count_threshold != -1 and incident_avg_threshold == -1 and IncidentCount > incident_count_threshold)
or (incident_count_threshold == -1 and incident_avg_threshold != -1 and FullSearchDailyAvg > incident_avg_threshold)
or (incident_count_threshold == -1 and incident_avg_threshold == -1)
| project-reorder Title, RelatedAnalyticRuleId, IncidentCount, MinDateTime, MaxDateTime, DaysBetweenMinMax, Occurences, DailyOccurenceAvg, DailyOccurenceStdDev, SearchDurationDays, FullSearchDailyAvg, FullSearchDailyStdDev
| join kind = inner (SecurityIncident
| where TimeGenerated > ago(lookback_time)
| where ProviderName == "Microsoft XDR"
| project IncidentNumber, IncidentName, Title, RelatedAnalyticRuleIds, AlertIds
| mv-expand AlertId = AlertIds
| extend AlertId = tostring(AlertId)
| join kind = inner ( SecurityAlert
| where TimeGenerated > ago(lookback_time)
| where ProductName == "Azure Sentinel" and ProductComponentName == "Scheduled Alerts"
| project SystemAlertId, AlertName, Entities, AlertType
| extend AnalyticRuleId = split(AlertType, "_")[-1]
| mv-expand Entities = todynamic(Entities)
| mv-expand Entities = todynamic(Entities)
| mv-expand kind=array key = bag_keys(Entities)
| extend PropertyName = tostring(key), PropertyValue = tostring(Entities[tostring(key)])
| where not(PropertyName startswith "$") and PropertyName != ""
| where PropertyName != "Type"
) on $left.AlertId == $right.SystemAlertId
| extend AnalyticRuleId = tostring(AnalyticRuleId)
| summarize EntitiesIncidentCount = count_distinct(IncidentNumber), EntitiesAlertCount = count_distinct(SystemAlertId), IncidentNumbers=make_set(IncidentNumber), SystemAlertIds=make_set(SystemAlertId), AlertNames = make_set(AlertName) by Title, AnalyticRuleId, PropertyName, PropertyValue
| project-reorder Title, AnalyticRuleId, PropertyName, PropertyValue, EntitiesIncidentCount, EntitiesAlertCount, AlertNames, IncidentNumbers, SystemAlertIds
| extend EntitiesAppearancesFormatted = strcat(PropertyName, ": ", PropertyValue, ", ", "IncidentCount: ", EntitiesIncidentCount, "
")
| extend EntitiesAppearances = pack(PropertyName, PropertyValue, "IncidentCount", EntitiesIncidentCount)
| order by toint(EntitiesAppearances.IncidentCount) desc
| summarize EntitiesAppearancesFormatted = make_list(EntitiesAppearancesFormatted), EntitiesAppearances = make_list(EntitiesAppearances) by AnalyticRuleId, Title
) on $left.Title == $right.Title and $left.RelatedAnalyticRuleId == $right.AnalyticRuleId
| project-away Title1, AnalyticRuleId
kql_query2: |
let lookback_time = ${{ parameters.lookback_time }}d;
let deleted_rules_authorized_entities = dynamic([]);
let disabled_rules_authorized_entities = dynamic([]);
let monthly_failures_ratio_threshold = ${{ parameters.monthly_failures_ratio_threshold }};
let daily_failures_ratio_threshold = ${{ parameters.daily_failures_ratio_threshold }};
let rule_auto_disabled_results = SentinelHealth
| where Time