278 lines
9.8 KiB
Python
Executable File
278 lines
9.8 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Fetch AWS VPC-related metadata and emit a Cytoscape-friendly `graph.json` file.
|
|
|
|
Adds support for multiple AWS accounts by letting you specify one or more AWS CLI
|
|
profiles. Each account is tagged in the output so the front-end can group VPCs
|
|
per account.
|
|
"""
|
|
import argparse
|
|
import json
|
|
from collections import defaultdict
|
|
|
|
import boto3
|
|
from botocore.exceptions import ClientError
|
|
|
|
RESOURCE_KEYS = ['vpcs', 'subnets', 'sgs', 'enis', 'ec2', 'lbs', 'rds', 'exposures']
|
|
ACCOUNT_COLORS = ['#7c3aed', '#0ea5e9', '#f97316', '#10b981', '#facc15', '#f472b6']
|
|
|
|
|
|
def name_tag(tags):
|
|
return next((t['Value'] for t in (tags or []) if t['Key'] == 'Name'), None)
|
|
|
|
|
|
def empty_region():
|
|
return {k: [] for k in RESOURCE_KEYS}
|
|
|
|
|
|
def resolve_account(session, profile_hint=None):
|
|
sts = session.client('sts')
|
|
identity = sts.get_caller_identity()
|
|
account_id = identity['Account']
|
|
|
|
alias = None
|
|
try:
|
|
iam = session.client('iam')
|
|
aliases = iam.list_account_aliases().get('AccountAliases', [])
|
|
alias = aliases[0] if aliases else None
|
|
except ClientError:
|
|
alias = None
|
|
|
|
label = alias or profile_hint or account_id
|
|
return {'id': account_id, 'label': label, 'alias': alias}
|
|
|
|
|
|
def collect(session, account):
|
|
region = session.region_name
|
|
out = empty_region()
|
|
|
|
ec2 = session.client('ec2')
|
|
elb = session.client('elbv2')
|
|
rds = session.client('rds')
|
|
|
|
vpcs = ec2.describe_vpcs()['Vpcs']
|
|
subnets = ec2.describe_subnets()['Subnets']
|
|
sgs = ec2.describe_security_groups()['SecurityGroups']
|
|
enis = ec2.describe_network_interfaces()['NetworkInterfaces']
|
|
reservations = ec2.describe_instances()['Reservations']
|
|
|
|
for v in vpcs:
|
|
out['vpcs'].append({
|
|
'id': v['VpcId'],
|
|
'cidr': v['CidrBlock'],
|
|
'name': name_tag(v.get('Tags')) or v['VpcId'],
|
|
'accountId': account['id']
|
|
})
|
|
|
|
for sn in subnets:
|
|
out['subnets'].append({
|
|
'id': sn['SubnetId'],
|
|
'vpc': sn['VpcId'],
|
|
'cidr': sn['CidrBlock'],
|
|
'az': sn['AvailabilityZone'],
|
|
'public': sn.get('MapPublicIpOnLaunch', False),
|
|
'accountId': account['id']
|
|
})
|
|
|
|
def flatten_rules(perms):
|
|
rules = []
|
|
for perm in perms:
|
|
proto = perm.get('IpProtocol')
|
|
frm = perm.get('FromPort')
|
|
to = perm.get('ToPort')
|
|
if frm is None:
|
|
port = 'all'
|
|
elif frm == to:
|
|
port = f"{frm}"
|
|
else:
|
|
port = f"{frm}-{to}"
|
|
for ipr in perm.get('IpRanges', []) + perm.get('Ipv6Ranges', []):
|
|
cidr = ipr.get('CidrIp') or ipr.get('CidrIpv6')
|
|
rules.append(f"{proto} {port} from {cidr}")
|
|
for sgr in perm.get('UserIdGroupPairs', []):
|
|
rules.append(f"{proto} {port} from {sgr['GroupId']}")
|
|
return rules
|
|
|
|
for g in sgs:
|
|
out['sgs'].append({
|
|
'id': g['GroupId'],
|
|
'name': g.get('GroupName'),
|
|
'vpc': g.get('VpcId'),
|
|
'accountId': account['id'],
|
|
'rules_in': flatten_rules(g.get('IpPermissions', [])),
|
|
'rules_out': flatten_rules(g.get('IpPermissionsEgress', []))
|
|
})
|
|
|
|
for ni in enis:
|
|
assoc = ni.get('Association', {})
|
|
out['enis'].append({
|
|
'id': ni['NetworkInterfaceId'],
|
|
'subnet': ni['SubnetId'],
|
|
'vpc': ni.get('VpcId'),
|
|
'sgs': [sg['GroupId'] for sg in ni.get('Groups', [])],
|
|
'privateIp': ni.get('PrivateIpAddress'),
|
|
'publicIp': assoc.get('PublicIp'),
|
|
'accountId': account['id']
|
|
})
|
|
|
|
for reservation in reservations:
|
|
for inst in reservation['Instances']:
|
|
inst_name = name_tag(inst.get('Tags')) or inst['InstanceId']
|
|
out['ec2'].append({
|
|
'id': inst['InstanceId'],
|
|
'name': inst_name,
|
|
'type': inst['InstanceType'],
|
|
'privateIp': inst.get('PrivateIpAddress'),
|
|
'publicIp': inst.get('PublicIpAddress'),
|
|
'state': inst.get('State', {}).get('Name', 'unknown'),
|
|
'sgs': [g['GroupId'] for g in inst.get('SecurityGroups', [])],
|
|
'subnet': inst.get('SubnetId'),
|
|
'vpc': inst.get('VpcId'),
|
|
'accountId': account['id']
|
|
})
|
|
|
|
try:
|
|
lbs = elb.describe_load_balancers()['LoadBalancers']
|
|
except (ClientError, elb.exceptions.UnsupportedFeatureException):
|
|
lbs = []
|
|
|
|
for lb in lbs:
|
|
lb_id = lb['LoadBalancerName']
|
|
try:
|
|
listeners = elb.describe_listeners(LoadBalancerArn=lb['LoadBalancerArn'])['Listeners']
|
|
except ClientError:
|
|
listeners = []
|
|
listener_meta = [{'proto': L['Protocol'], 'port': L['Port']} for L in listeners]
|
|
|
|
try:
|
|
tgs = elb.describe_target_groups(LoadBalancerArn=lb['LoadBalancerArn'])['TargetGroups']
|
|
except ClientError:
|
|
tgs = []
|
|
|
|
target_groups = []
|
|
all_targets = []
|
|
for tg in tgs:
|
|
try:
|
|
health = elb.describe_target_health(TargetGroupArn=tg['TargetGroupArn'])['TargetHealthDescriptions']
|
|
except ClientError:
|
|
health = []
|
|
targets = [desc['Target']['Id'] for desc in health]
|
|
target_groups.append({'port': tg.get('Port'), 'targets': targets})
|
|
all_targets.extend(targets)
|
|
|
|
out['lbs'].append({
|
|
'id': lb_id,
|
|
'scheme': lb['Scheme'],
|
|
'type': lb['Type'],
|
|
'dns': lb.get('DNSName'),
|
|
'subnets': [az['SubnetId'] for az in lb.get('AvailabilityZones', [])],
|
|
'securityGroups': lb.get('SecurityGroups', []),
|
|
'listeners': listener_meta,
|
|
'targetGroups': target_groups,
|
|
'accountId': account['id']
|
|
})
|
|
|
|
if lb['Scheme'] == 'internet-facing':
|
|
for listener in listener_meta:
|
|
out['exposures'].append({
|
|
'surface': f"{lb_id}:{listener['port']}",
|
|
'world_open': True,
|
|
'via': lb['Type'].upper(),
|
|
'to': [
|
|
f"{t}:{next((tg['port'] for tg in target_groups if t in tg['targets']), None)}"
|
|
for t in all_targets
|
|
],
|
|
'accountId': account['id'],
|
|
'region': region
|
|
})
|
|
|
|
for db in rds.describe_db_instances()['DBInstances']:
|
|
out['rds'].append({
|
|
'id': db['DBInstanceIdentifier'],
|
|
'engine': db['Engine'],
|
|
'port': db.get('DbInstancePort') or db.get('Endpoint', {}).get('Port'),
|
|
'publiclyAccessible': db.get('PubliclyAccessible', False),
|
|
'sgs': [sg['VpcSecurityGroupId'] for sg in db.get('VpcSecurityGroups', [])],
|
|
'subnetGroup': [s['SubnetIdentifier'] for s in db.get('DBSubnetGroup', {}).get('Subnets', [])],
|
|
'accountId': account['id']
|
|
})
|
|
|
|
return out
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description='Gather AWS VPC graph data into graph.json')
|
|
parser.add_argument('regions', nargs='*', help='AWS regions to scan (defaults to your configured region)')
|
|
parser.add_argument('-r', '--region', dest='regions_opt', action='append', help='Additional region to scan (repeatable)')
|
|
parser.add_argument('-p', '--profile', dest='profiles', action='append', help='AWS CLI profile to use (repeatable)')
|
|
parser.add_argument('--account-label', dest='account_labels', action='append', help='Friendly label per profile (matches order of --profile)')
|
|
parser.add_argument('-o', '--output', default='graph.json', help='Output file (default: graph.json)')
|
|
return parser
|
|
|
|
|
|
def main():
|
|
parser = parse_args()
|
|
args = parser.parse_args()
|
|
|
|
regions = []
|
|
if args.regions:
|
|
regions.extend(args.regions)
|
|
if args.regions_opt:
|
|
regions.extend(args.regions_opt)
|
|
|
|
if not regions:
|
|
default_region = boto3.Session().region_name
|
|
if not default_region:
|
|
parser.error('No regions specified and no default region configured.')
|
|
regions = [default_region]
|
|
|
|
regions = sorted(set(regions))
|
|
|
|
profiles = args.profiles or [None]
|
|
account_label_overrides = args.account_labels or []
|
|
|
|
graph_accounts = []
|
|
account_index = {}
|
|
region_data = defaultdict(empty_region)
|
|
|
|
for idx, profile in enumerate(profiles):
|
|
profile_label_hint = account_label_overrides[idx] if idx < len(account_label_overrides) else profile
|
|
for region in regions:
|
|
session = boto3.Session(profile_name=profile, region_name=region)
|
|
account = resolve_account(session, profile_label_hint)
|
|
account_id = account['id']
|
|
if account_id not in account_index:
|
|
color = ACCOUNT_COLORS[len(account_index) % len(ACCOUNT_COLORS)]
|
|
entry = {
|
|
'id': account_id,
|
|
'label': account['label'],
|
|
'alias': account['alias'],
|
|
'profile': profile,
|
|
'color': color
|
|
}
|
|
graph_accounts.append(entry)
|
|
account_index[account_id] = entry
|
|
else:
|
|
entry = account_index[account_id]
|
|
|
|
data = collect(session, entry)
|
|
bucket = region_data[region]
|
|
for key in RESOURCE_KEYS:
|
|
bucket[key].extend(data[key])
|
|
|
|
graph = {
|
|
'accounts': graph_accounts,
|
|
'regions': {region: bucket for region, bucket in region_data.items()}
|
|
}
|
|
|
|
with open(args.output, 'w') as f:
|
|
json.dump(graph, f, indent=2)
|
|
|
|
region_list = ', '.join(regions)
|
|
profile_list = ', '.join(p or 'default' for p in profiles)
|
|
print(f"Wrote {args.output} for regions: {region_list} (profiles: {profile_list})")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|