Skip to content

Elasticsearch module

This module contains Elasticsearch related utility functions

get_transport_variables(es)

Utility function for getting Elasticsearch connection info.

This function takes an Elasticsearch client object and extracts the host connection info and returns it in a dict.

Parameters:

Name Type Description Default
es Elasticsearch

Elasticsearch client object

required

Exceptions:

Type Description
TypeError

If the passed client object is not connected to a host

Returns:

Type Description
Dict[str, Any]

Host connection information dict containing the following fields:

  • ELASTICSEARCH_HOST: The host IP/FQDN and port
  • ELASTICSEARCH_HOST_PATH: The HTTP path if it is part of the full address
  • ELASTICSEARCH_SSL: Boolean indicating SSL on or off
  • ELASTICSEARCH_USER: The username if HTTP auth is used
  • ELASTICSEARCH_PASSWORD: The password if HTTP auth is used
Source code in dataset/elasticsearch.py
def get_transport_variables(es: Elasticsearch) -> Dict[str, Any]:
    """Utility function for getting Elasticsearch connection info.

    This function takes an Elasticsearch client object and
    extracts the host connection info and returns it in a dict.

    Args:
        es: Elasticsearch client object

    Raises:
        TypeError: If the passed client object is not connected to a host

    Returns:
        Host connection information dict containing the following fields:

          - `ELASTICSEARCH_HOST`: The host IP/FQDN and port
          - `ELASTICSEARCH_HOST_PATH`: The HTTP path if it is part of the full address
          - `ELASTICSEARCH_SSL`: Boolean indicating SSL on or off
          - `ELASTICSEARCH_USER`: The username if HTTP auth is used
          - `ELASTICSEARCH_PASSWORD`: The password if HTTP auth is used
    """
    if es.transport.hosts is not None and len(es.transport.hosts) > 0:
        # get the first host
        host = es.transport.hosts[0]

        # these we can always set
        host_variables = {
            "ELASTICSEARCH_HOST": f"{host['host']}:{host.get('port', 80)}",
            "ELASTICSEARCH_SSL": host.get("use_ssl", False),
        }
        if "http_auth" in host:
            # split RFC http auth into user and password pair
            (
                host_variables["ELASTICSEARCH_USER"],
                host_variables["ELASTICSEARCH_PASSWORD"],
            ) = host["http_auth"].split(":", 1)

        if "url_prefix" in host:
            host_variables["ELASTICSEARCH_HOST_PATH"] = host["url_prefix"]

        return host_variables
    raise TypeError("Uninitialized elasticsearch client!")

scan_composite(search, name)

Utility function for getting all result buckets of a composite aggregate.

Parameters:

Name Type Description Default
search Search

The composite aggregate search object.

required
name str

The name of the aggregate

required

Returns:

Type Description
List[elasticsearch_dsl.response.aggs.Bucket]

List of result buckets

Source code in dataset/elasticsearch.py
def scan_composite(search: Search, name: str) -> List[Bucket]:
    """Utility function for getting all result buckets of a composite aggregate.

    Args:
        search: The composite aggregate search object.
        name: The name of the aggregate

    Returns:
        List of result buckets
    """
    # ensure that we do not get documents for no reason
    search = search.extra(size=0)
    buckets = []
    while True:
        # need to disable cache or the API will keep returning the same result
        result = search.execute(ignore_cache=True)
        buckets.extend(result.aggregations[name].buckets)
        if "after_key" not in result.aggregations[name]:
            # no after key indicates we got everything
            return buckets

        # resume query after the key
        search.aggs[name].after = result.aggregations[name].after_key

search_eql(es, index, body, check_interval=0.5)

Utility function for issueing and receiving the result of a EQL query.

Parameters:

Name Type Description Default
es Elasticsearch

The Elasticsearch client object

required
index Union[Sequence[str], str]

The indices to search on

required
body Dict[str, Any]

The EQL query body

required
check_interval float

The time to wait in between query ready checks.

0.5

Returns:

Type Description
Dict[str, Any]

The EQL query result

Source code in dataset/elasticsearch.py
def search_eql(
    es: Elasticsearch,
    index: Union[Sequence[str], str, None],
    body: Dict[str, Any],
    check_interval: float = 0.5,
) -> Dict[str, Any]:
    """Utility function for issueing and receiving the result of a EQL query.

    Args:
        es: The Elasticsearch client object
        index: The indices to search on
        body: The EQL query body
        check_interval: The time to wait in between query ready checks.

    Returns:
        The EQL query result
    """
    result = es.eql.search(index=index, body=body, wait_for_completion_timeout="0s")
    result_id = None
    while result["is_running"]:
        result_id = result["id"]
        sleep(check_interval)
        result = es.eql.get_status(id=result_id)

    # if result id is not none then we have a async request and have
    # to retrieve the actual result and delete the async data
    if result_id is not None:
        result = es.eql.get(id=result_id)
        # delete the async request once we have its data
        es.eql.delete(id=result_id)

    return result["hits"]