From 31e782e479c3b45e8926e671533d5ba92ecbd940 Mon Sep 17 00:00:00 2001 From: Seth Grover Date: Mon, 2 Dec 2024 15:50:49 -0700 Subject: [PATCH] for cisagov/Malcolm#518, added some tests to make sure the ICSNPP protocols generate the data they should --- README.md | 13 +++++++------ src/maltest/tests/test_icsnpp_protocols.py | 12 ++++++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index b31e88d..c518b34 100644 --- a/README.md +++ b/README.md @@ -280,12 +280,7 @@ Package source highlights (under [`./src/maltest`](src/maltest)): New tests should be placed in the [`./src/maltest/tests/`](src/maltest/tests/) directory. Tests have access to the connection information for the running Malcolm instance through [fixtures](https://docs.pytest.org/en/stable/reference/fixtures.html#conftest-py-sharing-fixtures-across-multiple-files) provided by [`./src/maltest/tests/conftest.py`](src/maltest/tests/conftest.py). -See the following tests for examples of how to access and use these fixtures: - -* [test_malcolm_response.py](src/maltest/tests/test_malcolm_response.py) - querying the [Malcolm API](https://idaholab.github.io/Malcolm/docs/api.html#API) using the [Requests](https://requests.readthedocs.io/en/latest/) library -* [test_malcolm_db_health.py](src/maltest/tests/test_malcolm_db_health.py) - querying the [data store](https://idaholab.github.io/Malcolm/docs/opensearch-instances.html#OpenSearchInstance) directly using the [OpenSearch](https://opensearch.org/docs/latest/clients/python-low-level/) or [Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html) client - -Use `UPLOAD_ARTIFACTS` to specify a PCAP file required by your test. This example test would succeed if both `foobar.pcap` and `barbaz.pcap` were uploaded to Malcolm and their hashes stored in the global `pcap_hash_map`: +Use `UPLOAD_ARTIFACTS` to specify a PCAP file or files required by your test. This example test would succeed if both `foobar.pcap` and `barbaz.pcap` were uploaded to Malcolm and their hashes stored in the global `pcap_hash_map`: ```python UPLOAD_ARTIFACTS = ['foobar.pcap', 'barbaz.pcap'] @@ -297,3 +292,9 @@ def test_malcolm_pcap_hash( ``` As PCAP files are uploaded to Malcolm by `malcolm-test`, they are [hashed](https://docs.python.org/3/library/hashlib.html#hashlib.shake_256) and stored in `pcap_hash_map` which maps the PCAP filename to its hash. The PCAP file is renamed to the hex-representation of the hash digest and the file extension (e.g., if the contents of `foobar.pcap` hashed to `52b92cdcec4af0e1`, the file would be uploaded as `52b92cdcec4af0e1.pcap`). This is done to ensure that conflicting PCAP filenames among different tests are resolved prior to processing. Since Malcolm automatically [assigns tags](https://idaholab.github.io/Malcolm/docs/upload.html#Tagging) to uploaded PCAP files, this hash should be used as a filter for the `tags` field for any network log-based queries for data related to that PCAP file. This way your tests can have reproducible outputs without being affected by PCAPs for other tests. + +See the following tests for examples of how to access and use these fixtures: + +* [test_malcolm_response.py](src/maltest/tests/test_malcolm_response.py) - querying the [Malcolm API](https://idaholab.github.io/Malcolm/docs/api.html#API) using the [Requests](https://requests.readthedocs.io/en/latest/) library +* [test_malcolm_db_health.py](src/maltest/tests/test_malcolm_db_health.py) - querying the [data store](https://idaholab.github.io/Malcolm/docs/opensearch-instances.html#OpenSearchInstance) directly using the [OpenSearch](https://opensearch.org/docs/latest/clients/python-low-level/) or [Elasticsearch](https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/index.html) client +* [test_icsnpp_protocols.py](src/maltest/tests/test_icsnpp_protocols.py) - querying the [Malcolm Field Aggregation API](https://idaholab.github.io/Malcolm/docs/api-aggregations.html), specifying a `from` query start time filter to search all historical data, a filter on `event.provider` to limit the result set to records from Zeek, and a `tags` filter to limit the matching records to the tags represented by the uploaded PCAPs (see above) diff --git a/src/maltest/tests/test_icsnpp_protocols.py b/src/maltest/tests/test_icsnpp_protocols.py index 1a425fe..1e06322 100644 --- a/src/maltest/tests/test_icsnpp_protocols.py +++ b/src/maltest/tests/test_icsnpp_protocols.py @@ -1,6 +1,8 @@ -import logging import mmguero import requests +import logging + +LOGGER = logging.getLogger(__name__) UPLOAD_ARTIFACTS = [ 'Protocols/BACnet.pcap', @@ -109,8 +111,6 @@ "tds_sql_batch", ] -LOGGER = logging.getLogger(__name__) - HEADERS = {"Content-Type": "application/json"} @@ -126,7 +126,10 @@ def test_icsnpp_protocols( headers=HEADERS, json={ "from": "0", - "filter": {"event.provider": "zeek"}, + "filter": { + "event.provider": "zeek", + "tags": [pcap_hash_map[x] for x in mmguero.GetIterable(UPLOAD_ARTIFACTS)], + }, }, allow_redirects=True, auth=malcolm_http_auth, @@ -136,4 +139,5 @@ def test_icsnpp_protocols( buckets = { item['key']: item['doc_count'] for item in mmguero.DeepGet(response.json(), ['event.dataset', 'buckets'], []) } + LOGGER.info(buckets) assert all([(buckets.get(x, 0) > 0) for x in EXPECTED_DATASETS])