Processors module¶
This module contains the base definitions of dataset pipeline processors and also the core processors shipped with the Cyber Range Kyoushi Dataset package.
ProcessorList
¶
Type alias for a list of processors
ComponentTemplateCreateProcessor
pydantic-model
¶
Processor for creating Elasticsearch index component templates.
This processor can be used to create Elasticsearch index component templates. To prepare the Elasticsearch instance for the parsing phase. See the index templates doc for more details.
Examples:
- name: Add pcap component template
type: elasticsearch.component_template
template: processing/logstash/pcap-component-template.json
template_name: pcap
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
create_only: bool
pydantic-field
¶
If true then an existing template with the given name will not be replaced.
name: str
pydantic-field
required
¶
The processors name
template: FilePath
pydantic-field
required
¶
The index component template to add to elasticsearch
template_name: str
pydantic-field
required
¶
The name to use for the index component template
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and configure Elasticsearch index component template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and configure Elasticsearch index component template.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
template_data = load_file(self.template)
ies = ClusterClient(es)
ies.put_component_template(
name=self.template_name,
body=template_data,
create=self.create_only,
)
CreateDirectoryProcessor
pydantic-model
¶
Processor for creating file directories.
Examples:
- name: Ensure processing config directory exists
type: mkdir
path: processing/config
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
name: str
pydantic-field
required
¶
The processors name
path: Path
pydantic-field
required
¶
The directory path to create
recursive: bool
pydantic-field
¶
If all missing parent directories should als be created
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and create the directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and create the directory.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
self.path.mkdir(parents=self.recursive, exist_ok=True)
ForEachProcessor
pydantic-model
¶
For each processor
This is a special processor container allowing for the dynamic creation of a list of processor based on a list of items.
Examples:
- name: Render labeling rules
type: foreach
# processing/templates/rules
items:
- src: 0_auth.yaml.j2
dest: 0_auth.yaml
- src: apache.yaml.j2
dest: apache.yaml
- src: audit.yaml.j2
dest: audit.yaml
- src: openvpn.yaml.j2
dest: openvpn.yaml
processor:
type: template
name: Rendering labeling rule {{ item.src }}
template_context:
var_files:
attacker: processing/config/attacker/attacker.yaml
escalate: processing/config/attacker/escalate.yaml
foothold: processing/config/attacker/foothold.yaml
servers: processing/config/servers.yaml
src: "processing/templates/rules/{{ item.src }}"
dest: "rules/{{ item.dest }}"
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
items: Any
pydantic-field
required
¶
List of items to create processors for
loop_var: str
pydantic-field
¶
The variable name to use for current loops item in the processor context
name: str
pydantic-field
required
¶
The processors name
processor: Any
pydantic-field
required
¶
The processor template config to create multiple instances of
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Executes the processor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
):
"""Executes the processor.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
raise NotImplementedError("Incomplete processor implementation!")
processors(self)
¶
Create a list of processors for each item.
Returns:
Type | Description |
---|---|
List[Dict[str, Any]] |
List of processors based on the given items and processor template. |
Source code in dataset/processors.py
def processors(self) -> List[Dict[str, Any]]:
"""Create a list of processors for each item.
Returns:
List of processors based on the given items and
processor template.
"""
processors = []
for item in self.items:
processor = copy.deepcopy(self.processor)
# set the loop var
if "context" in processor:
context = processor["context"]
else:
context = self.context.dict()
processor["context"] = context
variables = context.setdefault("vars", {})
variables[self.loop_var] = item
processors.append(processor)
return processors
GzipProcessor
pydantic-model
¶
Processor for decompressing gzip files.
It is possible to either define a glob
of gzip files
or a path
to a single gzip file. If a glob
is defined
it is resolved relative to the defined path
(default=<dataset dir>
).
Examples:
- name: Decompress all GZIP logs
type: gzip
path: gather
glob: "*/logs/**/*.gz"
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
glob: str
pydantic-field
¶
The file glob expression to use
name: str
pydantic-field
required
¶
The processors name
path: Path
pydantic-field
¶
The base path to search for the gzipped files.
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and decompress the files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and decompress the files.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
files: Iterable
if self.glob is None:
files = [self.path]
else:
files = self.path.glob(self.glob)
for gzip_file in files:
with gzip.open(gzip_file, "rb") as f_in:
# with suffix replaces .gz ending
with open(gzip_file.with_suffix(""), "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
# delete the gzip file
gzip_file.unlink()
IngestCreateProcessor
pydantic-model
¶
Processor for creating Elasticsearch ingest pipelines.
This processor can be used to create Elasticsearch ingest pipelines for parsing log event. The log file parsing can then be configured to use the pipelines for upstream parsing instead of local Logstash parsing.
Examples:
- name: Add auditd ingest pipeline to elasticsearch
type: elasticsearch.ingest
ingest_pipeline: processing/logstash/auditd-ingest.yml
ingest_pipeline_id: auditd-logs
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
ingest_pipeline: FilePath
pydantic-field
required
¶
The ingest pipeline to add to elasticsearch
ingest_pipeline_id: str
pydantic-field
required
¶
The id to use for the ingest pipeline
name: str
pydantic-field
required
¶
The processors name
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and configure Elasticsearch ingest pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and configure Elasticsearch ingest pipeline.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
pipeline_data = load_file(self.ingest_pipeline)
ies = IngestClient(es)
ies.put_pipeline(id=self.ingest_pipeline_id, body=pipeline_data)
LegacyTemplateCreateProcessor
pydantic-model
¶
Processor for configuring Elasticsearch legacy index templates.
This processor can be used to configure Elasticsearch index templates. To prepare the Elasticsearch instance for the parsing phase. See the legacy index templates doc for more details.
Examples:
- name: Add pcap index mapping
type: elasticsearch.legacy_template
template: processing/logstash/pcap-index-template.json
template_name: pcap
index_patterns: ["pcap-*"]
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
create_only: bool
pydantic-field
¶
If true then an existing template with the given name will not be replaced.
index_patterns: str
pydantic-field
¶
The index patterns the template should be applied to. If this is not set then the index template file must contain this information already!
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
name: str
pydantic-field
required
¶
The processors name
order: int
pydantic-field
¶
The order to assign to this index template (higher values take precedent).
template: FilePath
pydantic-field
required
¶
The index template to add to elasticsearch
template_name: str
pydantic-field
required
¶
The name to use for the index template
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and configure Elasticsearch index template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and configure Elasticsearch index template.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
template_data = load_file(self.template)
# configure the index patterns
if self.index_patterns is not None:
template_data["index_patterns"] = (
# if prefix is on add the prefix to all patterns
[f"{dataset_config.name}-{p}" for p in self.index_patterns]
if self.indices_prefix_dataset
# else add list as is
else self.index_patterns
)
ies = IndicesClient(es)
ies.put_template(
name=self.template_name,
body=template_data,
create=self.create_only,
order=self.order,
)
LogstashSetupProcessor
pydantic-model
¶
Logstash parser setup processor.
This processor is used to create all the configuration files required for the Logstash parser (e.g., input and filter configs). Unless you provide a static Logstash parsing configuration you must invoke this processor at somepoint during the pre-processing phase.
Note
The processor only does the basic setup any Logstash parsing filters used for processing specific log events must be prepared separately.
Examples:
- name: Setup logstash pipeline
type: logstash.setup
context:
var_files:
servers: processing/config/servers.yaml
servers: "{{ servers }}"
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
index_template_template: Path
pydantic-field
¶
The template to use for the elasticsearch dataset index patterns index template
input_config_name: str
pydantic-field
¶
The name of the log inputs config file. (relative to the pipeline config dir)
input_template: Path
pydantic-field
¶
The template to use for the file input plugin configuration
legacy_index_template_template: Path
pydantic-field
¶
The template to use for the elasticsearch dataset legacy index patterns index template
logstash_template: Path
pydantic-field
¶
The template to use for the logstash configuration
name: str
pydantic-field
required
¶
The processors name
output_config_name: str
pydantic-field
¶
The name of the log outputs config file. (relative to the pipeline config dir)
output_template: Path
pydantic-field
¶
The template to use for the file output plugin configuration
piplines_template: Path
pydantic-field
¶
The template to use for the logstash pipelines configuration
pre_process_name: str
pydantic-field
¶
The file name to use for the pre process filters config. This is prefixed with 0000_ to ensure that the filters are run first.
pre_process_template: Path
pydantic-field
¶
The template to use for the file output plugin configuration
servers: Any
pydantic-field
required
¶
Dictionary of servers and their log configurations
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
use_legacy_template: bool
pydantic-field
¶
If the output config should use the legacy index template or the modern index template
default_server_timezone(v)
classmethod
¶
Validate that each log config entry has a timezone or set default.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
Dict[str, Any] |
A single log configuration element |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The validated and parsed log configuration element |
Source code in dataset/processors.py
@validator("servers", each_item=True)
def default_server_timezone(cls, v: Dict[str, Any]) -> Dict[str, Any]:
"""Validate that each log config entry has a timezone or set default.
Args:
v: A single log configuration element
Returns:
The validated and parsed log configuration element
"""
if "timezone" not in v:
v["timezone"] = "UTC"
return v
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and configure Logstash
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and configure Logstash
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
variables = self.context.load()
variables.update(
{
"DATASET_DIR": dataset_dir,
"DATASET": dataset_config,
"PARSER": parser_config,
"servers": self.servers,
"USE_LEGACY_TEMPLATE": self.use_legacy_template,
}
)
# add elasticsearch connection variables
variables.update(get_transport_variables(es))
# create all logstash directories
create_dirs(
[
parser_config.settings_dir,
parser_config.conf_dir,
parser_config.data_dir,
parser_config.log_dir,
]
)
# copy jvm and log4j config to settings dir if they don't exist
copy_package_file(
"cr_kyoushi.dataset.files",
"jvm.options",
parser_config.settings_dir.joinpath("jvm.options"),
)
copy_package_file(
"cr_kyoushi.dataset.files",
"log4j2.properties",
parser_config.settings_dir.joinpath("log4j2.properties"),
)
# write logstash configuration
write_template(
self.logstash_template,
parser_config.settings_dir.joinpath("logstash.yml"),
variables,
es,
)
# write pipelines configuration
write_template(
self.piplines_template,
parser_config.settings_dir.joinpath("pipelines.yml"),
variables,
es,
)
# write index template
write_template(
self.index_template_template,
parser_config.settings_dir.joinpath(
f"{dataset_config.name}-index-template.json"
),
variables,
es,
)
# write legacy index template
write_template(
self.legacy_index_template_template,
parser_config.settings_dir.joinpath(
f"{dataset_config.name}-legacy-index-template.json"
),
variables,
es,
)
# write input configuration
write_template(
self.input_template,
parser_config.conf_dir.joinpath(self.input_config_name),
variables,
es,
)
# write output configuration
write_template(
self.output_template,
parser_config.conf_dir.joinpath(self.output_config_name),
variables,
es,
)
# write pre process configuration
write_template(
self.pre_process_template,
parser_config.conf_dir.joinpath(self.pre_process_name),
variables,
es,
)
validate_servers(v)
classmethod
¶
Validate the server logs configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
Dict[str, Any] |
A single log configuration element |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The validated and parsed log configuration element |
Source code in dataset/processors.py
@validator("servers", each_item=True)
def validate_servers(cls, v: Dict[str, Any]) -> Dict[str, Any]:
"""Validate the server logs configuration.
Args:
v: A single log configuration element
Returns:
The validated and parsed log configuration element
"""
assert "logs" in v, "Each server must have a logs configuration"
v["logs"] = parse_obj_as(List[LogstashLogConfig], v["logs"])
return v
PcapElasticsearchProcessor
pydantic-model
¶
Processor for converting PCAP files to ndjson format.
This processor uses tshark to convert PCAP files to
a line based JSON format (ek
output).
Examples:
- name: Convert attacker pcap to elasticsearch json
type: pcap.elasticsearch
pcap: gather/attacker_0/logs/ait.aecid.attacker.wpdiscuz/traffic.pcap
dest: gather/attacker_0/logs/ait.aecid.attacker.wpdiscuz/traffic.json
tls_keylog: gather/attacker_0/logs/ait.aecid.attacker.wpdiscuz/premaster.txt
read_filter: "tcp or udp or icmp"
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
create_destination_dirs: bool
pydantic-field
¶
If the processor should create missing destination parent directories
dest: Path
pydantic-field
required
¶
The destination file
force: bool
pydantic-field
¶
If the pcap should be created even when the destination file already exists.
name: str
pydantic-field
required
¶
The processors name
packet_details: bool
pydantic-field
¶
If the packet details should be included, when packet_summary=False then details are always included (-V option).
packet_summary: bool
pydantic-field
¶
If the packet summaries should be included (-P option).
pcap: FilePath
pydantic-field
required
¶
The pcap file to convert
protocol_match_filter: str
pydantic-field
¶
Display filter for protocols and their fields (-J option).Parent and child nodes are included for all matches lower level protocols must be added explicitly.
protocol_match_filter_parent: str
pydantic-field
¶
Display filter for protocols and their fields. Only partent nodes are included (-j option).
read_filter: str
pydantic-field
¶
The read filter to use when reading the pcap file useful to reduce the number of packets (-Y option)
remove_filtered: bool
pydantic-field
¶
Remove filtered fields from the event dicts.
remove_index_messages: bool
pydantic-field
¶
If the elasticsearch bulk API index messages should be stripped from the output file. Useful when using logstash or similar instead of the bulk API.
tls_keylog: FilePath
pydantic-field
¶
TLS keylog file to decrypt TLS on the fly.
tshark_bin: FilePath
pydantic-field
¶
Path to your tshark binary (searches in common paths if not supplied)
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and convert the pcap file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and convert the pcap file.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
if self.create_destination_dirs:
# create destination parent directory if it does not exist
self.dest.parent.mkdir(parents=True, exist_ok=True)
if self.force or not self.dest.exists():
# convert the file
convert_pcap_to_ecs(
self.pcap,
self.dest,
self.tls_keylog,
self.tshark_bin,
self.remove_index_messages,
self.remove_filtered,
self.packet_summary,
self.packet_details,
self.read_filter,
self.protocol_match_filter,
self.protocol_match_filter_parent,
)
PrintProcessor
pydantic-model
¶
Debug processor that simply prints a message.
Examples:
- name: Print Hello World
type: print
msg: Hello World
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
msg: str
pydantic-field
required
¶
The message to print
name: str
pydantic-field
required
¶
The processors name
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Print the msg
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Print the `msg`
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
print(self.msg)
Processor
¶
Cyber Range Kyoushi Dataset processor interface
For the Cyber Range Kyoushi Dataset tool processors are used during the pre and post processing phase. A processor class exposes configuration variables and is executed to achieve a certain task during the processing phases. They work similar to Ansible modules and it is possible to use Jinja2 templates and context variable to define partial configuration.
Examples:
- name: Render foo template
type: template
context:
vars:
foo: bar
var_files: gather/server/foo-bar.yaml
src: processing/templates/foo.yaml.j2
dest: processing/bar.yaml.j2
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Executes the processor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Executes the processor.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
render(context, data)
classmethod
¶
Renders all template strings of a processor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ProcessorContext |
The processors context variables |
required |
data |
Dict[str, Any] |
The raw templated processor configuration. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The rendered processor configuration. |
Source code in dataset/processors.py
@classmethod
def render(cls, context: ProcessorContext, data: Dict[str, Any]) -> Dict[str, Any]:
"""Renders all template strings of a processor.
Args:
context: The processors context variables
data: The raw templated processor configuration.
Returns:
The rendered processor configuration.
"""
...
ProcessorBase
pydantic-model
¶
Pydantic base model of Cyber Range Kyoushi Dataset processor.
Use this base model to implement processors it ensures that rendering and data loading is done correctly.
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
name: str
pydantic-field
required
¶
The processors name
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Executes the processor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
):
"""Executes the processor.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
raise NotImplementedError("Incomplete processor implementation!")
render(context, data, es)
classmethod
¶
Renders all template strings of a processor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
ProcessorContext |
The processors context variables |
required |
data |
Dict[str, Any] |
The raw templated processor configuration. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The rendered processor configuration. |
Source code in dataset/processors.py
@classmethod
def render(
cls,
context: ProcessorContext,
data: Dict[str, Any],
es: Elasticsearch,
) -> Dict[str, Any]:
"""Renders all template strings of a processor.
Args:
context: The processors context variables
data: The raw templated processor configuration.
Returns:
The rendered processor configuration.
"""
# handle main dict
data_rendered = {}
for key, val in data.items():
# do not render excluded fields
if key not in cls.context_render_exclude:
data_rendered[key] = cls._render(context, val, es)
else:
data_rendered[key] = val
return data_rendered
ProcessorContainer
¶
Cyber Range Kyoushi Dataset processor container interface
This interface definition defines the API used for classes implementing processor container types.
execute(self, dataset_dir, dataset_config, parser_config, es)
inherited
¶
Executes the processor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Executes the processor.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
processors(self)
¶
Returns a list of processors contained in this container.
Returns:
Type | Description |
---|---|
List[Dict[str, Any]] |
List of processors |
Source code in dataset/processors.py
def processors(self) -> List[Dict[str, Any]]:
"""Returns a list of processors contained in this container.
Returns:
List of processors
"""
...
ProcessorContext
pydantic-model
¶
Processor context model
This model is used to configure the variable context
used for rendering a processor. It is possible to either
define variables directly (variables
) or load them from
variable files (variable_files
).
variable_files: Union[pathlib.Path, Dict[str, pathlib.Path]]
pydantic-field
¶
Config files to load into the render context
variables: Any
pydantic-field
¶
Context variables to use during rendering
load(self)
¶
Load the configured context variables into a combined dict.
The loaded context variables are cached so that variable files
are only read on the first call of load()
.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A single dict containing all context variables. |
Source code in dataset/processors.py
def load(self) -> Dict[str, Any]:
"""Load the configured context variables into a combined dict.
The loaded context variables are cached so that variable files
are only read on the first call of `load()`.
Returns:
A single dict containing all context variables.
"""
if self._loaded_variables is None:
self._loaded_variables = load_variables(self.variable_files)
self._loaded_variables.update(self.variables)
return self._loaded_variables
ProcessorPipeline
¶
The Cyber Range Kyoushi Dataset processing pipeline implementation.
This class is used to configure, parse and execute the pre and post processing steps.
__init__(self, processor_map=None)
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
processor_map |
Optional[Dict[str, Any]] |
Dict of processors to execute |
None |
Source code in dataset/processors.py
def __init__(self, processor_map: Optional[Dict[str, Any]] = None):
"""
Args:
processor_map: Dict of processors to execute
"""
if processor_map is None:
processor_map = {}
self.processor_map: Dict[str, Any] = processor_map
self.processor_map.update(
{
PrintProcessor.type_: PrintProcessor,
TemplateProcessor.type_: TemplateProcessor,
ForEachProcessor.type_: ForEachProcessor,
CreateDirectoryProcessor.type_: CreateDirectoryProcessor,
GzipProcessor.type_: GzipProcessor,
LogstashSetupProcessor.type_: LogstashSetupProcessor,
PcapElasticsearchProcessor.type_: PcapElasticsearchProcessor,
ComponentTemplateCreateProcessor.type_: ComponentTemplateCreateProcessor,
TemplateCreateProcessor.type_: TemplateCreateProcessor,
LegacyTemplateCreateProcessor.type_: LegacyTemplateCreateProcessor,
IngestCreateProcessor.type_: IngestCreateProcessor,
TrimProcessor.type_: TrimProcessor,
}
)
execute(self, data, dataset_dir, dataset_config, parser_config, es)
¶
Executes the processor pipeline by running all the configured processors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
List[Dict[str, Any]] |
The raw processor information |
required |
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
data: List[Dict[str, Any]],
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
):
"""Executes the processor pipeline by running all the configured processors.
Args:
data: The raw processor information
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
# pre-validate the processor list
# check if all processors have a name and type
parse_obj_as(ProcessorList, data)
for p in data:
# get the processor context and class
context = p.setdefault("context", {})
processor_class = self.processor_map[p["type"]]
# render the processor template and parse it
p_rendered = processor_class.render(
context=ProcessorContext.parse_obj(context),
data=p,
es=es,
)
processor = processor_class.parse_obj(p_rendered)
if isinstance(processor, ProcessorContainer):
print(f"Expanding processor container - {processor.name} ...")
self.execute(
processor.processors(),
dataset_dir,
dataset_config,
parser_config,
es,
)
else:
print(f"Executing - {processor.name} ...")
processor.execute(dataset_dir, dataset_config, parser_config, es)
TemplateCreateProcessor
pydantic-model
¶
Processor for configuring Elasticsearch index templates.
This processor can be used to configure Elasticsearch index templates. To prepare the Elasticsearch instance for the parsing phase. See the index templates doc for more details.
Examples:
- name: Add pcap index mapping
type: elasticsearch.template
template: processing/logstash/pcap-index-template.json
template_name: pcap
index_patterns: ["pcap-*"]
composed_of: str
pydantic-field
¶
Optional list of component templates the index template should be composed of.
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
create_only: bool
pydantic-field
¶
If true then an existing template with the given name will not be replaced.
index_patterns: str
pydantic-field
¶
The index patterns the template should be applied to. If this is not set then the index template file must contain this information already!
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
name: str
pydantic-field
required
¶
The processors name
priority: int
pydantic-field
¶
The priority to assign to this index template (higher values take precedent).
template: FilePath
pydantic-field
required
¶
The index template to add to elasticsearch
template_name: str
pydantic-field
required
¶
The name to use for the index template
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and configure Elasticsearch index template.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and configure Elasticsearch index template.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
template_data = load_file(self.template)
# configure the index patterns
if self.index_patterns is not None:
template_data["index_patterns"] = (
# if prefix is on add the prefix to all patterns
[f"{dataset_config.name}-{p}" for p in self.index_patterns]
if self.indices_prefix_dataset
# else add list as is
else self.index_patterns
)
# set template priority to given value
template_data["priority"] = self.priority
if self.composed_of is not None:
template_data["composed_of"] = self.composed_of
ies = IndicesClient(es)
ies.put_index_template(
name=self.template_name,
body=template_data,
create=self.create_only,
)
TemplateProcessor
pydantic-model
¶
Processor for rendering template files.
In addition to the normal processor context it
is also possible to define a template_context
.
If template_context
is defined it will be used for
rendering the template otherwise the normal processor
context will be used.
Examples:
- type: template
name: Rendering labeling rule {{ item.src }}
template_context:
var_files:
attacker: processing/config/attacker/attacker.yaml
escalate: processing/config/attacker/escalate.yaml
foothold: processing/config/attacker/foothold.yaml
servers: processing/config/servers.yaml
src: "processing/templates/rules/{{ item.src }}"
dest: "rules/{{ item.dest }}"
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
dest: Path
pydantic-field
required
¶
The destination to save the rendered file to
name: str
pydantic-field
required
¶
The processors name
src: Path
pydantic-field
required
¶
The template file to render
template_context: ProcessorContext
pydantic-field
¶
Optional template context if this is not set the processor context is used instead
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Load and render the template file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Load and render the template file.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
if self.template_context is not None:
variables = self.template_context.load()
else:
variables = self.context.load()
variables["DATASET_DIR"] = dataset_dir
variables["DATASET"] = dataset_config
variables["PARSER"] = parser_config
write_template(self.src, self.dest.absolute(), variables, es, dataset_config)
TrimProcessor
pydantic-model
¶
Processor for trimming log files to a defined time frame.
This processor can be used to remove all log lines outside of defined dataset observation times.
Note
Currently only support simple time frames with a single start and end time.
Examples:
- name: Trim server logs to observation time
type: dataset.trim
context:
var_files:
groups: processing/config/groups.yaml
# we only want to trim the logs of servers that will be part
# of the IDS dataset
indices:
- attacker_0-*
context: ProcessorContext
pydantic-field
¶
The variable context for the processor
end: datetime
pydantic-field
¶
The end time to trim the logs to (defaults to dataset end)
exclude: str
pydantic-field
¶
Indices to exclude from triming. This will overwrite/exclude indices from any patterns supplied in indices
indices: str
pydantic-field
¶
The log indices to trim (defaults to <dataset>-*
)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
name: str
pydantic-field
required
¶
The processors name
start: datetime
pydantic-field
¶
The start time to trim the logs to (defaults to dataset start)
type_field: str
pydantic-field
required
¶
The processor type as passed in from the config
execute(self, dataset_dir, dataset_config, parser_config, es)
¶
Execute the processor and trim the log files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
parser_config |
LogstashParserConfig |
The dataset parser configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Source code in dataset/processors.py
def execute(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
parser_config: LogstashParserConfig,
es: Elasticsearch,
) -> None:
"""Execute the processor and trim the log files.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
parser_config: The dataset parser configuration
es: The elasticsearch client object
"""
if self.indices is None:
# if not explicitly set use dataset root indices pattern
indices = [f"{dataset_config.name}-*"]
else:
indices = (
# if given and prefix flag is True add dataset name prefix
[f"{dataset_config.name}-{ind}" for ind in self.indices]
if self.indices_prefix_dataset
else
# otherwise use as is
indices
)
exclude = (
# add negative match indicator '-' and dataset name prefix
[f"-{dataset_config.name}-{exc}" for exc in self.exclude]
if self.indices_prefix_dataset
# add negative match indicator only
else [f"-{exc}" for exc in self.exclude]
)
start = self.start or dataset_config.start
end = self.end or dataset_config.end
# exclude must be after indices for negative patterns to work as expected
index = indices + exclude
# get documents before trim
docs_before = {
bucket.key.path: bucket.doc_count
for bucket in self.get_line_stats(es, index)
}
remove = Search(using=es, index=index)
# setup trim range filter
# start >= @timestamp < end
valid_range = Range(**{"@timestamp": {"gte": start, "lt": end}})
# remove all elements outside of range i.e., `not` valid_range
remove = remove.filter(~valid_range)
# refresh="true" is important to ensure consecutive queries
# use up to date information
print(remove.params(refresh="true", request_cache=False).delete().to_dict())
# lines after trim
lines_after = self.get_line_stats(es, index)
# trim each file
for bucket in lines_after:
first_line = int(bucket.min_line.value)
last_line: Optional[int] = int(bucket.max_line.value)
if docs_before[bucket.key.path] - (first_line - 1) == last_line:
# if our last line is already correct we set it to None and skip truncate
# since truncating requires us to read up to the truncate point
last_line = None
trim_file(bucket.key.path, first_line, last_line)
# delete entry for this file so we later can detect if a file must be deleted completely
del docs_before[bucket.key.path]
# any file that still has a docs before entry
# does not have any logs within the trim range and thus should be deleted
for path in docs_before.keys():
print(
f"Removing {path} as it does not have any log lines within the observation time."
)
# delete the file
Path(path).unlink()
# update entries in elastic search
update_lines = UpdateByQuery(using=es, index=index)
# adjust map for shifting line numbers in the db to start at our new min line
adjust_map = {
bucket.key.path: int(bucket.min_line.value - 1)
for bucket in lines_after
# only include paths that need actual changing
if int(bucket.min_line.value - 1) > 0
}
# we only have entries to update if the adjust map is non empty
if len(adjust_map) > 0:
# pre filter our update query to only include file paths we
# want to update
update_lines = update_lines.filter(
"terms",
log__file__path=list(adjust_map.keys()),
)
# ToDO might be better as async query due to threat of timeouts
# (i.e., update_lines.to_dict() and then use low level async API)
update_lines.script(
lang="painless",
# subtract matching the entries log file path
source="ctx._source.log.file.line -= params[ctx._source.log.file.path]",
params=adjust_map,
).execute()
get_doc_stats(self, es, index)
¶
Get a list of unique log file paths.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
index |
List[str] |
The indices to get the data for |
required |
Returns:
Type | Description |
---|---|
List[elasticsearch_dsl.response.aggs.Bucket] |
List of log file paths. |
Source code in dataset/processors.py
def get_doc_stats(self, es: Elasticsearch, index: List[str]) -> List[Bucket]:
"""Get a list of unique log file paths.
Args:
es: The elasticsearch client object
index: The indices to get the data for
Returns:
List of log file paths.
"""
# disable request cache to ensure we always get latest info
search_lines = Search(using=es, index=index).params(request_cache=False)
# setup aggregations
search_lines.aggs.bucket(
"files",
"composite",
sources=[{"path": {"terms": {"field": "log.file.path"}}}],
)
# use custom scan function to ensure we get all the buckets
return scan_composite(search_lines, "files")
get_line_stats(self, es, index)
¶
Retrieve minimum and maximum line numbers for the log files.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
index |
List[str] |
The indices to get the stats for |
required |
Returns:
Type | Description |
---|---|
List[elasticsearch_dsl.response.aggs.Bucket] |
List of min and max line numbers per file |
Source code in dataset/processors.py
def get_line_stats(self, es: Elasticsearch, index: List[str]) -> List[Bucket]:
"""Retrieve minimum and maximum line numbers for the log files.
Args:
es: The elasticsearch client object
index: The indices to get the stats for
Returns:
List of min and max line numbers per file
"""
# disable request cache to ensure we always get latest info
search_lines = Search(using=es, index=index).params(request_cache=False)
# setup aggregations
search_lines.aggs.bucket(
"files",
"composite",
sources=[{"path": {"terms": {"field": "log.file.path"}}}],
)
search_lines.aggs["files"].metric("min_line", "min", field="log.file.line")
search_lines.aggs["files"].metric("max_line", "max", field="log.file.line")
# use custom scan function to ensure we get all the buckets
return scan_composite(search_lines, "files")