この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので十分ご注意ください。
皆様、
こんにちは。章でございます。
この度、CWPPのことを少し勉強しました。理論まとめ、ハンズオン事例を作成し、共有させていただきます。
©zhangzy
前回、CWPPの理論まとめを共有いたしました(CWPPに関すること(1/2):理論まとめ)。今回、具体的なハンズオン事例を作成し、共有させていただきます。
Prisma Cloudアラートメールレポーター
機能:カウントを含むアラート(違反している上位のポリシー)の新規/高リスクカウントに関するテナントおよびアカウントレベルの詳細を確認し、アラートメールで実施すること。
まずは、config.ymlを書く
prisma_cloud:
username: #access key
password: #secret key
customer_name: #option for multiple tenants
api_base: api.prismacloud.io #appX.prismacloud.io(X is the stack number)
つきまして、__init__.py、config_help.py、email_help.py、prismacloud_sdk.pyを作り
# __init__.py
from .prismacloud_sdk import PCSession
from .config_helper import ConfigHelper
from .email_helper import EmailHelper
# config_help.py
import os
import yaml
class ConfigHelper(object):
def __init__(self):
config = self.read_yml('configs')
self.pc_user = config["prisma_cloud"]["username"]
self.pc_pass = config["prisma_cloud"]["password"]
self.pc_cust = config["prisma_cloud"]["customer_name"]
self.pc_api_base = config["prisma_cloud"]["api_base"]
@classmethod
def read_yml(self, f):
yml_path = os.path.join(os.path.dirname(__file__), "../config/%s.yml" % f)
with open(yml_path,'r') as stream:
return yaml.safe_load(stream)
# email_helper.py
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailHelper(object):
def __init__(self):
self.msg = MIMEMultipart()
self.from_address = ""
self.email_srv = ""
self.email_srv_port = ""
self.username = ""
self.password = ""
def send_email(self, subject, bodycontent, tolist):
self.msg['From'] = self.from_address
self.msg['To'] = tolist
self.msg['Subject'] = subject
self.msg.attach(MIMEText(bodycontent, 'plain'))
if self.email_srv_port == "465":
server = smtplib.SMTP_SSL(self.email_srv, self.email_srv_port)
elif self.email_srv_port == "587":
server = smtplib.SMTP(self.email_srv, self.email_srv_port)
server.starttls()
else:
server = smtplib.SMTP(self.email_srv, self.email_srv_port)
server.login(self.username, self.password)
text = self.msg.as_string()
server.sendmail(self.from_address, tolist.split(','), text)
server.quit()
# prismacloud_sdk.py
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
import time
import json
class PCSession(object):
max_retries = 5
retry_statuses = [429, 500, 502, 503, 504]
def __init__(self,userid,userpass,customer_name,api_base):
self.api_id = userid
self.api_pass = userpass
self.cust_name = customer_name
self.api_base_url = api_base
self.auth_token = None
self.build_client()
return None
def build_client(self):
self.client = requests.Session()
self.retries = Retry(total=self.max_retries,
status_forcelist=self.retry_statuses,
backoff_factor=1)
self.redlock_http_adapter = HTTPAdapter(pool_connections=1,
pool_maxsize=10,
max_retries=self.retries)
self.session_mount = "https://"
self.client.mount(self.session_mount, self.redlock_http_adapter)
return None
def get_auth_token(self, endpoint, body):
token = None
resp = self.client.post(endpoint, json=body)
if resp.status_code == 200:
auth_resp_json = resp.json()
token = auth_resp_json["token"]
if resp.status_code == 401:
token = "BAD"
return token
def authenticate_client(self):
success = False
prefix = "https://" + self.api_base_url
endpoint = prefix + "/login"
body = {"username":self.api_id,"password":self.api_pass,"customerName":self.cust_name}
max_tries = 5
for _ in range(max_tries):
token = self.get_auth_token(endpoint, body)
if token == "BAD":
print("Invalid credentials - can not obtain session token.")
if token is not None:
self.auth_token = token
success = True
break
else:
time.sleep(1)
self.client.headers.update(self.build_header())
return success
def build_header(self):
header = {"x-redlock-auth": self.auth_token,
"Content-Type": "application/json"}
return header
def interact(self, verb, endpoint, params=None, reqbody=None):
url = "%s%s" % (self.build_endpoint_prefix(), endpoint)
success = False
if self.auth_token is None:
self.authenticate_client()
success, response, exception = self.try_wrapper(verb, url, params,
reqbody)
if success:
return response
raise exception
def try_wrapper(self, verb, url, params, reqbody):
verb_mapping = {'get': self.client.get,
'post': self.client.post,
'put': self.client.put,
'delete': self.client.delete}
if verb not in verb_mapping:
raise ValueError("Invalid HTTP verb for API: %s" % verb)
if self.auth_token is None:
self.authenticate_client()
success, response, exception = self.get_response(verb_mapping[verb],
verb, url, params,
reqbody)
if response.status_code == 401:
self.authenticate_client()
success, response, exception = self.get_response(verb_mapping[verb],
verb, url, params,
reqbody)
return success, response, exception
def get_response(self, client_method, verb, url, params, reqbody):
if verb in ['get', 'delete']:
response = client_method(url, params=params)
else:
response = client_method(url, data=json.dumps(reqbody))
success, exception = utility.parse_status(url, response.status_code,
response.text)
return success, response, exception
後は本番のrunner.pyを作成
import lib
import sys
from collections import defaultdict
class PCAlertEmailReport():
def __init__(self):
self.config = lib.ConfigHelper()
self.pc_sess = lib.PCSession(self.config.pc_user, self.config.pc_pass, self.config.pc_cust,
self.config.pc_api_base)
self.email_send = lib.EmailHelper()
if self.config.pc_user is None:
print("No access key specified, please fix and re-run script")
sys.exit()
if self.config.pc_pass is None:
print("No secret key specified, please fix and re-run script")
sys.exit()
def map_callback(self):
if self.config.pc_api_base == "api.prismacloud.io":
callback_base = "app.prismacloud.io"
elif self.config.pc_api_base == "api2.prismacloud.io":
callback_base = "app2.prismacloud.io"
elif self.config.pc_api_base == "api3.prismacloud.io":
callback_base = "app3.prismacloud.io"
return callback_base
def build_email_list(self):
email_list = defaultdict(list)
self.pc_sess.authenticate_client()
self.url = "https://" + self.config.pc_api_base + "/user/role"
roles = self.pc_sess.client.get(self.url)
self.url = "https://" + self.config.pc_api_base + "/cloud/group"
acct_grps = self.pc_sess.client.get(self.url)
self.url = "https://" + self.config.pc_api_base + "/cloud/name?onlyActive=true"
activeaccts = self.pc_sess.client.get(self.url)
allacctids = []
for acct in activeaccts.json():
allacctids.append(acct['id'])
for role in roles.json():
if role['roleType'] == "Build and Deploy Security":
continue
elif role['roleType'] != "System Admin":
for email in role['associatedUsers']:
if email in email_list.keys():
for acct in role['accountGroupIds']:
for target in acct_grps.json():
if target['id'] == acct:
if target['accountIds']:
email_list[email].extend(target['accountIds'])
else:
for acct in role['accountGroupIds']:
for target in acct_grps.json():
if target['id'] == acct:
if target['accountIds']:
email_list[email] = target['accountIds']
else:
for email in role['associatedUsers']:
if email in email_list.keys():
email_list[email].extend(allacctids)
else:
email_list[email].extend(allacctids)
for key,value in email_list.items():
email_list[key] = list(set(value))
return email_list
def gather_toplevel(self):
topinfo = {}
self.pc_sess.authenticate_client()
self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
"&detailed=false&fields=alert.status&alert.status=open"
top_new_open = self.pc_sess.client.get(self.url)
topinfo['top_new_open'] = top_new_open.json()['totalRows']
self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
"&detailed=false&fields=alert.status&alert.status=open&policy.severity=high"
top_new_high = self.pc_sess.client.get(self.url)
topinfo['top_new_high'] = top_new_high.json()['totalRows']
self.url = "https://" + self.config.pc_api_base + "/alert/count/open"
top_total = self.pc_sess.client.get(self.url)
topinfo['top_total'] = top_total.json()["count"]
return topinfo
def gather_acctlevel(self):
acctinfo = defaultdict(dict)
self.pc_sess.authenticate_client()
self.url = "https://" + self.config.pc_api_base + "/cloud/name?onlyActive=true"
activeaccts = self.pc_sess.client.get(self.url)
for acct in activeaccts.json():
self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
"&detailed=false&fields=alert.status&alert.status=open" \
"&cloud.accountId=" + acct["id"]
top_new_open = self.pc_sess.client.get(self.url)
acctinfo[acct['id']]['top_new_open'] = top_new_open.json()['totalRows']
self.url = "https://" + self.config.pc_api_base + "/v2/alert?timeType=relative&timeAmount=24&timeUnit=hour" \
"&detailed=false&fields=alert.status&alert.status=open" \
"&policy.severity=high&cloud.accountId=" + acct["id"]
top_new_high = self.pc_sess.client.get(self.url)
acctinfo[acct['id']]['top_new_high'] = top_new_high.json()['totalRows']
self.url = "https://" + self.config.pc_api_base + "/v2/alert?&timeType=to_now&timeUnit=epoch" \
"&detailed=false&alert.status=open" \
"&fields=alert.status&cloud.accountId=" + acct["id"]
top_total = self.pc_sess.client.get(self.url)
acctinfo[acct['id']]['top_total'] = top_total.json()['totalRows']
return acctinfo
def gather_top_risks(self):
toprisk = {}
self.pc_sess.authenticate_client()
self.url = "https://" + self.config.pc_api_base + "/alert/policy?timeType=to_now&timeUnit=epoch" \
"&detailed=false&alert.status=open"
policies = self.pc_sess.client.get(self.url)
for policy in policies.json():
toprisk[policy['policy']['name']] = policy['alertCount']
return toprisk
def build_email(self, toplevel, acctlevel, summarylevel, callback_base, email_addr):
top = toplevel
accts = acctlevel
risks = summarylevel
callback_url_top_new_open = \
"https://" + callback_base + "/alerts/overview#alert.status=open&timeAmount=24" \
"&timeType=relative&timeUnit=hour"
callback_url_top_new_high = \
"https://" + callback_base + "/alerts/overview#alert.status=open&policy.severity=high" \
"&timeAmount=24&timeType=relative&timeUnit=hour"
callback_url_top_total = \
"https://" + callback_base + "/alerts/overview#alert.status=open&timeType=to_now&timeUnit=epoch"
body = f"Daily Risk Summary\n\nNew risks identified: {top['top_new_open']} ({callback_url_top_new_open})\n" \
f"New high risk alerts: {top['top_new_high']} ({callback_url_top_new_high})\n" \
f"Total risks identified: {top['top_total']} ({callback_url_top_total})\n\n"
for acct,acct_data in accts.items():
if ((acct_data['top_new_open'] == 0) and (acct_data['top_new_high'] == 0)):
continue
else:
callback_url_acct_top_new_open = "https://" + callback_base + "/alerts/overview#alert.status=open" \
"&timeAmount=24&timeType=relative" \
"&timeUnit=hour&cloud.accountId={}".format(acct)
body2 = f"{acct} ({callback_url_acct_top_new_open})\n" \
f"New risks identified: {acct_data['top_new_open']}\n" \
f"New high risk alerts: {acct_data['top_new_high']}\n" \
f"Total risks identified: {acct_data['top_total']}\n\n"
body += body2
body3 = f"Top risks\n\n"
body += body3
risks_sorted = {val[0]: val[1] for val in sorted(risks.items(), key=lambda x: (-x[1], x[0]))}
top5risks = dict(list(risks_sorted.items())[0: 5])
for policy, count in top5risks.items():
body4 = f"{policy}:{count}\n"
body += body4
self.email_send.send_email('Daily Risk Summary', body, email_addr)
def build_email_acct_list(self, toplevel_data, acctlevel_data, summarylevel_data, callback, email_list):
new_accts = defaultdict(dict)
for emailaddr, acctids in email_list.items():
for acct in acctids:
for acct_id,data in acctlevel_data.items():
if acct == acct_id:
new_accts[acct_id].update(data)
self.build_email(toplevel_data, new_accts, summarylevel_data, callback, emailaddr)
def run(self):
callback = self.map_callback()
print("Gathering top level data from tenant...")
toplevel_data = self.gather_toplevel()
print("Gathering account level data from tenant...")
acctlevel_data = self.gather_acctlevel()
print("Gathering summary level risk data from tenant...")
summarylevel_data = self.gather_top_risks()
print("Constructing email recipient list")
email_list = self.build_email_list()
print("Building and sending email...")
self.build_email_acct_list(toplevel_data, acctlevel_data, summarylevel_data, callback, email_list)
def main():
PCAlertEmailReport().run()
if __name__ == "__main__":
main()
後は現場状況を合わせて修正実施してみます。