Source code for greynoise.util

"""Utility functions."""

import configparser
import logging
import os
import re
from importlib import resources
from ipaddress import IPv6Address, ip_address

CONFIG_FILE = os.path.expanduser(os.path.join("~", ".config", "greynoise", "config"))
LOGGER = logging.getLogger(__name__)

DEFAULT_CONFIG = {
    "api_key": "",
    "api_server": "https://api.greynoise.io",
    "timeout": 60,
    "proxy": "",
    "offering": "enterprise",
    "cache_max_size": 1000000,
    "cache_ttl": 3600,
    "use_cache": True,
}


[docs] def load_config(): """Load configuration. :returns: Current configuration based on configuration file and environment variables. :rtype: dict """ config_parser = configparser.ConfigParser( {key: str(value) for key, value in DEFAULT_CONFIG.items()} ) config_parser.add_section("greynoise") if os.path.isfile(CONFIG_FILE): LOGGER.debug("Parsing configuration file: %s...", CONFIG_FILE) with open(CONFIG_FILE) as config_file: config_parser.read_file(config_file) else: LOGGER.debug("Configuration file not found: %s", CONFIG_FILE) if "GREYNOISE_API_KEY" in os.environ: api_key = os.environ["GREYNOISE_API_KEY"] LOGGER.debug("API key found in environment variable: %s", api_key) # Environment variable takes precedence over configuration file content config_parser.set("greynoise", "api_key", api_key) if "GREYNOISE_API_SERVER" in os.environ: api_server = os.environ["GREYNOISE_API_SERVER"] LOGGER.debug("API server found in environment variable: %s", api_server) # Environment variable takes precedence over configuration file content config_parser.set("greynoise", "api_server", api_server) if "GREYNOISE_TIMEOUT" in os.environ: timeout = os.environ["GREYNOISE_TIMEOUT"] try: int(timeout) except ValueError: timeout = DEFAULT_CONFIG["timeout"] # Environment variable takes precedence over configuration file content config_parser.set("greynoise", "timeout", str(timeout)) if "GREYNOISE_PROXY" in os.environ: proxy = os.environ["GREYNOISE_PROXY"] LOGGER.debug("Proxy found in environment variable: %s", proxy) # Environment variable takes precedence over configuration file content config_parser.set("greynoise", "proxy", proxy) if "GREYNOISE_OFFERING" in os.environ: offering = os.environ["GREYNOISE_OFFERING"] LOGGER.debug("Offering found in environment variable: %s", offering) # Environment variable takes precedence over configuration file content config_parser.set("greynoise", "offering", offering) if "GREYNOISE_CACHE_MAX_SIZE" in os.environ: cache_max_size = os.environ["GREYNOISE_CACHE_MAX_SIZE"] try: int(cache_max_size) except ValueError: cache_max_size = DEFAULT_CONFIG["cache_max_size"] config_parser.set("greynoise", "cache_max_size", str(cache_max_size)) if "GREYNOISE_CACHE_TTL" in os.environ: cache_ttl = os.environ["GREYNOISE_CACHE_TTL"] try: int(cache_ttl) except ValueError: cache_ttl = DEFAULT_CONFIG["cache_ttl"] config_parser.set("greynoise", "cache_ttl", str(cache_ttl)) # validate config if config_parser.get("greynoise", "timeout"): try: int(config_parser.get("greynoise", "timeout")) except ValueError: config_parser.set("greynoise", "timeout", str(DEFAULT_CONFIG["timeout"])) if config_parser.get("greynoise", "cache_max_size"): try: int(config_parser.get("greynoise", "cache_max_size")) except ValueError: config_parser.set( "greynoise", "cache_max_size", str(DEFAULT_CONFIG["cache_max_size"]) ) if config_parser.get("greynoise", "cache_ttl"): try: int(config_parser.get("greynoise", "cache_ttl")) except ValueError: config_parser.set( "greynoise", "cache_ttl", str(DEFAULT_CONFIG["cache_ttl"]) ) return { "api_key": config_parser.get("greynoise", "api_key"), "api_server": config_parser.get("greynoise", "api_server"), "timeout": config_parser.getint("greynoise", "timeout"), "proxy": config_parser.get("greynoise", "proxy"), "offering": config_parser.get("greynoise", "offering"), "cache_max_size": config_parser.getint("greynoise", "cache_max_size"), "cache_ttl": config_parser.getint("greynoise", "cache_ttl"), "use_cache": config_parser.getboolean("greynoise", "use_cache"), }
[docs] def save_config(config): """Save configuration. :param config: Data to be written to the configuration file. :type config: dict """ config_parser = configparser.ConfigParser() config_parser.add_section("greynoise") # Only set values that are provided in the config if "api_key" in config: config_parser.set("greynoise", "api_key", config["api_key"]) if "api_server" in config: config_parser.set("greynoise", "api_server", config["api_server"]) if "timeout" in config: config_parser.set("greynoise", "timeout", str(config["timeout"])) if "proxy" in config: config_parser.set("greynoise", "proxy", config["proxy"]) if "offering" in config: config_parser.set("greynoise", "offering", config["offering"]) if "cache_max_size" in config: config_parser.set("greynoise", "cache_max_size", str(config["cache_max_size"])) if "cache_ttl" in config: config_parser.set("greynoise", "cache_ttl", str(config["cache_ttl"])) if "use_cache" in config: config_parser.set("greynoise", "use_cache", str(config["use_cache"])) config_dir = os.path.dirname(CONFIG_FILE) if not os.path.isdir(config_dir): os.makedirs(config_dir) # If file doesn't exist, create it with default values if not os.path.isfile(CONFIG_FILE): for key, value in DEFAULT_CONFIG.items(): if not config_parser.has_option("greynoise", key): config_parser.set("greynoise", key, str(value)) with open(CONFIG_FILE, "w") as config_file: config_parser.write(config_file)
[docs] def validate_ip(ip, strict=True, print_warning=True): """Check if the IPv4 address is valid. :param ip_address: IPv4 address value to validate. :type ip_address: str :param strict: Whether to raise exception if validation fails. :type strict: bool :raises ValueError: When validation fails and strict is set to True. :type print_warning: bool :raises ValueError: By default, otherwise returns nothing """ is_valid = False error_message = "" try: ip_address(ip) is_valid = True except ValueError: if print_warning: error_message = "Invalid IP address: {!r}".format(ip) LOGGER.warning(error_message) if strict: raise ValueError(error_message) return False if is_valid: if type(ip_address(ip)) is IPv6Address: error_message = "IPv6 addresses are not supported: {!r}".format(ip) if print_warning: LOGGER.warning(error_message) if strict: raise ValueError(error_message) return False else: is_routable = ip_address(ip).is_global if is_routable: return True else: error_message = "Non-Routable IP address: {!r}".format(ip) if print_warning: LOGGER.warning(error_message) if strict: raise ValueError(error_message) return False
[docs] def validate_timeline_field_value(field): """Check if the Timeline Field value is valid. :param field: field value to validate. :type field: str """ valid_field_names = [ "destination_port", "http_path", "http_user_agent", "source_asn", "source_org", "source_rdns", "tag_ids", "classification", ] if field in valid_field_names: return True else: raise ValueError( f"Field must be one of the following values: {valid_field_names}" )
[docs] def validate_timeline_days(days): """Check if the Timeline Days value is valid. :param days: field value to validate. :type days: str """ if isinstance(days, str): raise ValueError( "Days must be a valid integer between 1 and 90. Current input is a " "string." ) if isinstance(days, int) and 1 <= int(days) <= 90: return True else: raise ValueError("Days must be a valid integer between 1 and 90.")
[docs] def validate_timeline_granularity(granularity): """Check if the Timeline granularity value is valid. :param granularity: field value to validate. :type granularity: str """ if granularity != "1h" and granularity != "1d": raise ValueError("Granularity currently only supports a value of 1d or 1h") else: return True
[docs] def validate_similar_min_score(min_score): """Check if the Similarity min_score value is valid. :param min_score: field value to validate. :type min_score: str """ if isinstance(min_score, str): raise ValueError( "Min Score must be a valid integer between 0 and 100. Current input is a " "string." ) if isinstance(min_score, int) and 0 <= int(min_score) <= 100: return True else: raise ValueError("Min Score must be a valid integer between 0 and 100.")
[docs] def validate_cve_id(cve_id): """Check if provided value is a valid CVE ID :param cve_id: field value to validate. :type cve_id: str """ # CVE regular expression cve_pattern = r"CVE-\d{4}-\d{4,7}" pattern = re.compile(cve_pattern) if not pattern.match(cve_id): raise ValueError("Invalid CVE ID format: {!r}".format(cve_id)) else: return True
[docs] def load_template(template_name: str) -> str: """Load a template from the templates directory. Args: template_name: Name of the template to load Returns: Template content as a string """ template_path = resources.files("greynoise").joinpath(f"templates/{template_name}") return template_path.read_text()