aws-vis/gather.py

278 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Fetch AWS VPC-related metadata and emit a Cytoscape-friendly `graph.json` file.
Adds support for multiple AWS accounts by letting you specify one or more AWS CLI
profiles. Each account is tagged in the output so the front-end can group VPCs
per account.
"""
import argparse
import json
from collections import defaultdict
import boto3
from botocore.exceptions import ClientError
RESOURCE_KEYS = ['vpcs', 'subnets', 'sgs', 'enis', 'ec2', 'lbs', 'rds', 'exposures']
ACCOUNT_COLORS = ['#7c3aed', '#0ea5e9', '#f97316', '#10b981', '#facc15', '#f472b6']
def name_tag(tags):
return next((t['Value'] for t in (tags or []) if t['Key'] == 'Name'), None)
def empty_region():
return {k: [] for k in RESOURCE_KEYS}
def resolve_account(session, profile_hint=None):
sts = session.client('sts')
identity = sts.get_caller_identity()
account_id = identity['Account']
alias = None
try:
iam = session.client('iam')
aliases = iam.list_account_aliases().get('AccountAliases', [])
alias = aliases[0] if aliases else None
except ClientError:
alias = None
label = alias or profile_hint or account_id
return {'id': account_id, 'label': label, 'alias': alias}
def collect(session, account):
region = session.region_name
out = empty_region()
ec2 = session.client('ec2')
elb = session.client('elbv2')
rds = session.client('rds')
vpcs = ec2.describe_vpcs()['Vpcs']
subnets = ec2.describe_subnets()['Subnets']
sgs = ec2.describe_security_groups()['SecurityGroups']
enis = ec2.describe_network_interfaces()['NetworkInterfaces']
reservations = ec2.describe_instances()['Reservations']
for v in vpcs:
out['vpcs'].append({
'id': v['VpcId'],
'cidr': v['CidrBlock'],
'name': name_tag(v.get('Tags')) or v['VpcId'],
'accountId': account['id']
})
for sn in subnets:
out['subnets'].append({
'id': sn['SubnetId'],
'vpc': sn['VpcId'],
'cidr': sn['CidrBlock'],
'az': sn['AvailabilityZone'],
'public': sn.get('MapPublicIpOnLaunch', False),
'accountId': account['id']
})
def flatten_rules(perms):
rules = []
for perm in perms:
proto = perm.get('IpProtocol')
frm = perm.get('FromPort')
to = perm.get('ToPort')
if frm is None:
port = 'all'
elif frm == to:
port = f"{frm}"
else:
port = f"{frm}-{to}"
for ipr in perm.get('IpRanges', []) + perm.get('Ipv6Ranges', []):
cidr = ipr.get('CidrIp') or ipr.get('CidrIpv6')
rules.append(f"{proto} {port} from {cidr}")
for sgr in perm.get('UserIdGroupPairs', []):
rules.append(f"{proto} {port} from {sgr['GroupId']}")
return rules
for g in sgs:
out['sgs'].append({
'id': g['GroupId'],
'name': g.get('GroupName'),
'vpc': g.get('VpcId'),
'accountId': account['id'],
'rules_in': flatten_rules(g.get('IpPermissions', [])),
'rules_out': flatten_rules(g.get('IpPermissionsEgress', []))
})
for ni in enis:
assoc = ni.get('Association', {})
out['enis'].append({
'id': ni['NetworkInterfaceId'],
'subnet': ni['SubnetId'],
'vpc': ni.get('VpcId'),
'sgs': [sg['GroupId'] for sg in ni.get('Groups', [])],
'privateIp': ni.get('PrivateIpAddress'),
'publicIp': assoc.get('PublicIp'),
'accountId': account['id']
})
for reservation in reservations:
for inst in reservation['Instances']:
inst_name = name_tag(inst.get('Tags')) or inst['InstanceId']
out['ec2'].append({
'id': inst['InstanceId'],
'name': inst_name,
'type': inst['InstanceType'],
'privateIp': inst.get('PrivateIpAddress'),
'publicIp': inst.get('PublicIpAddress'),
'state': inst.get('State', {}).get('Name', 'unknown'),
'sgs': [g['GroupId'] for g in inst.get('SecurityGroups', [])],
'subnet': inst.get('SubnetId'),
'vpc': inst.get('VpcId'),
'accountId': account['id']
})
try:
lbs = elb.describe_load_balancers()['LoadBalancers']
except (ClientError, elb.exceptions.UnsupportedFeatureException):
lbs = []
for lb in lbs:
lb_id = lb['LoadBalancerName']
try:
listeners = elb.describe_listeners(LoadBalancerArn=lb['LoadBalancerArn'])['Listeners']
except ClientError:
listeners = []
listener_meta = [{'proto': L['Protocol'], 'port': L['Port']} for L in listeners]
try:
tgs = elb.describe_target_groups(LoadBalancerArn=lb['LoadBalancerArn'])['TargetGroups']
except ClientError:
tgs = []
target_groups = []
all_targets = []
for tg in tgs:
try:
health = elb.describe_target_health(TargetGroupArn=tg['TargetGroupArn'])['TargetHealthDescriptions']
except ClientError:
health = []
targets = [desc['Target']['Id'] for desc in health]
target_groups.append({'port': tg.get('Port'), 'targets': targets})
all_targets.extend(targets)
out['lbs'].append({
'id': lb_id,
'scheme': lb['Scheme'],
'type': lb['Type'],
'dns': lb.get('DNSName'),
'subnets': [az['SubnetId'] for az in lb.get('AvailabilityZones', [])],
'securityGroups': lb.get('SecurityGroups', []),
'listeners': listener_meta,
'targetGroups': target_groups,
'accountId': account['id']
})
if lb['Scheme'] == 'internet-facing':
for listener in listener_meta:
out['exposures'].append({
'surface': f"{lb_id}:{listener['port']}",
'world_open': True,
'via': lb['Type'].upper(),
'to': [
f"{t}:{next((tg['port'] for tg in target_groups if t in tg['targets']), None)}"
for t in all_targets
],
'accountId': account['id'],
'region': region
})
for db in rds.describe_db_instances()['DBInstances']:
out['rds'].append({
'id': db['DBInstanceIdentifier'],
'engine': db['Engine'],
'port': db.get('DbInstancePort') or db.get('Endpoint', {}).get('Port'),
'publiclyAccessible': db.get('PubliclyAccessible', False),
'sgs': [sg['VpcSecurityGroupId'] for sg in db.get('VpcSecurityGroups', [])],
'subnetGroup': [s['SubnetIdentifier'] for s in db.get('DBSubnetGroup', {}).get('Subnets', [])],
'accountId': account['id']
})
return out
def parse_args():
parser = argparse.ArgumentParser(description='Gather AWS VPC graph data into graph.json')
parser.add_argument('regions', nargs='*', help='AWS regions to scan (defaults to your configured region)')
parser.add_argument('-r', '--region', dest='regions_opt', action='append', help='Additional region to scan (repeatable)')
parser.add_argument('-p', '--profile', dest='profiles', action='append', help='AWS CLI profile to use (repeatable)')
parser.add_argument('--account-label', dest='account_labels', action='append', help='Friendly label per profile (matches order of --profile)')
parser.add_argument('-o', '--output', default='graph.json', help='Output file (default: graph.json)')
return parser
def main():
parser = parse_args()
args = parser.parse_args()
regions = []
if args.regions:
regions.extend(args.regions)
if args.regions_opt:
regions.extend(args.regions_opt)
if not regions:
default_region = boto3.Session().region_name
if not default_region:
parser.error('No regions specified and no default region configured.')
regions = [default_region]
regions = sorted(set(regions))
profiles = args.profiles or [None]
account_label_overrides = args.account_labels or []
graph_accounts = []
account_index = {}
region_data = defaultdict(empty_region)
for idx, profile in enumerate(profiles):
profile_label_hint = account_label_overrides[idx] if idx < len(account_label_overrides) else profile
for region in regions:
session = boto3.Session(profile_name=profile, region_name=region)
account = resolve_account(session, profile_label_hint)
account_id = account['id']
if account_id not in account_index:
color = ACCOUNT_COLORS[len(account_index) % len(ACCOUNT_COLORS)]
entry = {
'id': account_id,
'label': account['label'],
'alias': account['alias'],
'profile': profile,
'color': color
}
graph_accounts.append(entry)
account_index[account_id] = entry
else:
entry = account_index[account_id]
data = collect(session, entry)
bucket = region_data[region]
for key in RESOURCE_KEYS:
bucket[key].extend(data[key])
graph = {
'accounts': graph_accounts,
'regions': {region: bucket for region, bucket in region_data.items()}
}
with open(args.output, 'w') as f:
json.dump(graph, f, indent=2)
region_list = ', '.join(regions)
profile_list = ', '.join(p or 'default' for p in profiles)
print(f"Wrote {args.output} for regions: {region_list} (profiles: {profile_list})")
if __name__ == '__main__':
main()