#!/usr/bin/env python3 """ Fetch AWS VPC-related metadata and emit a Cytoscape-friendly `graph.json` file. Adds support for multiple AWS accounts by letting you specify one or more AWS CLI profiles. Each account is tagged in the output so the front-end can group VPCs per account. """ import argparse import json from collections import defaultdict import boto3 from botocore.exceptions import ClientError RESOURCE_KEYS = ['vpcs', 'subnets', 'sgs', 'enis', 'ec2', 'lbs', 'rds', 'exposures'] ACCOUNT_COLORS = ['#7c3aed', '#0ea5e9', '#f97316', '#10b981', '#facc15', '#f472b6'] def name_tag(tags): return next((t['Value'] for t in (tags or []) if t['Key'] == 'Name'), None) def empty_region(): return {k: [] for k in RESOURCE_KEYS} def resolve_account(session, profile_hint=None): sts = session.client('sts') identity = sts.get_caller_identity() account_id = identity['Account'] alias = None try: iam = session.client('iam') aliases = iam.list_account_aliases().get('AccountAliases', []) alias = aliases[0] if aliases else None except ClientError: alias = None label = alias or profile_hint or account_id return {'id': account_id, 'label': label, 'alias': alias} def collect(session, account): region = session.region_name out = empty_region() ec2 = session.client('ec2') elb = session.client('elbv2') rds = session.client('rds') vpcs = ec2.describe_vpcs()['Vpcs'] subnets = ec2.describe_subnets()['Subnets'] sgs = ec2.describe_security_groups()['SecurityGroups'] enis = ec2.describe_network_interfaces()['NetworkInterfaces'] reservations = ec2.describe_instances()['Reservations'] for v in vpcs: out['vpcs'].append({ 'id': v['VpcId'], 'cidr': v['CidrBlock'], 'name': name_tag(v.get('Tags')) or v['VpcId'], 'accountId': account['id'] }) for sn in subnets: out['subnets'].append({ 'id': sn['SubnetId'], 'vpc': sn['VpcId'], 'cidr': sn['CidrBlock'], 'az': sn['AvailabilityZone'], 'public': sn.get('MapPublicIpOnLaunch', False), 'accountId': account['id'] }) def flatten_rules(perms): rules = [] for perm in perms: proto = perm.get('IpProtocol') frm = perm.get('FromPort') to = perm.get('ToPort') if frm is None: port = 'all' elif frm == to: port = f"{frm}" else: port = f"{frm}-{to}" for ipr in perm.get('IpRanges', []) + perm.get('Ipv6Ranges', []): cidr = ipr.get('CidrIp') or ipr.get('CidrIpv6') rules.append(f"{proto} {port} from {cidr}") for sgr in perm.get('UserIdGroupPairs', []): rules.append(f"{proto} {port} from {sgr['GroupId']}") return rules for g in sgs: out['sgs'].append({ 'id': g['GroupId'], 'name': g.get('GroupName'), 'vpc': g.get('VpcId'), 'accountId': account['id'], 'rules_in': flatten_rules(g.get('IpPermissions', [])), 'rules_out': flatten_rules(g.get('IpPermissionsEgress', [])) }) for ni in enis: assoc = ni.get('Association', {}) out['enis'].append({ 'id': ni['NetworkInterfaceId'], 'subnet': ni['SubnetId'], 'vpc': ni.get('VpcId'), 'sgs': [sg['GroupId'] for sg in ni.get('Groups', [])], 'privateIp': ni.get('PrivateIpAddress'), 'publicIp': assoc.get('PublicIp'), 'accountId': account['id'] }) for reservation in reservations: for inst in reservation['Instances']: inst_name = name_tag(inst.get('Tags')) or inst['InstanceId'] out['ec2'].append({ 'id': inst['InstanceId'], 'name': inst_name, 'type': inst['InstanceType'], 'privateIp': inst.get('PrivateIpAddress'), 'publicIp': inst.get('PublicIpAddress'), 'state': inst.get('State', {}).get('Name', 'unknown'), 'sgs': [g['GroupId'] for g in inst.get('SecurityGroups', [])], 'subnet': inst.get('SubnetId'), 'vpc': inst.get('VpcId'), 'accountId': account['id'] }) try: lbs = elb.describe_load_balancers()['LoadBalancers'] except (ClientError, elb.exceptions.UnsupportedFeatureException): lbs = [] for lb in lbs: lb_id = lb['LoadBalancerName'] try: listeners = elb.describe_listeners(LoadBalancerArn=lb['LoadBalancerArn'])['Listeners'] except ClientError: listeners = [] listener_meta = [{'proto': L['Protocol'], 'port': L['Port']} for L in listeners] try: tgs = elb.describe_target_groups(LoadBalancerArn=lb['LoadBalancerArn'])['TargetGroups'] except ClientError: tgs = [] target_groups = [] all_targets = [] for tg in tgs: try: health = elb.describe_target_health(TargetGroupArn=tg['TargetGroupArn'])['TargetHealthDescriptions'] except ClientError: health = [] targets = [desc['Target']['Id'] for desc in health] target_groups.append({'port': tg.get('Port'), 'targets': targets}) all_targets.extend(targets) out['lbs'].append({ 'id': lb_id, 'scheme': lb['Scheme'], 'type': lb['Type'], 'dns': lb.get('DNSName'), 'subnets': [az['SubnetId'] for az in lb.get('AvailabilityZones', [])], 'securityGroups': lb.get('SecurityGroups', []), 'listeners': listener_meta, 'targetGroups': target_groups, 'accountId': account['id'] }) if lb['Scheme'] == 'internet-facing': for listener in listener_meta: out['exposures'].append({ 'surface': f"{lb_id}:{listener['port']}", 'world_open': True, 'via': lb['Type'].upper(), 'to': [ f"{t}:{next((tg['port'] for tg in target_groups if t in tg['targets']), None)}" for t in all_targets ], 'accountId': account['id'], 'region': region }) for db in rds.describe_db_instances()['DBInstances']: out['rds'].append({ 'id': db['DBInstanceIdentifier'], 'engine': db['Engine'], 'port': db.get('DbInstancePort') or db.get('Endpoint', {}).get('Port'), 'publiclyAccessible': db.get('PubliclyAccessible', False), 'sgs': [sg['VpcSecurityGroupId'] for sg in db.get('VpcSecurityGroups', [])], 'subnetGroup': [s['SubnetIdentifier'] for s in db.get('DBSubnetGroup', {}).get('Subnets', [])], 'accountId': account['id'] }) return out def parse_args(): parser = argparse.ArgumentParser(description='Gather AWS VPC graph data into graph.json') parser.add_argument('regions', nargs='*', help='AWS regions to scan (defaults to your configured region)') parser.add_argument('-r', '--region', dest='regions_opt', action='append', help='Additional region to scan (repeatable)') parser.add_argument('-p', '--profile', dest='profiles', action='append', help='AWS CLI profile to use (repeatable)') parser.add_argument('--account-label', dest='account_labels', action='append', help='Friendly label per profile (matches order of --profile)') parser.add_argument('-o', '--output', default='graph.json', help='Output file (default: graph.json)') return parser def main(): parser = parse_args() args = parser.parse_args() regions = [] if args.regions: regions.extend(args.regions) if args.regions_opt: regions.extend(args.regions_opt) if not regions: default_region = boto3.Session().region_name if not default_region: parser.error('No regions specified and no default region configured.') regions = [default_region] regions = sorted(set(regions)) profiles = args.profiles or [None] account_label_overrides = args.account_labels or [] graph_accounts = [] account_index = {} region_data = defaultdict(empty_region) for idx, profile in enumerate(profiles): profile_label_hint = account_label_overrides[idx] if idx < len(account_label_overrides) else profile for region in regions: session = boto3.Session(profile_name=profile, region_name=region) account = resolve_account(session, profile_label_hint) account_id = account['id'] if account_id not in account_index: color = ACCOUNT_COLORS[len(account_index) % len(ACCOUNT_COLORS)] entry = { 'id': account_id, 'label': account['label'], 'alias': account['alias'], 'profile': profile, 'color': color } graph_accounts.append(entry) account_index[account_id] = entry else: entry = account_index[account_id] data = collect(session, entry) bucket = region_data[region] for key in RESOURCE_KEYS: bucket[key].extend(data[key]) graph = { 'accounts': graph_accounts, 'regions': {region: bucket for region, bucket in region_data.items()} } with open(args.output, 'w') as f: json.dump(graph, f, indent=2) region_list = ', '.join(regions) profile_list = ', '.join(p or 'default' for p in profiles) print(f"Wrote {args.output} for regions: {region_list} (profiles: {profile_list})") if __name__ == '__main__': main()