CWPPに関すること (2/2):ハンズオン事例


この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので十分ご注意ください。

皆様、

こんにちは。章でございます。

この度、CWPPのことを少し勉強しました。理論まとめ、ハンズオン事例を作成し、共有させていただきます。

©zhangzy

前回、CWPPの理論まとめを共有いたしました(CWPPに関すること(1/2):理論まとめ)。今回、具体的なハンズオン事例を作成し、共有させていただきます。


Prisma Cloudアラートメールレポーター


機能:カウントを含むアラート(違反している上位のポリシー)の新規/高リスクカウントに関するテナントおよびアカウントレベルの詳細を確認し、アラートメールで実施すること。

まずは、config.ymlを書く

prisma_cloud:
  username:  #access key
  password:  #secret key
  customer_name: #option for multiple tenants
  api_base: api.prismacloud.io #appX.prismacloud.io(X is the stack number)

つきまして、__init__.py、config_help.py、email_help.py、prismacloud_sdk.pyを作り

# __init__.py
from .prismacloud_sdk import PCSession
from .config_helper import ConfigHelper
from .email_helper import EmailHelper
# config_help.py
import os
import yaml
class ConfigHelper(object):
    def __init__(self):
        config = self.read_yml('configs')
        self.pc_user = config["prisma_cloud"]["username"]
        self.pc_pass = config["prisma_cloud"]["password"]
        self.pc_cust = config["prisma_cloud"]["customer_name"]
        self.pc_api_base = config["prisma_cloud"]["api_base"]
    @classmethod
    def read_yml(self, f):
        yml_path = os.path.join(os.path.dirname(__file__), "../config/%s.yml" % f)
        with open(yml_path,'r') as stream:
            return yaml.safe_load(stream)
# email_helper.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailHelper(object):
    def __init__(self):
        self.msg = MIMEMultipart()
        self.from_address = ""
        self.email_srv = ""
        self.email_srv_port = ""
        self.username = ""
        self.password = ""
    def send_email(self, subject, bodycontent, tolist):
        self.msg['From'] = self.from_address
        self.msg['To'] = tolist
        self.msg['Subject'] = subject
self.msg.attach(MIMEText(bodycontent, 'plain'))
        if self.email_srv_port == "465":
            server = smtplib.SMTP_SSL(self.email_srv, self.email_srv_port)
        elif self.email_srv_port == "587":
            server = smtplib.SMTP(self.email_srv, self.email_srv_port)
            server.starttls()
        else:
            server = smtplib.SMTP(self.email_srv, self.email_srv_port)
        server.login(self.username, self.password)
        text = self.msg.as_string()
        server.sendmail(self.from_address, tolist.split(','), text)
        server.quit()
# prismacloud_sdk.py
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
import time
import json
class PCSession(object):
    max_retries = 5
    retry_statuses = [429, 500, 502, 503, 504]
    def __init__(self,userid,userpass,customer_name,api_base):
        self.api_id = userid
        self.api_pass = userpass
        self.cust_name = customer_name
        self.api_base_url = api_base
        self.auth_token = None
        self.build_client()
        return None
    def build_client(self):
        self.client = requests.Session()
        self.retries = Retry(total=self.max_retries,
                             status_forcelist=self.retry_statuses,
                             backoff_factor=1)
        self.redlock_http_adapter = HTTPAdapter(pool_connections=1,
                                                pool_maxsize=10,
                                                max_retries=self.retries)
        self.session_mount = "https://"
        self.client.mount(self.session_mount, self.redlock_http_adapter)
        return None
    def get_auth_token(self, endpoint, body):
        token = None
        resp = self.client.post(endpoint, json=body)
        if resp.status_code == 200:
            auth_resp_json = resp.json()
            token = auth_resp_json["token"]
        if resp.status_code == 401:
            token = "BAD"
        return token
    def authenticate_client(self):
        success = False
        prefix = "https://"  + self.api_base_url
        endpoint = prefix + "/login"
        body = {"username":self.api_id,"password":self.api_pass,"customerName":self.cust_name}
        max_tries = 5
        for _ in range(max_tries):
            token = self.get_auth_token(endpoint, body)
            if token == "BAD":
                print("Invalid credentials - can not obtain session token.")
            if token is not None:
                self.auth_token = token
                success = True
                break
            else:
                time.sleep(1)
        self.client.headers.update(self.build_header())
        return success
    def build_header(self):
        header = {"x-redlock-auth": self.auth_token,
                  "Content-Type": "application/json"}
        return header
    def interact(self, verb, endpoint, params=None, reqbody=None):
        url = "%s%s" % (self.build_endpoint_prefix(), endpoint)
        success = False
        if self.auth_token is None:
            self.authenticate_client()
        success, response, exception = self.try_wrapper(verb, url, params,
                                                        reqbody)
        if success:
            return response
        raise exception
    def try_wrapper(self, verb, url, params, reqbody):
        verb_mapping = {'get': self.client.get,
                        'post': self.client.post,
                        'put': self.client.put,
                        'delete': self.client.delete}
        if verb not in verb_mapping:
            raise ValueError("Invalid HTTP verb for API: %s" % verb)
        if self.auth_token is None:
            self.authenticate_client()
        success, response, exception = self.get_response(verb_mapping[verb],
                                                         verb, url, params,
                                                         reqbody)
        if response.status_code == 401:
            self.authenticate_client()
            success, response, exception = self.get_response(verb_mapping[verb],
                                                             verb, url, params,
                                                             reqbody)
        return success, response, exception
    def get_response(self, client_method, verb, url, params, reqbody):
        if verb in ['get', 'delete']:
            response = client_method(url, params=params)
        else:
            response = client_method(url, data=json.dumps(reqbody))
        success, exception = utility.parse_status(url, response.status_code,
                                                  response.text)
        return success, response, exception

後は本番のrunner.pyを作成

import lib
import sys
from collections import defaultdict
class PCAlertEmailReport():
    def __init__(self):
        self.config = lib.ConfigHelper()
        self.pc_sess = lib.PCSession(self.config.pc_user, self.config.pc_pass, self.config.pc_cust,
                                     self.config.pc_api_base)
        self.email_send = lib.EmailHelper()
        if self.config.pc_user is None:
            print("No access key specified, please fix and re-run script")
            sys.exit()
        if self.config.pc_pass is None:
            print("No secret key specified, please fix and re-run script")
            sys.exit()
    def map_callback(self):
        if self.config.pc_api_base == "api.prismacloud.io":
            callback_base = "app.prismacloud.io"
        elif self.config.pc_api_base == "api2.prismacloud.io":
            callback_base = "app2.prismacloud.io"
        elif self.config.pc_api_base == "api3.prismacloud.io":
            callback_base = "app3.prismacloud.io"
        return callback_base
    def build_email_list(self):
        email_list = defaultdict(list)
        self.pc_sess.authenticate_client()
        self.url = "https://" + self.config.pc_api_base + "/user/role"
        roles = self.pc_sess.client.get(self.url)
        self.url = "https://" + self.config.pc_api_base + "/cloud/group"
        acct_grps = self.pc_sess.client.get(self.url)
        self.url = "https://" + self.config.pc_api_base + "/cloud/name?onlyActive=true"
        activeaccts = self.pc_sess.client.get(self.url)
        allacctids = []
        for acct in activeaccts.json():
            allacctids.append(acct['id'])
        for role in roles.json():
            if role['roleType'] == "Build and Deploy Security":
                continue
            elif role['roleType'] != "System Admin":
                for email in role['associatedUsers']:
                    if email in email_list.keys():
                        for acct in role['accountGroupIds']:
                            for target in acct_grps.json():
                                if target['id'] == acct:
                                    if target['accountIds']:
                                        email_list[email].extend(target['accountIds'])
                    else:
                        for acct in role['accountGroupIds']:
                            for target in acct_grps.json():
                                if target['id'] == acct:
                                    if target['accountIds']:
                                        email_list[email] = target['accountIds']
            else:
                for email in role['associatedUsers']:
                    if email in email_list.keys():
                        email_list[email].extend(allacctids)
                    else:
                        email_list[email].extend(allacctids)
        for key,value in email_list.items():
            email_list[key] = list(set(value))
        return email_list
    def gather_toplevel(self):
        topinfo = {}
        self.pc_sess.authenticate_client()
        self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
                   "&detailed=false&fields=alert.status&alert.status=open"
        top_new_open = self.pc_sess.client.get(self.url)
        topinfo['top_new_open'] = top_new_open.json()['totalRows']
        self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
                   "&detailed=false&fields=alert.status&alert.status=open&policy.severity=high"
        top_new_high = self.pc_sess.client.get(self.url)
        topinfo['top_new_high'] = top_new_high.json()['totalRows']
        self.url = "https://" + self.config.pc_api_base + "/alert/count/open"
        top_total = self.pc_sess.client.get(self.url)
        topinfo['top_total'] = top_total.json()["count"]
        return topinfo
    def gather_acctlevel(self):
        acctinfo = defaultdict(dict)
        self.pc_sess.authenticate_client()
        self.url = "https://" + self.config.pc_api_base + "/cloud/name?onlyActive=true"
        activeaccts = self.pc_sess.client.get(self.url)
        for acct in activeaccts.json():
            self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
                                                          "&detailed=false&fields=alert.status&alert.status=open" \
                                                          "&cloud.accountId=" + acct["id"]
            top_new_open = self.pc_sess.client.get(self.url)
            acctinfo[acct['id']]['top_new_open'] = top_new_open.json()['totalRows']
            self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
                                                         "&detailed=false&fields=alert.status&alert.status=open" \
                                                         "&policy.severity=high&cloud.accountId=" + acct["id"]
            top_new_high = self.pc_sess.client.get(self.url)
            acctinfo[acct['id']]['top_new_high'] = top_new_high.json()['totalRows']
            self.url = "https://" + self.config.pc_api_base + "/v2/alert?&timeType=to_now&timeUnit=epoch" \
                                                          "&detailed=false&alert.status=open" \
                                                          "&fields=alert.status&cloud.accountId=" + acct["id"]
            top_total = self.pc_sess.client.get(self.url)
            acctinfo[acct['id']]['top_total'] = top_total.json()['totalRows']
        return acctinfo
    def gather_top_risks(self):
        toprisk = {}
        self.pc_sess.authenticate_client()
        self.url = "https://" + self.config.pc_api_base + "/alert/policy?timeType=to_now&timeUnit=epoch" \
                                                          "&detailed=false&alert.status=open"
        policies = self.pc_sess.client.get(self.url)
        for policy in policies.json():
            toprisk[policy['policy']['name']] = policy['alertCount']
        return toprisk
    def build_email(self, toplevel, acctlevel, summarylevel, callback_base, email_addr):
        top = toplevel
        accts = acctlevel
        risks = summarylevel
        callback_url_top_new_open = \
            "https://" + callback_base + "/alerts/overview#alert.status=open&timeAmount=24" \
                                         "&timeType=relative&timeUnit=hour"
        callback_url_top_new_high = \
            "https://" + callback_base + "/alerts/overview#alert.status=open&policy.severity=high" \
                                         "&timeAmount=24&timeType=relative&timeUnit=hour"
        callback_url_top_total = \
            "https://" + callback_base + "/alerts/overview#alert.status=open&timeType=to_now&timeUnit=epoch"
        body = f"Daily Risk Summary\n\nNew risks identified: {top['top_new_open']} ({callback_url_top_new_open})\n" \
               f"New high risk alerts: {top['top_new_high']} ({callback_url_top_new_high})\n" \
               f"Total risks identified: {top['top_total']} ({callback_url_top_total})\n\n"
        for acct,acct_data in accts.items():
            if ((acct_data['top_new_open'] == 0) and (acct_data['top_new_high'] == 0)):
                continue
            else:
                callback_url_acct_top_new_open = "https://" + callback_base + "/alerts/overview#alert.status=open" \
                                                                          "&timeAmount=24&timeType=relative" \
                                                                          "&timeUnit=hour&cloud.accountId={}".format(acct)
                body2 = f"{acct} ({callback_url_acct_top_new_open})\n" \
                        f"New risks identified: {acct_data['top_new_open']}\n" \
                        f"New high risk alerts: {acct_data['top_new_high']}\n" \
                        f"Total risks identified: {acct_data['top_total']}\n\n"
                body += body2
        body3 = f"Top risks\n\n"
        body += body3
        risks_sorted = {val[0]: val[1] for val in sorted(risks.items(), key=lambda x: (-x[1], x[0]))}
        top5risks = dict(list(risks_sorted.items())[0: 5])
        for policy, count in top5risks.items():
            body4 = f"{policy}:{count}\n"
            body += body4
        self.email_send.send_email('Daily Risk Summary', body, email_addr)
    def build_email_acct_list(self, toplevel_data, acctlevel_data, summarylevel_data, callback, email_list):
        new_accts = defaultdict(dict)
        for emailaddr, acctids in email_list.items():
            for acct in acctids:
                for acct_id,data in acctlevel_data.items():
                    if acct == acct_id:
                        new_accts[acct_id].update(data)
            self.build_email(toplevel_data, new_accts, summarylevel_data, callback, emailaddr)
    def run(self):
        callback = self.map_callback()
        print("Gathering top level data from tenant...")
        toplevel_data = self.gather_toplevel()
        print("Gathering account level data from tenant...")
        acctlevel_data = self.gather_acctlevel()
        print("Gathering summary level risk data from tenant...")
        summarylevel_data = self.gather_top_risks()
        print("Constructing email recipient list")
        email_list = self.build_email_list()
        print("Building and sending email...")
        self.build_email_acct_list(toplevel_data, acctlevel_data, summarylevel_data, callback, email_list)
def main():
    PCAlertEmailReport().run()
if __name__ == "__main__":
    main()

後は現場状況を合わせて修正実施してみます。

Last modified: 2022-03-31

Author