Module onnconformity.conformity
Expand source code
import os
import sys
import requests
import json
from pprint import pformat
API_VERION = 'v1'
CONFORMITY_ACCOUNT_ID = '717210094962'
CC_REGIONS = [
'eu-west-1',
'ap-southeast-2',
'us-west-2',
]
class Conformity:
def __init__(self, logger):
"""Description:
Convenience Python module for Cloud One Conformity
Args:
logger (onnlogger.Loggers): An instance of `onnlogger.Loggers`
Example:
Example usage:
from onnlogger import Loggers
logger = Loggers(logger_name='Conformity', console_logger=True, log_level='INFO', log_file_path='/tmp/log')
conformity = Conformity(logger)
"""
self.logger = logger
try:
self.logger.entry('info', 'Obtaining required environment variables...')
self.cc_region = os.environ['CC_REGION'].lower()
if self.cc_region not in CC_REGIONS:
sys.exit('Error: Please ensure "CC_REGION" is set to a region which is supported by Cloud Conformity')
self.api_key = os.environ['CC_API_KEY']
except KeyError:
sys.exit('Error: Please ensure all environment variables are set')
self.headers = {
'Content-Type': 'application/vnd.api+json',
'Authorization': 'ApiKey ' + self.api_key
}
self.base_url = f'https://{self.cc_region}-api.cloudconformity.com/{API_VERION}'
self.org_id = self._get_org_id()
def delete_subscription(self, conformity_id) -> dict:
"""Description:
Delete a Conformity subscription
Args:
conformity_id (str): Subscription ID
Example:
Example usage:
deleted_subscription = conformity.delete_subscription('57KARHFRW')
print(deleted_subscription)
{'meta': {'status': 'sent'}}
Returns:
dict
"""
self.logger.entry('info', f'Deleting subscription ID {conformity_id}...')
account_endpoint = f'{self.base_url}/accounts/{conformity_id}'
resp = requests.delete(account_endpoint, headers=self.headers)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
self.logger.entry('debug', 'Done')
return resp_json
def list_subscriptions(self) -> dict:
"""Description:
List Conformity subscriptions
Args:
conformity_id (str): Subscription ID
Example:
Example usage:
from pprint import pprint
subscriptions = conformity.list_subscriptions()
pprint(subscriptions)
{'data': [{'attributes': {'access': None,
'awsaccount-id': '123456789012',
'cloud-type': 'aws',
'cost-package': True,
'created-date': 1585526686560,
'environment': None,
'has-real-time-monitoring': True,
'last-checked-date': 1585711290751,
'last-monitoring-event-date': None,
'last-notified-date': 1585711291075,
'name': 'Demo',
'security-package': True,
'tags': None},
'id': 'dg4Sfsw23',
'relationships': {'organisation': {'data': {'id': 'dsaer28hw',
'type': 'organisations'}}},
'type': 'accounts'}]}
Returns:
dict
"""
self.logger.entry('debug', 'Getting list of Conformity subscriptions...')
account_endpoint = f'{self.base_url}/accounts/'
resp = requests.get(account_endpoint, headers=self.headers)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
self.logger.entry('debug', f'Found subscriptions:\n{pformat(resp_json)}')
return resp_json
@staticmethod
def _check_error(json_payload) -> None:
error_message = json_payload.get('Message') or json_payload.get('errors')
if error_message:
sys.exit(f'Error: {error_message}')
def _get_org_id(self) -> str:
"""Description:
Gets the Conformity ID
Example:
Example usage:
org_id = conformity._get_org_id()
print(org_id)
g34242f5y-2345-345g-234d-gi473bdlfng7
Returns:
str
"""
self.logger.entry('info', 'Getting Conformity Organization ID...')
org_endpoint = f'{self.base_url}/organisation/external-id/'
resp = requests.get(org_endpoint, headers=self.headers)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
org_id = resp_json['data']['id']
self.logger.entry('debug', {org_id})
return org_id
def check_subscription_exists(self, aws_account_id) -> dict:
"""Description:
Checks if a Conformity subscription exists for an AWS account
Note - `list_subscriptions()` is called every time as CC does not prevent users from creating multiple
subscriptions for the same account/ARN
Args:
aws_account_id (str): AWS account ID
Example:
Example usage:
from pprint import pprint
existing_account = conformity.check_subscription_exists('098765432109')
pprint(existing_account)
{'attributes': {'access': None,
'awsaccount-id': '098765432109',
'cloud-type': 'aws',
'cost-package': False,
'created-date': 1585780981170,
'environment': '',
'has-real-time-monitoring': False,
'name': 'Lab1',
'security-package': True,
'settings': {'rules': []},
'status': 'ACTIVE',
'tags': None},
'id': '462FSFGGR',
'relationships': {'organisation': {'data': {'id': 'sdes23216',
'type': 'organisations'}}},
'type': 'accounts'}}
Returns:
dict
"""
self.logger.entry('info', f'Checking if AWS account ID {aws_account_id} already has a Conformity '
f'subscription...')
list_of_subscriptions = self.list_subscriptions()
for entry in list_of_subscriptions['data']:
aws_id = entry['attributes'].get('awsaccount-id')
if not aws_id:
continue
if aws_id == aws_account_id:
self.logger.entry('info', f'It does exist')
self.logger.entry('debug', {pformat(entry)})
return entry
self.logger.entry('info', f'AWS account ID {aws_account_id} does not have a Conformity subscription')
return dict()
def list_users(self):
"""Description:
Lists Conformity users
Example:
Example usage:
users = conformity.list_users()
pprint(users)
{'data': [{'attributes': {'created-date': 1586387549366,
'email': 'demo@example.com',
'first-name': 'Jane',
'has-credentials': True,
'last-login-date': 1586400827844,
'last-name': 'Doe',
'role': 'USER',
'status': 'ACTIVE'},
'id': '6s5fg23sf',
'relationships': {'organisation': {'data': {'id': 'd3g63dgs1',
'type': 'organisations'}}},
'type': 'users'}]}
Returns:
dict
"""
self.logger.entry('debug', 'Getting list of Conformity users...')
users_endpoint = f'{self.base_url}/users/'
resp = requests.get(users_endpoint, headers=self.headers)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
return resp_json
def update_user(self, user_id, account_id, user_role='USER', access_level='READONLY') -> dict:
"""Description:
Lists Conformity users
Args:
user_id: Conformity user ID
account_id: Conformity account ID
user_role: User role
access_level: User access level
Example:
Example usage:
Returns:
"""
self.logger.entry('info', f'Updating Conformity account ID {account_id}...')
users_endpoint = f'{self.base_url}/users/{user_id}'
user_role = user_role.upper()
data = {
'data': {
'role': user_role
}
}
if user_role == 'USER':
access_level = access_level.upper()
access_list = {
'accessList': [
{
'account': account_id,
'level': access_level,
}
]
}
data['data'].update(access_list)
data = json.dumps(data)
self.logger.entry('debug', f'Sending payload:\n{pformat(data)}')
resp = requests.patch(users_endpoint, headers=self.headers, data=data)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
self.logger.entry('debug', f'Done:\n{pformat(resp_json)}')
return resp_json
def create_subscription(self, aws_account_id, role_arn, account_name, account_env='', cost_package=False,
rtm=False) -> dict:
"""Description:
Create a Conformity subscription for an AWS account
Note - `list_subscriptions()` is called every time as CC does not prevent users from creating multiple
subscriptions for the same account/ARN
Args:
aws_account_id (str): AWS account ID
role_arn (str): Role ARN for Conformity's access
account_name (str): Conformity account name
account_env (str): Conformity account environment name
cost_package (bool): Enable cost package
rtm (bool): Enable RTM package
Example:
Example usage:
new_subscription = conformity.create_subscription('098765432109', 'arn:aws:iam::098765432109:role/CloudConformity', 'Lab1')
pprint(new_subscription)
{'data': {'attributes': {'access': None,
'awsaccount-id': '098765432109',
'cloud-type': 'aws',
'cost-package': False,
'created-date': 1585780981170,
'environment': '',
'has-real-time-monitoring': False,
'name': 'Lab1',
'security-package': True,
'settings': {'rules': []},
'status': 'ACTIVE',
'tags': None},
'id': '462FSFGGR',
'relationships': {'organisation': {'data': {'id': 'sdefgw3r6',
'type': 'organisations'}}},
'type': 'accounts'}}
Returns:
dict
"""
subscription_exists = self.check_subscription_exists(aws_account_id)
if subscription_exists:
self.logger.entry('debug', f'Skipping subscription creation for for AWS account ID {aws_account_id} as it '
f'is not required')
return subscription_exists
self.logger.entry('info', f'Creating Conformity subscription for AWS account ID {aws_account_id}...')
account_endpoint = f'{self.base_url}/accounts/'
cost_package = cost_package.upper() if cost_package else False
rtm = rtm.upper() if rtm else False
data = {
'data': {
'type': 'accounts',
'attributes': {
'name': account_name,
'environment': account_env,
'access': {
'keys': {
'roleArn': role_arn,
'externalId': self.org_id,
}
},
'costPackage': cost_package,
'hasRealTimeMonitoring': rtm,
}
}
}
data = json.dumps(data)
self.logger.entry('debug', f'Sending payload:\n{pformat(data)}')
resp = requests.post(account_endpoint, headers=self.headers, data=data)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
self.logger.entry('debug', f'Done:\n{pformat(resp_json)}')
return resp_json
def invite_user(self, first_name, last_name, email, account_id, access_level='READONLY') -> dict:
"""Description:
Lists Conformity users
Args:
user_id: Conformity user ID
account_id: Conformity account ID
user_role: User role
access_level: User access level
Example:
Example usage:
Returns:
"""
self.logger.entry('info', f'Updating Conformity account ID {account_id}...')
users_endpoint = f'{self.base_url}/users/{user_id}'
user_role = user_role.upper()
data = {
'data': {
'role': user_role
}
}
if user_role == 'USER':
access_level = access_level.upper()
access_list = {
'accessList': [
{
'account': account_id,
'level': access_level,
}
]
}
data['data'].update(access_list)
data = json.dumps(data)
self.logger.entry('debug', f'Sending payload:\n{pformat(data)}')
resp = requests.patch(users_endpoint, headers=self.headers, data=data)
resp_json = json.loads(resp.text)
self._check_error(resp_json)
self.logger.entry('debug', f'Done:\n{pformat(resp_json)}')
return resp_json
Classes
class Conformity (logger)
-
Description
Convenience Python module for Cloud One Conformity
Args
logger
:onnlogger.Loggers
- An instance of
onnlogger.Loggers
Example
Example usage:
from onnlogger import Loggers logger = Loggers(logger_name='Conformity', console_logger=True, log_level='INFO', log_file_path='/tmp/log') conformity = Conformity(logger)
Expand source code
class Conformity: def __init__(self, logger): """Description: Convenience Python module for Cloud One Conformity Args: logger (onnlogger.Loggers): An instance of `onnlogger.Loggers` Example: Example usage: from onnlogger import Loggers logger = Loggers(logger_name='Conformity', console_logger=True, log_level='INFO', log_file_path='/tmp/log') conformity = Conformity(logger) """ self.logger = logger try: self.logger.entry('info', 'Obtaining required environment variables...') self.cc_region = os.environ['CC_REGION'].lower() if self.cc_region not in CC_REGIONS: sys.exit('Error: Please ensure "CC_REGION" is set to a region which is supported by Cloud Conformity') self.api_key = os.environ['CC_API_KEY'] except KeyError: sys.exit('Error: Please ensure all environment variables are set') self.headers = { 'Content-Type': 'application/vnd.api+json', 'Authorization': 'ApiKey ' + self.api_key } self.base_url = f'https://{self.cc_region}-api.cloudconformity.com/{API_VERION}' self.org_id = self._get_org_id() def delete_subscription(self, conformity_id) -> dict: """Description: Delete a Conformity subscription Args: conformity_id (str): Subscription ID Example: Example usage: deleted_subscription = conformity.delete_subscription('57KARHFRW') print(deleted_subscription) {'meta': {'status': 'sent'}} Returns: dict """ self.logger.entry('info', f'Deleting subscription ID {conformity_id}...') account_endpoint = f'{self.base_url}/accounts/{conformity_id}' resp = requests.delete(account_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', 'Done') return resp_json def list_subscriptions(self) -> dict: """Description: List Conformity subscriptions Args: conformity_id (str): Subscription ID Example: Example usage: from pprint import pprint subscriptions = conformity.list_subscriptions() pprint(subscriptions) {'data': [{'attributes': {'access': None, 'awsaccount-id': '123456789012', 'cloud-type': 'aws', 'cost-package': True, 'created-date': 1585526686560, 'environment': None, 'has-real-time-monitoring': True, 'last-checked-date': 1585711290751, 'last-monitoring-event-date': None, 'last-notified-date': 1585711291075, 'name': 'Demo', 'security-package': True, 'tags': None}, 'id': 'dg4Sfsw23', 'relationships': {'organisation': {'data': {'id': 'dsaer28hw', 'type': 'organisations'}}}, 'type': 'accounts'}]} Returns: dict """ self.logger.entry('debug', 'Getting list of Conformity subscriptions...') account_endpoint = f'{self.base_url}/accounts/' resp = requests.get(account_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Found subscriptions:\n{pformat(resp_json)}') return resp_json @staticmethod def _check_error(json_payload) -> None: error_message = json_payload.get('Message') or json_payload.get('errors') if error_message: sys.exit(f'Error: {error_message}') def _get_org_id(self) -> str: """Description: Gets the Conformity ID Example: Example usage: org_id = conformity._get_org_id() print(org_id) g34242f5y-2345-345g-234d-gi473bdlfng7 Returns: str """ self.logger.entry('info', 'Getting Conformity Organization ID...') org_endpoint = f'{self.base_url}/organisation/external-id/' resp = requests.get(org_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) org_id = resp_json['data']['id'] self.logger.entry('debug', {org_id}) return org_id def check_subscription_exists(self, aws_account_id) -> dict: """Description: Checks if a Conformity subscription exists for an AWS account Note - `list_subscriptions()` is called every time as CC does not prevent users from creating multiple subscriptions for the same account/ARN Args: aws_account_id (str): AWS account ID Example: Example usage: from pprint import pprint existing_account = conformity.check_subscription_exists('098765432109') pprint(existing_account) {'attributes': {'access': None, 'awsaccount-id': '098765432109', 'cloud-type': 'aws', 'cost-package': False, 'created-date': 1585780981170, 'environment': '', 'has-real-time-monitoring': False, 'name': 'Lab1', 'security-package': True, 'settings': {'rules': []}, 'status': 'ACTIVE', 'tags': None}, 'id': '462FSFGGR', 'relationships': {'organisation': {'data': {'id': 'sdes23216', 'type': 'organisations'}}}, 'type': 'accounts'}} Returns: dict """ self.logger.entry('info', f'Checking if AWS account ID {aws_account_id} already has a Conformity ' f'subscription...') list_of_subscriptions = self.list_subscriptions() for entry in list_of_subscriptions['data']: aws_id = entry['attributes'].get('awsaccount-id') if not aws_id: continue if aws_id == aws_account_id: self.logger.entry('info', f'It does exist') self.logger.entry('debug', {pformat(entry)}) return entry self.logger.entry('info', f'AWS account ID {aws_account_id} does not have a Conformity subscription') return dict() def list_users(self): """Description: Lists Conformity users Example: Example usage: users = conformity.list_users() pprint(users) {'data': [{'attributes': {'created-date': 1586387549366, 'email': 'demo@example.com', 'first-name': 'Jane', 'has-credentials': True, 'last-login-date': 1586400827844, 'last-name': 'Doe', 'role': 'USER', 'status': 'ACTIVE'}, 'id': '6s5fg23sf', 'relationships': {'organisation': {'data': {'id': 'd3g63dgs1', 'type': 'organisations'}}}, 'type': 'users'}]} Returns: dict """ self.logger.entry('debug', 'Getting list of Conformity users...') users_endpoint = f'{self.base_url}/users/' resp = requests.get(users_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) return resp_json def update_user(self, user_id, account_id, user_role='USER', access_level='READONLY') -> dict: """Description: Lists Conformity users Args: user_id: Conformity user ID account_id: Conformity account ID user_role: User role access_level: User access level Example: Example usage: Returns: """ self.logger.entry('info', f'Updating Conformity account ID {account_id}...') users_endpoint = f'{self.base_url}/users/{user_id}' user_role = user_role.upper() data = { 'data': { 'role': user_role } } if user_role == 'USER': access_level = access_level.upper() access_list = { 'accessList': [ { 'account': account_id, 'level': access_level, } ] } data['data'].update(access_list) data = json.dumps(data) self.logger.entry('debug', f'Sending payload:\n{pformat(data)}') resp = requests.patch(users_endpoint, headers=self.headers, data=data) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Done:\n{pformat(resp_json)}') return resp_json def create_subscription(self, aws_account_id, role_arn, account_name, account_env='', cost_package=False, rtm=False) -> dict: """Description: Create a Conformity subscription for an AWS account Note - `list_subscriptions()` is called every time as CC does not prevent users from creating multiple subscriptions for the same account/ARN Args: aws_account_id (str): AWS account ID role_arn (str): Role ARN for Conformity's access account_name (str): Conformity account name account_env (str): Conformity account environment name cost_package (bool): Enable cost package rtm (bool): Enable RTM package Example: Example usage: new_subscription = conformity.create_subscription('098765432109', 'arn:aws:iam::098765432109:role/CloudConformity', 'Lab1') pprint(new_subscription) {'data': {'attributes': {'access': None, 'awsaccount-id': '098765432109', 'cloud-type': 'aws', 'cost-package': False, 'created-date': 1585780981170, 'environment': '', 'has-real-time-monitoring': False, 'name': 'Lab1', 'security-package': True, 'settings': {'rules': []}, 'status': 'ACTIVE', 'tags': None}, 'id': '462FSFGGR', 'relationships': {'organisation': {'data': {'id': 'sdefgw3r6', 'type': 'organisations'}}}, 'type': 'accounts'}} Returns: dict """ subscription_exists = self.check_subscription_exists(aws_account_id) if subscription_exists: self.logger.entry('debug', f'Skipping subscription creation for for AWS account ID {aws_account_id} as it ' f'is not required') return subscription_exists self.logger.entry('info', f'Creating Conformity subscription for AWS account ID {aws_account_id}...') account_endpoint = f'{self.base_url}/accounts/' cost_package = cost_package.upper() if cost_package else False rtm = rtm.upper() if rtm else False data = { 'data': { 'type': 'accounts', 'attributes': { 'name': account_name, 'environment': account_env, 'access': { 'keys': { 'roleArn': role_arn, 'externalId': self.org_id, } }, 'costPackage': cost_package, 'hasRealTimeMonitoring': rtm, } } } data = json.dumps(data) self.logger.entry('debug', f'Sending payload:\n{pformat(data)}') resp = requests.post(account_endpoint, headers=self.headers, data=data) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Done:\n{pformat(resp_json)}') return resp_json def invite_user(self, first_name, last_name, email, account_id, access_level='READONLY') -> dict: """Description: Lists Conformity users Args: user_id: Conformity user ID account_id: Conformity account ID user_role: User role access_level: User access level Example: Example usage: Returns: """ self.logger.entry('info', f'Updating Conformity account ID {account_id}...') users_endpoint = f'{self.base_url}/users/{user_id}' user_role = user_role.upper() data = { 'data': { 'role': user_role } } if user_role == 'USER': access_level = access_level.upper() access_list = { 'accessList': [ { 'account': account_id, 'level': access_level, } ] } data['data'].update(access_list) data = json.dumps(data) self.logger.entry('debug', f'Sending payload:\n{pformat(data)}') resp = requests.patch(users_endpoint, headers=self.headers, data=data) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Done:\n{pformat(resp_json)}') return resp_json
Methods
def check_subscription_exists(self, aws_account_id)
-
Description
Checks if a Conformity subscription exists for an AWS account
Note -
list_subscriptions()
is called every time as CC does not prevent users from creating multiple subscriptions for the same account/ARNArgs
aws_account_id
:str
- AWS account ID
Example
Example usage:
from pprint import pprint existing_account = conformity.check_subscription_exists('098765432109') pprint(existing_account) {'attributes': {'access': None, 'awsaccount-id': '098765432109', 'cloud-type': 'aws', 'cost-package': False, 'created-date': 1585780981170, 'environment': '', 'has-real-time-monitoring': False, 'name': 'Lab1', 'security-package': True, 'settings': {'rules': []}, 'status': 'ACTIVE', 'tags': None}, 'id': '462FSFGGR', 'relationships': {'organisation': {'data': {'id': 'sdes23216', 'type': 'organisations'}}}, 'type': 'accounts'}}
Returns
dict
Expand source code
def check_subscription_exists(self, aws_account_id) -> dict: """Description: Checks if a Conformity subscription exists for an AWS account Note - `list_subscriptions()` is called every time as CC does not prevent users from creating multiple subscriptions for the same account/ARN Args: aws_account_id (str): AWS account ID Example: Example usage: from pprint import pprint existing_account = conformity.check_subscription_exists('098765432109') pprint(existing_account) {'attributes': {'access': None, 'awsaccount-id': '098765432109', 'cloud-type': 'aws', 'cost-package': False, 'created-date': 1585780981170, 'environment': '', 'has-real-time-monitoring': False, 'name': 'Lab1', 'security-package': True, 'settings': {'rules': []}, 'status': 'ACTIVE', 'tags': None}, 'id': '462FSFGGR', 'relationships': {'organisation': {'data': {'id': 'sdes23216', 'type': 'organisations'}}}, 'type': 'accounts'}} Returns: dict """ self.logger.entry('info', f'Checking if AWS account ID {aws_account_id} already has a Conformity ' f'subscription...') list_of_subscriptions = self.list_subscriptions() for entry in list_of_subscriptions['data']: aws_id = entry['attributes'].get('awsaccount-id') if not aws_id: continue if aws_id == aws_account_id: self.logger.entry('info', f'It does exist') self.logger.entry('debug', {pformat(entry)}) return entry self.logger.entry('info', f'AWS account ID {aws_account_id} does not have a Conformity subscription') return dict()
def create_subscription(self, aws_account_id, role_arn, account_name, account_env='', cost_package=False, rtm=False)
-
Description
Create a Conformity subscription for an AWS account
Note -
list_subscriptions()
is called every time as CC does not prevent users from creating multiple subscriptions for the same account/ARNArgs
aws_account_id
:str
- AWS account ID
role_arn
:str
- Role ARN for Conformity's access
account_name
:str
- Conformity account name
account_env
:str
- Conformity account environment name
cost_package
:bool
- Enable cost package
rtm
:bool
- Enable RTM package
Example
Example usage:
new_subscription = conformity.create_subscription('098765432109', 'arn:aws:iam::098765432109:role/CloudConformity', 'Lab1') pprint(new_subscription) {'data': {'attributes': {'access': None, 'awsaccount-id': '098765432109', 'cloud-type': 'aws', 'cost-package': False, 'created-date': 1585780981170, 'environment': '', 'has-real-time-monitoring': False, 'name': 'Lab1', 'security-package': True, 'settings': {'rules': []}, 'status': 'ACTIVE', 'tags': None}, 'id': '462FSFGGR', 'relationships': {'organisation': {'data': {'id': 'sdefgw3r6', 'type': 'organisations'}}}, 'type': 'accounts'}}
Returns
dict
Expand source code
def create_subscription(self, aws_account_id, role_arn, account_name, account_env='', cost_package=False, rtm=False) -> dict: """Description: Create a Conformity subscription for an AWS account Note - `list_subscriptions()` is called every time as CC does not prevent users from creating multiple subscriptions for the same account/ARN Args: aws_account_id (str): AWS account ID role_arn (str): Role ARN for Conformity's access account_name (str): Conformity account name account_env (str): Conformity account environment name cost_package (bool): Enable cost package rtm (bool): Enable RTM package Example: Example usage: new_subscription = conformity.create_subscription('098765432109', 'arn:aws:iam::098765432109:role/CloudConformity', 'Lab1') pprint(new_subscription) {'data': {'attributes': {'access': None, 'awsaccount-id': '098765432109', 'cloud-type': 'aws', 'cost-package': False, 'created-date': 1585780981170, 'environment': '', 'has-real-time-monitoring': False, 'name': 'Lab1', 'security-package': True, 'settings': {'rules': []}, 'status': 'ACTIVE', 'tags': None}, 'id': '462FSFGGR', 'relationships': {'organisation': {'data': {'id': 'sdefgw3r6', 'type': 'organisations'}}}, 'type': 'accounts'}} Returns: dict """ subscription_exists = self.check_subscription_exists(aws_account_id) if subscription_exists: self.logger.entry('debug', f'Skipping subscription creation for for AWS account ID {aws_account_id} as it ' f'is not required') return subscription_exists self.logger.entry('info', f'Creating Conformity subscription for AWS account ID {aws_account_id}...') account_endpoint = f'{self.base_url}/accounts/' cost_package = cost_package.upper() if cost_package else False rtm = rtm.upper() if rtm else False data = { 'data': { 'type': 'accounts', 'attributes': { 'name': account_name, 'environment': account_env, 'access': { 'keys': { 'roleArn': role_arn, 'externalId': self.org_id, } }, 'costPackage': cost_package, 'hasRealTimeMonitoring': rtm, } } } data = json.dumps(data) self.logger.entry('debug', f'Sending payload:\n{pformat(data)}') resp = requests.post(account_endpoint, headers=self.headers, data=data) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Done:\n{pformat(resp_json)}') return resp_json
def delete_subscription(self, conformity_id)
-
Description
Delete a Conformity subscription
Args
conformity_id
:str
- Subscription ID
Example
Example usage:
deleted_subscription = conformity.delete_subscription('57KARHFRW') print(deleted_subscription) {'meta': {'status': 'sent'}}
Returns
dict
Expand source code
def delete_subscription(self, conformity_id) -> dict: """Description: Delete a Conformity subscription Args: conformity_id (str): Subscription ID Example: Example usage: deleted_subscription = conformity.delete_subscription('57KARHFRW') print(deleted_subscription) {'meta': {'status': 'sent'}} Returns: dict """ self.logger.entry('info', f'Deleting subscription ID {conformity_id}...') account_endpoint = f'{self.base_url}/accounts/{conformity_id}' resp = requests.delete(account_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', 'Done') return resp_json
def invite_user(self, first_name, last_name, email, account_id, access_level='READONLY')
-
Description
Lists Conformity users
Args
user_id
- Conformity user ID
account_id
- Conformity account ID
user_role
- User role
access_level
- User access level
Example
Example usage: Returns:
Expand source code
def invite_user(self, first_name, last_name, email, account_id, access_level='READONLY') -> dict: """Description: Lists Conformity users Args: user_id: Conformity user ID account_id: Conformity account ID user_role: User role access_level: User access level Example: Example usage: Returns: """ self.logger.entry('info', f'Updating Conformity account ID {account_id}...') users_endpoint = f'{self.base_url}/users/{user_id}' user_role = user_role.upper() data = { 'data': { 'role': user_role } } if user_role == 'USER': access_level = access_level.upper() access_list = { 'accessList': [ { 'account': account_id, 'level': access_level, } ] } data['data'].update(access_list) data = json.dumps(data) self.logger.entry('debug', f'Sending payload:\n{pformat(data)}') resp = requests.patch(users_endpoint, headers=self.headers, data=data) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Done:\n{pformat(resp_json)}') return resp_json
def list_subscriptions(self)
-
Description
List Conformity subscriptions
Args
conformity_id
:str
- Subscription ID
Example
Example usage:
from pprint import pprint subscriptions = conformity.list_subscriptions() pprint(subscriptions) {'data': [{'attributes': {'access': None, 'awsaccount-id': '123456789012', 'cloud-type': 'aws', 'cost-package': True, 'created-date': 1585526686560, 'environment': None, 'has-real-time-monitoring': True, 'last-checked-date': 1585711290751, 'last-monitoring-event-date': None, 'last-notified-date': 1585711291075, 'name': 'Demo', 'security-package': True, 'tags': None}, 'id': 'dg4Sfsw23', 'relationships': {'organisation': {'data': {'id': 'dsaer28hw', 'type': 'organisations'}}}, 'type': 'accounts'}]}
Returns
dict
Expand source code
def list_subscriptions(self) -> dict: """Description: List Conformity subscriptions Args: conformity_id (str): Subscription ID Example: Example usage: from pprint import pprint subscriptions = conformity.list_subscriptions() pprint(subscriptions) {'data': [{'attributes': {'access': None, 'awsaccount-id': '123456789012', 'cloud-type': 'aws', 'cost-package': True, 'created-date': 1585526686560, 'environment': None, 'has-real-time-monitoring': True, 'last-checked-date': 1585711290751, 'last-monitoring-event-date': None, 'last-notified-date': 1585711291075, 'name': 'Demo', 'security-package': True, 'tags': None}, 'id': 'dg4Sfsw23', 'relationships': {'organisation': {'data': {'id': 'dsaer28hw', 'type': 'organisations'}}}, 'type': 'accounts'}]} Returns: dict """ self.logger.entry('debug', 'Getting list of Conformity subscriptions...') account_endpoint = f'{self.base_url}/accounts/' resp = requests.get(account_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Found subscriptions:\n{pformat(resp_json)}') return resp_json
def list_users(self)
-
Description
Lists Conformity users
Example
Example usage:
users = conformity.list_users() pprint(users) {'data': [{'attributes': {'created-date': 1586387549366, 'email': 'demo@example.com', 'first-name': 'Jane', 'has-credentials': True, 'last-login-date': 1586400827844, 'last-name': 'Doe', 'role': 'USER', 'status': 'ACTIVE'}, 'id': '6s5fg23sf', 'relationships': {'organisation': {'data': {'id': 'd3g63dgs1', 'type': 'organisations'}}}, 'type': 'users'}]}
Returns
dict
Expand source code
def list_users(self): """Description: Lists Conformity users Example: Example usage: users = conformity.list_users() pprint(users) {'data': [{'attributes': {'created-date': 1586387549366, 'email': 'demo@example.com', 'first-name': 'Jane', 'has-credentials': True, 'last-login-date': 1586400827844, 'last-name': 'Doe', 'role': 'USER', 'status': 'ACTIVE'}, 'id': '6s5fg23sf', 'relationships': {'organisation': {'data': {'id': 'd3g63dgs1', 'type': 'organisations'}}}, 'type': 'users'}]} Returns: dict """ self.logger.entry('debug', 'Getting list of Conformity users...') users_endpoint = f'{self.base_url}/users/' resp = requests.get(users_endpoint, headers=self.headers) resp_json = json.loads(resp.text) self._check_error(resp_json) return resp_json
def update_user(self, user_id, account_id, user_role='USER', access_level='READONLY')
-
Description
Lists Conformity users
Args
user_id
- Conformity user ID
account_id
- Conformity account ID
user_role
- User role
access_level
- User access level
Example
Example usage: Returns:
Expand source code
def update_user(self, user_id, account_id, user_role='USER', access_level='READONLY') -> dict: """Description: Lists Conformity users Args: user_id: Conformity user ID account_id: Conformity account ID user_role: User role access_level: User access level Example: Example usage: Returns: """ self.logger.entry('info', f'Updating Conformity account ID {account_id}...') users_endpoint = f'{self.base_url}/users/{user_id}' user_role = user_role.upper() data = { 'data': { 'role': user_role } } if user_role == 'USER': access_level = access_level.upper() access_list = { 'accessList': [ { 'account': account_id, 'level': access_level, } ] } data['data'].update(access_list) data = json.dumps(data) self.logger.entry('debug', f'Sending payload:\n{pformat(data)}') resp = requests.patch(users_endpoint, headers=self.headers, data=data) resp_json = json.loads(resp.text) self._check_error(resp_json) self.logger.entry('debug', f'Done:\n{pformat(resp_json)}') return resp_json