import logging
from datetime import datetime
import pytz
from google.cloud import securitycenter
from google.cloud.securitycenter_v1 import Finding
from google.protobuf import field_mask_pb2
from google.protobuf.json_format import ParseError
from inflection import camelize
from inflection import underscore
_LOGGER = logging.getLogger(__name__)
[docs]
def get_all_findings(
filter, gcp_org_id, order_by=None, page_size=1000, credentials=None, client=None
):
"""Returns an iterator for all findings matching a particular filter.
.. code:: python
from bibt.gcp.scc import get_all_findings
for _ in get_all_findings(
filter='category="PUBLIC_BUCKET_ACL"',
order_by='eventTime desc',
gcp_org_id=123123
):
print(_.finding.name, _.resource.name)
:type filter: :py:class:`str`
:param filter: the filter to use. See
`here <https://googleapis.dev/python/securitycenter/latest/securitycenter_v1/types.html#google.cloud.securitycenter_v1.types.ListFindingsRequest.filter>`__
for more on valid filter syntax.
:type gcp_org_id: :py:class:`str`
:param gcp_org_id: the GCP organization ID under which to search.
:type order_by: :py:class:`str`
:param order_by: (optional) the sort order of the findings. See
`here <https://googleapis.dev/python/securitycenter/latest/securitycenter_v1/types.html#google.cloud.securitycenter_v1.types.ListFindingsRequest.order_by>`__
for more on valid arguments. Default is None.
:type page_size: :py:class:`int`
:param page_size: (optional) the page size for the API requests.
max and default is ``1000`` .
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: (optional) the credentials object to use when making the
API call, if not to use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls.
will generate one if not passed.
:rtype: :py:class:`gcp_scc:google.cloud.securitycenter_v1.types.ListFindingsResponse`
:returns: an iterator for all findings matching the filter.
""" # noqa: E501
return _get_all_findings_iter(
request={
"parent": f"organizations/{gcp_org_id}/sources/-",
"filter": filter,
"page_size": page_size,
"order_by": order_by,
},
credentials=credentials,
client=client,
)
[docs]
def get_value(obj, path, raise_exception=True):
"""Fetches the value in the given ``obj`` according to the given ``path``.
Works on objects and dicts. Supports arrays in a few ways:
* if the ``path`` is ``resource.folders[].resource_folder_display_name`` OR
``resource.folders[0].resource_folder_display_name``,
it will just consider the first element in the array.
* if the ``path`` is ``resource.folders[*].resource_folder_display_name``,
it will return a list of ``resource_folder_display_name`` values,
one for each folder.
Additionally, if unsuccessful with exactly what was passed as ``path``, it
will convert and try both camelized and underscored attribute names
(``resource_folder_display_name`` and ``resourceFolderDisplayName``).
As a last resort it will try a key lookup (e.g. ``obj[key]``).
.. code:: python
from bibt.gcp import scc
f = scc.get_finding(
name="organizations/123123/sources/123123/findings/123123",
gcp_org_id=123123
)
v = scc.get_value(
f,
"finding.source_properties.abuse_target_ips"
)
print(v)
:type obj: :py:class:`object`
:param obj: the object from which to extract a value.
:type path: :py:class:`str`
:param path: the path to follow to find the desired value(s).
:type raise_exception: :py:class:`bool`
:param raise_exception: whether it should raise an exception if the path isn't
resolved successfully, or just return None.
:returns: whatever it finds at the end of the specified ``path``.
:raises KeyError: if the next part of the path cannot be found.
"""
if path == "":
return obj
attr, _, remaining = path.partition(".")
grab_one = grab_all = False
if attr.endswith("[]"):
attr = attr[:-2]
grab_one = True
elif attr.endswith("[0]"):
attr = attr[:-3]
grab_one = True
elif attr.endswith("[*]"):
attr = attr[:-3]
grab_all = True
obj = _get(obj, attr, raise_exception=raise_exception)
if not obj:
return None
if grab_one:
obj = obj[0]
elif grab_all:
return [
get_value(_obj, remaining, raise_exception=raise_exception) for _obj in obj
]
return get_value(obj, remaining, raise_exception=raise_exception)
[docs]
def get_finding(name, gcp_org_id, credentials=None, client=None):
"""This function returns the finding object specified by name.
.. code:: python
from bibt.gcp import scc
f = scc.get_finding(
name="organizations/123123/sources/123123/findings/123123",
gcp_org_id=123123
)
print(f.finding.name, f.resource.name)
:type name: :py:class:`str`
:param name: the ``finding.name`` to fetch.
:type gcp_org_id: :py:class:`str`
:param gcp_org_id: the GCP organization ID under which to search.
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call, if not to
use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls.
will generate one if not passed.
:rtype: :py:class:`gcp_scc:google.cloud.securitycenter_v1.types.ListFindingsResponse.ListFindingsResult`
:returns: the specified finding object, paired with its resource information.
:raises ValueError: if no finding under the supplied name is found.
""" # noqa: E501
findings = _get_all_findings_iter(
request={
"parent": f"organizations/{gcp_org_id}/sources/-",
"filter": f'name="{name}"',
"page_size": 1,
},
credentials=credentials,
client=client,
)
try:
_, f = next(enumerate(findings))
return f
except StopIteration:
raise ValueError(
f'No finding object returned for name="{name}" in '
f"organizations/{gcp_org_id}"
)
[docs]
def get_security_marks(scc_name, gcp_org_id, credentials=None, client=None):
"""Gets security marks on an asset or finding in SCC and returns them as a dict.
.. code:: python
from bibt.gcp import scc
for k, v in scc.get_security_marks(
scc_name="organizations/123123/sources/123123/findings/123123",
os.environ["GCP_ORG_ID"]
).items():
print(k, v)
:type scc_name: :py:class:`str`
:param scc_name: may be either an SCC ``finding.name`` or a GCP ``resourceName`` .
format is: ``organizations/123123/sources/123123/findings/123123`` or
``//storage.googleapis.com/my-bucket``.
**note this does not accept ``asset.name`` format!**
:type gcp_org_id: :py:class:`str`
:param gcp_org_id: the GCP organization ID under which to search.
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call, if
not to use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls.
will generate one if not passed.
:rtype: :py:class:`dict`
:returns: a dictionary containing security marks as key/value pairs.
:raises TypeError: if scc_name is not in a recognizeable format.
""" # noqa: E501
if "/findings/" in scc_name:
_LOGGER.debug(f'Assuming type "finding" from scc_name format: {scc_name}')
f = get_finding(scc_name, gcp_org_id, credentials, client)
if "security_marks" in f.finding:
return dict(f.finding.security_marks.marks)
else:
raise TypeError(f"Unrecognized scc_name type: {scc_name}")
return {}
[docs]
def get_sources(parent_name, credentials=None, client=None):
"""Returns a list of all sources in the parent.
.. code:: python
for source in get_sources("organizations/123456"):
print(source.display_name)
:type parent_name: :py:class:`str`
:param parent_name: the parent name, e.g. "organizations/123456" or
"projects/123456"
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call,
if not to use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls.
will generate one if not passed.
:rtype: :py:class:`list` :py:class:`gcp_scc:google.cloud.securitycenter_v1.types.Sources`
:returns: a list of SCC Source objects
""" # noqa: E501
if not isinstance(client, securitycenter.SecurityCenterClient):
client = securitycenter.SecurityCenterClient(credentials=credentials)
return [source for source in client.list_sources(parent=parent_name)]
[docs]
def parse_notification(notification, ignore_unknown_fields=False):
"""This method takes the notification received from a SCC Notification Pubsub
and returns a Python object.
.. code:: python
import base64
from bibt.gcp import scc
def main(event, context):
raw_notification = base64.b64decode(event["data"]).decode("utf-8")
notification = scc.parse_notification(raw_notification)
print(
notification.finding.name,
notification.finding.category,
notification.resource.name
)
:type notification: :py:class:`str` OR :py:class:`dict`
:param notification: the notification to parse. may be either a dictionary
or a json string.
:type ignore_unknown_fields: :py:class:`bool`
:param ignore_unknown_fields: whether or not unrecognized fields should be
ignored when parsing. fields may be unrecognized if they are added to
the finding category in later releases of google-cloud-securitycenter library.
:rtype: :py:class:`gcp_scc:google.cloud.securitycenter_v1.types.ListFindingsResponse.ListFindingsResult`
:returns: the finding notification as a Python object.
:raises TypeError: if it is passed anything aside from a :py:class:`str`
or :py:class:`dict`, or it has an issue parsing the finding into an object.
""" # noqa: E501
from google.cloud.securitycenter_v1.types import ListFindingsResponse
if isinstance(notification, dict):
import json
notification = json.dumps(notification)
elif not isinstance(notification, str):
raise TypeError(
"Notification must be either a string or a dict! "
f"You passed a {type(notification).__name__}"
)
try:
return ListFindingsResponse.ListFindingsResult.from_json(
notification, ignore_unknown_fields=ignore_unknown_fields
)
except ParseError as e:
raise TypeError(
"Error encountered while attempting to parse into finding object, "
"try setting ignore_unknown_fields=True or updating the "
"google-cloud-securitycenter package: "
f"{type(e).__name__}: {e}"
)
[docs]
def set_finding_state(finding_name, state="INACTIVE", credentials=None, client=None):
"""This method will set the finding to inactive state by default.
.. code:: python
from bibt.gcp import scc
scc.set_finding_state(
finding_name="organizations/123123/sources/123123/findings/123123"
)
:type finding_name: :py:class:`str`
:param finding_name: the finding.name whose state to modify.
:type state: :py:class:`str`
:param state: the state to set the finding to. must be valid according to
:py:class:`gcp_scc:google.cloud.securitycenter_v1.types.Finding.State`.
defaults to "INACTIVE".
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call,
if not to use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls.
will generate one if not passed.
:raises KeyError: if the argument supplied for ``state`` is not a valid name
for :py:class:`gcp_scc:google.cloud.securitycenter_v1.types.Finding.State`.
""" # noqa: E501
try:
state_enum = Finding.State[state]
except KeyError:
raise KeyError(
f"Supplied state ({state}) not recognized. "
f"Must be one of {[s.name for s in Finding.State]}"
)
if not isinstance(client, securitycenter.SecurityCenterClient):
client = securitycenter.SecurityCenterClient(credentials=credentials)
client.set_finding_state(
request={
"name": finding_name,
"state": state_enum,
"start_time": datetime.now(pytz.UTC),
}
)
return
[docs]
def set_security_marks(scc_name, marks, gcp_org_id=None, credentials=None, client=None):
"""Sets security marks on an asset or finding in SCC. Usually, if we're setting
them on a finding, it means we're setting a mark of ``reason`` for setting it
to inactive. if we're setting them on an asset, it is usually to
``allow_{finding.category}=true`` .
.. code:: python
from bibt.gcp import scc
scc.set_security_mark(
scc_name="organizations/123123/sources/123123/findings/123123",
marks={
'reason': 'intentionally public'
}
)
:type scc_name: :py:class:`str`
:param scc_name: may be either an SCC ``finding.name`` or a GCP ``resourceName`` .
format is: ``organizations/123123/sources/123123/findings/123123`` or
``//storage.googleapis.com/my-bucket``. **note this does not accept
``asset.name`` format!**
:type marks: :py:class:`dict`
:param marks: a dictionary of marks to set on the asset or finding. format it:
``marks={"allow_public_bucket_acl": "true", "reason": "intentional"}`` .
**note this must be a dict and not a list!**
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call, if not
to use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls. will generate one
if not passed.
:raises TypeError: if the argument supplied for ``marks`` is not a :py:class:`dict`
""" # noqa: E501
if not isinstance(marks, dict):
raise TypeError(
f"Argument: 'marks' must be a dict! You passed a {type(marks).__name__}."
)
mask_paths = [f"marks.{k}" for k in marks.keys()]
if not isinstance(client, securitycenter.SecurityCenterClient):
client = securitycenter.SecurityCenterClient(credentials=credentials)
client.update_security_marks(
request={
"security_marks": {"name": f"{scc_name}/securityMarks", "marks": marks},
"update_mask": field_mask_pb2.FieldMask(paths=mask_paths),
}
)
return
[docs]
def set_mute_status(finding_name, status="MUTED", credentials=None, client=None):
"""This method will mute the finding by default. May also be used to unmute with
``status="UNMUTED"`` .
.. code:: python
from bibt.gcp import scc
scc.set_mute_status(
finding_name="organizations/123123/sources/123123/findings/123123"
)
:type finding_name: :py:class:`str`
:param finding_name: the finding.name whose state to modify.
:type status: :py:class:`str`
:param status: whether the finding should be muted or unmuted. must be a valid
value of ``MUTED`` or ``UNMUTED`` . defaults to ``MUTED`` .
:type credentials: :py:class:`google_auth:google.oauth2.credentials.Credentials`
:param credentials: the credentials object to use when making the API call, if
not to use the account running the function for authentication.
:type client: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.SecurityCenterClient`
:param client: (optional) the SCC client to use for API calls. will generate one
if not passed.
:raises KeyError: if the argument supplied for ``status`` is not ``MUTED``
or ``UNMUTED`` .
""" # noqa: E501
if not isinstance(client, securitycenter.SecurityCenterClient):
client = securitycenter.SecurityCenterClient(credentials=credentials)
if status in ["MUTED", "UNMUTED"]:
mute_enum = Finding.Mute[status]
else:
raise KeyError(
f"Supplied status ({status}) not recognized. Must be "
"one of ['MUTED','UNMUTED']"
)
client.set_mute(request={"name": finding_name, "mute": mute_enum})
return
def _get_all_findings_iter(request, credentials=None, client=None):
"""A helper method to make a list_findings API call. Expects a valid ``request``
dictionary and can optionally be supplied with a credentials object.
Returns: :py:class:`gcp_scc:google.cloud.securitycenter_v1.services.security_center.pagers.ListFindingsPager`
""" # noqa: E501
if not isinstance(client, securitycenter.SecurityCenterClient):
client = securitycenter.SecurityCenterClient(credentials=credentials)
return client.list_findings(request)
def _get(obj, attr, raise_exception):
"""A helper function to get attributes. Works with objects as well as dictionaries.
Will attempt in this order: 1) exactly what was passed (obj.my_attr) 2)
underscored (obj.my_attr) 3) camelized (obj.myAttr) 4) key (obj[attr])
Returns: whatever the value of the attribute is.
Raises: KeyError if the key could not be found in the object.
"""
try:
return getattr(obj, attr)
except AttributeError:
pass
try:
return getattr(obj, underscore(attr))
except AttributeError:
pass
try:
return getattr(obj, camelize(attr, False))
except AttributeError:
pass
try:
return obj.get(attr)
except (KeyError, AttributeError):
if raise_exception:
raise KeyError(
f"Could not find attribute value [{attr}] in object of type: "
f"{type(obj).__name__}"
)
else:
_LOGGER.warning(
f"Could not find attribute value [{attr}] in object of type: "
f"{type(obj).__name__}; returning None."
)
return None