aws-vis/gather.py

115 lines
5.3 KiB
Python
Executable File

#!/usr/bin/env python3
import boto3, json, os, sys
def name_tag(tags):
return next((t['Value'] for t in (tags or []) if t['Key']=='Name'), None)
def collect(region):
s = boto3.Session(region_name=region)
ec2, elb, rds = s.client('ec2'), s.client('elbv2'), s.client('rds')
out = {k:[] for k in ['vpcs','subnets','sgs','enis','ec2','lbs','rds','exposures']}
# VPCs, Subnets, SGs, ENIs, Instances
vpcs = ec2.describe_vpcs()['Vpcs']
subnets = ec2.describe_subnets()['Subnets']
sgs = ec2.describe_security_groups()['SecurityGroups']
enis = ec2.describe_network_interfaces()['NetworkInterfaces']
resv = ec2.describe_instances()['Reservations']
for v in vpcs:
out['vpcs'].append({'id':v['VpcId'],'cidr':v['CidrBlock'],'name':name_tag(v.get('Tags')) or v['VpcId']})
for sn in subnets:
out['subnets'].append({
'id':sn['SubnetId'],'vpc':sn['VpcId'],'cidr':sn['CidrBlock'],
'az':sn['AvailabilityZone'],'public':sn.get('MapPublicIpOnLaunch',False)
})
for g in sgs:
def flatten(perms):
r=[]
for p in perms:
proto=p.get('IpProtocol'); frm=p.get('FromPort'); to=p.get('ToPort')
port = f"{frm}" if frm==to else f"{frm}-{to}" if frm is not None else "all"
for ipr in p.get('IpRanges',[]) + p.get('Ipv6Ranges',[]):
cidr = ipr.get('CidrIp') or ipr.get('CidrIpv6')
r.append(f"{proto} {port} from {cidr}")
for sgr in p.get('UserIdGroupPairs',[]):
r.append(f"{proto} {port} from {sgr['GroupId']}")
return r
out['sgs'].append({'id':g['GroupId'],'name':g.get('GroupName'),
'rules_in':flatten(g.get('IpPermissions',[])),
'rules_out':flatten(g.get('IpPermissionsEgress',[]))})
for ni in enis:
assoc = ni.get('Association',{})
out['enis'].append({
'id':ni['NetworkInterfaceId'],'subnet':ni['SubnetId'],
'sgs':[sg['GroupId'] for sg in ni.get('Groups',[])],
'privateIp':ni.get('PrivateIpAddress'),
'publicIp':assoc.get('PublicIp')
})
for r in resv:
for i in r['Instances']:
n = name_tag(i.get('Tags')) or i['InstanceId']
eni = (i.get('NetworkInterfaces') or [{}])[0]
out['ec2'].append({
'id': i['InstanceId'],
'name': n,
'type': i['InstanceType'],
'privateIp': i.get('PrivateIpAddress'),
'publicIp': i.get('PublicIpAddress'), # shows on label
'state': i.get('State', {}).get('Name', 'unknown'), # running/stopped
'sgs': [g['GroupId'] for g in i.get('SecurityGroups',[])],
'subnet': i.get('SubnetId')
})
# Load balancers (ALB/NLB), listeners, target groups/targets
try:
lbs = elb.describe_load_balancers()['LoadBalancers']
except elb.exceptions.UnsupportedFeatureException:
lbs = []
for lb in lbs:
lbid = lb['LoadBalancerName']
listeners = elb.describe_listeners(LoadBalancerArn=lb['LoadBalancerArn'])['Listeners']
lst = [{'proto':L['Protocol'],'port':L['Port']} for L in listeners]
tgs = elb.describe_target_groups(LoadBalancerArn=lb['LoadBalancerArn'])['TargetGroups']
tga=[]
for tg in tgs:
th = elb.describe_target_health(TargetGroupArn=tg['TargetGroupArn'])['TargetHealthDescriptions']
targets=[thd['Target']['Id'] for thd in th]
tga.append({'port':tg.get('Port'),'targets':targets})
out['lbs'].append({
'id': lbid,
'scheme': lb['Scheme'],
'type': lb['Type'],
'dns': lb.get('DNSName'), # add me (handy on the label)
'subnets': [z['SubnetId'] for z in lb.get('AvailabilityZones',[])],
'securityGroups': lb.get('SecurityGroups',[]),
'listeners': lst, 'targetGroups': tga
})
# Simple exposure: internet-facing LB listeners are public surfaces
if lb['Scheme']=='internet-facing':
for L in lst:
out['exposures'].append({'surface':f"{lbid}:{L['port']}",
'world_open':True,'via':lb['Type'].upper(),
'to': [f"{t}:{next((tg['port'] for tg in tga if t in tg['targets']),None)}" for t in sum([tg['targets'] for tg in tga],[])]})
# RDS
for db in rds.describe_db_instances()['DBInstances']:
out['rds'].append({
'id':db['DBInstanceIdentifier'],'engine':db['Engine'],
'port':db['DbInstancePort'] if 'DbInstancePort' in db else db.get('Endpoint',{}).get('Port'),
'publiclyAccessible':db.get('PubliclyAccessible',False),
'sgs':[v['VpcSecurityGroupId'] for v in db.get('VpcSecurityGroups',[])],
'subnetGroup':[s['SubnetIdentifier'] for s in db.get('DBSubnetGroup',{}).get('Subnets',[])]
})
return out
def main():
regions = sys.argv[1:] or [boto3.Session().region_name or 'us-west-1']
graph = {'regions':{}}
for r in regions:
graph['regions'][r] = collect(r)
with open('graph.json','w') as f: json.dump(graph, f, indent=2)
print('Wrote graph.json for regions:', ', '.join(regions))
if __name__=='__main__': main()