Sample module¶
This module contains utility functions used for sampling processed datasets.
get_sample(es, label_filter_script_id, labels, files=None, index=None, label_object='kyoushi_labels', size=10, seed=None, seed_field='_seq_no', start=None, stop=None)
¶
Retrieve a list of sample log lines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
label_filter_script_id |
str |
The kyoushi filter scripts ID |
required |
labels |
Optional[List[str]] |
The labels to sample from |
required |
files |
Optional[List[str]] |
The log files to sample from |
None |
index |
Union[List[str], str] |
The elasticsearch indices to sample from |
None |
label_object |
str |
The field that contains the labeling data |
'kyoushi_labels' |
size |
int |
The number of lines to sample |
10 |
seed |
Optional[int] |
The seed to use for the sample randomization |
None |
seed_field |
str |
The elasticsearch field to use for the random sample order |
'_seq_no' |
start |
Union[str, datetime.datetime, float] |
The minimum time stamp to sample from |
None |
stop |
Union[str, datetime.datetime, float] |
The maximum time stamp to sample from |
None |
Returns:
Type | Description |
---|---|
List[elasticsearch_dsl.response.hit.Hit] |
List of randomly sample log lines. Each line being represented as a dict of the following format:
|
Source code in dataset/sample.py
def get_sample(
es: Elasticsearch,
label_filter_script_id: str,
labels: Optional[List[str]],
files: Optional[List[str]] = None,
index: Union[List[str], str, None] = None,
label_object: str = "kyoushi_labels",
size: int = 10,
seed: Optional[int] = None,
seed_field: str = "_seq_no",
start: Union[str, datetime, float, None] = None,
stop: Union[str, datetime, float, None] = None,
) -> List[Hit]:
"""Retrieve a list of sample log lines.
Args:
es: The elasticsearch client object
label_filter_script_id: The kyoushi filter scripts ID
labels: The labels to sample from
files: The log files to sample from
index: The elasticsearch indices to sample from
label_object: The field that contains the labeling data
size: The number of lines to sample
seed: The seed to use for the sample randomization
seed_field: The elasticsearch field to use for the random sample order
start: The minimum time stamp to sample from
stop: The maximum time stamp to sample from
Returns:
List of randomly sample log lines. Each line being
represented as a dict of the following format:
```
- @timestamp: The log event timestamp
log: The elasticsearch log field (containing line number, original log line, etc.)
<label_object>.list: List of labels
<label_object>.rules: Map of labeling rules applied to the line
type: Log type
```
"""
search = Search(using=es, index=index)
# use random score to get a random sampling
random_score = {"seed": seed, "field": seed_field} if seed is not None else {}
search = search.query("function_score", random_score=random_score)
search = search.sort("_score").extra(size=size)
if labels is None or len(labels) == 0:
# if we are given no labels to search for we explicitly return
# only log rows without any labels
search = search.exclude("exists", field=f"{label_object}.rules")
else:
# if we got a label then we filter for it using our script search filter
search = search.filter(
"script",
script={"id": label_filter_script_id, "params": {"labels": labels}},
)
time_range = {}
if start is not None:
time_range["gte"] = start
if stop is not None:
time_range["lte"] = stop
if len(time_range) > 0:
search = search.filter(Range(**{"@timestamp": time_range}))
if files is not None and len(files) > 0:
search = search.filter(
"bool", should=[{"match": {"log.file.path": f}} for f in files]
)
search = search.source(
[
"@timestamp",
"log",
f"{label_object}.list",
f"{label_object}.rules",
"type",
"_score",
"_seq_no",
]
)
return search.execute().hits
get_sample_log(es, sample, label, gather_dir, before=5, after=5, related=None, index=None)
¶
Retrieves additional information for a sampled log entry.
This function can be used to retrieve additional information such as, lines before or after. The information can be helpful when analyzing sampled log lines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
es |
Elasticsearch |
The elasticsearch client object |
required |
sample |
Hit |
The sample log line |
required |
label |
str |
The label that the sample is fore |
required |
gather_dir |
Path |
The dataset gather directory |
required |
before |
int |
The number of lines before the sample to fetch |
5 |
after |
int |
The number of line after the sample to fetch |
5 |
related |
Optional[List[str]] |
List of related elasticsearch indices to retrieve neighbor logs from |
None |
index |
Union[List[str], str] |
The index the sample was retrieved from |
None |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Dictionary containing verbose information about the sample log. Format:
|
Source code in dataset/sample.py
def get_sample_log(
es: Elasticsearch,
sample: Hit,
label: str,
gather_dir: Path,
before: int = 5,
after: int = 5,
related: Optional[List[str]] = None,
index: Union[List[str], str, None] = None,
) -> Dict[str, Any]:
"""Retrieves additional information for a sampled log entry.
This function can be used to retrieve additional information
such as, lines before or after. The information can be helpful
when analyzing sampled log lines.
Args:
es: The elasticsearch client object
sample: The sample log line
label: The label that the sample is fore
gather_dir: The dataset gather directory
before: The number of lines before the sample to fetch
after: The number of line after the sample to fetch
related: List of related elasticsearch indices to retrieve neighbor logs from
index: The index the sample was retrieved from
Returns:
Dictionary containing verbose information about the sample log.
Format:
```
label: <The label the sample is for>
rules: <List of labeling rules applied to the sample log line>
path: <The samples log files relative path>
line_no: <The samples line number>
before: <List of log lines before the sample>
line: <The sample log line>
after: <List of log lines after the sample>
related: <List of log lines in related files with timestamps close to the sample.>
```
"""
path = Path(sample.log.file.path)
line_no = sample.log.file.line
start = max(0, line_no - before)
end = line_no + after
before_lines: List[str] = []
sample_line: str
after_lines: List[str] = []
# read the sample line and the requested surrounding lines
with open(path, "r") as f:
for i, line in enumerate(f, 1):
if i >= start and i < line_no:
before_lines.append(line)
elif i == line_no:
sample_line = line
elif i > line_no and i <= end:
after_lines.append(line)
elif i > end:
break
_related: List[Dict[str, Any]] = []
if related is not None:
for rel in related:
closest: Optional[Hit] = _get_closest(
es=es, related=rel, timestamp=sample["@timestamp"]
)
if closest is not None and closest.log.file.path != str(path):
if closest is not None:
_related.append(
{
"path": str(
Path(closest.log.file.path).relative_to(gather_dir)
),
"line_no": closest.log.file.line,
"timestamp": closest["@timestamp"],
}
)
return {
"label": label,
"rules": list(sample.kyoushi_labels.rules)
if "kyoushi_labels" in sample
else [],
"path": str(path.relative_to(gather_dir)),
"line_no": line_no,
"before": before_lines,
"line": sample_line,
"after": after_lines,
"related": _related,
}