Improve performance when checking URLs and domains

Some MVT modules such as the WhatsApp module can be very slow as it was taking a naive approach to look for IOCs. The code was checking URLs (potentially more than 100k) against
1000's of IOC domains resulting in a quadratic run-time with hundreds of millions of comparisons as the number of IOCs increases.

This commit add an Aho-Corasick library which allows the efficient search in a string (the URL in this case) for all matches in set of keys (the IOCs). This data structure is perfect for this use case.

A quick measurement shows a 80% performance improvement for a WhatsApp database with 100k entries. The slow path is now the time spent fetching and expanding short URLs found in the database. This
can also be sped up significantly by fetching each URL asynchronously. This would require reworking modules to split the URL expansion from the IOC check so I will implement in a separate PR.
This commit is contained in:
Donncha Ó Cearbhaill 2023-06-29 10:28:56 +02:00
parent 2b01ed7179
commit 41db117168
2 changed files with 55 additions and 9 deletions

View File

@ -7,7 +7,9 @@ import json
import logging
import os
from typing import Any, Dict, Iterator, List, Optional, Union
from functools import lru_cache
import ahocorasick
from appdirs import user_data_dir
from .url import URL
@ -241,6 +243,40 @@ class Indicators:
"stix2_file_name": ioc_collection["stix2_file_name"],
}
@lru_cache()
def get_ioc_matcher(
self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None
) -> ahocorasick.Automaton:
"""
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
Returns an Aho-Corasick automaton
This data-structue and algorithim allows for fast matching of a large number
of match strings (i.e IOCs) against a large body of text. This will also
match strings containing the IOC, so it is important to confirm the
match is a valid IOC before using it.
for _, ioc in domains_automaton.iter(url.domain.lower()):
if ioc.value == url.domain.lower():
print(ioc)
We use an LRU cache to avoid rebuilding the automaton every time we call a
function such as check_domain().
"""
automaton = ahocorasick.Automaton()
if ioc_type:
iocs = self.get_iocs(ioc_type)
elif ioc_list:
iocs = ioc_list
else:
raise ValueError("Must provide either ioc_tyxpe or ioc_list")
for ioc in iocs:
automaton.add_word(ioc["value"], ioc)
automaton.make_automaton()
return automaton
@lru_cache()
def check_domain(self, url: str) -> Union[dict, None]:
"""Check if a given URL matches any of the provided domain indicators.
@ -254,6 +290,9 @@ class Indicators:
if not isinstance(url, str):
return None
# Create an Aho-Corasick automaton from the list of domains
domain_matcher = self.get_ioc_matcher("domains")
try:
# First we use the provided URL.
orig_url = URL(url)
@ -265,6 +304,7 @@ class Indicators:
self.log.debug("Found a shortened URL %s -> %s", url, unshortened)
if unshortened is None:
self.log.warning("Unable to unshorten URL %s", url)
return None
# Now we check for any nested URL shorteners.
@ -285,12 +325,13 @@ class Indicators:
except Exception:
# If URL parsing failed, we just try to do a simple substring
# match.
for ioc in self.get_iocs("domains"):
for idx, ioc in domain_matcher.iter(url):
if ioc["value"].lower() in url:
self.log.warning(
"Maybe found a known suspicious domain %s "
'matching indicators from "%s"',
'matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
)
return ioc
@ -300,43 +341,47 @@ class Indicators:
# If all parsing worked, we start walking through available domain
# indicators.
for ioc in self.get_iocs("domains"):
for idx, ioc in domain_matcher.iter(final_url.domain.lower()):
# First we check the full domain.
if final_url.domain.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning(
"Found a known suspicious domain %s "
'shortened as %s matching indicators from "%s"',
'shortened as %s matching indicator "%s" from "%s"',
final_url.url,
orig_url.url,
ioc["value"],
ioc["name"],
)
else:
self.log.warning(
"Found a known suspicious domain %s "
'matching indicators from "%s"',
'matching indicator "%s" from "%s"',
final_url.url,
ioc["value"],
ioc["name"],
)
return ioc
# Then we just check the top level domain.
# Then we just check the top level domain.
for idx, ioc in domain_matcher.iter(final_url.top_level.lower()):
if final_url.top_level.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning(
"Found a sub-domain with suspicious top "
"level %s shortened as %s matching "
'indicators from "%s"',
'indicator "%s" from "%s"',
final_url.url,
orig_url.url,
ioc["value"],
ioc["name"],
)
else:
self.log.warning(
"Found a sub-domain with a suspicious top "
'level %s matching indicators from "%s"',
'level %s matching indicator "%s" from "%s"',
final_url.url,
ioc["value"],
ioc["name"],
)

View File

@ -33,6 +33,7 @@ install_requires =
libusb1 >=3.0.0
cryptography >=38.0.1
pyyaml >=6.0
pyahocorasick >= 2.0.0
[options.packages.find]
where = ./