Skip to content

Processors module

This module contains the base definitions of dataset pipeline processors and also the core processors shipped with the Cyber Range Kyoushi Dataset package.

ProcessorList

Type alias for a list of processors

ComponentTemplateCreateProcessor pydantic-model

Processor for creating Elasticsearch index component templates.

This processor can be used to create Elasticsearch index component templates. To prepare the Elasticsearch instance for the parsing phase. See the index templates doc for more details.

Examples:

- name: Add pcap component template
  type: elasticsearch.component_template
  template: processing/logstash/pcap-component-template.json
  template_name: pcap

context: ProcessorContext pydantic-field

The variable context for the processor

create_only: bool pydantic-field

If true then an existing template with the given name will not be replaced.

name: str pydantic-field required

The processors name

template: FilePath pydantic-field required

The index component template to add to elasticsearch

template_name: str pydantic-field required

The name to use for the index component template

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and configure Elasticsearch index component template.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and configure Elasticsearch index component template.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    template_data = load_file(self.template)

    ies = ClusterClient(es)

    ies.put_component_template(
        name=self.template_name,
        body=template_data,
        create=self.create_only,
    )

CreateDirectoryProcessor pydantic-model

Processor for creating file directories.

Examples:

- name: Ensure processing config directory exists
  type: mkdir
  path: processing/config

context: ProcessorContext pydantic-field

The variable context for the processor

name: str pydantic-field required

The processors name

path: Path pydantic-field required

The directory path to create

recursive: bool pydantic-field

If all missing parent directories should als be created

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and create the directory.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and create the directory.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    self.path.mkdir(parents=self.recursive, exist_ok=True)

ForEachProcessor pydantic-model

For each processor

This is a special processor container allowing for the dynamic creation of a list of processor based on a list of items.

Examples:

- name: Render labeling rules
  type: foreach
  # processing/templates/rules
  items:
    - src: 0_auth.yaml.j2
      dest: 0_auth.yaml
    - src: apache.yaml.j2
      dest: apache.yaml
    - src: audit.yaml.j2
      dest: audit.yaml
    - src: openvpn.yaml.j2
      dest: openvpn.yaml
  processor:
    type: template
    name: Rendering labeling rule {{ item.src }}
    template_context:
      var_files:
        attacker: processing/config/attacker/attacker.yaml
        escalate: processing/config/attacker/escalate.yaml
        foothold: processing/config/attacker/foothold.yaml
        servers: processing/config/servers.yaml
    src: "processing/templates/rules/{{ item.src }}"
    dest: "rules/{{ item.dest }}"

context: ProcessorContext pydantic-field

The variable context for the processor

items: Any pydantic-field required

List of items to create processors for

loop_var: str pydantic-field

The variable name to use for current loops item in the processor context

name: str pydantic-field required

The processors name

processor: Any pydantic-field required

The processor template config to create multiple instances of

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Executes the processor.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
):
    """Executes the processor.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    raise NotImplementedError("Incomplete processor implementation!")

processors(self)

Create a list of processors for each item.

Returns:

Type Description
List[Dict[str, Any]]

List of processors based on the given items and processor template.

Source code in dataset/processors.py
def processors(self) -> List[Dict[str, Any]]:
    """Create a list of processors for each item.

    Returns:
        List of processors based on the given items and
        processor template.
    """
    processors = []
    for item in self.items:
        processor = copy.deepcopy(self.processor)
        # set the loop var
        if "context" in processor:
            context = processor["context"]
        else:
            context = self.context.dict()
            processor["context"] = context

        variables = context.setdefault("vars", {})
        variables[self.loop_var] = item

        processors.append(processor)
    return processors

GzipProcessor pydantic-model

Processor for decompressing gzip files.

It is possible to either define a glob of gzip files or a path to a single gzip file. If a glob is defined it is resolved relative to the defined path (default=<dataset dir>).

Examples:

- name: Decompress all GZIP logs
  type: gzip
  path: gather
  glob: "*/logs/**/*.gz"

context: ProcessorContext pydantic-field

The variable context for the processor

glob: str pydantic-field

The file glob expression to use

name: str pydantic-field required

The processors name

path: Path pydantic-field

The base path to search for the gzipped files.

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and decompress the files.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and decompress the files.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    files: Iterable
    if self.glob is None:
        files = [self.path]
    else:
        files = self.path.glob(self.glob)
    for gzip_file in files:
        with gzip.open(gzip_file, "rb") as f_in:
            # with suffix replaces .gz ending
            with open(gzip_file.with_suffix(""), "wb") as f_out:
                shutil.copyfileobj(f_in, f_out)
        # delete the gzip file
        gzip_file.unlink()

IngestCreateProcessor pydantic-model

Processor for creating Elasticsearch ingest pipelines.

This processor can be used to create Elasticsearch ingest pipelines for parsing log event. The log file parsing can then be configured to use the pipelines for upstream parsing instead of local Logstash parsing.

Examples:

- name: Add auditd ingest pipeline to elasticsearch
  type: elasticsearch.ingest
  ingest_pipeline: processing/logstash/auditd-ingest.yml
  ingest_pipeline_id: auditd-logs

context: ProcessorContext pydantic-field

The variable context for the processor

ingest_pipeline: FilePath pydantic-field required

The ingest pipeline to add to elasticsearch

ingest_pipeline_id: str pydantic-field required

The id to use for the ingest pipeline

name: str pydantic-field required

The processors name

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and configure Elasticsearch ingest pipeline.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and configure Elasticsearch ingest pipeline.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    pipeline_data = load_file(self.ingest_pipeline)

    ies = IngestClient(es)

    ies.put_pipeline(id=self.ingest_pipeline_id, body=pipeline_data)

LegacyTemplateCreateProcessor pydantic-model

Processor for configuring Elasticsearch legacy index templates.

This processor can be used to configure Elasticsearch index templates. To prepare the Elasticsearch instance for the parsing phase. See the legacy index templates doc for more details.

Examples:

- name: Add pcap index mapping
  type: elasticsearch.legacy_template
  template: processing/logstash/pcap-index-template.json
  template_name: pcap
  index_patterns: ["pcap-*"]

context: ProcessorContext pydantic-field

The variable context for the processor

create_only: bool pydantic-field

If true then an existing template with the given name will not be replaced.

index_patterns: str pydantic-field

The index patterns the template should be applied to. If this is not set then the index template file must contain this information already!

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

name: str pydantic-field required

The processors name

order: int pydantic-field

The order to assign to this index template (higher values take precedent).

template: FilePath pydantic-field required

The index template to add to elasticsearch

template_name: str pydantic-field required

The name to use for the index template

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and configure Elasticsearch index template.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and configure Elasticsearch index template.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    template_data = load_file(self.template)

    # configure the index patterns
    if self.index_patterns is not None:
        template_data["index_patterns"] = (
            # if prefix is on add the prefix to all patterns
            [f"{dataset_config.name}-{p}" for p in self.index_patterns]
            if self.indices_prefix_dataset
            # else add list as is
            else self.index_patterns
        )

    ies = IndicesClient(es)

    ies.put_template(
        name=self.template_name,
        body=template_data,
        create=self.create_only,
        order=self.order,
    )

LogstashSetupProcessor pydantic-model

Logstash parser setup processor.

This processor is used to create all the configuration files required for the Logstash parser (e.g., input and filter configs). Unless you provide a static Logstash parsing configuration you must invoke this processor at somepoint during the pre-processing phase.

Note

The processor only does the basic setup any Logstash parsing filters used for processing specific log events must be prepared separately.

Examples:

- name: Setup logstash pipeline
  type: logstash.setup
  context:
    var_files:
      servers: processing/config/servers.yaml
  servers: "{{ servers }}"

context: ProcessorContext pydantic-field

The variable context for the processor

index_template_template: Path pydantic-field

The template to use for the elasticsearch dataset index patterns index template

input_config_name: str pydantic-field

The name of the log inputs config file. (relative to the pipeline config dir)

input_template: Path pydantic-field

The template to use for the file input plugin configuration

legacy_index_template_template: Path pydantic-field

The template to use for the elasticsearch dataset legacy index patterns index template

logstash_template: Path pydantic-field

The template to use for the logstash configuration

name: str pydantic-field required

The processors name

output_config_name: str pydantic-field

The name of the log outputs config file. (relative to the pipeline config dir)

output_template: Path pydantic-field

The template to use for the file output plugin configuration

piplines_template: Path pydantic-field

The template to use for the logstash pipelines configuration

pre_process_name: str pydantic-field

The file name to use for the pre process filters config. This is prefixed with 0000_ to ensure that the filters are run first.

pre_process_template: Path pydantic-field

The template to use for the file output plugin configuration

servers: Any pydantic-field required

Dictionary of servers and their log configurations

type_field: str pydantic-field required

The processor type as passed in from the config

use_legacy_template: bool pydantic-field

If the output config should use the legacy index template or the modern index template

default_server_timezone(v) classmethod

Validate that each log config entry has a timezone or set default.

Parameters:

Name Type Description Default
v Dict[str, Any]

A single log configuration element

required

Returns:

Type Description
Dict[str, Any]

The validated and parsed log configuration element

Source code in dataset/processors.py
@validator("servers", each_item=True)
def default_server_timezone(cls, v: Dict[str, Any]) -> Dict[str, Any]:
    """Validate that each log config entry has a timezone or set default.

    Args:
        v: A single log configuration element

    Returns:
        The validated and parsed log configuration element
    """
    if "timezone" not in v:
        v["timezone"] = "UTC"
    return v

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and configure Logstash

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and configure Logstash

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """

    variables = self.context.load()
    variables.update(
        {
            "DATASET_DIR": dataset_dir,
            "DATASET": dataset_config,
            "PARSER": parser_config,
            "servers": self.servers,
            "USE_LEGACY_TEMPLATE": self.use_legacy_template,
        }
    )
    # add elasticsearch connection variables
    variables.update(get_transport_variables(es))

    # create all logstash directories
    create_dirs(
        [
            parser_config.settings_dir,
            parser_config.conf_dir,
            parser_config.data_dir,
            parser_config.log_dir,
        ]
    )

    # copy jvm and log4j config to settings dir if they don't exist
    copy_package_file(
        "cr_kyoushi.dataset.files",
        "jvm.options",
        parser_config.settings_dir.joinpath("jvm.options"),
    )
    copy_package_file(
        "cr_kyoushi.dataset.files",
        "log4j2.properties",
        parser_config.settings_dir.joinpath("log4j2.properties"),
    )

    # write logstash configuration
    write_template(
        self.logstash_template,
        parser_config.settings_dir.joinpath("logstash.yml"),
        variables,
        es,
    )

    # write pipelines configuration
    write_template(
        self.piplines_template,
        parser_config.settings_dir.joinpath("pipelines.yml"),
        variables,
        es,
    )

    # write index template
    write_template(
        self.index_template_template,
        parser_config.settings_dir.joinpath(
            f"{dataset_config.name}-index-template.json"
        ),
        variables,
        es,
    )

    # write legacy index template
    write_template(
        self.legacy_index_template_template,
        parser_config.settings_dir.joinpath(
            f"{dataset_config.name}-legacy-index-template.json"
        ),
        variables,
        es,
    )

    # write input configuration
    write_template(
        self.input_template,
        parser_config.conf_dir.joinpath(self.input_config_name),
        variables,
        es,
    )

    # write output configuration
    write_template(
        self.output_template,
        parser_config.conf_dir.joinpath(self.output_config_name),
        variables,
        es,
    )

    # write pre process configuration
    write_template(
        self.pre_process_template,
        parser_config.conf_dir.joinpath(self.pre_process_name),
        variables,
        es,
    )

validate_servers(v) classmethod

Validate the server logs configuration.

Parameters:

Name Type Description Default
v Dict[str, Any]

A single log configuration element

required

Returns:

Type Description
Dict[str, Any]

The validated and parsed log configuration element

Source code in dataset/processors.py
@validator("servers", each_item=True)
def validate_servers(cls, v: Dict[str, Any]) -> Dict[str, Any]:
    """Validate the server logs configuration.

    Args:
        v: A single log configuration element

    Returns:
        The validated and parsed log configuration element
    """
    assert "logs" in v, "Each server must have a logs configuration"
    v["logs"] = parse_obj_as(List[LogstashLogConfig], v["logs"])
    return v

PcapElasticsearchProcessor pydantic-model

Processor for converting PCAP files to ndjson format.

This processor uses tshark to convert PCAP files to a line based JSON format (ek output).

Examples:

- name: Convert attacker pcap to elasticsearch json
  type: pcap.elasticsearch
  pcap: gather/attacker_0/logs/ait.aecid.attacker.wpdiscuz/traffic.pcap
  dest: gather/attacker_0/logs/ait.aecid.attacker.wpdiscuz/traffic.json
  tls_keylog: gather/attacker_0/logs/ait.aecid.attacker.wpdiscuz/premaster.txt
  read_filter: "tcp or udp or icmp"

context: ProcessorContext pydantic-field

The variable context for the processor

create_destination_dirs: bool pydantic-field

If the processor should create missing destination parent directories

dest: Path pydantic-field required

The destination file

force: bool pydantic-field

If the pcap should be created even when the destination file already exists.

name: str pydantic-field required

The processors name

packet_details: bool pydantic-field

If the packet details should be included, when packet_summary=False then details are always included (-V option).

packet_summary: bool pydantic-field

If the packet summaries should be included (-P option).

pcap: FilePath pydantic-field required

The pcap file to convert

protocol_match_filter: str pydantic-field

Display filter for protocols and their fields (-J option).Parent and child nodes are included for all matches lower level protocols must be added explicitly.

protocol_match_filter_parent: str pydantic-field

Display filter for protocols and their fields. Only partent nodes are included (-j option).

read_filter: str pydantic-field

The read filter to use when reading the pcap file useful to reduce the number of packets (-Y option)

remove_filtered: bool pydantic-field

Remove filtered fields from the event dicts.

remove_index_messages: bool pydantic-field

If the elasticsearch bulk API index messages should be stripped from the output file. Useful when using logstash or similar instead of the bulk API.

tls_keylog: FilePath pydantic-field

TLS keylog file to decrypt TLS on the fly.

tshark_bin: FilePath pydantic-field

Path to your tshark binary (searches in common paths if not supplied)

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and convert the pcap file.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and convert the pcap file.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    if self.create_destination_dirs:
        # create destination parent directory if it does not exist
        self.dest.parent.mkdir(parents=True, exist_ok=True)

    if self.force or not self.dest.exists():
        # convert the file
        convert_pcap_to_ecs(
            self.pcap,
            self.dest,
            self.tls_keylog,
            self.tshark_bin,
            self.remove_index_messages,
            self.remove_filtered,
            self.packet_summary,
            self.packet_details,
            self.read_filter,
            self.protocol_match_filter,
            self.protocol_match_filter_parent,
        )

PrintProcessor pydantic-model

Debug processor that simply prints a message.

Examples:

- name: Print Hello World
  type: print
  msg: Hello World

context: ProcessorContext pydantic-field

The variable context for the processor

msg: str pydantic-field required

The message to print

name: str pydantic-field required

The processors name

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Print the msg

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Print the `msg`

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    print(self.msg)

Processor

Cyber Range Kyoushi Dataset processor interface

For the Cyber Range Kyoushi Dataset tool processors are used during the pre and post processing phase. A processor class exposes configuration variables and is executed to achieve a certain task during the processing phases. They work similar to Ansible modules and it is possible to use Jinja2 templates and context variable to define partial configuration.

Examples:

  - name: Render foo template
    type: template
    context:
    vars:
        foo: bar
    var_files: gather/server/foo-bar.yaml
    src: processing/templates/foo.yaml.j2
    dest: processing/bar.yaml.j2

execute(self, dataset_dir, dataset_config, parser_config, es)

Executes the processor.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Executes the processor.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """

render(context, data) classmethod

Renders all template strings of a processor.

Parameters:

Name Type Description Default
context ProcessorContext

The processors context variables

required
data Dict[str, Any]

The raw templated processor configuration.

required

Returns:

Type Description
Dict[str, Any]

The rendered processor configuration.

Source code in dataset/processors.py
@classmethod
def render(cls, context: ProcessorContext, data: Dict[str, Any]) -> Dict[str, Any]:
    """Renders all template strings of a processor.

    Args:
        context: The processors context variables
        data: The raw templated processor configuration.

    Returns:
        The rendered processor configuration.
    """
    ...

ProcessorBase pydantic-model

Pydantic base model of Cyber Range Kyoushi Dataset processor.

Use this base model to implement processors it ensures that rendering and data loading is done correctly.

context: ProcessorContext pydantic-field

The variable context for the processor

name: str pydantic-field required

The processors name

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Executes the processor.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
):
    """Executes the processor.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    raise NotImplementedError("Incomplete processor implementation!")

render(context, data, es) classmethod

Renders all template strings of a processor.

Parameters:

Name Type Description Default
context ProcessorContext

The processors context variables

required
data Dict[str, Any]

The raw templated processor configuration.

required

Returns:

Type Description
Dict[str, Any]

The rendered processor configuration.

Source code in dataset/processors.py
@classmethod
def render(
    cls,
    context: ProcessorContext,
    data: Dict[str, Any],
    es: Elasticsearch,
) -> Dict[str, Any]:
    """Renders all template strings of a processor.

    Args:
        context: The processors context variables
        data: The raw templated processor configuration.

    Returns:
        The rendered processor configuration.
    """
    # handle main dict
    data_rendered = {}
    for key, val in data.items():
        # do not render excluded fields
        if key not in cls.context_render_exclude:
            data_rendered[key] = cls._render(context, val, es)
        else:
            data_rendered[key] = val
    return data_rendered

ProcessorContainer

Cyber Range Kyoushi Dataset processor container interface

This interface definition defines the API used for classes implementing processor container types.

execute(self, dataset_dir, dataset_config, parser_config, es) inherited

Executes the processor.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Executes the processor.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """

processors(self)

Returns a list of processors contained in this container.

Returns:

Type Description
List[Dict[str, Any]]

List of processors

Source code in dataset/processors.py
def processors(self) -> List[Dict[str, Any]]:
    """Returns a list of processors contained in this container.

    Returns:
        List of processors
    """
    ...

ProcessorContext pydantic-model

Processor context model

This model is used to configure the variable context used for rendering a processor. It is possible to either define variables directly (variables) or load them from variable files (variable_files).

variable_files: Union[pathlib.Path, Dict[str, pathlib.Path]] pydantic-field

Config files to load into the render context

variables: Any pydantic-field

Context variables to use during rendering

load(self)

Load the configured context variables into a combined dict.

The loaded context variables are cached so that variable files are only read on the first call of load().

Returns:

Type Description
Dict[str, Any]

A single dict containing all context variables.

Source code in dataset/processors.py
def load(self) -> Dict[str, Any]:
    """Load the configured context variables into a combined dict.

    The loaded context variables are cached so that variable files
    are only read on the first call of `load()`.

    Returns:
        A single dict containing all context variables.
    """
    if self._loaded_variables is None:
        self._loaded_variables = load_variables(self.variable_files)
        self._loaded_variables.update(self.variables)
    return self._loaded_variables

ProcessorPipeline

The Cyber Range Kyoushi Dataset processing pipeline implementation.

This class is used to configure, parse and execute the pre and post processing steps.

__init__(self, processor_map=None) special

Parameters:

Name Type Description Default
processor_map Optional[Dict[str, Any]]

Dict of processors to execute

None
Source code in dataset/processors.py
def __init__(self, processor_map: Optional[Dict[str, Any]] = None):
    """
    Args:
        processor_map: Dict of processors to execute
    """

    if processor_map is None:
        processor_map = {}

    self.processor_map: Dict[str, Any] = processor_map
    self.processor_map.update(
        {
            PrintProcessor.type_: PrintProcessor,
            TemplateProcessor.type_: TemplateProcessor,
            ForEachProcessor.type_: ForEachProcessor,
            CreateDirectoryProcessor.type_: CreateDirectoryProcessor,
            GzipProcessor.type_: GzipProcessor,
            LogstashSetupProcessor.type_: LogstashSetupProcessor,
            PcapElasticsearchProcessor.type_: PcapElasticsearchProcessor,
            ComponentTemplateCreateProcessor.type_: ComponentTemplateCreateProcessor,
            TemplateCreateProcessor.type_: TemplateCreateProcessor,
            LegacyTemplateCreateProcessor.type_: LegacyTemplateCreateProcessor,
            IngestCreateProcessor.type_: IngestCreateProcessor,
            TrimProcessor.type_: TrimProcessor,
        }
    )

execute(self, data, dataset_dir, dataset_config, parser_config, es)

Executes the processor pipeline by running all the configured processors.

Parameters:

Name Type Description Default
data List[Dict[str, Any]]

The raw processor information

required
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    data: List[Dict[str, Any]],
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
):
    """Executes the processor pipeline by running all the configured processors.

    Args:
        data: The raw processor information
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    # pre-validate the processor list
    # check if all processors have a name and type
    parse_obj_as(ProcessorList, data)

    for p in data:
        # get the processor context and class
        context = p.setdefault("context", {})
        processor_class = self.processor_map[p["type"]]

        # render the processor template and parse it
        p_rendered = processor_class.render(
            context=ProcessorContext.parse_obj(context),
            data=p,
            es=es,
        )
        processor = processor_class.parse_obj(p_rendered)

        if isinstance(processor, ProcessorContainer):
            print(f"Expanding processor container - {processor.name} ...")
            self.execute(
                processor.processors(),
                dataset_dir,
                dataset_config,
                parser_config,
                es,
            )
        else:
            print(f"Executing - {processor.name} ...")
            processor.execute(dataset_dir, dataset_config, parser_config, es)

TemplateCreateProcessor pydantic-model

Processor for configuring Elasticsearch index templates.

This processor can be used to configure Elasticsearch index templates. To prepare the Elasticsearch instance for the parsing phase. See the index templates doc for more details.

Examples:

- name: Add pcap index mapping
  type: elasticsearch.template
  template: processing/logstash/pcap-index-template.json
  template_name: pcap
  index_patterns: ["pcap-*"]

composed_of: str pydantic-field

Optional list of component templates the index template should be composed of.

context: ProcessorContext pydantic-field

The variable context for the processor

create_only: bool pydantic-field

If true then an existing template with the given name will not be replaced.

index_patterns: str pydantic-field

The index patterns the template should be applied to. If this is not set then the index template file must contain this information already!

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

name: str pydantic-field required

The processors name

priority: int pydantic-field

The priority to assign to this index template (higher values take precedent).

template: FilePath pydantic-field required

The index template to add to elasticsearch

template_name: str pydantic-field required

The name to use for the index template

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and configure Elasticsearch index template.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and configure Elasticsearch index template.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    template_data = load_file(self.template)

    # configure the index patterns
    if self.index_patterns is not None:
        template_data["index_patterns"] = (
            # if prefix is on add the prefix to all patterns
            [f"{dataset_config.name}-{p}" for p in self.index_patterns]
            if self.indices_prefix_dataset
            # else add list as is
            else self.index_patterns
        )

    # set template priority to given value
    template_data["priority"] = self.priority

    if self.composed_of is not None:
        template_data["composed_of"] = self.composed_of

    ies = IndicesClient(es)

    ies.put_index_template(
        name=self.template_name,
        body=template_data,
        create=self.create_only,
    )

TemplateProcessor pydantic-model

Processor for rendering template files.

In addition to the normal processor context it is also possible to define a template_context. If template_context is defined it will be used for rendering the template otherwise the normal processor context will be used.

Examples:

- type: template
  name: Rendering labeling rule {{ item.src }}
  template_context:
    var_files:
    attacker: processing/config/attacker/attacker.yaml
    escalate: processing/config/attacker/escalate.yaml
    foothold: processing/config/attacker/foothold.yaml
    servers: processing/config/servers.yaml
  src: "processing/templates/rules/{{ item.src }}"
  dest: "rules/{{ item.dest }}"

context: ProcessorContext pydantic-field

The variable context for the processor

dest: Path pydantic-field required

The destination to save the rendered file to

name: str pydantic-field required

The processors name

src: Path pydantic-field required

The template file to render

template_context: ProcessorContext pydantic-field

Optional template context if this is not set the processor context is used instead

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Load and render the template file.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Load and render the template file.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    if self.template_context is not None:
        variables = self.template_context.load()
    else:
        variables = self.context.load()

    variables["DATASET_DIR"] = dataset_dir
    variables["DATASET"] = dataset_config
    variables["PARSER"] = parser_config

    write_template(self.src, self.dest.absolute(), variables, es, dataset_config)

TrimProcessor pydantic-model

Processor for trimming log files to a defined time frame.

This processor can be used to remove all log lines outside of defined dataset observation times.

Note

Currently only support simple time frames with a single start and end time.

Examples:

- name: Trim server logs to observation time
  type: dataset.trim
  context:
  var_files:
    groups: processing/config/groups.yaml
  # we only want to trim the logs of servers that will be part
  # of the IDS dataset
  indices:
    - attacker_0-*

context: ProcessorContext pydantic-field

The variable context for the processor

end: datetime pydantic-field

The end time to trim the logs to (defaults to dataset end)

exclude: str pydantic-field

Indices to exclude from triming. This will overwrite/exclude indices from any patterns supplied in indices

indices: str pydantic-field

The log indices to trim (defaults to <dataset>-*)

indices_prefix_dataset: bool pydantic-field

If set to true the <DATASET.name>- is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.

name: str pydantic-field required

The processors name

start: datetime pydantic-field

The start time to trim the logs to (defaults to dataset start)

type_field: str pydantic-field required

The processor type as passed in from the config

execute(self, dataset_dir, dataset_config, parser_config, es)

Execute the processor and trim the log files.

Parameters:

Name Type Description Default
dataset_dir Path

The dataset path

required
dataset_config DatasetConfig

The dataset configuration

required
parser_config LogstashParserConfig

The dataset parser configuration

required
es Elasticsearch

The elasticsearch client object

required
Source code in dataset/processors.py
def execute(
    self,
    dataset_dir: Path,
    dataset_config: DatasetConfig,
    parser_config: LogstashParserConfig,
    es: Elasticsearch,
) -> None:
    """Execute the processor and trim the log files.

    Args:
        dataset_dir: The dataset path
        dataset_config: The dataset configuration
        parser_config: The dataset parser configuration
        es: The elasticsearch client object
    """
    if self.indices is None:
        # if not explicitly set use dataset root indices pattern
        indices = [f"{dataset_config.name}-*"]
    else:
        indices = (
            # if given and prefix flag is True add dataset name prefix
            [f"{dataset_config.name}-{ind}" for ind in self.indices]
            if self.indices_prefix_dataset
            else
            # otherwise use as is
            indices
        )

    exclude = (
        # add negative match indicator '-' and dataset name prefix
        [f"-{dataset_config.name}-{exc}" for exc in self.exclude]
        if self.indices_prefix_dataset
        # add negative match indicator only
        else [f"-{exc}" for exc in self.exclude]
    )
    start = self.start or dataset_config.start
    end = self.end or dataset_config.end
    # exclude must be after indices for negative patterns to work as expected
    index = indices + exclude

    # get documents before trim
    docs_before = {
        bucket.key.path: bucket.doc_count
        for bucket in self.get_line_stats(es, index)
    }

    remove = Search(using=es, index=index)
    # setup trim range filter
    # start >= @timestamp < end
    valid_range = Range(**{"@timestamp": {"gte": start, "lt": end}})

    # remove all elements outside of range i.e., `not` valid_range
    remove = remove.filter(~valid_range)
    # refresh="true" is important to ensure consecutive queries
    # use up to date information
    print(remove.params(refresh="true", request_cache=False).delete().to_dict())

    # lines after trim
    lines_after = self.get_line_stats(es, index)

    # trim each file
    for bucket in lines_after:
        first_line = int(bucket.min_line.value)
        last_line: Optional[int] = int(bucket.max_line.value)
        if docs_before[bucket.key.path] - (first_line - 1) == last_line:
            # if our last line is already correct we set it to None and skip truncate
            # since truncating requires us to read up to the truncate point
            last_line = None

        trim_file(bucket.key.path, first_line, last_line)
        # delete entry for this file so we later can detect if a file must be deleted completely
        del docs_before[bucket.key.path]

    # any file that still has a docs before entry
    # does not have any logs within the trim range and thus should be deleted
    for path in docs_before.keys():
        print(
            f"Removing {path} as it does not have any log lines within the observation time."
        )
        # delete the file
        Path(path).unlink()

    # update entries in elastic search
    update_lines = UpdateByQuery(using=es, index=index)
    # adjust map for shifting line numbers in the db to start at our new min line
    adjust_map = {
        bucket.key.path: int(bucket.min_line.value - 1)
        for bucket in lines_after
        # only include paths that need actual changing
        if int(bucket.min_line.value - 1) > 0
    }

    # we only have entries to update if the adjust map is non empty
    if len(adjust_map) > 0:
        # pre filter our update query to only include file paths we
        # want to update
        update_lines = update_lines.filter(
            "terms",
            log__file__path=list(adjust_map.keys()),
        )

        # ToDO might be better as async query due to threat of timeouts
        # (i.e., update_lines.to_dict() and then use low level async API)
        update_lines.script(
            lang="painless",
            # subtract matching the entries log file path
            source="ctx._source.log.file.line -= params[ctx._source.log.file.path]",
            params=adjust_map,
        ).execute()

get_doc_stats(self, es, index)

Get a list of unique log file paths.

Parameters:

Name Type Description Default
es Elasticsearch

The elasticsearch client object

required
index List[str]

The indices to get the data for

required

Returns:

Type Description
List[elasticsearch_dsl.response.aggs.Bucket]

List of log file paths.

Source code in dataset/processors.py
def get_doc_stats(self, es: Elasticsearch, index: List[str]) -> List[Bucket]:
    """Get a list of unique log file paths.

    Args:
        es: The elasticsearch client object
        index: The indices to get the data for

    Returns:
        List of log file paths.
    """
    # disable request cache to ensure we always get latest info
    search_lines = Search(using=es, index=index).params(request_cache=False)

    # setup aggregations
    search_lines.aggs.bucket(
        "files",
        "composite",
        sources=[{"path": {"terms": {"field": "log.file.path"}}}],
    )

    # use custom scan function to ensure we get all the buckets
    return scan_composite(search_lines, "files")

get_line_stats(self, es, index)

Retrieve minimum and maximum line numbers for the log files.

Parameters:

Name Type Description Default
es Elasticsearch

The elasticsearch client object

required
index List[str]

The indices to get the stats for

required

Returns:

Type Description
List[elasticsearch_dsl.response.aggs.Bucket]

List of min and max line numbers per file

Source code in dataset/processors.py
def get_line_stats(self, es: Elasticsearch, index: List[str]) -> List[Bucket]:
    """Retrieve minimum and maximum line numbers for the log files.

    Args:
        es: The elasticsearch client object
        index: The indices to get the stats for

    Returns:
        List of min and max line numbers per file
    """
    # disable request cache to ensure we always get latest info
    search_lines = Search(using=es, index=index).params(request_cache=False)

    # setup aggregations
    search_lines.aggs.bucket(
        "files",
        "composite",
        sources=[{"path": {"terms": {"field": "log.file.path"}}}],
    )
    search_lines.aggs["files"].metric("min_line", "min", field="log.file.line")
    search_lines.aggs["files"].metric("max_line", "max", field="log.file.line")
    # use custom scan function to ensure we get all the buckets
    return scan_composite(search_lines, "files")