#! /usr/bin/env python3 """ ``` terraform init terraform plan -out plan.out && terraform show -json plan.out > export DIVVY_API_KEY= ./api_test.py \ --provider=\ --scan_name=\ --author=\ --html_out=\ --json_out=\ --auth_for_submission # only required if your insightCloudSec IaC installation requires auth\ --parameters= # currently CFT-only\ --overrides= # currently CFT-only ``` This script is designed to be used in a CI/CD pipeline to make requests to the insightCloudSec IaC Security scanning endpoint. Pass in a JSON plan as specified above, with one or both of the `--html_out` and `--json_out` parameters. This will generate HTML and/or JSON output and save it to the specified files or files. The script is currently configured to never fail; you will have to change the failure conditions at the bottom of the script to start breaking the build on the basis of these results. We recommend that, when you first deploy IaC, you begin by always passing. This gives you time to use the results to familiarize yourself with how scanning works against your Terraform or CFT codebase, confirm the results align with your expectations, and to educate your DevOps teams on how to read and address any failures. After collaborative evaluation of the results, DevOps users will be better prepared to address issues independently. """ import argparse import json import logging import os import sys import urllib import warnings import requests logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) console = logging.StreamHandler() logger.addHandler(console) parser = argparse.ArgumentParser() parser.add_argument("divvy_url", help="The Base URL for your insightCloudSec installation.") parser.add_argument("config_name", help="The name of the IaC configuration to use for the scan.") parser.add_argument("template", help="Terraform plan output JSON file or CFT template file.") parser.add_argument( "--provider", default="terraform", choices=["terraform", "cft"], help="The provider to use for the scan." ) parser.add_argument("--scan_name", default="API Scan", help="Name for the job that will appear in insightCloudSec.") parser.add_argument("--author", default="API Author", help="Author of the job that will appear in insightCloudSec.") parser.add_argument( "--html_out", default=None, help="Takes a filename. If specified, download an HTML report to the specified file." ) parser.add_argument( "--json_out", default=None, help="Takes a filename. If specified, download a JSON report to the specified file." ) parser.add_argument( "--auth_for_submission", action="store_true", help=( "If you've configured IaC to require authentication for scan submission, " "set this flag to authenticate for that submission." ), ) parser.add_argument( "--parameters", default=None, type=argparse.FileType(mode="r"), help=( "Takes a filename. If specified, ICS will substitute the values specified for parameters " "in the template. Currently CFT-only." ), ) parser.add_argument( "--overrides", default=None, type=argparse.FileType(mode="r"), help=( "Takes a filename. If specified, ICS will substitute the values specified for parameters " "in the template. These can include pseudo parameters and take precedent over values in " "the file passed to --parameters. Currently CFT-only." ), ) mu_group = parser.add_mutually_exclusive_group() mu_group.add_argument( "--no_ssl_verification", dest="ssl_verification", action="store_false", help="Disables SSL verification for all API calls to Divvy", ) mu_group.add_argument( "--ssl_trusted_authorities", default=None, help=( "Sets the trusted authorities for SSL verification " "using a CA bundle file (.pem) or directory of trusted certificates." ), ) api_key = os.environ.get("DIVVY_API_KEY") username = None password = None # Prefer API key authentication over username & password if not api_key: # Username/password to authenticate against the API username, password = os.environ.get("DIVVY_USERNAME"), os.environ.get("DIVVY_PASSWORD") def get_auth_token(login_url): """ Log in to Divvy's authentication API and return a session token for performing authenticated actions. """ response = requests.post( url=login_url, data=json.dumps({"username": username, "password": password}), headers={"Content-Type": "application/json"}, ) assert response.ok, "Authentication failed with message: {}".format(response.text) return response.json()["session_id"] def scan_template( base_url, scan_mode, scan_filename, provider="terraform", api_key=None, session_token=None, ssl_verification=True, parameters=None, overrides=None, ): """ Use Divvy's `/scan` API to submit a template file. """ # Prepare Accept headers and URL parameters to request JSON or HTML response params = {} if scan_mode == "json": accept_value = "application/json" params["readable"] = "true" elif scan_mode == "html": accept_value = "text/html" else: raise ValueError("Invalid value {} for scan_mode".format(scan_mode)) with open(args.template) as template_file: template_str = template_file.read() data = { "scan_name": scan_name, "author_name": author, "scan_template": template_str, "config_name": config_name, "iac_provider": provider, } if parameters: data["parameters"] = parameters if overrides: data["overrides"] = overrides headers = { "Content-Type": "application/json;charset=UTF-8", "Accept": accept_value, } if api_key: headers["Api-Key"] = api_key elif session_token: headers["X-Auth-Token"] = session_token response = requests.post( url=base_url + "/v3/iac/scan", headers=headers, data=json.dumps(data), params=params, verify=ssl_verification ) with open(scan_filename, "w") as f: f.write(response.text) return response if __name__ == "__main__": args = parser.parse_args() ssl_verification = args.ssl_verification if ssl_verification: if args.ssl_trusted_authorities: if os.path.isfile(args.ssl_trusted_authorities) or os.path.isdir(args.ssl_trusted_authorities): ssl_verification = args.ssl_trusted_authorities else: parser.error("Invalid value for --ssl_trusted_authorities; Path to a file or folder expected") else: # Disables SSL verification, allowing use of HTTPS with unsigned certs requests.packages.urllib3.disable_warnings() parsed_url = urllib.parse.urlparse(args.divvy_url) if parsed_url.port not in (443, 8443, None) and parsed_url.scheme == "https": warnings.warn( "insightCloudSec URL [{}] uses HTTPS, but its port is not 443 or 8443. " "Please confirm that port {} is using SSL and if not, " "use HTTP instead.".format(args.divvy_url, parsed_url.port) ) html_out = args.html_out json_out = args.json_out two_phases = html_out and json_out if two_phases or json_out: scan_mode = "json" scan_filename = json_out elif html_out: scan_mode = "html" scan_filename = html_out else: raise ValueError("Must specify at least one of `--html_out ` and/or `--json_out `") config_name = args.config_name scan_name = args.scan_name provider = args.provider author = args.author base_url = args.divvy_url if "/v3/iac/scan" in base_url: base_url = base_url.replace("/v3/iac/scan", "") # If `/scan` requires authentication (`--auth_for_submission`), or if we # will be getting both JSON and HTML reports and thus need to hit the # authenticated `/scans/id` API, get a session token. session_token = None if args.auth_for_submission or two_phases: if not api_key and not username and not password: logger.error( "Authentication is required for this action, but " "you didn't provide an API key via DIVVY_API_KEY." ) sys.exit(os.EX_NOPERM) if not api_key: if not username: logger.error( "Authentication is required for this action, but " "you didn't provide a username via DIVVY_USERNAME." ) sys.exit(os.EX_NOPERM) if password is None: logger.error( "Authentication is required for this action, but " "you didn't provide a username via DIVVY_PASSWORD." ) sys.exit(os.EX_NOPERM) session_token = get_auth_token(base_url + "/v2/public/user/login") parameters = None if args.parameters: parameters = args.parameters.read() overrides = None if args.overrides: overrides = args.overrides.read() try: result = scan_template( base_url=base_url, scan_mode=scan_mode, scan_filename=scan_filename, provider=args.provider, api_key=api_key, # May be `None` session_token=session_token, # May be `None` ssl_verification=ssl_verification, parameters=parameters, overrides=overrides, ) except requests.exceptions.SSLError as e: logger.error("Fatal SSL error: {}".format(e)) sys.exit(os.EX_CONFIG) status_code = result.status_code if two_phases: # Hit the `get_scans` endpoint to get the HTML report. This endpoint is # always authenticated so it requires a session token non-optionally. headers = { "Content-Type": "application/json;charset=UTF-8", "Accept": "text/html", } if api_key: headers["Api-Key"] = api_key elif session_token: headers["X-Auth-Token"] = session_token response = requests.get( url=base_url + "/v3/iac/scans/{}".format(json.loads(result.text)["build_id"]), headers=headers, verify=ssl_verification, ) with open(html_out, "w") as f: f.write(response.text) # Fail based on the API results from the `/scan` request. Customize this to # your use case. if status_code == 200: message = "[insightCloudSec]: Scan completed successfully. All insights have passed." exit_code = 0 elif status_code == 202: message = ( "[insightCloudSec]: Scan completed successfully, but with warnings. " "All failure-inducing insights have passed, but some warning-inducing insights did not." ) exit_code = 0 # Change to a nonzero positive integer to fail the build elif status_code == 406: message = ( "[insightCloudSec]: Scan completed, but one or more insights have failed. " "Please check the insightCloudSec console for more information." ) exit_code = 0 # Change to a nonzero positive integer to fail the build else: http_error = "[Not available; contact insightCloudSec Support]" try: result.raise_for_status() except requests.exceptions.HTTPError as e: error_json = e.response.json() http_error = error_json.get("error_message") if error_json else http_error message = ( f"[insightCloudSec]: IaC Endpoint Request returned HTTP Error {status_code}. " f"Exception text:\n\n{http_error}" ) logger.error(message) sys.exit(1) logger.info(message) sys.exit(exit_code)