1
mirror of https://git.dn42.dev/dn42/registry.git synced 2025-01-14 10:27:27 +01:00

add policy checks

This commit is contained in:
xuu 2017-11-06 09:52:54 -07:00
parent e407b0195c
commit 0689c8bb82
No known key found for this signature in database
GPG Key ID: 8B3B0604F164E04F

View File

@ -8,6 +8,8 @@ import sys
import argparse
import log
import glob
import random
from pprint import pprint
SCHEMA_NAMESPACE = "dn42."
@ -335,6 +337,218 @@ def index_files(path):
print("%s\t%s\t%s\t%s" % i)
def http_get(server, url, query=None, headers=None):
import urllib.parse
import http.client
import json
if headers is None:
headers = {}
if 'User-Agent' not in headers:
headers['User-Agent'] = "curl"
if 'Accept' not in headers:
headers['Accept'] = 'application/json'
if query is None:
query = {}
http_client = http.client.HTTPSConnection(server)
full_url = url + '?' + urllib.parse.urlencode(query)
log.debug("GET " + full_url)
http_client.request('GET', full_url, headers=headers)
req = http_client.getresponse()
log.debug("HTTP Response: %d %s" % (req.status, req.reason))
if "application/json" in req.getheader("Content-Type", "application/json"):
if req.status > 299:
return {}
return json.loads(req.read())
if req.status > 299:
return ""
return req.read()
def find(fields=None, filter=None):
server = "util.sour.is"
url = "/v1/reg/reg.objects"
if fields is None:
fields = []
if filter is None:
filter = {}
query = {"fields": ",".join(fields), "filter": ",".join([k + "=" + v for k, v in filter.items()])}
return http_get(server, url, query)
def test_policy(obj_type, name, mntner):
log.debug([obj_type, name, mntner])
if obj_type in ["organisation", "mntner", "person", "role", "as-set", "schema", "dns"]:
if obj_type == "organisation" and not name.startswith("ORG-"):
log.error("%s does not start with 'ORG-'" %(name))
return "FAIL"
elif obj_type == "mntner" and not name.endswith("-MNT"):
log.error("%s does not end with '-MNT'" %(name))
return "FAIL"
elif obj_type == "dns" and not name.endswith(".dn42"):
log.error("%s does not end with '.dn42'" %(name))
return "FAIL"
elif obj_type == "dns" and len(name.strip(".").split(".")) != 2:
log.error("%s is not a second level domain" %(name))
return "FAIL"
elif obj_type in ["person", "role"] and not name.endswith("-DN42"):
log.error("%s does not end with '-DN42'" %(name))
return "FAIL"
lis = find(["mnt-by"], {"@type": obj_type, "@name": name})
if len(lis) == 0:
log.notice("%s does not currently exist" %(name))
return "PASS"
status = 'FAIL'
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
log.error("%s does not have mnt for object" %(mntner))
return status
return status
elif args["type"] in ["inetnum","inet6num"]:
lis = find(["mnt-by"], {"@type": "aut-num", "@name": name})
log.info(lis)
pass
elif args["type"] in ["route","route6"]:
lis = find(["mnt-by"], {"@type": "aut-num", "@name": name})
log.info(lis)
pass
elif args["type"] == "aut-num":
if not name.startswith("AS"):
log.error("%s does not start with AS" %(name))
return "FAIL"
# 1. Check if they already have an object
lis = find(["mnt-by"], {"@type": "aut-num", "@name": name})
log.info(lis)
if len(lis) > 0:
status = 'FAIL'
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
log.notice("%s has mnt for current object" %(mntner))
return status
log.error("%s does not have mnt for current object" %(mntner))
return status
# 2. Check if the as-block has an open policy
asn = "AS{:0>9}".format(name[2:])
lis = find(["as-block","policy","@as-min","@as-max","mnt-by","mnt-lower"], {"@type": "as-block","@as-min":"le=" + asn,"@as-max": "ge=" + asn})
log.info(lis)
policy = {}
select = None
mntners = []
for n in lis:
obj = {}
for o in n:
obj[o[0]] = o[1]
if o[0].startswith("mnt-"):
mntners.append(o[1])
k = (obj["@as-min"],obj["@as-max"])
policy[k] = obj
if select is None:
select = k
elif select[0]<=k[0] or select[1]>=k[1]:
select = k
if policy[select]["policy"] == "open":
log.notice("Policy is open for parent object")
return "PASS"
# 3. Check if mntner or mnt-lower for any as-block in the tree.
elif mntner in mntners:
log.notice("%s has mnt in parent object" %(mntner))
return "PASS"
elif args["type"] == "as-block":
Lname, Hname = name.split("-")
Lname, Hname = Lname.strip(), Hname.strip()
if not Lname.startswith("AS") or not Hname.startswith("AS"):
log.error("%s does not start with AS for min and max" %(name))
return "FAIL"
# 1. Check if they already have an object
lis = find(["mnt-by"], {"@type": "as-block", "@name": name})
log.info(lis)
if len(lis) > 0:
status = 'FAIL'
for o in lis:
for n in o:
if n[0] == "mnt-by" and n[1] == mntner:
status = 'PASS'
log.notice("%s has mnt for current object" %(mntner))
return status
log.notice("%s does not have mnt for current object" %(mntner))
return status
# 2. Check if the parent as-blocks have an open policy
Lasn = "AS{:0>9}".format(Lname[2:])
Hasn = "AS{:0>9}".format(Hname[2:])
if Lasn > Hasn:
log.error("%s should come before %s" %(Lname, Hname))
lis = find(["as-block","policy","@as-min","@as-max","mnt-by","mnt-lower"], {"@type": "as-block","@as-min":"le=" + Lasn,"@as-max": "ge=" + Hasn})
log.info(lis)
policy = {}
select = None
mntners = []
for n in lis:
obj = {}
for o in n:
obj[o[0]] = o[1]
if o[0].startswith("mnt-"):
mntners.append(o[1])
k = (obj["@as-min"],obj["@as-max"])
policy[k] = obj
if select is None:
select = k
elif select[0]<=k[0] or select[1]>=k[1]:
select = k
# Policy Open only applies to aut-nums. as-blocks must be defined by parent mntners only.
#
# if policy[select]["policy"] == "open":
# log.notice("Policy is open for parent object")
# return "PASS"
# 3. Check if mntner or mnt-lower for any as-block in the tree.
if mntner in mntners:
log.notice("%s has mnt in parent object" %(mntner))
return "PASS"
pass
log.error("%s does not pass checks for %s %s" %(mntner, obj_type, name))
return "FAIL"
def get_args():
"""Get and parse command line arguments"""
@ -384,6 +598,13 @@ def get_args():
parser_fmt.add_argument('-i', '--in-place',
help="Format file in place", action="store_true")
parser_pol = subparsers.add_parser('policy', help='Format file')
parser_pol.add_argument('type', nargs="?", type=str, help="dn42 object type")
parser_pol.add_argument('name', nargs="?", type=str, help="dn42 object name")
parser_pol.add_argument('mntner', nargs="?", type=str, help="dn42 object mntner")
parser_mroute = subparsers.add_parser('match-routes', help='Match routes to inetnums')
return vars(parser.parse_args())
@ -437,3 +658,93 @@ if __name__ == '__main__':
f.write(str(dom))
print(str(dom))
elif args["command"] == "policy":
if args["type"] is None:
log.fatal("Type should be provided")
if args["name"] is None:
log.fatal("Name should be provided")
if args["mntner"] is None:
log.fatal("Mntner should be provided")
status = test_policy(args["type"], args["name"], args["mntner"])
print(status)
if status != "PASS":
sys.exit(1)
elif args["command"] == "match-routes":
lis = find(["mnt-by","cidr","route","@netlevel", "@netmin", "@netmax", "@uri"], {"@family":"ipv4"})
def field(x, field):
for i in x:
if i[0] == field:
return i[1]
return None
def lvl(x):
for i in x:
if i[0] == "@netlevel":
return i[1]
def net(x):
for i in x:
if i[0] == "@netmin":
return i[1]
def is_net(x):
i = field(x, "cidr")
if i is not None:
return True
return False
def obj(x):
d = {}
for k,v in x:
if k in d:
d[k].append(v)
else:
d[k] = [v]
return d
inet = None
first = True
for n in sorted(sorted(lis, key=lvl), key=net):
o = obj(n)
if is_net(n):
if not first:
print()
first = True
inet = o
continue
ilvl = int(inet["@netlevel"][0])
rlvl = int(o["@netlevel"][0])
if ilvl + 1 != rlvl:
print("\nNo Parent > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]), \
"Nearest INET ", inet["cidr"][0], " ", ilvl, " ", ",".join(inet["mnt-by"]))
first = True
continue
if inet["@netmin"][0] > o["@netmin"][0] or inet["@netmax"][0] < o["@netmax"][0]:
print("\nNo Parent > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]), \
"Nearest INET ", inet["cidr"][0], " ", ilvl, " ", ",".join(inet["mnt-by"]))
first = True
continue
continue
if first:
first = False
print(inet["cidr"]," ", ilvl, ",".join(inet["mnt-by"]))
print(" > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]))
else:
print(" > ", o["route"][0], " ", rlvl, " ", ",".join(o["mnt-by"]))