Labels module¶
This module contains labeling rule implementations and utility functions used during the labeling process
EqlQueryBase
pydantic-model
¶
Pydantic model representing an EQL query
batch_size: int
pydantic-field
¶
The amount of sequences to update with each batch. Cannot be bigger than max_result_window
by: Union[List[str], str]
pydantic-field
¶
Optional global sequence by fields
event_category_field: str
pydantic-field
¶
The field used to categories events
filter_: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
The filter/s to limit queried to documents to only those that match the filters
index: Union[List[str], str]
pydantic-field
¶
The indices to query (by default prefixed with the dataset name)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
max_result_window: int
pydantic-field
¶
The max result window allowed on the elasticsearch instance
max_span: str
pydantic-field
¶
Optional max time span in which a sequence must occur to be considered a match
sequences: str
pydantic-field
required
¶
Event sequences to search. Must contain at least two events.
tiebreaker_field: str
pydantic-field
¶
(Optional, string) Field used to sort hits with the same timestamp in ascending order.
timestamp_field: str
pydantic-field
¶
The field containing the event timestamp
until: str
pydantic-field
¶
Optional until event marking the end of valid sequences. The until event will not be labeled.
query(self)
¶
Converts the EQL query object into an EQL query string.
Returns:
Type | Description |
---|---|
str |
The query in EQL syntax |
Source code in dataset/labels.py
def query(self) -> str:
"""Converts the EQL query object into an EQL query string.
Returns:
The query in EQL syntax
"""
query = "sequence"
if self.by is not None:
if isinstance(self.by, Text):
query += f" by {self.by}"
elif len(self.by) > 0:
query += f" by {', '.join(self.by)}"
if self.max_span is not None:
query += f" with maxspan={self.max_span}"
for sequence in self.sequences:
query += f"\n {sequence}"
if self.until is not None:
query += f"\nuntil {self.until}"
return query
EqlSequenceRule
pydantic-model
¶
Applies labels to a sequence of log events defined by an EQL query.
This labeling rule is defined as an EQL query. Using this syntax it is possible to define a sequence of related events and retrieve them. All events part of retrieved sequences are then labeled.
Examples:
- type: elasticsearch.sequence
id: attacker.webshell.upload.seq
labels: [webshell_upload]
description: >-
This rule labels the web shell upload step by matching the 3 step sequence
within the foothold phase.
index:
- apache_access-intranet_server
# since we do these requests very fast
# we need the line number as tie breaker
tiebreaker_field: log.file.line
by: source.address
max_span: 2m
filter:
range:
"@timestamp":
# foothold phase start
gte: "2021-03-23 20:31:00+00:00"
# foothold phase stop
lte: "2021-03-23 21:13:52+00:00"
sequences:
- '[ apache where event.action == "access" and url.original == "/" ]'
- '[ apache where event.action == "access" and url.original == "/?p=5" ]'
- '[ apache where event.action == "access" and http.request.method == "POST" and url.original == "/wp-admin/admin-ajax.php" ]'
batch_size: int
pydantic-field
¶
The amount of sequences to update with each batch. Cannot be bigger than max_result_window
by: Union[List[str], str]
pydantic-field
¶
Optional global sequence by fields
description: str
pydantic-field
¶
An optional description for the rule
event_category_field: str
pydantic-field
¶
The field used to categories events
filter_: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
The filter/s to limit queried to documents to only those that match the filters
id_: str
pydantic-field
required
¶
The unique rule id
index: Union[List[str], str]
pydantic-field
¶
The indices to query (by default prefixed with the dataset name)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
labels: str
pydantic-field
required
¶
The list of labels to apply to log lines matching this rule
max_result_window: int
pydantic-field
¶
The max result window allowed on the elasticsearch instance
max_span: str
pydantic-field
¶
Optional max time span in which a sequence must occur to be considered a match
sequences: str
pydantic-field
required
¶
Event sequences to search. Must contain at least two events.
tiebreaker_field: str
pydantic-field
¶
(Optional, string) Field used to sort hits with the same timestamp in ascending order.
timestamp_field: str
pydantic-field
¶
The field containing the event timestamp
type_field: str
pydantic-field
required
¶
The rule type as passed in from the config
until: str
pydantic-field
¶
Optional until event marking the end of valid sequences. The until event will not be labeled.
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the labels to all events part of retrieved sequences.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
The number of rows the labels were applied to. |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the labels to all events part of retrieved sequences.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
The number of rows the labels were applied to.
"""
index: Optional[Union[Sequence[str], str]] = resolve_indices(
dataset_config.name, self.indices_prefix_dataset, self.index
)
body = self._make_body(label_object)
updated = 0
# as of elk 7.12 of there is no way to ensure we get all even through the EQL api
# (there is no scan or search after like for the DSL API)
# so manually search in batches for sequences with events that are not labeled yet
# we stop only when we do not get any results anymore i.e., all events have been labeled
# this is obviously not the most efficient approach but its the best we can do for now
while True:
hits = search_eql(es, index, body)
if hits["total"]["value"] > 0:
index_ids: Dict[str, List[str]] = {}
# we have to sort the events by indices
# because ids are only guranteed to be uniq per index
for sequence in hits["sequences"]:
for event in sequence["events"]:
index_ids.setdefault(event["_index"], []).append(event["_id"])
# add labels to each event per index
for _index, ids in index_ids.items():
# split the update requests into chunks of at most max result window
update_chunks = [
ids[i : i + self.max_result_window]
for i in range(0, len(ids), self.max_result_window)
]
for chunk in update_chunks:
update = UpdateByQuery(using=es, index=_index).query(
"ids", values=chunk
)
# apply labels to events
updated += apply_labels_by_update_dsl(
update, self.update_params(), update_script_id
)
else:
# end loop once we do not find new events anymore
break
return updated
query(self)
inherited
¶
Converts the EQL query object into an EQL query string.
Returns:
Type | Description |
---|---|
str |
The query in EQL syntax |
Source code in dataset/labels.py
def query(self) -> str:
"""Converts the EQL query object into an EQL query string.
Returns:
The query in EQL syntax
"""
query = "sequence"
if self.by is not None:
if isinstance(self.by, Text):
query += f" by {self.by}"
elif len(self.by) > 0:
query += f" by {', '.join(self.by)}"
if self.max_span is not None:
query += f" with maxspan={self.max_span}"
for sequence in self.sequences:
query += f"\n {sequence}"
if self.until is not None:
query += f"\nuntil {self.until}"
return query
update_params(self)
inherited
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
return {
"rule": self.id_,
"labels": self.labels,
}
LabelException
¶
Generic exception for errors during labeling phase.
Labeler
¶
Cyber Range Kyoushi labeler
This class is used to configure and execute the labeling process.
__init__(self, rule_types={}, update_script_id='kyoushi_label_update', label_object='kyoushi_labels')
special
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rule_types |
Dict[str, Any] |
Dictionary containing all available labeling rule types. |
{} |
update_script_id |
str |
The Elasticsearch ID for the update script. |
'kyoushi_label_update' |
label_object |
str |
The field to store labels in. |
'kyoushi_labels' |
Source code in dataset/labels.py
def __init__(
self,
rule_types: Dict[str, Any] = {},
update_script_id: str = "kyoushi_label_update",
label_object: str = "kyoushi_labels",
):
"""
Args:
rule_types : Dictionary containing all available labeling rule types.
update_script_id: The Elasticsearch ID for the update script.
label_object: The field to store labels in.
"""
self.rule_types: Dict[str, Any] = rule_types
# default rule types
self.rule_types.update(
{
NoopRule.type_: NoopRule,
UpdateByQueryRule.type_: UpdateByQueryRule,
UpdateSubQueryRule.type_: UpdateSubQueryRule,
UpdateParentQueryRule.type_: UpdateParentQueryRule,
EqlSequenceRule.type_: EqlSequenceRule,
}
)
self.update_script_id: str = update_script_id
self.label_object: str = label_object
add_label_object_mapping(self, es, dataset_name, rules)
¶
Utility function for adding the field definitions for the label field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object. |
required |
dataset_name |
str |
The name of the dataset. |
required |
rules |
List[cr_kyoushi.dataset.labels.Rule] |
List of all to be applied rules. |
required |
Source code in dataset/labels.py
def add_label_object_mapping(
self,
es: Elasticsearch,
dataset_name: str,
rules: List[Rule],
):
"""Utility function for adding the field definitions for the label field.
Args:
es: The elasticsearch client object.
dataset_name: The name of the dataset.
rules: List of all to be applied rules.
"""
root = Mapping()
flat = {}
list_ = {}
for rule in rules:
flat[rule.id_] = Keyword()
list_[rule.id_] = Keyword(multi=True)
properties = {
"flat": {
"properties": flat,
},
"list": {
"properties": list_,
},
"rules": {"type": "keyword"},
}
root.field(self.label_object, "object", properties=properties)
es.indices.put_mapping(index=f"{dataset_name}-*", body=root.to_dict())
execute(self, rules, dataset_dir, dataset_config, es)
¶
Parse and apply all labeling rules.
Note
This function only write labels to the database.
The resulting labels can be written to the file system
using the write(...)
function of the labeler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rules |
List[Dict[str, Any]] |
The unparsed list of labeling rules. |
required |
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Exceptions:
Type | Description |
---|---|
ValidationError |
If a labeling rule or configuration is invalid |
LabelException |
If an error occurs while applying a labeling rule |
Source code in dataset/labels.py
def execute(
self,
rules: List[Dict[str, Any]],
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
):
"""Parse and apply all labeling rules.
!!! Note
This function only write labels to the database.
The resulting labels can be written to the file system
using the `write(...)` function of the labeler.
Args:
rules: The unparsed list of labeling rules.
dataset_dir: The dataset path
dataset_config: The dataset configuration
es: The elasticsearch client object
Raises:
ValidationError: If a labeling rule or configuration is invalid
LabelException: If an error occurs while applying a labeling rule
"""
# validate the general rule list
RuleList.parse_obj(rules)
# convert and validate rule types
rule_objects: List[Rule] = []
errors: List[ErrorWrapper] = []
for r in rules:
try:
rule_objects.append(self.rule_types[r["type"]].parse_obj(r))
except ValidationError as e:
errors.append(ErrorWrapper(e, r["id"]))
if len(errors) > 0:
raise ValidationError(errors, RuleList)
# create mappings for rule label fields
# we need to do this since EQL queries cannot check existence of non mapped fields
self.add_label_object_mapping(es, dataset_config.name, rule_objects)
# ensure update script exists
create_kyoushi_scripts(es, dataset_config.name, self.label_object)
# start labeling process
for rule in rule_objects:
try:
print(f"Applying rule {rule.id_} ...")
updated = rule.apply(
dataset_dir,
dataset_config,
es,
f"{dataset_config.name}_{self.update_script_id}",
self.label_object,
)
print(
f"Rule {rule.id_} applied labels: {rule.labels} to {updated} lines."
)
except ElasticsearchRequestError as e:
raise LabelException(f"Error executing rule '{rule.id_}'", e)
write(self, dataset_dir, dataset_config, es, index, skip_files)
¶
Write labeles stored in the database to the file system.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset path |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
index |
List[str] |
The indices to consider |
required |
skip_files |
List[str] |
The files to ignore |
required |
Source code in dataset/labels.py
def write(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
index: List[str],
skip_files: List[str],
):
"""Write labeles stored in the database to the file system.
Args:
dataset_dir: The dataset path
dataset_config: The dataset configuration
es: The elasticsearch client object
index: The indices to consider
skip_files: The files to ignore
"""
files = self._get_label_files(dataset_config, es, index, skip_files)
for current_file in files:
# disable request cache to ensure we always get latest info
search_labeled = Search(using=es, index=f"{dataset_config.name}-*").params(
request_cache=False, preserve_order=True
)
search_labeled = search_labeled.filter(
"exists", field=f"{self.label_object}.rules"
).filter("term", log__file__path=current_file)
search_labeled = search_labeled.sort({"log.file.line": "asc"})
base_path = dataset_dir.joinpath(LAYOUT.GATHER.value)
label_path = dataset_dir.joinpath(LAYOUT.LABELS.value)
label_file_path = label_path.joinpath(
Path(current_file).relative_to(base_path)
)
print(f"Start writing {current_file}")
self._write_file(search_labeled, label_file_path)
NoopRule
pydantic-model
¶
A noop rule that does absolutely nothing.
description: str
pydantic-field
¶
An optional description for the rule
id_: str
pydantic-field
required
¶
The unique rule id
labels: str
pydantic-field
required
¶
The list of labels to apply to log lines matching this rule
type_field: str
pydantic-field
required
¶
The rule type as passed in from the config
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the NoopRule i.e., does nothing.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
Always returns 0 since nothing happens. |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the NoopRule i.e., does nothing.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
Always returns 0 since nothing happens.
"""
return 0
update_params(self)
inherited
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
return {
"rule": self.id_,
"labels": self.labels,
}
QueryBase
pydantic-model
¶
Base model for DSL query based labeling rules
exclude: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
Similar to filters, but used to exclude results
filter_: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
The filter/s to limit queried to documents to only those that match the filters
index: Union[List[str], str]
pydantic-field
¶
The indices to query (by default prefixed with the dataset name)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
query: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
required
¶
The query/s to use for identifying log lines to apply the tags to.
validate_exclude(value, values)
classmethod
¶
Validator to check the DSL query exclude syntax
Exclude is simply a negative filter clause i.e., not (DSL QUERY).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Union[List[Dict[str, Any]], Dict[str, Any]] |
The DSL exclude |
required |
values |
Dict[str, Any] |
The model dictionary |
required |
Exceptions:
Type | Description |
---|---|
ValidationError |
Should the exclude not be valid |
Returns:
Type | Description |
---|---|
Union[List[Dict[str, Any]], Dict[str, Any]] |
The validated exclude |
Source code in dataset/labels.py
@validator("exclude")
def validate_exclude(
cls, value: Union[List[Dict[str, Any]], Dict[str, Any]], values: Dict[str, Any]
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""Validator to check the DSL query exclude syntax
Exclude is simply a negative filter clause i.e.,
not (DSL QUERY).
Args:
value: The DSL exclude
values: The model dictionary
Raises:
ValidationError: Should the exclude not be valid
Returns:
The validated exclude
"""
# temporary update by query object used to validate the input excludes
_temp = UpdateByQuery()
errors = []
if not isinstance(value, List):
value = [value]
for i, exclude in enumerate(value):
try:
_temp = _temp.exclude(exclude)
except (TypeError, UnknownDslObject, ValueError) as e:
errors.append(ErrorWrapper(e, (i,)))
if len(errors) > 0:
raise ValidationError(errors, UpdateByQueryRule)
return value
validate_filter(value, values)
classmethod
¶
Validator to check the DSL query filter syntax
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Union[List[Dict[str, Any]], Dict[str, Any]] |
The DSL filter |
required |
values |
Dict[str, Any] |
The model dictionary |
required |
Exceptions:
Type | Description |
---|---|
ValidationError |
Should the filter not be valid |
Returns:
Type | Description |
---|---|
Union[List[Dict[str, Any]], Dict[str, Any]] |
The validated filter |
Source code in dataset/labels.py
@validator("filter_")
def validate_filter(
cls, value: Union[List[Dict[str, Any]], Dict[str, Any]], values: Dict[str, Any]
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""Validator to check the DSL query filter syntax
Args:
value: The DSL filter
values: The model dictionary
Raises:
ValidationError: Should the filter not be valid
Returns:
The validated filter
"""
# temporary update by query object used to validate the input filters
_temp = UpdateByQuery()
errors = []
if not isinstance(value, List):
value = [value]
for i, filter_ in enumerate(value):
try:
_temp = _temp.filter(filter_)
except (TypeError, UnknownDslObject, ValueError) as e:
errors.append(ErrorWrapper(e, (i,)))
if len(errors) > 0:
raise ValidationError(errors, UpdateByQueryRule)
return value
validate_queries(value, values)
classmethod
¶
Validator checking DSL syntax
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Union[List[Dict[str, Any]], Dict[str, Any]] |
The DSL query |
required |
values |
Dict[str, Any] |
The model dictionary |
required |
Exceptions:
Type | Description |
---|---|
ValidationError |
Should the given query be invalid |
Returns:
Type | Description |
---|---|
Union[List[Dict[str, Any]], Dict[str, Any]] |
The validated query |
Source code in dataset/labels.py
@validator("query")
def validate_queries(
cls, value: Union[List[Dict[str, Any]], Dict[str, Any]], values: Dict[str, Any]
) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""Validator checking DSL syntax
Args:
value: The DSL query
values: The model dictionary
Raises:
ValidationError: Should the given query be invalid
Returns:
The validated query
"""
# temporary update by query object used to validate the input queries
_temp = UpdateByQuery()
errors = []
if not isinstance(value, List):
value = [value]
for i, query in enumerate(value):
try:
_temp = _temp.query(query)
except (TypeError, UnknownDslObject, ValueError) as e:
errors.append(ErrorWrapper(e, (i,)))
if len(errors) > 0:
raise ValidationError(errors, UpdateByQueryRule)
return value
Rule
¶
Interface definition for labeling rules.
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the configured rule and assigns the labels to all matching rows.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
The number of rows the rule was applied to |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the configured rule and assigns the labels to all matching rows.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
The number of rows the rule was applied to
"""
...
update_params(self)
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
...
RuleBase
pydantic-model
¶
Pydantic base model for labeling rules.
This class can be extended to define custom labeling rules.
description: str
pydantic-field
¶
An optional description for the rule
id_: str
pydantic-field
required
¶
The unique rule id
labels: str
pydantic-field
required
¶
The list of labels to apply to log lines matching this rule
type_field: str
pydantic-field
required
¶
The rule type as passed in from the config
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the configured rule and assigns the labels to all matching rows.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
The number of rows the rule was applied to |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the configured rule and assigns the labels to all matching rows.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
The number of rows the rule was applied to
"""
raise NotImplementedError()
update_params(self)
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
return {
"rule": self.id_,
"labels": self.labels,
}
validate_label_no_semicolon(val)
classmethod
¶
Validator ensuring label names do not contain ;
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
val |
str |
A single label |
required |
Returns:
Type | Description |
---|---|
str |
The validated label |
Source code in dataset/labels.py
@validator("labels", each_item=True)
def validate_label_no_semicolon(cls, val: str) -> str:
"""Validator ensuring label names do not contain `;`.
Args:
val: A single label
Returns:
The validated label
"""
assert ";" not in val, f"Labels must not contain semicolons, but got '{val}'"
return val
RuleList
pydantic-model
¶
Model definition of a list of labeling rules.
check_rule_ids_uniq(values)
classmethod
¶
Validator for ensuring that rule IDs are unique.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, List[cr_kyoushi.dataset.labels.RuleBase]] |
The model dictionary |
required |
Returns:
Type | Description |
---|---|
Dict[str, List[cr_kyoushi.dataset.labels.RuleBase]] |
The validated model dictionary |
Source code in dataset/labels.py
@root_validator
def check_rule_ids_uniq(
cls, values: Dict[str, List[RuleBase]]
) -> Dict[str, List[RuleBase]]:
"""Validator for ensuring that rule IDs are unique.
Args:
values: The model dictionary
Returns:
The validated model dictionary
"""
duplicates = set()
temp = []
if "__root__" in values:
for r in values["__root__"]:
if r.id_ in temp:
duplicates.add(r.id_)
else:
temp.append(r.id_)
assert (
len(duplicates) == 0
), f"Rule IDs must be uniq, but got duplicates: {duplicates}"
return values
UpdateByQueryRule
pydantic-model
¶
Applies labels based on a simple Elasticsearch DSL query.
Examples:
- type: elasticsearch.query
id: attacker.foothold.vpn.ip
labels:
- attacker_vpn
- foothold
description: >-
This rule applies the labels to all openvpn log rows that have
the attacker server as source ip and are within the foothold phase.
index:
- openvpn-vpn
filter:
range:
"@timestamp":
# foothold phase start
gte: "2021-03-23 20:31:00+00:00"
# foothold phase stop
lte: "2021-03-23 21:13:52+00:00"
query:
- match:
source.ip: '192.42.0.255'
description: str
pydantic-field
¶
An optional description for the rule
exclude: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
Similar to filters, but used to exclude results
filter_: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
The filter/s to limit queried to documents to only those that match the filters
id_: str
pydantic-field
required
¶
The unique rule id
index: Union[List[str], str]
pydantic-field
¶
The indices to query (by default prefixed with the dataset name)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
labels: str
pydantic-field
required
¶
The list of labels to apply to log lines matching this rule
query: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
required
¶
The query/s to use for identifying log lines to apply the tags to.
type_field: str
pydantic-field
required
¶
The rule type as passed in from the config
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the labels to all rows matching the DSL query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
The number of rows the labels were applied to. |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the labels to all rows matching the DSL query.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
The number of rows the labels were applied to.
"""
index: Optional[Union[Sequence[str], str]] = resolve_indices(
dataset_config.name, self.indices_prefix_dataset, self.index
)
update = UpdateByQuery(using=es, index=index)
# ensure we have lists
if not isinstance(self.query, List):
self.query = [self.query]
if not isinstance(self.filter_, List):
self.filter_ = [self.filter_] if self.filter_ is not None else []
if not isinstance(self.exclude, List):
self.exclude = [self.exclude] if self.exclude is not None else []
# exclude already correctly labeled rows from the result set
update = update.exclude(
"term",
**{f"{label_object}.flat.{self.id_}": ";".join(self.labels)},
)
for _query in self.query:
update = update.query(_query)
for _filter in self.filter_:
update = update.filter(_filter)
for _exclude in self.exclude:
update = update.exclude(_exclude)
result = apply_labels_by_update_dsl(
update, self.update_params(), update_script_id
)
return result
update_params(self)
inherited
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
return {
"rule": self.id_,
"labels": self.labels,
}
UpdateParentQueryRule
pydantic-model
¶
Applies the labels to all rows of a base query for which a parent query returns results.
This labeling rule first executes a base query to retrieve rows we might want to apply labels to. It then renders and executes a templated parent query for each retrieved row. The parent queries are then used to indicate if the initial result row should be labeled or not. By default result rows of the base query are labeled if the corresponding parent query returns at leas one row. It is possible to configure this minimum number e.g., to require at least two results.
Note
The sub query uses Jinja2 syntax for templating. The information retrieved
by the base query can be accessed through the HIT
variable.
Examples:
- type: elasticsearch.parent_query
id: attacker.foothold.apache.error_access
labels:
- attacker_http
- foothold
description: >-
This rule looks for unlabeled error messages resulting from VPN server
traffic within the attack time and tries to match it to an already labeled
access log row.
index:
- apache_error-intranet_server
query:
match:
source.address: "172.16.100.151"
filter:
# use script query to match only entries that
# are not already tagged for as attacker http in the foothold phase
- bool:
must_not:
- script:
script:
id: test_dataset_kyoushi_label_filter
params:
labels: [attacker_http]
parent_query:
index:
- apache_access-intranet_server
query:
- term:
url.full: "{{ HIT.url.full }}"
- term:
source.address: "{{ HIT.source.address }}"
# we are looking for parents that are labeled as attacker http
- bool:
must:
- script:
script:
id: test_dataset_kyoushi_label_filter
params:
labels: [attacker_http]
filter:
- range:
# parent must be within +-1s of potential child
"@timestamp":
gte: "{{ (HIT['@timestamp'] | as_datetime) - timedelta(seconds=1) }}"
lte: "{{ ( HIT['@timestamp'] | as_datetime) + timedelta(seconds=1) }}"
description: str
pydantic-field
¶
An optional description for the rule
exclude: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
Similar to filters, but used to exclude results
filter_: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
The filter/s to limit queried to documents to only those that match the filters
id_: str
pydantic-field
required
¶
The unique rule id
index: Union[List[str], str]
pydantic-field
¶
The indices to query (by default prefixed with the dataset name)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
labels: str
pydantic-field
required
¶
The list of labels to apply to log lines matching this rule
max_result_window: int
pydantic-field
¶
The max result window allowed on the elasticsearch instance
min_match: int
pydantic-field
¶
The minimum number of parent matches needed for the main query to be labeled.
parent_query: QueryBase
pydantic-field
required
¶
The templated parent query to check if the labels should be applied to a query hit.
query: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
required
¶
The query/s to use for identifying log lines to apply the tags to.
type_field: str
pydantic-field
required
¶
The rule type as passed in from the config
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the labels to result from the base query for which a parent was found.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
The number of rows the labels were applied to. |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the labels to result from the base query for which a parent was found.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
The number of rows the labels were applied to.
"""
index: Optional[Union[Sequence[str], str]] = resolve_indices(
dataset_config.name, self.indices_prefix_dataset, self.index
)
search = Search(using=es, index=index)
# ensure we have lists
if not isinstance(self.query, List):
self.query = [self.query]
if not isinstance(self.filter_, List):
self.filter_ = [self.filter_] if self.filter_ is not None else []
if not isinstance(self.exclude, List):
self.exclude = [self.exclude] if self.exclude is not None else []
# exclude already correctly labeled rows from the result set
search = search.exclude(
"term",
**{f"{label_object}.flat.{self.id_}": ";".join(self.labels)},
)
for _query in self.query:
search = search.query(_query)
for _filter in self.filter_:
search = search.filter(_filter)
for _exclude in self.exclude:
search = search.exclude(_exclude)
result = 0
update_map: Dict[str, List[str]] = {}
_i = 0
for hit in search.scan():
_i += 1
# make deep copy of parent query so we can template it
parent_query = self.parent_query.copy(deep=True)
# render the subquery
parent_query = render_query_base(hit, parent_query)
if self.check_parent(parent_query, self.min_match, dataset_config, es):
update_map.setdefault(hit.meta.index, []).append(hit.meta.id)
# add labels to each event per index
for _index, ids in update_map.items():
# split the update requests into chunks of at most max result window
update_chunks = [
ids[i : i + self.max_result_window]
for i in range(0, len(ids), self.max_result_window)
]
for chunk in update_chunks:
update = UpdateByQuery(using=es, index=_index).query(
"ids", values=chunk
)
# apply labels to events
result += apply_labels_by_update_dsl(
update, self.update_params(), update_script_id
)
return result
check_parent(self, parent_query, min_match, dataset_config, es)
¶
Executes a parent query and returns if there were enough result rows.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parent_query |
QueryBase |
The parent query to execute |
required |
min_match |
int |
The minimum number of result rows required |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in dataset/labels.py
def check_parent(
self,
parent_query: QueryBase,
min_match: int,
dataset_config: DatasetConfig,
es: Elasticsearch,
) -> bool:
"""Executes a parent query and returns if there were enough result rows.
Args:
parent_query: The parent query to execute
min_match: The minimum number of result rows required
dataset_config: The dataset configuration
es: The elasticsearch client object
Returns:
`True` if the query returned >= min_match rows and `False` otherwise.
"""
index: Optional[Union[Sequence[str], str]] = resolve_indices(
dataset_config.name, parent_query.indices_prefix_dataset, parent_query.index
)
search = Search(using=es, index=index)
# ensure we have lists
if not isinstance(parent_query.query, List):
parent_query.query = [parent_query.query]
if not isinstance(parent_query.filter_, List):
parent_query.filter_ = (
[parent_query.filter_] if parent_query.filter_ is not None else []
)
if not isinstance(parent_query.exclude, List):
parent_query.exclude = (
[parent_query.exclude] if parent_query.exclude is not None else []
)
for _query in parent_query.query:
search = search.query(_query)
for _filter in parent_query.filter_:
search = search.filter(_filter)
for _exclude in parent_query.exclude:
search = search.exclude(_exclude)
return search.execute().hits.total.value >= min_match
update_params(self)
inherited
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
return {
"rule": self.id_,
"labels": self.labels,
}
UpdateSubQueryRule
pydantic-model
¶
Labeling rule that labels the results of multiple sub queries.
This labeling rule first executes a base query to retrieve information. It then renders and executes a templated sub query for each row retrieved from the base query. The result rows of these dynamically generated sub queries are then labled.
Note
The sub query uses Jinja2 syntax for templating. The information retrieved
by the base query can be accessed through the HIT
variable.
Examples:
- type: elasticsearch.sub_query
id: attacker.foothold.apache.access_dropped
labels:
- attacker_http
- foothold
description: >-
This rule tries to match attacker requests that we where unable to
match to a labeled response with access log entries. Such cases can
happen if the corresponding response gets lost in the network or
otherwise is not sent.
index:
- pcap-attacker_0
# obligatory match all
query:
- term:
destination.ip: "172.16.0.217"
filter:
- term:
event.category: http
- term:
event.action: request
# we are looking for requests that have not been marked as attacker http yet
# most likely they did not have a matching response due to some network error
# or timeout
- bool:
must_not:
- script:
script:
id: test_dataset_kyoushi_label_filter
params:
labels: [attacker_http]
sub_query:
index:
- apache_access-intranet_server
query:
- term:
url.full: "{{ HIT.url.full }}"
- term:
source.address: "172.16.100.151"
filter:
- range:
"@timestamp":
# the access log entry should be after the request, but since the access log
# does not have microseconds we drop them here as well
gte: "{{ (HIT['@timestamp'] | as_datetime).replace(microsecond=0) }}"
# the type of error we are looking for should create an access log entry almost immediately
# se we keep the time frame short
lte: "{{ ( HIT['@timestamp'] | as_datetime).replace(microsecond=0) + timedelta(seconds=1) }}"
description: str
pydantic-field
¶
An optional description for the rule
exclude: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
Similar to filters, but used to exclude results
filter_: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
¶
The filter/s to limit queried to documents to only those that match the filters
id_: str
pydantic-field
required
¶
The unique rule id
index: Union[List[str], str]
pydantic-field
¶
The indices to query (by default prefixed with the dataset name)
indices_prefix_dataset: bool
pydantic-field
¶
If set to true the <DATASET.name>-
is automatically prefixed to each pattern. This is a convenience setting as per default all dataset indices start with this prefix.
labels: str
pydantic-field
required
¶
The list of labels to apply to log lines matching this rule
query: Union[List[Dict[str, Any]], Dict[str, Any]]
pydantic-field
required
¶
The query/s to use for identifying log lines to apply the tags to.
sub_query: QueryBase
pydantic-field
required
¶
The templated sub query to use to apply the labels. Executed for each hit of the parent query.
type_field: str
pydantic-field
required
¶
The rule type as passed in from the config
apply(self, dataset_dir, dataset_config, es, update_script_id, label_object)
¶
Applies the labels to all rows matching the created sub queries.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset_dir |
Path |
The dataset base directory |
required |
dataset_config |
DatasetConfig |
The dataset configuration |
required |
es |
Elasticsearch |
The elasticsearch client object |
required |
update_script_id |
str |
The label update script ID |
required |
label_object |
str |
The field used to store label information |
required |
Returns:
Type | Description |
---|---|
int |
The number of rows the labels were applied to. |
Source code in dataset/labels.py
def apply(
self,
dataset_dir: Path,
dataset_config: DatasetConfig,
es: Elasticsearch,
update_script_id: str,
label_object: str,
) -> int:
"""Applies the labels to all rows matching the created sub queries.
Args:
dataset_dir: The dataset base directory
dataset_config: The dataset configuration
es: The elasticsearch client object
update_script_id: The label update script ID
label_object: The field used to store label information
Returns:
The number of rows the labels were applied to.
"""
index: Optional[Union[Sequence[str], str]] = resolve_indices(
dataset_config.name, self.indices_prefix_dataset, self.index
)
search = Search(using=es, index=index)
# ensure we have lists
if not isinstance(self.query, List):
self.query = [self.query]
if not isinstance(self.filter_, List):
self.filter_ = [self.filter_] if self.filter_ is not None else []
if not isinstance(self.exclude, List):
self.exclude = [self.exclude] if self.exclude is not None else []
# exclude already correctly labeled rows from the result set
search = search.exclude(
"term",
**{f"{label_object}.flat.{self.id_}": ";".join(self.labels)},
)
for _query in self.query:
search = search.query(_query)
for _filter in self.filter_:
search = search.filter(_filter)
for _exclude in self.exclude:
search = search.exclude(_exclude)
result = 0
for hit in search.params(scroll="30m").scan():
# make deep copy of sub query so we can template it
sub_query = self.sub_query.copy(deep=True)
# render the subquery
sub_query = render_query_base(hit, sub_query)
sub_rule = UpdateByQueryRule(
type="elasticsearch.query",
id=self.id_,
labels=self.labels,
description=self.description,
index=sub_query.index,
query=sub_query.query,
filter=sub_query.filter_,
exclude=sub_query.exclude,
indices_prefix_dataset=sub_query.indices_prefix_dataset,
)
result += sub_rule.apply(
dataset_dir, dataset_config, es, update_script_id, label_object
)
return result
update_params(self)
inherited
¶
Gets the update script parameters required for this rule.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The update script parameters. |
Source code in dataset/labels.py
def update_params(self) -> Dict[str, Any]:
"""Gets the update script parameters required for this rule.
Returns:
The update script parameters.
"""
return {
"rule": self.id_,
"labels": self.labels,
}
apply_labels_by_query(es, query, script_params, update_script_id, index='_all', check_interval=0.5)
¶
Utility function for applying labels based on a DSL query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
query |
Dict[str, Any] |
The DSL query dict |
required |
script_params |
Dict[str, Any] |
The parameters for the label update script. |
required |
update_script_id |
str |
The label update script to use |
required |
index |
Union[List[str], str] |
The indices to query |
'_all' |
check_interval |
float |
The amount in time between the status checks. |
0.5 |
Returns:
Type | Description |
---|---|
int |
The number of updated rows. |
Source code in dataset/labels.py
def apply_labels_by_query(
es: Elasticsearch,
query: Dict[str, Any],
script_params: Dict[str, Any],
update_script_id: str,
index: Union[List[str], str] = "_all",
check_interval: float = 0.5,
) -> int:
"""Utility function for applying labels based on a DSL query.
Args:
es: The elasticsearch client object
query: The DSL query dict
script_params: The parameters for the label update script.
update_script_id: The label update script to use
index: The indices to query
check_interval: The amount in time between the status checks.
Returns:
The number of updated rows.
"""
update = UpdateByQuery(using=es, index=index)
update = update.update_from_dict(query)
return apply_labels_by_update_dsl(
update, script_params, update_script_id, check_interval
)
apply_labels_by_update_dsl(update, script_params, update_script_id, check_interval=0.5)
¶
Utility function for applying labels based on a DSL query object.
The function takes a update by query object and uses the Cyber Range Kyoushi label update script to apply the given labels to all matching rows. This function is blocking as it waits for the process to complete.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
update |
UpdateByQuery |
The update by query object containing the DSL query. |
required |
script_params |
Dict[str, Any] |
The parameters for the label update script. |
required |
update_script_id |
str |
The label update script to use |
required |
check_interval |
float |
The amount in time between the status checks. |
0.5 |
Returns:
Type | Description |
---|---|
int |
The number of updated rows. |
Source code in dataset/labels.py
def apply_labels_by_update_dsl(
update: UpdateByQuery,
script_params: Dict[str, Any],
update_script_id: str,
check_interval: float = 0.5,
) -> int:
"""Utility function for applying labels based on a DSL query object.
The function takes a update by query object and uses the Cyber Range Kyoushi
label update script to apply the given labels to all matching rows. This
function is blocking as it waits for the process to complete.
Args:
update: The update by query object containing the DSL query.
script_params: The parameters for the label update script.
update_script_id: The label update script to use
check_interval: The amount in time between the status checks.
Returns:
The number of updated rows.
"""
# refresh=True is important so that consecutive rules
# have a consitant state
es: Elasticsearch = get_connection(update._using)
# add update script
update = update.script(id=update_script_id, params=script_params)
# run update task
task = update.params(refresh=True, wait_for_completion=False).execute().task
task_info = es.tasks.get(task_id=task)
while not task_info["completed"]:
sleep(check_interval)
task_info = es.tasks.get(task_id=task)
with warnings.catch_warnings():
# ToDo: Elasticsearch (7.12) does not provide any API to delete update by query tasks
# The only option is to delete the document directly this will be deprecated
# and as such gives warnings. For now we ignore these and wait for elasticsearch
# to provide an API for it.
warnings.simplefilter("ignore")
es.delete(index=".tasks", doc_type="task", id=task)
return task_info["response"]["updated"]
create_kyoushi_scripts(es, dataset_name, label_object='kyoushi_labels')
¶
Utility function for creating labeling update, filter and field scripts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
dataset_name |
str |
The name of the dataset to create the scripts for. This is used as prefix for script names to ensure different versions of these scripts can exist for different datasets in the same Elasticsearch instance. |
required |
label_object |
str |
The name of field to store labeling information in. |
'kyoushi_labels' |
Source code in dataset/labels.py
def create_kyoushi_scripts(
es: Elasticsearch,
dataset_name: str,
label_object: str = "kyoushi_labels",
):
"""Utility function for creating labeling update, filter and field scripts.
Args:
es: The elasticsearch client object
dataset_name: The name of the dataset to create the scripts for.
This is used as prefix for script names to ensure
different versions of these scripts can exist for
different datasets in the same Elasticsearch instance.
label_object: The name of field to store labeling information in.
"""
update_script = {
"script": {
"description": "Kyoushi Dataset - Update by Query label script",
"lang": "painless",
"source": UPDATE_SCRIPT.replace("{{ label_object }}", label_object),
}
}
es.put_script(
id=f"{dataset_name}_kyoushi_label_update", body=update_script, context="update"
)
filter_script = {
"script": {
"lang": "painless",
"source": LABEL_FILTER_SCRIPT.replace("{{ label_object }}", label_object),
}
}
es.put_script(
id=f"{dataset_name}_kyoushi_label_filter",
body=filter_script,
context="filter",
)
labels_field = {
"script": {
"lang": "painless",
"source": LABELS_FIELD_SCRIPT.replace("{{ label_object }}", label_object),
}
}
es.put_script(
id=f"{dataset_name}_kyoushi_label_field",
body=labels_field,
context="field",
)
get_label_counts(es, index=None, label_object='kyoushi_labels')
¶
Utility function for getting number of rows per label.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
index |
Union[List[str], str] |
The indices to get the information for |
None |
label_object |
str |
The field containing the label information |
'kyoushi_labels' |
Returns:
Type | Description |
---|---|
List[elasticsearch_dsl.response.aggs.Bucket] |
List of result buckets |
Source code in dataset/labels.py
def get_label_counts(
es: Elasticsearch,
index: Union[List[str], str, None] = None,
label_object: str = "kyoushi_labels",
) -> List[Bucket]:
"""Utility function for getting number of rows per label.
Args:
es: The elasticsearch client object
index: The indices to get the information for
label_object: The field containing the label information
Returns:
List of result buckets
"""
# disable request cache to ensure we always get latest info
search_labels = Search(using=es, index=index).params(request_cache=False)
runtime_mappings = {
"labels": {
"type": "keyword",
"script": LABELS_AGGREGATES_FIELD_SCRIPT,
}
}
search_labels = search_labels.extra(runtime_mappings=runtime_mappings)
# setup aggregations
search_labels.aggs.bucket(
"labels",
"composite",
sources=[{"label": {"terms": {"field": "labels"}}}],
)
search_labels.aggs["labels"].bucket("file", "terms", field="log.file.path")
return scan_composite(search_labels, "labels")
render_query_base(hit, query)
¶
Utility function for rendering sub or parent query.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hit |
Hit |
The Elasticsearch query result row to render for. |
required |
query |
QueryBase |
The templated query definition. |
required |
Returns:
Type | Description |
---|---|
QueryBase |
The rendered DSL query |
Source code in dataset/labels.py
def render_query_base(hit: Hit, query: QueryBase) -> QueryBase:
"""Utility function for rendering sub or parent query.
Args:
hit: The Elasticsearch query result row to render for.
query: The templated query definition.
Returns:
The rendered DSL query
"""
variables = {"HIT": hit}
# render the index var
if isinstance(query.index, str):
query.index = render_template(query.index, variables)
elif isinstance(query.index, Sequence):
query.index = [render_template(i, variables) for i in query.index]
# ensure we have lists
if not isinstance(query.query, List):
query.query = [query.query]
if not isinstance(query.filter_, List):
query.filter_ = [query.filter_] if query.filter_ is not None else []
if not isinstance(query.exclude, List):
query.exclude = [query.exclude] if query.exclude is not None else []
query.query = [
render_template_recursive(_query, variables) for _query in query.query
]
query.filter_ = [
render_template_recursive(_filter, variables) for _filter in query.filter_
]
query.exclude = [
render_template_recursive(_exclude, variables) for _exclude in query.exclude
]
return query