Skip to content

Labels module

This module contains labeling rule implementations and utility functions used during the labeling process

EqlQueryBase pydantic-model

Pydantic model representing an EQL query

batch_size: int pydantic-field

The amount of sequences to update with each batch. Cannot be bigger than max_result_window

by: Union[List[str], str] pydantic-field

Optional global sequence by fields

event_category_field: str pydantic-field

The field used to categories events

filter_: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

The filter/s to limit queried to documents to only those that match the filters

index: Union[List[str], str] pydantic-field

The indices to query (by default prefixed with the dataset name)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

max_result_window: int pydantic-field

The max result window allowed on the elasticsearch instance

max_span: str pydantic-field

Optional max time span in which a sequence must occur to be considered a match

sequences: str pydantic-field required

Event sequences to search. Must contain at least two events.

tiebreaker_field: str pydantic-field

(Optional, string) Field used to sort hits with the same timestamp in ascending order.

timestamp_field: str pydantic-field

The field containing the event timestamp

until: str pydantic-field

Optional until event marking the end of valid sequences. The until event will not be labeled.

query(self)

Converts the EQL query object into an EQL query string.

Returns:

Type Description
str

The query in EQL syntax

Source code in dataset/labels.py
def query(self) -> str:
    """Converts the EQL query object into an EQL query string.

    Returns:
        The query in EQL syntax
    """
    query = "sequence"

    if self.by is not None:
        if isinstance(self.by, Text):
            query += f" by {self.by}"
        elif len(self.by) > 0:
            query += f" by {', '.join(self.by)}"

    if self.max_span is not None:
        query += f" with maxspan={self.max_span}"

    for sequence in self.sequences:
        query += f"\n  {sequence}"

    if self.until is not None:
        query += f"\nuntil {self.until}"
    return query

EqlSequenceRule pydantic-model

Applies labels to a sequence of log events defined by an EQL query.

This labeling rule is defined as an EQL query. Using this syntax it is possible to define a sequence of related events and retrieve them. All events part of retrieved sequences are then labeled.

Examples:

- type: elasticsearch.sequence
  id: attacker.webshell.upload.seq
  labels: [webshell_upload]
  description: >-
    This rule labels the web shell upload step by matching the 3 step sequence
    within the foothold phase.
  index:
    - apache_access-intranet_server
  # since we do these requests very fast
  # we need the line number as tie breaker
  tiebreaker_field: log.file.line
  by: source.address
  max_span: 2m
  filter:
    range:
      "@timestamp":
        # foothold phase start
        gte: "2021-03-23 20:31:00+00:00"
        # foothold phase stop
        lte: "2021-03-23 21:13:52+00:00"
  sequences:
    - '[ apache where event.action == "access" and url.original == "/" ]'
    - '[ apache where event.action == "access" and url.original == "/?p=5" ]'
    - '[ apache where event.action == "access" and http.request.method == "POST" and url.original == "/wp-admin/admin-ajax.php" ]'

batch_size: int pydantic-field

The amount of sequences to update with each batch. Cannot be bigger than max_result_window

by: Union[List[str], str] pydantic-field

Optional global sequence by fields

description: str pydantic-field

An optional description for the rule

event_category_field: str pydantic-field

The field used to categories events

filter_: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

The filter/s to limit queried to documents to only those that match the filters

id_: str pydantic-field required

The unique rule id

index: Union[List[str], str] pydantic-field

The indices to query (by default prefixed with the dataset name)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

labels: str pydantic-field required

The list of labels to apply to log lines matching this rule

max_result_window: int pydantic-field

The max result window allowed on the elasticsearch instance

max_span: str pydantic-field

Optional max time span in which a sequence must occur to be considered a match

sequences: str pydantic-field required

Event sequences to search. Must contain at least two events.

tiebreaker_field: str pydantic-field

(Optional, string) Field used to sort hits with the same timestamp in ascending order.

timestamp_field: str pydantic-field

The field containing the event timestamp

type_field: str pydantic-field required

The rule type as passed in from the config

until: str pydantic-field

Optional until event marking the end of valid sequences. The until event will not be labeled.

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the labels to all events part of retrieved sequences.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

The number of rows the labels were applied to.

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the labels to all events part of retrieved sequences.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        The number of rows the labels were applied to.
    """
    index: Optional[Union[Sequence[str], str]] = resolve_indices(
        dataset_config.name, self.indices_prefix_dataset, self.index
    )

    body = self._make_body(label_object)

    updated = 0
    # as of elk 7.12 of there is no way to ensure we get all even through the EQL api
    # (there is no scan or search after like for the DSL API)
    # so manually search in batches for sequences with events that are not labeled yet
    # we stop only when we do not get any results anymore i.e., all events have been labeled
    # this is obviously not the most efficient approach but its the best we can do for now
    while True:
        hits = search_eql(es, index, body)
        if hits["total"]["value"] > 0:
            index_ids: Dict[str, List[str]] = {}
            # we have to sort the events by indices
            # because ids are only guranteed to be uniq per index
            for sequence in hits["sequences"]:
                for event in sequence["events"]:
                    index_ids.setdefault(event["_index"], []).append(event["_id"])
            # add labels to each event per index
            for _index, ids in index_ids.items():
                # split the update requests into chunks of at most max result window
                update_chunks = [
                    ids[i : i + self.max_result_window]
                    for i in range(0, len(ids), self.max_result_window)
                ]
                for chunk in update_chunks:
                    update = UpdateByQuery(using=es, index=_index).query(
                        "ids", values=chunk
                    )
                    # apply labels to events
                    updated += apply_labels_by_update_dsl(
                        update, self.update_params(), update_script_id
                    )
        else:
            # end loop once we do not find new events anymore
            break

    return updated

query(self) inherited

Converts the EQL query object into an EQL query string.

Returns:

Type Description
str

The query in EQL syntax

Source code in dataset/labels.py
def query(self) -> str:
    """Converts the EQL query object into an EQL query string.

    Returns:
        The query in EQL syntax
    """
    query = "sequence"

    if self.by is not None:
        if isinstance(self.by, Text):
            query += f" by {self.by}"
        elif len(self.by) > 0:
            query += f" by {', '.join(self.by)}"

    if self.max_span is not None:
        query += f" with maxspan={self.max_span}"

    for sequence in self.sequences:
        query += f"\n  {sequence}"

    if self.until is not None:
        query += f"\nuntil {self.until}"
    return query

update_params(self) inherited

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """

    return {
        "rule": self.id_,
        "labels": self.labels,
    }

LabelException

Generic exception for errors during labeling phase.

Labeler

Cyber Range Kyoushi labeler

This class is used to configure and execute the labeling process.

__init__(self, rule_types={}, update_script_id='kyoushi_label_update', label_object='kyoushi_labels') special

Parameters:

Name Type Description Default
rule_types Dict[str, Any]

Dictionary containing all available labeling rule types.

{}
update_script_id str

The Elasticsearch ID for the update script.

'kyoushi_label_update'
label_object str

The field to store labels in.

'kyoushi_labels'
Source code in dataset/labels.py
def __init__(
    self,
    rule_types: Dict[str, Any] = {},
    update_script_id: str = "kyoushi_label_update",
    label_object: str = "kyoushi_labels",
):
    """
    Args:
        rule_types : Dictionary containing all available labeling rule types.
        update_script_id: The Elasticsearch ID for the update script.
        label_object: The field to store labels in.
    """
    self.rule_types: Dict[str, Any] = rule_types
    # default rule types
    self.rule_types.update(
        {
            NoopRule.type_: NoopRule,
            UpdateByQueryRule.type_: UpdateByQueryRule,
            UpdateSubQueryRule.type_: UpdateSubQueryRule,
            UpdateParentQueryRule.type_: UpdateParentQueryRule,
            EqlSequenceRule.type_: EqlSequenceRule,
        }
    )
    self.update_script_id: str = update_script_id
    self.label_object: str = label_object

add_label_object_mapping(self, es, dataset_name, rules)

Utility function for adding the field definitions for the label field.

Parameters:

Name Type Description Default
es Elasticsearch

The elasticsearch client object.

required
dataset_name str

The name of the dataset.

required
rules List[cr_kyoushi.dataset.labels.Rule]

List of all to be applied rules.

required
Source code in dataset/labels.py
def add_label_object_mapping(
    self,
    es: Elasticsearch,
    dataset_name: str,
    rules: List[Rule],
):
    """Utility function for adding the field definitions for the label field.

    Args:
        es: The elasticsearch client object.
        dataset_name: The name of the dataset.
        rules: List of all to be applied rules.
    """
    root = Mapping()
    flat = {}
    list_ = {}

    for rule in rules:
        flat[rule.id_] = Keyword()
        list_[rule.id_] = Keyword(multi=True)

    properties = {
        "flat": {
            "properties": flat,
        },
        "list": {
            "properties": list_,
        },
        "rules": {"type": "keyword"},
    }
    root.field(self.label_object, "object", properties=properties)
    es.indices.put_mapping(index=f"{dataset_name}-*", body=root.to_dict())

execute(self, rules, dataset_dir, dataset_config, es)

Parse and apply all labeling rules.

Note

This function only write labels to the database. The resulting labels can be written to the file system using the write(...) function of the labeler.

Parameters:

Name Type Description Default
rules List[Dict[str, Any]]

The unparsed list of labeling rules.

required
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required

Exceptions:

Type Description
ValidationError

If a labeling rule or configuration is invalid

LabelException

If an error occurs while applying a labeling rule

Source code in dataset/labels.py
def execute(
    self,
    rules: List[Dict[str, Any]],
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
):
    """Parse and apply all labeling rules.

    !!! Note
        This function only write labels to the database.
        The resulting labels can be written to the file system
        using the `write(...)` function of the labeler.

    Args:
        rules: The unparsed list of labeling rules.
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        es: The elasticsearch client object

    Raises:
        ValidationError: If a labeling rule or configuration is invalid
        LabelException: If an error occurs while applying a labeling rule
    """
    # validate the general rule list
    RuleList.parse_obj(rules)

    # convert and validate rule types
    rule_objects: List[Rule] = []
    errors: List[ErrorWrapper] = []
    for r in rules:
        try:
            rule_objects.append(self.rule_types[r["type"]].parse_obj(r))
        except ValidationError as e:
            errors.append(ErrorWrapper(e, r["id"]))
    if len(errors) > 0:
        raise ValidationError(errors, RuleList)

    # create mappings for rule label fields
    # we need to do this since EQL queries cannot check existence of non mapped fields
    self.add_label_object_mapping(es, dataset_config.name, rule_objects)

    # ensure update script exists
    create_kyoushi_scripts(es, dataset_config.name, self.label_object)

    # start labeling process
    for rule in rule_objects:
        try:
            print(f"Applying rule {rule.id_} ...")
            updated = rule.apply(
                dataset_dir,
                dataset_config,
                es,
                f"{dataset_config.name}_{self.update_script_id}",
                self.label_object,
            )
            print(
                f"Rule {rule.id_} applied labels: {rule.labels} to {updated} lines."
            )
        except ElasticsearchRequestError as e:
            raise LabelException(f"Error executing rule '{rule.id_}'", e)

write(self, dataset_dir, dataset_config, es, index, skip_files)

Write labeles stored in the database to the file system.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
index List[str]

The indices to consider

required
skip_files List[str]

The files to ignore

required
Source code in dataset/labels.py
def write(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    index: List[str],
    skip_files: List[str],
):
    """Write labeles stored in the database to the file system.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        index: The indices to consider
        skip_files: The files to ignore
    """
    files = self._get_label_files(dataset_config, es, index, skip_files)

    for current_file in files:
        # disable request cache to ensure we always get latest info
        search_labeled = Search(using=es, index=f"{dataset_config.name}-*").params(
            request_cache=False, preserve_order=True
        )
        search_labeled = search_labeled.filter(
            "exists", field=f"{self.label_object}.rules"
        ).filter("term", log__file__path=current_file)

        search_labeled = search_labeled.sort({"log.file.line": "asc"})

        base_path = dataset_dir.joinpath(LAYOUT.GATHER.value)
        label_path = dataset_dir.joinpath(LAYOUT.LABELS.value)

        label_file_path = label_path.joinpath(
            Path(current_file).relative_to(base_path)
        )
        print(f"Start writing {current_file}")
        self._write_file(search_labeled, label_file_path)

NoopRule pydantic-model

A noop rule that does absolutely nothing.

description: str pydantic-field

An optional description for the rule

id_: str pydantic-field required

The unique rule id

labels: str pydantic-field required

The list of labels to apply to log lines matching this rule

type_field: str pydantic-field required

The rule type as passed in from the config

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the NoopRule i.e., does nothing.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

Always returns 0 since nothing happens.

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the NoopRule i.e., does nothing.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        Always returns 0 since nothing happens.
    """
    return 0

update_params(self) inherited

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """

    return {
        "rule": self.id_,
        "labels": self.labels,
    }

QueryBase pydantic-model

Base model for DSL query based labeling rules

exclude: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

Similar to filters, but used to exclude results

filter_: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

The filter/s to limit queried to documents to only those that match the filters

index: Union[List[str], str] pydantic-field

The indices to query (by default prefixed with the dataset name)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

query: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field required

The query/s to use for identifying log lines to apply the tags to.

validate_exclude(value, values) classmethod

Validator to check the DSL query exclude syntax

Exclude is simply a negative filter clause i.e., not (DSL QUERY).

Parameters:

Name Type Description Default
value Union[List[Dict[str, Any]], Dict[str, Any]]

The DSL exclude

required
values Dict[str, Any]

The model dictionary

required

Exceptions:

Type Description
ValidationError

Should the exclude not be valid

Returns:

Type Description
Union[List[Dict[str, Any]], Dict[str, Any]]

The validated exclude

Source code in dataset/labels.py
@validator("exclude")
def validate_exclude(
    cls, value: Union[List[Dict[str, Any]], Dict[str, Any]], values: Dict[str, Any]
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
    """Validator to check the DSL query exclude syntax

    Exclude is simply a negative filter clause i.e.,
    not (DSL QUERY).

    Args:
        value: The DSL exclude
        values: The model dictionary

    Raises:
        ValidationError: Should the exclude not be valid

    Returns:
        The validated exclude
    """
    # temporary update by query object used to validate the input excludes
    _temp = UpdateByQuery()
    errors = []
    if not isinstance(value, List):
        value = [value]
    for i, exclude in enumerate(value):
        try:
            _temp = _temp.exclude(exclude)
        except (TypeError, UnknownDslObject, ValueError) as e:
            errors.append(ErrorWrapper(e, (i,)))
    if len(errors) > 0:
        raise ValidationError(errors, UpdateByQueryRule)
    return value

validate_filter(value, values) classmethod

Validator to check the DSL query filter syntax

Parameters:

Name Type Description Default
value Union[List[Dict[str, Any]], Dict[str, Any]]

The DSL filter

required
values Dict[str, Any]

The model dictionary

required

Exceptions:

Type Description
ValidationError

Should the filter not be valid

Returns:

Type Description
Union[List[Dict[str, Any]], Dict[str, Any]]

The validated filter

Source code in dataset/labels.py
@validator("filter_")
def validate_filter(
    cls, value: Union[List[Dict[str, Any]], Dict[str, Any]], values: Dict[str, Any]
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
    """Validator to check the DSL query filter syntax

    Args:
        value: The DSL filter
        values: The model dictionary

    Raises:
        ValidationError: Should the filter not be valid

    Returns:
        The validated filter
    """
    # temporary update by query object used to validate the input filters
    _temp = UpdateByQuery()
    errors = []
    if not isinstance(value, List):
        value = [value]
    for i, filter_ in enumerate(value):
        try:
            _temp = _temp.filter(filter_)
        except (TypeError, UnknownDslObject, ValueError) as e:
            errors.append(ErrorWrapper(e, (i,)))
    if len(errors) > 0:
        raise ValidationError(errors, UpdateByQueryRule)
    return value

validate_queries(value, values) classmethod

Validator checking DSL syntax

Parameters:

Name Type Description Default
value Union[List[Dict[str, Any]], Dict[str, Any]]

The DSL query

required
values Dict[str, Any]

The model dictionary

required

Exceptions:

Type Description
ValidationError

Should the given query be invalid

Returns:

Type Description
Union[List[Dict[str, Any]], Dict[str, Any]]

The validated query

Source code in dataset/labels.py
@validator("query")
def validate_queries(
    cls, value: Union[List[Dict[str, Any]], Dict[str, Any]], values: Dict[str, Any]
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
    """Validator checking DSL syntax

    Args:
        value: The DSL query
        values: The model dictionary

    Raises:
        ValidationError: Should the given query be invalid

    Returns:
        The validated query
    """
    # temporary update by query object used to validate the input queries
    _temp = UpdateByQuery()
    errors = []
    if not isinstance(value, List):
        value = [value]
    for i, query in enumerate(value):
        try:
            _temp = _temp.query(query)
        except (TypeError, UnknownDslObject, ValueError) as e:
            errors.append(ErrorWrapper(e, (i,)))
    if len(errors) > 0:
        raise ValidationError(errors, UpdateByQueryRule)
    return value

Rule

Interface definition for labeling rules.

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the configured rule and assigns the labels to all matching rows.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

The number of rows the rule was applied to

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the configured rule and assigns the labels to all matching rows.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        The number of rows the rule was applied to
    """
    ...

update_params(self)

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """
    ...

RuleBase pydantic-model

Pydantic base model for labeling rules.

This class can be extended to define custom labeling rules.

description: str pydantic-field

An optional description for the rule

id_: str pydantic-field required

The unique rule id

labels: str pydantic-field required

The list of labels to apply to log lines matching this rule

type_field: str pydantic-field required

The rule type as passed in from the config

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the configured rule and assigns the labels to all matching rows.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

The number of rows the rule was applied to

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the configured rule and assigns the labels to all matching rows.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        The number of rows the rule was applied to
    """

    raise NotImplementedError()

update_params(self)

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """

    return {
        "rule": self.id_,
        "labels": self.labels,
    }

validate_label_no_semicolon(val) classmethod

Validator ensuring label names do not contain ;.

Parameters:

Name Type Description Default
val str

A single label

required

Returns:

Type Description
str

The validated label

Source code in dataset/labels.py
@validator("labels", each_item=True)
def validate_label_no_semicolon(cls, val: str) -> str:
    """Validator ensuring label names do not contain `;`.

    Args:
        val: A single label

    Returns:
        The validated label
    """
    assert ";" not in val, f"Labels must not contain semicolons, but got '{val}'"
    return val

RuleList pydantic-model

Model definition of a list of labeling rules.

check_rule_ids_uniq(values) classmethod

Validator for ensuring that rule IDs are unique.

Parameters:

Name Type Description Default
values Dict[str, List[cr_kyoushi.dataset.labels.RuleBase]]

The model dictionary

required

Returns:

Type Description
Dict[str, List[cr_kyoushi.dataset.labels.RuleBase]]

The validated model dictionary

Source code in dataset/labels.py
@root_validator
def check_rule_ids_uniq(
    cls, values: Dict[str, List[RuleBase]]
) -> Dict[str, List[RuleBase]]:
    """Validator for ensuring that rule IDs are unique.

    Args:
        values: The model dictionary

    Returns:
        The validated model dictionary
    """
    duplicates = set()
    temp = []
    if "__root__" in values:
        for r in values["__root__"]:
            if r.id_ in temp:
                duplicates.add(r.id_)
            else:
                temp.append(r.id_)
        assert (
            len(duplicates) == 0
        ), f"Rule IDs must be uniq, but got duplicates: {duplicates}"
    return values

UpdateByQueryRule pydantic-model

Applies labels based on a simple Elasticsearch DSL query.

Examples:

- type: elasticsearch.query
  id: attacker.foothold.vpn.ip
  labels:
    - attacker_vpn
    - foothold
  description: >-
    This rule applies the labels to all openvpn log rows that have
    the attacker server as source ip and are within the foothold phase.
  index:
    - openvpn-vpn
  filter:
    range:
    "@timestamp":
        # foothold phase start
        gte: "2021-03-23 20:31:00+00:00"
        # foothold phase stop
        lte: "2021-03-23 21:13:52+00:00"
  query:
    - match:
        source.ip: '192.42.0.255'

description: str pydantic-field

An optional description for the rule

exclude: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

Similar to filters, but used to exclude results

filter_: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

The filter/s to limit queried to documents to only those that match the filters

id_: str pydantic-field required

The unique rule id

index: Union[List[str], str] pydantic-field

The indices to query (by default prefixed with the dataset name)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

labels: str pydantic-field required

The list of labels to apply to log lines matching this rule

query: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field required

The query/s to use for identifying log lines to apply the tags to.

type_field: str pydantic-field required

The rule type as passed in from the config

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the labels to all rows matching the DSL query.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

The number of rows the labels were applied to.

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the labels to all rows matching the DSL query.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        The number of rows the labels were applied to.
    """
    index: Optional[Union[Sequence[str], str]] = resolve_indices(
        dataset_config.name, self.indices_prefix_dataset, self.index
    )
    update = UpdateByQuery(using=es, index=index)

    # ensure we have lists
    if not isinstance(self.query, List):
        self.query = [self.query]
    if not isinstance(self.filter_, List):
        self.filter_ = [self.filter_] if self.filter_ is not None else []
    if not isinstance(self.exclude, List):
        self.exclude = [self.exclude] if self.exclude is not None else []

    # exclude already correctly labeled rows from the result set
    update = update.exclude(
        "term",
        **{f"{label_object}.flat.{self.id_}": ";".join(self.labels)},
    )

    for _query in self.query:
        update = update.query(_query)
    for _filter in self.filter_:
        update = update.filter(_filter)
    for _exclude in self.exclude:
        update = update.exclude(_exclude)

    result = apply_labels_by_update_dsl(
        update, self.update_params(), update_script_id
    )

    return result

update_params(self) inherited

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """

    return {
        "rule": self.id_,
        "labels": self.labels,
    }

UpdateParentQueryRule pydantic-model

Applies the labels to all rows of a base query for which a parent query returns results.

This labeling rule first executes a base query to retrieve rows we might want to apply labels to. It then renders and executes a templated parent query for each retrieved row. The parent queries are then used to indicate if the initial result row should be labeled or not. By default result rows of the base query are labeled if the corresponding parent query returns at leas one row. It is possible to configure this minimum number e.g., to require at least two results.

Note

The sub query uses Jinja2 syntax for templating. The information retrieved by the base query can be accessed through the HIT variable.

Examples:

- type: elasticsearch.parent_query
  id: attacker.foothold.apache.error_access
  labels:
    - attacker_http
    - foothold
  description: >-
    This rule looks for unlabeled error messages resulting from VPN server
    traffic within the attack time and tries to match it to an already labeled
    access log row.
  index:
    - apache_error-intranet_server
  query:
    match:
    source.address: "172.16.100.151"
  filter:
    # use script query to match only entries that
    # are not already tagged for as attacker http in the foothold phase
    - bool:
        must_not:
        - script:
            script:
                id: test_dataset_kyoushi_label_filter
                params:
                labels: [attacker_http]
  parent_query:
    index:
      - apache_access-intranet_server
    query:
      - term:
          url.full: "{{ HIT.url.full }}"
      - term:
          source.address: "{{ HIT.source.address }}"
    # we are looking for parents that are labeled as attacker http
      - bool:
          must:
            - script:
                script:
                  id: test_dataset_kyoushi_label_filter
                  params:
                    labels: [attacker_http]
    filter:
      - range:
        # parent must be within +-1s of potential child
          "@timestamp":
             gte: "{{ (HIT['@timestamp'] | as_datetime) - timedelta(seconds=1) }}"
             lte: "{{ ( HIT['@timestamp'] | as_datetime) + timedelta(seconds=1) }}"

description: str pydantic-field

An optional description for the rule

exclude: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

Similar to filters, but used to exclude results

filter_: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

The filter/s to limit queried to documents to only those that match the filters

id_: str pydantic-field required

The unique rule id

index: Union[List[str], str] pydantic-field

The indices to query (by default prefixed with the dataset name)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

labels: str pydantic-field required

The list of labels to apply to log lines matching this rule

max_result_window: int pydantic-field

The max result window allowed on the elasticsearch instance

min_match: int pydantic-field

The minimum number of parent matches needed for the main query to be labeled.

parent_query: QueryBase pydantic-field required

The templated parent query to check if the labels should be applied to a query hit.

query: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field required

The query/s to use for identifying log lines to apply the tags to.

type_field: str pydantic-field required

The rule type as passed in from the config

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the labels to result from the base query for which a parent was found.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

The number of rows the labels were applied to.

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the labels to result from the base query for which a parent was found.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        The number of rows the labels were applied to.
    """
    index: Optional[Union[Sequence[str], str]] = resolve_indices(
        dataset_config.name, self.indices_prefix_dataset, self.index
    )
    search = Search(using=es, index=index)

    # ensure we have lists
    if not isinstance(self.query, List):
        self.query = [self.query]
    if not isinstance(self.filter_, List):
        self.filter_ = [self.filter_] if self.filter_ is not None else []
    if not isinstance(self.exclude, List):
        self.exclude = [self.exclude] if self.exclude is not None else []

    # exclude already correctly labeled rows from the result set
    search = search.exclude(
        "term",
        **{f"{label_object}.flat.{self.id_}": ";".join(self.labels)},
    )

    for _query in self.query:
        search = search.query(_query)
    for _filter in self.filter_:
        search = search.filter(_filter)
    for _exclude in self.exclude:
        search = search.exclude(_exclude)

    result = 0
    update_map: Dict[str, List[str]] = {}
    _i = 0
    for hit in search.scan():
        _i += 1
        # make deep copy of parent query so we can template it
        parent_query = self.parent_query.copy(deep=True)

        # render the subquery
        parent_query = render_query_base(hit, parent_query)

        if self.check_parent(parent_query, self.min_match, dataset_config, es):
            update_map.setdefault(hit.meta.index, []).append(hit.meta.id)

    # add labels to each event per index
    for _index, ids in update_map.items():
        # split the update requests into chunks of at most max result window
        update_chunks = [
            ids[i : i + self.max_result_window]
            for i in range(0, len(ids), self.max_result_window)
        ]
        for chunk in update_chunks:
            update = UpdateByQuery(using=es, index=_index).query(
                "ids", values=chunk
            )
            # apply labels to events
            result += apply_labels_by_update_dsl(
                update, self.update_params(), update_script_id
            )

    return result

check_parent(self, parent_query, min_match, dataset_config, es)

Executes a parent query and returns if there were enough result rows.

Parameters:

Name Type Description Default
parent_query QueryBase

The parent query to execute

required
min_match int

The minimum number of result rows required

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required

Returns:

Type Description
bool

True if the query returned >= min_match rows and False otherwise.

Source code in dataset/labels.py
def check_parent(
    self,
    parent_query: QueryBase,
    min_match: int,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
) -> bool:
    """Executes a parent query and returns if there were enough result rows.

    Args:
        parent_query: The parent query to execute
        min_match: The minimum number of result rows required
        dataset_config: The dataset configuration
        es: The elasticsearch client object

    Returns:
        `True` if the query returned >= min_match rows and `False` otherwise.
    """
    index: Optional[Union[Sequence[str], str]] = resolve_indices(
        dataset_config.name, parent_query.indices_prefix_dataset, parent_query.index
    )
    search = Search(using=es, index=index)

    # ensure we have lists
    if not isinstance(parent_query.query, List):
        parent_query.query = [parent_query.query]
    if not isinstance(parent_query.filter_, List):
        parent_query.filter_ = (
            [parent_query.filter_] if parent_query.filter_ is not None else []
        )
    if not isinstance(parent_query.exclude, List):
        parent_query.exclude = (
            [parent_query.exclude] if parent_query.exclude is not None else []
        )

    for _query in parent_query.query:
        search = search.query(_query)
    for _filter in parent_query.filter_:
        search = search.filter(_filter)
    for _exclude in parent_query.exclude:
        search = search.exclude(_exclude)

    return search.execute().hits.total.value >= min_match

update_params(self) inherited

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """

    return {
        "rule": self.id_,
        "labels": self.labels,
    }

UpdateSubQueryRule pydantic-model

Labeling rule that labels the results of multiple sub queries.

This labeling rule first executes a base query to retrieve information. It then renders and executes a templated sub query for each row retrieved from the base query. The result rows of these dynamically generated sub queries are then labled.

Note

The sub query uses Jinja2 syntax for templating. The information retrieved by the base query can be accessed through the HIT variable.

Examples:

- type: elasticsearch.sub_query
  id: attacker.foothold.apache.access_dropped
  labels:
    - attacker_http
    - foothold
  description: >-
    This rule tries to match attacker requests that we where unable to
    match to a labeled response with access log entries. Such cases can
    happen if the corresponding response gets lost in the network or
    otherwise is not sent.
  index:
    - pcap-attacker_0
  # obligatory match all
  query:
    - term:
        destination.ip: "172.16.0.217"
  filter:
    - term:
        event.category: http
    - term:
        event.action: request
    # we are looking for requests that have not been marked as attacker http yet
    # most likely they did not have a matching response due to some network error
    # or timeout
    - bool:
        must_not:
        - script:
            script:
                id: test_dataset_kyoushi_label_filter
                params:
                labels: [attacker_http]
  sub_query:
    index:
      - apache_access-intranet_server
    query:
      - term:
          url.full: "{{ HIT.url.full }}"
      - term:
          source.address: "172.16.100.151"
    filter:
      - range:
          "@timestamp":
            # the access log entry should be after the request, but since the access log
            # does not have microseconds we drop them here as well
            gte: "{{ (HIT['@timestamp'] | as_datetime).replace(microsecond=0) }}"
            # the type of error we are looking for should create an access log entry almost immediately
            # se we keep the time frame short
            lte: "{{ ( HIT['@timestamp'] | as_datetime).replace(microsecond=0) + timedelta(seconds=1) }}"

description: str pydantic-field

An optional description for the rule

exclude: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

Similar to filters, but used to exclude results

filter_: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field

The filter/s to limit queried to documents to only those that match the filters

id_: str pydantic-field required

The unique rule id

index: Union[List[str], str] pydantic-field

The indices to query (by default prefixed with the dataset name)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

labels: str pydantic-field required

The list of labels to apply to log lines matching this rule

query: Union[List[Dict[str, Any]], Dict[str, Any]] pydantic-field required

The query/s to use for identifying log lines to apply the tags to.

sub_query: QueryBase pydantic-field required

The templated sub query to use to apply the labels. Executed for each hit of the parent query.

type_field: str pydantic-field required

The rule type as passed in from the config

apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)

Applies the labels to all rows matching the created sub queries.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset base directory

required
dataset_config DatasetConfig

The dataset configuration

required
es Elasticsearch

The elasticsearch client object

required
update_script_id str

The label update script ID

required
label_object str

The field used to store label information

required

Returns:

Type Description
int

The number of rows the labels were applied to.

Source code in dataset/labels.py
def apply(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    es: Elasticsearch,
    update_script_id: str,
    label_object: str,
) -> int:
    """Applies the labels to all rows matching the created sub queries.

    Args:
        dataset_dir: The dataset base directory
        dataset_config: The dataset configuration
        es: The elasticsearch client object
        update_script_id: The label update script ID
        label_object: The field used to store label information

    Returns:
        The number of rows the labels were applied to.
    """
    index: Optional[Union[Sequence[str], str]] = resolve_indices(
        dataset_config.name, self.indices_prefix_dataset, self.index
    )

    search = Search(using=es, index=index)

    # ensure we have lists
    if not isinstance(self.query, List):
        self.query = [self.query]
    if not isinstance(self.filter_, List):
        self.filter_ = [self.filter_] if self.filter_ is not None else []
    if not isinstance(self.exclude, List):
        self.exclude = [self.exclude] if self.exclude is not None else []

    # exclude already correctly labeled rows from the result set
    search = search.exclude(
        "term",
        **{f"{label_object}.flat.{self.id_}": ";".join(self.labels)},
    )

    for _query in self.query:
        search = search.query(_query)
    for _filter in self.filter_:
        search = search.filter(_filter)
    for _exclude in self.exclude:
        search = search.exclude(_exclude)

    result = 0
    for hit in search.params(scroll="30m").scan():
        # make deep copy of sub query so we can template it
        sub_query = self.sub_query.copy(deep=True)

        # render the subquery
        sub_query = render_query_base(hit, sub_query)

        sub_rule = UpdateByQueryRule(
            type="elasticsearch.query",
            id=self.id_,
            labels=self.labels,
            description=self.description,
            index=sub_query.index,
            query=sub_query.query,
            filter=sub_query.filter_,
            exclude=sub_query.exclude,
            indices_prefix_dataset=sub_query.indices_prefix_dataset,
        )
        result += sub_rule.apply(
            dataset_dir, dataset_config, es, update_script_id, label_object
        )
    return result

update_params(self) inherited

Gets the update script parameters required for this rule.

Returns:

Type Description
Dict[str, Any]

The update script parameters.

Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
    """Gets the update script parameters required for this rule.

    Returns:
        The update script parameters.
    """

    return {
        "rule": self.id_,
        "labels": self.labels,
    }

apply_labels_by_query(es, query, script_params, update_script_id, index='_all', check_interval=0.5)

Utility function for applying labels based on a DSL query.

Parameters:

Name Type Description Default
es Elasticsearch

The elasticsearch client object

required
query Dict[str, Any]

The DSL query dict

required
script_params Dict[str, Any]

The parameters for the label update script.

required
update_script_id str

The label update script to use

required
index Union[List[str], str]

The indices to query

'_all'
check_interval float

The amount in time between the status checks.

0.5

Returns:

Type Description
int

The number of updated rows.

Source code in dataset/labels.py
def apply_labels_by_query(
    es: Elasticsearch,
    query: Dict[str, Any],
    script_params: Dict[str, Any],
    update_script_id: str,
    index: Union[List[str], str] = "_all",
    check_interval: float = 0.5,
) -> int:
    """Utility function for applying labels based on a DSL query.

    Args:
        es: The elasticsearch client object
        query: The DSL query dict
        script_params: The parameters for the label update script.
        update_script_id: The label update script to use
        index: The indices to query
        check_interval: The amount in time between the status checks.

    Returns:
        The number of updated rows.
    """
    update = UpdateByQuery(using=es, index=index)
    update = update.update_from_dict(query)
    return apply_labels_by_update_dsl(
        update, script_params, update_script_id, check_interval
    )

apply_labels_by_update_dsl(update, script_params, update_script_id, check_interval=0.5)

Utility function for applying labels based on a DSL query object.

The function takes a update by query object and uses the Cyber Range Kyoushi label update script to apply the given labels to all matching rows. This function is blocking as it waits for the process to complete.

Parameters:

Name Type Description Default
update UpdateByQuery

The update by query object containing the DSL query.

required
script_params Dict[str, Any]

The parameters for the label update script.

required
update_script_id str

The label update script to use

required
check_interval float

The amount in time between the status checks.

0.5

Returns:

Type Description
int

The number of updated rows.

Source code in dataset/labels.py
def apply_labels_by_update_dsl(
    update: UpdateByQuery,
    script_params: Dict[str, Any],
    update_script_id: str,
    check_interval: float = 0.5,
) -> int:
    """Utility function for applying labels based on a DSL query object.

    The function takes a update by query object and uses the Cyber Range Kyoushi
    label update script to apply the given labels to all matching rows. This
    function is blocking as it waits for the process to complete.

    Args:
        update: The update by query object containing the DSL query.
        script_params: The parameters for the label update script.
        update_script_id: The label update script to use
        check_interval: The amount in time between the status checks.

    Returns:
        The number of updated rows.
    """
    # refresh=True is important so that consecutive rules
    # have a consitant state
    es: Elasticsearch = get_connection(update._using)

    # add update script
    update = update.script(id=update_script_id, params=script_params)

    # run update task
    task = update.params(refresh=True, wait_for_completion=False).execute().task
    task_info = es.tasks.get(task_id=task)

    while not task_info["completed"]:
        sleep(check_interval)
        task_info = es.tasks.get(task_id=task)

    with warnings.catch_warnings():
        # ToDo: Elasticsearch (7.12) does not provide any API to delete update by query tasks
        #       The only option is to delete the document directly this will be deprecated
        #       and as such gives warnings. For now we ignore these and wait for elasticsearch
        #       to provide an API for it.
        warnings.simplefilter("ignore")
        es.delete(index=".tasks", doc_type="task", id=task)
    return task_info["response"]["updated"]

create_kyoushi_scripts(es, dataset_name, label_object='kyoushi_labels')

Utility function for creating labeling update, filter and field scripts.

Parameters:

Name Type Description Default
es Elasticsearch

The elasticsearch client object

required
dataset_name str

The name of the dataset to create the scripts for. This is used as prefix for script names to ensure different versions of these scripts can exist for different datasets in the same Elasticsearch instance.

required
label_object str

The name of field to store labeling information in.

'kyoushi_labels'
Source code in dataset/labels.py
def create_kyoushi_scripts(
    es: Elasticsearch,
    dataset_name: str,
    label_object: str = "kyoushi_labels",
):
    """Utility function for creating labeling update, filter and field scripts.

    Args:
        es: The elasticsearch client object
        dataset_name: The name of the dataset to create the scripts for.
                      This is used as prefix for script names to ensure
                      different versions of these scripts can exist for
                      different datasets in the same Elasticsearch instance.
        label_object: The name of field to store labeling information in.
    """
    update_script = {
        "script": {
            "description": "Kyoushi Dataset - Update by Query label script",
            "lang": "painless",
            "source": UPDATE_SCRIPT.replace("{{ label_object }}", label_object),
        }
    }
    es.put_script(
        id=f"{dataset_name}_kyoushi_label_update", body=update_script, context="update"
    )

    filter_script = {
        "script": {
            "lang": "painless",
            "source": LABEL_FILTER_SCRIPT.replace("{{ label_object }}", label_object),
        }
    }
    es.put_script(
        id=f"{dataset_name}_kyoushi_label_filter",
        body=filter_script,
        context="filter",
    )

    labels_field = {
        "script": {
            "lang": "painless",
            "source": LABELS_FIELD_SCRIPT.replace("{{ label_object }}", label_object),
        }
    }

    es.put_script(
        id=f"{dataset_name}_kyoushi_label_field",
        body=labels_field,
        context="field",
    )

get_label_counts(es, index=None, label_object='kyoushi_labels')

Utility function for getting number of rows per label.

Parameters:

Name Type Description Default
es Elasticsearch

The elasticsearch client object

required
index Union[List[str], str]

The indices to get the information for

None
label_object str

The field containing the label information

'kyoushi_labels'

Returns:

Type Description
List[elasticsearch_dsl.response.aggs.Bucket]

List of result buckets

Source code in dataset/labels.py
def get_label_counts(
    es: Elasticsearch,
    index: Union[List[str], str, None] = None,
    label_object: str = "kyoushi_labels",
) -> List[Bucket]:
    """Utility function for getting number of rows per label.

    Args:
        es: The elasticsearch client object
        index: The indices to get the information for
        label_object: The field containing the label information

    Returns:
        List of result buckets
    """
    # disable request cache to ensure we always get latest info
    search_labels = Search(using=es, index=index).params(request_cache=False)
    runtime_mappings = {
        "labels": {
            "type": "keyword",
            "script": LABELS_AGGREGATES_FIELD_SCRIPT,
        }
    }
    search_labels = search_labels.extra(runtime_mappings=runtime_mappings)

    # setup aggregations
    search_labels.aggs.bucket(
        "labels",
        "composite",
        sources=[{"label": {"terms": {"field": "labels"}}}],
    )

    search_labels.aggs["labels"].bucket("file", "terms", field="log.file.path")

    return scan_composite(search_labels, "labels")

render_query_base(hit, query)

Utility function for rendering sub or parent query.

Parameters:

Name Type Description Default
hit Hit

The Elasticsearch query result row to render for.

required
query QueryBase

The templated query definition.

required

Returns:

Type Description
QueryBase

The rendered DSL query

Source code in dataset/labels.py
def render_query_base(hit: Hit, query: QueryBase) -> QueryBase:
    """Utility function for rendering sub or parent query.

    Args:
        hit: The Elasticsearch query result row to render for.
        query: The templated query definition.

    Returns:
        The rendered DSL query
    """
    variables = {"HIT": hit}

    # render the index var
    if isinstance(query.index, str):
        query.index = render_template(query.index, variables)
    elif isinstance(query.index, Sequence):
        query.index = [render_template(i, variables) for i in query.index]

    # ensure we have lists
    if not isinstance(query.query, List):
        query.query = [query.query]
    if not isinstance(query.filter_, List):
        query.filter_ = [query.filter_] if query.filter_ is not None else []
    if not isinstance(query.exclude, List):
        query.exclude = [query.exclude] if query.exclude is not None else []

    query.query = [
        render_template_recursive(_query, variables) for _query in query.query
    ]
    query.filter_ = [
        render_template_recursive(_filter, variables) for _filter in query.filter_
    ]
    query.exclude = [
        render_template_recursive(_exclude, variables) for _exclude in query.exclude
    ]

    return query