Use Case Demo: Fields Quality (CIM and non-CIM)

Use Case Demo: Fields Quality

  • This white paper describes a new concept for performing continuous fields quality assessment on Splunk data.

  • This leverages different scalable Splunk techniques and TrackMe components to perform this task.

  • Fields quality assessment is a crucial aspect of Splunk monitoring, as it helps ensure that the data is accurate and consistent, ready to serve use cases.

  • By implementing this, you will be able to build a solid, robust, and scalable solution to monitor the quality of fields parsing for your various sourcetypes in Splunk, as well as getting automated alerts when fields quality issues are detected.

  • These concepts are applicable to both Splunk Common Information Model (CIM) and non-CIM data.

  • Finally, we leverage a TrackMe restricted component called Flex Objects (splk-flx) to perform the continuous fields quality assessment, although parts of this logic are available to TrackMe Community Edition users.

  • TrackMe version 2.1.18 and later is required to leverage this feature.

  • TrackMe version 2.1.19 added several utilities to support this use case and ease the implementation. (trackmefieldsqualityextract, trackmefieldsqualitygendict, trackmefieldsqualitygensummary)

  • This work was made possible thanks to the support and deep collaboration of a major fellow TrackMe customer, thank you!

High level workflow and diagram

The following diagram shows the high-level workflow for fields quality assessment:

diagram.png

From a high-level perspective, the workflow is as follows:

Step 1: Collect

  • The user defines a set of Splunk scheduled searches that leverage Splunk Sampling to sample data from the CIM data models or events of their choice.

  • These searches call the streaming TrackMe backend trackmefieldsquality which performs the assessment of the fields quality: - Using one of the different methods supported by the command, define the fields of interest to monitor. - The command verifies for common issues: missing, empty or null, equal to unknown - The command can also check the content of the fields using a submitted regular expression as part of a model provided in input - The command generates a JSON object with the results of the assessment, as well as the global summary of the assessment for the sampled event - Metadata are stored in the JSON object (index, sourcetype, etc.) which can also be extended as per the user needs

  • The search finally calls the Splunk collect command to index the JSON results using the TrackMe sourcetype trackme:fields_quality

Step 2: Monitor & Alert

  • A TrackMe Virtual Tenant is created and enables the TrackMe Flex Objects (splk-flx) component.

  • A Flex Object tracker is created which consumes the resulting JSON events, defines the associated entities and track the quality of the sampling over time.

  • Thresholds can be defined for each entity using TrackMe capabilities, to finally generate automated alerts when the percentages of compliance go beyond the defined thresholds.

Hint

Common Information Knowledge (CIM) context versus raw events context

  • This concepts apply equally to both the Common Information Model to any raw events in Splunk.

  • CIM parsing quality is generally a critical topic when use cases heavily rely on CIM, but there can be main use cases where you need to ensure of the parsing quality out of a CIM context.

  • These concepts are applicable to both CIM and non-CIM data, and you totally can simply use the same approach for both.

Phase 1: Collect

The primary step is to define what needs to be monitored depending on your needs and objectives.

The collect phase is highly flexible and scalable, in short the concept is the following:

  • Create a set of scheduled searches that you will execute on a regular basis, for instance once per day during night off-peak hours.

  • Each search will use the Splunk sampling feature, which allows randomly selecting a certain number of subset of events from the scope of the search.

  • It then calls the trackmefieldsquality command with different parameters, which performs the assessment of the fields quality and generates a JSON object per event.

  • Finally, the search will call the collect command to index the JSON results using the TrackMe sourcetype trackme:fields_quality.

About the Common Information Model (CIM)

  • If your objective is to monitor the quality of your CIM parsing, from the lens of the CIM data models, you will likely want to have at least one search per CIM data model and node.

  • Example: - 1 search for the Web data model - 1 search for the Network_Traffic data model - 1 search for the Malware data model - 1 search for the Endpoints.Process data model - etc.

Hint

About TrackMe system level sharing

  • By default, TrackMe shares its content, including the command trackmefieldsquality at the application level only.

  • This means that you cannot execute this command outside of the TrackMe application unless you share TrackMe at the system level.

  • You can update your system configuration to change this behavior. Go to Splunk Web, navigate to Manage Apps, and update permissions on TrackMe so that Apply selected role permissions to is set to All apps (system).

CIM: generate the JSON dictionary models

Hint

About the command trackmefieldsqualitygendict

  • This command is available in TrackMe version 2.1.19 and later.

  • It allows generating the JSON dictionary model for a given CIM data model.

  • It behaves similarly to the CIM Vladitor application and can restricted to only recommended fields per data model.

In this example, we will use the utility trackmefieldsqualitygendict to generate the JSON dictionary model for the Authentication, Web and Network_Traffic CIM data models, we will store these into a CSV lookup table so these can be modified as needed.

| trackmefieldsqualitygendict datamodel=Authentication show_only_recommended_fields=True
| append [ | trackmefieldsqualitygendict datamodel=Web show_only_recommended_fields=True ]
| append [ | trackmefieldsqualitygendict datamodel=Network_Traffic show_only_recommended_fields=True ]

| table datamodel, json_dict
| trackmeprettyjson fields=json_dict

| outputlookup cim_datamodels_dict.csv
gen-dict.png

Final collect example

In this example, we monitor the fields quality of the following data models:

  • Authentication

  • Web

  • Network_Traffic

This looks like the following:

final_collect_example.png

Phase 2: Monitor & Alert

This is the simplest part of the work!

In short, we will:

  • Create a new TrackMe Virtual Tenant dedicated to the purposes of monitoring the fields quality.

  • The Virtual Tenant enables the TrackMe Flex Objects (splk-flx) component.

  • We will create a Flex Object tracker using the out-of-the-box use case called splk_splunk_fields_quality.

  • The Flex Object tracker breaks our Metadata convention, automatically identifying the entities and tracking the quality of the sampling over time.

Creating the Virtual Tenant

We create a new Virtual Tenant called fields-quality:

virtual_tenant1.png virtual_tenant2.png virtual_tenant3.png

Creating the Flex Object tracker

Once we have a Virtual Tenant, we can create a Flex Object tracker using the out-of-the-box use case called splk_splunk_fields_quality:

flex_object_tracker1.png

We create a new Flex Object tracker using the out-of-the-box use case called splk_splunk_fields_quality:

flex_object_tracker2.png flex_object_tracker3.png

After a first execution, entities are created and ordered by Data Model:

flex_object_tracker4.png

Key Performance Indicators start to be collected:

flex_object_tracker5.png flex_object_tracker6.png

The default behavior will let you enabling and defining a threshold according to your needs:

flex_object_tracker7.png

Which would lead to turning this entity to red if the threshold is exceeded:

flex_object_tracker8.png

The Status message contains the details of the fields, including figures and most representitive values. (maxvals=15, as in the CIM Vladiator application)

flex_object_tracker9.png

We now have a complete and flexible solution to monitor the quality of the fields of your data models over time!

The details of the fields is stored in the KVstore in a JSON structure within the field extra_attributes, you can very easily access the fields and values:

| trackmegetcoll tenant_id=fields-quality component=flx
| table object, alias, extra_attributes
| trackmeprettyjson fields=extra_attributes

We also have access to sampled events so that we can easily build additional dashboards for investigation and analytic purposes.

Annex: usage and options for the command trackmefieldsquality

The command trackmefieldsquality is used in the collect phase to parse and validate the fields compliancy, this is a powerful and flexible Python backend that provides various options.

The command accepts the following parameters:

argument

description

default

example or valid values

fields_to_check_list

The list of fields to verify, provided as a comma-separated list

None

“action,app,bytes,url”

fields_to_check_fieldname

Name of the field containing the list of fields to check (comma-separated)

None

“fields_list”

fields_to_check_dict

JSON string containing a dictionary of fields to check with optional regex patterns

None

‘{“field1”: {“name”: “field1”, “regex”: “^[A-Z]+$”}, “field2”: {“name”: “field2”}}’

fields_to_check_dict_path

Path to a JSON file containing a dictionary of fields to check with optional regex patterns

None

“/opt/splunk/etc/apps/myapp/mydir/web_datamodel.json”

fields_to_check_dict_fieldname

Name of the field containing a JSON string with a dictionary of fields to check

None

fields_dict

include_field_values

Boolean option to include field values in the JSON summary

False

True/False

pretty_print_json

Boolean option to pretty print the JSON summary

True

True/False

output_mode

The mode to output the results (json or raw)

json

json

metadata_fields

CSV list of metadata fields to include in the metadata section of the JSON

index,sourcetype,host,source

“datamodel”

summary_fieldname

Defines the name of the summary field

summary

“summary”

metadata_fieldname

Defines the name of the metadata field added to the summary JSON

metadata

“metadata”

The first 5 options are mutually exclusive, only one of these options can be used at a time:

  • fields_to_check_list

  • fields_to_check_fieldname

  • fields_to_check_dict

  • fields_to_check_dict_path

  • fields_to_check_dict_fieldname

These options exist so that we can cover all use cases, this provides all levels of flexibility to use different Splunk techniques such as subsearches or simply storing the list of fields or models, dynamic generation in SPL, etc.

Let’s take an example over each of these options:

Argument: fields_to_check_list

This is the most simple use case:

  • Provide the list of fields to be checked as a comma-separated list

  • For each field, we will check the following: - Missing - Empty - Null - Equal to unknown

If the field passes all these checks, it is declared as valid with a status of success, and failure otherwise.

The command will store with the JSON object a section for the field, which includes flags for each check, with a boolean value True or False:

  • is_missing

  • is_empty

  • is_unknown

  • regex_failure

In addition, the command will account the field and its status in the summary section of the JSON object:

  • overall_status: the overall status of the field checks, either success or failure

  • total_fields_checked: the total number of fields checked

  • total_fields_failed: the total number of fields that failed

  • total_fields_passed: the total number of fields that passed

  • percentage_failed: the percentage of fields that failed

  • percentage_passed: the percentage of fields that passed

Note: this argument does NOT process a regular expression to check for the content, therefore the regex_failure flag will always be False. (see next options for this)

Argument: fields_to_check_fieldname

This does exactly the same as fields_to_check_list, but instead of providing the list of fields to check as a comma-separated list, we provide the name of a field that contains the list of fields to check.

You would therefore call the command as follows:

Example:

| eval fields_list="action,app,bytes,url"
| trackmefieldsquality fields_to_check_fieldname="fields_list"

The point of having this option is that could for instance use a Splunk subsearch to generate the list of fields dynamically, for instance by accessing a lookup table where you store the fields depending on your criteria, or any other solution of your choice.

Hint

Providing a JSON dictionary model

  • The next 3 options allow to provide a JSON dictionary model that models the fields to check, as well as optional parameters for each field.

  • Especially, you can define a regular expression with the field regex to be apply against the value, allowing to valid the content of the field according to any needs.

  • You can also define the field allow_unknown to be True or False, which can be used to disable the check for the field is_unknown.

Argument: fields_to_check_dict

This option is more sophisticated and allows to define a dictionnary that models the fields to check, and as well as an option regular expression to be apply against the value.

For instance, the following distionnary would verify the fields bytes including the fact that this should be a numerical value using a regular expression, the field action that for instance would accept only success or failure, and finally the field http_referrer where we would only perform the basic checks without verifying that its value matches certain criteria.

| trackmefieldsquality fields_to_check_dict="{\"bytes\": {\"name\": \"bytes\", \"regex\": \"^\\\d*\"}, \"action\": {\"name\": \"action\", \"regex\": \"^(success|failure)$\"}, \"http_referrer\": {\"name\": \"http_referrer\"}}" pretty_print_json=False output_mode=json metadata_fields="datamodel" include_field_values=True

In this case, if a regrex expression is provided, the regex_failure flag will be set to True if the value does not match the regular expression, and False otherwise, which accounts for status of the field in addition to the other checks.

Example of JSON output:

{
    "time": 1747261746,
    "bytes": {
        "status": "success",
        "description": "Field exists and is valid.",
        "is_missing": false,
        "is_empty": false,
        "is_unknown": false,
        "regex_failure": false,
        "value": "309"
    },
    "action": {
        "status": "failure",
        "description": "Field exists but value does not match the required pattern.",
        "is_missing": false,
        "is_empty": false,
        "is_unknown": false,
        "regex_failure": true,
        "value": "Bad Request"
    },
    "http_referrer": {
        "status": "failure",
        "description": "Field is 'unknown'.",
        "is_missing": false,
        "is_empty": false,
        "is_unknown": true,
        "regex_failure": false,
        "value": "unknown"
    },
    "summary": {
        "overall_status": "failure",
        "total_fields_checked": 3,
        "total_fields_failed": 2,
        "total_fields_passed": 1,
        "percentage_failed": 66.67,
        "percentage_passed": 33.33
    },
    "metadata": {
        "time_epoch": 1747261746,
        "time_human": "Wed May 14 22:29:06 2025 UTC",
        "index": "webserver",
        "sourcetype": "nginx:plus:kv",
        "host": "trackme-solutions.com",
        "source": "/var/log/nginx/access.log",
        "datamodel": "Web"
    },
    "event_id": "f5ed1437ee8486a3782ebaea846dad37c52d47b825d1913c1ae7d085ba01f943"
}

Hint

Escaping backslashes and special characters

  • The tricky part is that you need to pay attention to the JSON provided as input to the command

  • Especially, double quotes within the JSON string need to be escaped.

  • The regular expression also needs to be escaped, for instance ^\\\d* which would otherwise be \d* in normal circumstances.

Argument: fields_to_check_dict_fieldname

Similarly to fields_to_check_fieldname, this option allows to provide the name of a field that contains the dictionary of fields to check as in the previous example.

This allows to use a Splunk subsearch to generate the dictionary dynamically, for instance by accessing a lookup table where you store the fields depending on your criteria, or any other solution of your choice.

Example:

| eval fields_dict="{\"bytes\": {\"name\": \"bytes\", \"regex\": \"^\\\d*\"}, \"action\": {\"name\": \"action\", \"regex\": \"^(success|failure)$\"}, \"http_referrer\": {\"name\": \"http_referrer\"}}"
| trackmefieldsquality fields_to_check_dict_fieldname="fields_dict" pretty_print_json=False output_mode=json metadata_fields="datamodel" include_field_values=True

Argument: fields_to_check_dict_path

This option is similar to fields_to_check_dict_fieldname, but instead of providing the dictionary as a JSON string, we provide the path to a JSON file that contains the dictionary.

The file must exist on the file system of the Splunk instance, and the path must be provided as a string.

Our JSON file would look like this:

{
    "action": {
        "name": "action",
        "regex": "^(success|failure)$"
    },
    "bytes": {
        "name": "bytes",
        "regex": "^\\d*"
    },
    "http_referrer": {
        "name": "http_referrer"
    }
}

Example:

| trackmefieldsquality fields_to_check_dict_path="/opt/splunk/etc/apps/myapp/mydir/web_datamodel.json" pretty_print_json=False output_mode=json metadata_fields="datamodel" include_field_values=True

Argument: include_field_values

This option allows to include the field values in the JSON output, which can useful for analytics and reporting purposes.

include_field_values.png

Argument: pretty_print_json

This option allows to pretty print the JSON output, which can be useful for debugging purposes.

pretty_print_json1.png pretty_print_json2.png

Argument: output_mode

This option allows to specify the output mode, which can be json or raw.

We recommend using the json mode for most use cases, especially this document is focused on the JSON output which is leveraged and indexed using the TrackMe sourcetype trackme:fields_quality.

output_mode=json

In JSON mode, the command verifies the fields and generates a JSON object per event:

json_output1.png json_output2.png json_output3.png

output_mode=raw

In raw mode, the command generates the events as they are, and adds a field called json_summary which contains our JSON object:

raw_output1.png raw_output2.png

Argument: metadata_fields

This option allows to specify the metadata fields as a comma-separated list of fieldsto include in the JSON output.

The metadata fields always include the following:

  • index: the index of the event

  • sourcetype: the sourcetype of the event

  • host: the host of the event

  • source: the source of the event

By defining the metadata_fields parameter, you can add additional fields to the JSON output, for instance the datamodel field which in our implementation is used to identify the data model of the event.

In our example, we are defining a field called datamodel using an eval which will be added to the JSON output:

| eval datamodel="Web"
| trackmefieldsquality fields_to_check_list="action,app,bytes,url" pretty_print_json=False output_mode=json metadata_fields="datamodel" include_field_values=True
metadata_fields.png

Argument: summary_fieldname

This option allows to specify the name of the summary field in the JSON output, this defaults to summary but can be customised if needed, for instance if there is a conflict with a field from the data model or events.

In this example, instead of summary, we use quality_summary:

| trackmefieldsquality fields_to_check_list="action,app,bytes,url" pretty_print_json=False output_mode=json metadata_fields="datamodel" include_field_values=True summary_fieldname="quality_summary"
summary_fieldname.png

Argument: metadata_fieldname

This option allows to specify the name of the metadata field in the JSON output, this defaults to metadata but can be customised if needed, for instance if there is a conflict with a field from the data model or events.

In this example, instead of metadata, we use quality_metadata:

| trackmefieldsquality fields_to_check_list="action,app,bytes,url" pretty_print_json=False output_mode=json metadata_fields="datamodel" include_field_values=True metadata_fieldname="quality_metadata"
metadata_fieldname.png

Annex: usage and options for the command trackmefieldsqualitygendict

The command trackmefieldsqualitygendict is used to generate JSON dictionary models for CIM data models, which can then be used with the trackmefieldsquality command for fields quality assessment.

The command accepts the following parameters:

argument

description

default

example or valid values

datamodel

The name of the CIM data model to generate the dictionary for

None (required)

“Authentication”, “Web”, “Network_Traffic”

show_only_recommended_fields

Boolean option to only include recommended fields from the data model

False

True/False

Usage:

| trackmefieldsqualitygendict datamodel=<datamodel name> show_only_recommended_fields=<boolean>

Examples:

Generate dictionary for Authentication data model with only recommended fields:

| trackmefieldsqualitygendict datamodel="Authentication" show_only_recommended_fields=true

Generate dictionary for Web data model with all fields:

| trackmefieldsqualitygendict datamodel="Web" show_only_recommended_fields=false

Generate dictionary for Network_Traffic data model with only recommended fields:

| trackmefieldsqualitygendict datamodel="Network_Traffic" show_only_recommended_fields=true

Output:

The command generates a JSON dictionary that can be used with the trackmefieldsquality command. The output includes:

  • Field names as keys

  • Field configuration objects as values, including: - name: The field name - regex: Optional regular expression pattern for validation - allow_unknown: Boolean flag to allow “unknown” values

Example output for Web data model:

{
    "action": {
        "name": "action",
        "regex": "^(success|failure|allowed|blocked|deferred)$",
        "allow_unknown": false
    },
    "app": {
        "name": "app",
        "regex": "^[\\w:\\-\\d\\s]+$",
        "allow_unknown": false
    },
    "dest": {
        "name": "dest",
        "regex": "^[\\w\\.-]+$",
        "allow_unknown": false
    },
    "signature": {
        "name": "signature",
        "regex": "^.{3,100}$",
        "allow_unknown": false
    },
    "src": {
        "name": "src",
        "regex": "^[\\w\\.-]+$",
        "allow_unknown": false
    },
    "src_user": {
        "name": "src_user",
        "regex": "^[\\w\/\\\\\\-\\.$]{1,30}$",
        "allow_unknown": false
    },
    "user": {
        "name": "user",
        "regex": "^[\\w\/\\\\\\-\\.$]{1,30}$",
        "allow_unknown": false
    }
}

Integration with fields quality workflow:

This command is typically used in the first phase of the fields quality workflow to generate the dictionary models that will be stored in lookup tables and used by the collect scheduled searches:

| trackmefieldsqualitygendict datamodel=Authentication show_only_recommended_fields=True
| append [ | trackmefieldsqualitygendict datamodel=Web show_only_recommended_fields=True ]
| append [ | trackmefieldsqualitygendict datamodel=Network_Traffic show_only_recommended_fields=True ]
| table datamodel, json_dict
| trackmeprettyjson fields=json_dict
| outputlookup cim_datamodels_dict.csv

The generated lookup table can then be used in the collect scheduled searches:

| lookup cim_datamodels_dict.csv datamodel OUTPUT json_dict as quality_dict
| trackmefieldsquality fields_to_check_dict_fieldname="quality_dict" output_mode=json metadata_fields="datamodel,nodename" include_field_values=True

Annex: usage and options for the command trackmefieldsqualitygensummary

The command trackmefieldsqualitygensummary is used to generate a summary of the quality of fields in records that have been processed by the trackmefieldsquality command. This command is typically used in the monitoring phase to aggregate and summarize field quality data for reporting and analysis purposes.

The command accepts the following parameters:

argument

description

default

example or valid values

maxvals

Max number of distinct values to report in field_values

15

10, 20, 50

fieldvalues_format

Format of field_values, either list or csv

csv

list, csv

groupby_metadata_fields

Comma-separated list of metadata fields to group by in addition to fieldname

“” (empty)

“metadata.datamodel,metadata.nodename,metadata.index,metadata.sourcetype”

Usage:

| trackmefieldsqualitygensummary maxvals=<max number of distinct values to report> fieldvalues_format=<format of field_values, either list or csv> groupby_metadata_fields=<comma separated list of metadata fields to group by in addition to fieldname>

Examples:

Generate summary with default settings:

| trackmefieldsqualitygensummary

Generate summary with custom maxvals and list format:

| trackmefieldsqualitygensummary maxvals=20 fieldvalues_format=list

Generate summary with grouping by metadata fields:

| trackmefieldsqualitygensummary maxvals=15 fieldvalues_format=csv groupby_metadata_fields="metadata.datamodel,metadata.nodename,metadata.index,metadata.sourcetype"

Output:

The command generates a summary that includes:

  • fieldname: The name of the field being analyzed

  • total_events: Total number of events processed for this field

  • distinct_value_count: Number of distinct values found for this field

  • percent_coverage: Percentage of events where this field has a value

  • field_values: Summary of the most common values for this field (limited by maxvals)

Example output:

trackmefieldsqualitygensummary_output.png

Example of field_values output:

The field_values column contains a summary of the most representative values for each field. For example:

  • CSV format: “98.65% success,1.35% unknown”

Note: the list format renders the same information but as a proper list, which will appear in Splunk as a multi-value field.

Integration with fields quality workflow:

This command is typically used in the monitoring phase, specifically within the Flex Object tracker SPL logic to generate summaries of field quality data:

| search index=summary sourcetype=trackme:fields_quality metadata.sourcetype=* metadata.source=* metadata.host=*
| trackmefieldsqualityextract
| table _time, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, value
| sort 0 _time
| trackmefieldsqualitygensummary maxvals=15 fieldvalues_format=csv groupby_metadata_fields="metadata.index,metadata.sourcetype,metadata.datamodel,metadata.nodename"

The output from this command is then used to populate the extra_attributes field in the Flex Object tracker, providing detailed field quality information for each entity.

Argument: maxvals

This option controls the maximum number of distinct values to report in the field_values column. This is useful for limiting the output size while still providing meaningful insights into the most common values for each field.

Example:

| trackmefieldsqualitygensummary maxvals=10

This would limit the field_values output to show only the top 10 most common values for each field.

Argument: fieldvalues_format

This option controls the format of the field_values output. Two formats are supported:

  • csv: Comma-separated values format (default)

  • list: List format with percentages in parentheses

CSV format example: “98.65% success,1.35% unknown”

List format example: “success (98.65%), unknown (1.35%)”

Example:

| trackmefieldsqualitygensummary fieldvalues_format=list

Argument: groupby_metadata_fields

This option allows grouping the summary by additional metadata fields beyond the default fieldname grouping. This is particularly useful when you want to analyze field quality across different dimensions such as data models, nodes, indexes, or sourcetypes.

Example:

| trackmefieldsqualitygensummary groupby_metadata_fields="metadata.datamodel,metadata.nodename,metadata.index,metadata.sourcetype"

This would generate separate summaries for each combination of datamodel, nodename, index, and sourcetype, allowing for more granular analysis of field quality across different contexts.

Integration example in Flex Object tracker:

In the Flex Object tracker source code, this command is used to generate detailed field summaries that are stored in the entity’s extra_attributes:

| join type=outer metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname [ search index=summary sourcetype=trackme:fields_quality metadata.sourcetype=* metadata.source=* metadata.host=*
| trackmefieldsqualityextract
| table _time, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, value
| sort 0 _time
| trackmefieldsqualitygensummary maxvals=15 fieldvalues_format=csv groupby_metadata_fields="metadata.index,metadata.sourcetype,metadata.datamodel,metadata.nodename"
| fields metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, total_events, distinct_value_count, percent_coverage, field_values | fields - _time, _raw
]

``` generate the per field json for reporting purposes, we also rename fieldname to @fieldname so it appears first in the JSON ```
| rename fieldname as @fieldname, fieldstatus as @fieldstatus
| tojson
| rename _raw as per_field_json

``` calculate ```
| stats values(eval(if('@fieldstatus'=="success", '@fieldname', null()))) as list_fields_passed, values(eval(if('@fieldstatus'=="failure", '@fieldname', null()))) as list_fields_failed, values(per_field_json) as per_field_json, max(total_events) as total_events_parsed by metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename

``` format the per_field_json ```
| eval per_field_json = "[" . mvjoin(per_field_json, ", ") . "]"

``` build the list of fields that passed and failed ```
| eval all_fields = mvappend(list_fields_failed, list_fields_passed)
| eval all_fields = mvdedup(all_fields)
| eval final_state = mvmap(
all_fields,
if(
mvfind(list_fields_failed, "^" . all_fields . "$") >= 0,
all_fields . "|failed",
all_fields . "|success"
)
)
| eval success_fields = mvfilter(match(final_state, "\|success$"))
| eval failed_fields = mvfilter(match(final_state, "\|failed$"))
| eval success_fields = mvmap(success_fields, mvindex(split(success_fields, "|"), 0))
| eval failed_fields  = mvmap(failed_fields,  mvindex(split(failed_fields, "|"), 0))
| fields - final_state
| eval success_fields=if(isnull(success_fields), "", success_fields), failed_fields=if(isnull(failed_fields), "", failed_fields)
| fields - list_fields_passed, list_fields_failed

``` calculate ```
| eventstats dc(all_fields) as total_fields_checked, dc(success_fields) as total_fields_passed, dc(failed_fields) as total_fields_failed by metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename
| eval percentage_passed=round(total_fields_passed/total_fields_checked*100, 2), percentage_failed=round(total_fields_failed/total_fields_checked*100, 2)

``` rename ```
| rename metadata.* as "*"

``` save this as parts of extra attributes ```
| eval extra_attributes = "{" . "\"success_fields\": \"" . mvjoin(success_fields, ",") . "\", \"failed_fields\": \"" . mvjoin(failed_fields, ",") . "\", " . "\"fields\":" . per_field_json . "}"
| fields - per_field_json

``` set principal metadata for the flex entity ```
| eval group = datamodel
| eval object = nodename . ":" . index . ":" . sourcetype, alias=index . ":" . sourcetype
| eval object_description = "CIM Quality for DM: " . datamodel . ":" . nodename . ", index:" . index . ", sourcetype:" . sourcetype

``` gen metrics ```
| eval metrics = "{" .
"\"fields_quality.percentage_passed\": " . if(isnum(percentage_passed), percentage_passed, 0) . ", " .
"\"fields_quality.percentage_failed\": " . if(isnum(percentage_failed), percentage_failed, 0) . ", " .
"\"fields_quality.total_fields_checked\": " . if(isnum(total_fields_checked), total_fields_checked, 0) . ", " .
"\"fields_quality.total_fields_failed\": " . if(isnum(total_fields_failed), total_fields_failed, 0) . ", " .
"\"fields_quality.total_fields_passed\": " . if(isnum(total_fields_passed), total_fields_passed, 0) . ", " .
"\"fields_quality.total_events_parsed\": " . if(isnum(total_events_parsed), total_events_parsed, 0) . "}"

``` no outliers for now ```
| eval outliers_metrics="{}"

``` basic status, thresholds can be defined on a per entity basis ```
| eval status=1
| eval status_description="DM Quality: " . metrics
| eval status_description_short="% passed: " . percentage_passed . ", checked: " . total_fields_checked . ", passed: " . total_fields_passed . ", failed: " . total_fields_failed

``` this sets a default threshold, which can then be overriden on a per entity basis via the TrackMe UI ```
| eval default_threshold = "{'metric_name': 'fields_quality.percentage_passed', 'operator': '>=', 'value': 95, 'condition_true': 1}"

```alert if inactive for more than 2 days```
| eval max_sec_inactive=86400*2

Annex: Understanding & troubleshooting the percentage of success

The percentage of success is calculated by the main Flex tracker SPL logic, it represents the percentage of times the field is passing the quality check, a field is considered as passing the quality check if:

  • is_empty: false

  • is_null: false

  • is_unknown: false (depending on if a dictionary is used and if in the dictionary and for this field, unknown is allowed or not)

  • regex_failure: false (depending on if a dictionary is used and if there is a regex expression for this field)

index=summary sourcetype=trackme:fields_quality metadata.sourcetype=* metadata.source=* metadata.host=*
| table _time, _raw
| trackmefieldsqualityextract

| stats count as count_total, count(eval(status=="success")) as count_success, count(eval(status=="failure")) as count_failure by fieldname, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename
| eval percentage_success=round(count_success/count_total*100, 2)

In this example, we are looking at the percentage of success for the field “action” in the datamodel Network_Traffic and nodename All_Traffic, for the sourcetype cisco:asa:

index=summary sourcetype=trackme:fields_quality metadata.datamodel=Network_Traffic metadata.nodename=All_Traffic metadata.sourcetype=cisco:asa metadata.source=* metadata.host=*
| table _time, _raw
| trackmefieldsqualityextract

| stats values(value) as values, values(regex_expression) as regex_expression, count as count_total, count(eval(status=="success")) as count_success, count(eval(status=="failure")) as count_failure by fieldname, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename
| eval percentage_success=round(count_success/count_total*100, 2)

``` set the threshold per field, this defines if the field is considered as passed or failed globally, this threshold has to be part of the SPL logic ```
| eval threshold=99

``` flag field ```
| eval fieldstatus=if(percentage_success>=threshold, "success", "failure")

| where fieldname="action"

Annex: Understanding and troubleshooting the percentage of coverage and others summary information per field

The percentage of coverage is reported by the trackmefieldsqualitygensummary command, it represents the percentage of events where the field has a value, that is not null and not empty. (it is for instance present equally in the CIM Vladiator table)

In the Flex tracker, this is the join part:

search index=summary sourcetype=trackme:fields_quality metadata.sourcetype=* metadata.source=* metadata.host=*
| trackmefieldsqualityextract
| table _time, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, value, regex_expression
``` sort is mandatory to force all records to be retrieved before we call the gen summary command ```
| sort 0 _time
| trackmefieldsqualitygensummary maxvals=15 fieldvalues_format=csv groupby_metadata_fields="metadata.index,metadata.sourcetype,metadata.datamodel,metadata.nodename"
| fields metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, total_events, distinct_value_count, percent_coverage, field_values, regex_expression | fields - _time, _raw

You can for example run this search to get the percentage of coverage for the field “action” in the datamodel Network_Traffic and nodename All_Traffic, for the sourcetype cisco:asa:

index=summary sourcetype=trackme:fields_quality metadata.datamodel=Network_Traffic metadata.nodename=All_Traffic metadata.sourcetype=cisco:asa metadata.source=* metadata.host=*
| table _time, _raw
| trackmefieldsqualityextract

| table _time, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, value, regex_expression
``` sort is mandatory to force all records to be retrieved before we call the gen summary command ```
| sort 0 _time
| trackmefieldsqualitygensummary maxvals=15 fieldvalues_format=csv groupby_metadata_fields="metadata.index,metadata.sourcetype,metadata.datamodel,metadata.nodename"
| fields metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, total_events, distinct_value_count, percent_coverage, field_values, regex_expression | fields - _time, _raw

| where fieldname="action"

This would be equivalent to run the following search:

index=summary sourcetype=trackme:fields_quality metadata.datamodel=Network_Traffic metadata.nodename=All_Traffic metadata.sourcetype=cisco:asa metadata.source=* metadata.host=* | head 100
| table _time, _raw
| trackmefieldsqualityextract
| table _time, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, value, is_empty, is_missing

| where fieldname="action"

| stats count, count(eval(is_empty=0 AND is_missing=0)) as count_covered, count(eval(is_empty=1 OR is_missing=1)) as count_not_covered
| eval percentage_covered=count_covered/count*100

Annex: Per field table statistics

The following search example shows the statistics per field:

index=summary sourcetype=trackme:fields_quality
``` you can filter our metadata fields to focus on a specific sourcetype, index, etc...```
| search metadata.datamodel="Web"

``` stats ```
| fields - summary.* metadata.*
| stats first(*status) as "*status" by event_id
| rename "*.status" as "*"

``` untable ```
| untable event_id, fieldname, value
| stats count, count(eval(value=="failure")) as count_failure, count(eval(value=="success")) as count_success by event_id, fieldname

``` calculate ```
| eval pct_compliance=round(count_success/count*100, 2)

``` aggreg ```
| stats avg(pct_compliance) as avg_pct_compliance by fieldname
| foreach *pct* [ eval <<FIELD>> = round('<<FIELD>>', 2) ]
| rename fieldname as field
per_field_table_statistics.png

Annex: Looking after a specific field

The following search example shows the statistics per field:

index=summary sourcetype=trackme:fields_quality
``` you can filter our metadata fields to focus on a specific sourcetype, index, etc...```
| search metadata.datamodel="Web"
| table _time, action.*
looking_after_a_specific_field.png

Annex: Flex Object tracker SPL logic

The following search example shows the Flex Object tracker SPL logic:

index=summary sourcetype=trackme:fields_quality metadata.sourcetype=* metadata.source=* metadata.host=*
| table _time, _raw
| trackmefieldsqualityextract

| stats count as count_total, count(eval(status=="success")) as count_success, count(eval(status=="failure")) as count_failure by fieldname, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename
| eval percentage_success=round(count_success/count_total*100, 2)

``` set the threshold per field, this defines if the field is considered as passed or failed globally, this threshold has to be part of the SPL logic ```
| eval threshold=99

``` flag field ```
| eval fieldstatus=if(percentage_success>=threshold, "success", "failure")

``` join field summary ```
| join type=outer metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname [ search index=summary sourcetype=trackme:fields_quality metadata.sourcetype=* metadata.source=* metadata.host=*
| trackmefieldsqualityextract
| table _time, metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, value, regex_expression
``` sort is mandatory to force all records to be retrieved before we call the gen summary command ```
| sort 0 _time
| trackmefieldsqualitygensummary maxvals=15 fieldvalues_format=csv groupby_metadata_fields="metadata.index,metadata.sourcetype,metadata.datamodel,metadata.nodename"
| fields metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename, fieldname, total_events, distinct_value_count, percent_coverage, field_values, regex_expression | fields - _time, _raw
]

``` generate the per field json for reporting purposes, we also rename fieldname to @fieldname so it appears first in the JSON ```
| rename fieldname as @fieldname, fieldstatus as @fieldstatus
| tojson
| rename _raw as per_field_json

``` calculate ```
| stats values(eval(if('@fieldstatus'=="success", '@fieldname', null()))) as list_fields_passed, values(eval(if('@fieldstatus'=="failure", '@fieldname', null()))) as list_fields_failed, values(per_field_json) as per_field_json, max(total_events) as total_events_parsed by metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename

``` format the per_field_json ```
| eval per_field_json = "[" . mvjoin(per_field_json, ", ") . "]"

``` build the list of fields that passed and failed ```
| eval all_fields = mvappend(list_fields_failed, list_fields_passed)
| eval all_fields = mvdedup(all_fields)
| eval final_state = mvmap(
all_fields,
if(
mvfind(list_fields_failed, "^" . all_fields . "$") >= 0,
all_fields . "|failed",
all_fields . "|success"
)
)
| eval success_fields = mvfilter(match(final_state, "\|success$"))
| eval failed_fields = mvfilter(match(final_state, "\|failed$"))
| eval success_fields = mvmap(success_fields, mvindex(split(success_fields, "|"), 0))
| eval failed_fields  = mvmap(failed_fields,  mvindex(split(failed_fields, "|"), 0))
| fields - final_state
| eval success_fields=if(isnull(success_fields), "", success_fields), failed_fields=if(isnull(failed_fields), "", failed_fields)
| fields - list_fields_passed, list_fields_failed

``` calculate ```
| eventstats dc(all_fields) as total_fields_checked, dc(success_fields) as total_fields_passed, dc(failed_fields) as total_fields_failed by metadata.index, metadata.sourcetype, metadata.datamodel, metadata.nodename
| eval percentage_passed=round(total_fields_passed/total_fields_checked*100, 2), percentage_failed=round(total_fields_failed/total_fields_checked*100, 2)

``` rename ```
| rename metadata.* as "*"

``` save this as parts of extra attributes ```
| eval extra_attributes = "{" . "\"success_fields\": \"" . mvjoin(success_fields, ",") . "\", \"failed_fields\": \"" . mvjoin(failed_fields, ",") . "\", " . "\"fields\":" . per_field_json . "}"
| fields - per_field_json

``` set principal metadata for the flex entity ```
| eval group = datamodel
| eval object = nodename . ":" . index . ":" . sourcetype, alias=index . ":" . sourcetype
| eval object_description = "CIM Quality for DM: " . datamodel . ":" . nodename . ", index:" . index . ", sourcetype:" . sourcetype

``` gen metrics ```
| eval metrics = "{" .
"\"fields_quality.percentage_passed\": " . if(isnum(percentage_passed), percentage_passed, 0) . ", " .
"\"fields_quality.percentage_failed\": " . if(isnum(percentage_failed), percentage_failed, 0) . ", " .
"\"fields_quality.total_fields_checked\": " . if(isnum(total_fields_checked), total_fields_checked, 0) . ", " .
"\"fields_quality.total_fields_failed\": " . if(isnum(total_fields_failed), total_fields_failed, 0) . ", " .
"\"fields_quality.total_fields_passed\": " . if(isnum(total_fields_passed), total_fields_passed, 0) . ", " .
"\"fields_quality.total_events_parsed\": " . if(isnum(total_events_parsed), total_events_parsed, 0) . "}"

``` no outliers for now ```
| eval outliers_metrics="{}"

``` basic status, thresholds can be defined on a per entity basis ```
| eval status=1
| eval status_description="DM Quality: " . metrics
| eval status_description_short="% passed: " . percentage_passed . ", checked: " . total_fields_checked . ", passed: " . total_fields_passed . ", failed: " . total_fields_failed

``` this sets a default threshold, which can then be overriden on a per entity basis via the TrackMe UI ```
| eval default_threshold = "{'metric_name': 'fields_quality.percentage_passed', 'operator': '>=', 'value': 95, 'condition_true': 1}"

```alert if inactive for more than 2 days```
| eval max_sec_inactive=86400*2