Merge pull request 'delete items removed from project' (#55) from xuu-20200716/remove-rspl into master

Reviewed-on: https://git.dn42.dev/dn42/registry/pulls/55
Reviewed-by: schema-checker <schema-checker@noreply.dn42.us>
This commit is contained in:
Xuu 2020-07-19 00:58:52 +00:00
commit d31c987488
25 changed files with 0 additions and 2779 deletions

View File

@ -1,2 +0,0 @@
test:
python3 -m unittest discover

View File

@ -1,19 +0,0 @@
"DN42 RSPL Library"
__version__ = "0.3.0"
from .file import FileDOM, Row, Value, index_files
from .schema import SchemaDOM, Level, State
from .transact import TransactDOM
from .config import Config
from .nettree import NetTree, NetRecord, NetList, as_net6
from .rspl import RPSL
__all__ = [
"FileDOM", "Row", "Value", "index_files",
"SchemaDOM", "Level", "State",
"TransactDOM",
"Config",
"NetTree", "NetRecord", "NetList", "as_net6",
"RPSL",
]

View File

@ -1,148 +0,0 @@
"RSPL Config"
import os
import os.path
from dataclasses import dataclass
from typing import Dict, Set, Tuple, Optional, TypeVar
from .file import FileDOM
C = TypeVar('C', bound='Config')
@dataclass
class Config:
"RSPL Config"
path: str
_dom: FileDOM
@property
def namespace(self) -> str:
"Get namespace"
return self._dom.get("namespace", default="dn42").value
@property
def schema(self) -> str:
"Get schema type name"
return str(self._dom.get("schema", default="schema"))
@property
def owners(self) -> str:
"Get owner type name"
return str(self._dom.get("owner", default="mntner"))
@property
def source(self) -> str:
"Get source"
return self._dom.get("source", default="DN42").value
@property
def default_owner(self) -> str:
"Get default onwer"
return self._dom.get("default-owner", default=self._dom.mntner).value
@property
def network_owners(self) -> Dict[str, str]:
"Get network owners"
network_owner = {} # type: Dict[str, str]
for (parent, child) in [
i.fields for i in self._dom.get_all("network-owner")]:
network_owner[child] = parent
return network_owner
@property
def primary_keys(self) -> Dict[str, str]:
"Get primary keys"
primary_keys = {} # type: Dict[str, str]
for (parent, key) in [
i.fields for i in self._dom.get_all("primary-key")]:
primary_keys[parent] = key
return primary_keys
@property
def network_parents(self) -> Set[str]:
"return network parents"
return set(self.network_owners.values())
@property
def network_children(self) -> Set[str]:
"return network children"
return set(self.network_owners.keys()) - self.network_parents
@property
def schema_dir(self) -> str:
"get schema directory"
return os.path.join(self.path, self.schema)
@property
def owner_dir(self) -> str:
"get owner directory"
return os.path.join(self.path, self.owners)
@property
def config_file(self) -> str:
"get config file"
return os.path.join(self.path, ".rpsl/config")
@property
def index_file(self) -> str:
"get index file"
return os.path.join(self.path, ".rpsl/index")
@property
def links_file(self) -> str:
"get links file"
return os.path.join(self.path, ".rpsl/links")
@property
def schema_file(self) -> str:
"get schema file"
return os.path.join(self.path, ".rpsl/schema")
@property
def nettree_file(self) -> str:
"get nettree file"
return os.path.join(self.path, ".rpsl/nettree")
@classmethod
def from_path(cls, path: str) -> C:
"create from path"
src = os.path.join(path, ".rpsl/config")
return cls(FileDOM.from_file(src))
@classmethod
def build(cls, # pylint: disable=too-many-arguments
path: str,
namespace: str = "dn42",
owners: str = "mntner",
schema: str = "schema",
source: str = "DN42",
dir_name: Optional[Set[Tuple[str, str]]] = None,
primary_keys: Optional[Set[Tuple[str, str]]] = None,
network_owners: Optional[Set[Tuple[str, str]]] = None,
) -> FileDOM:
"Build config from parameters"
FileDOM.namespace = namespace
dom = FileDOM()
dom.src = os.path.join(path, ".rpsl/config")
dom.put("namespace", namespace)
dom.put("owners", owners)
dom.put("schema", schema)
dom.put("source", source)
for (k, v) in dir_name:
dom.put("dir-name", f"{k} {v}", append=True)
for (k, v) in primary_keys:
dom.put("primary-key", f"{k} {v}", append=True)
for (k, v) in network_owners:
dom.put("network-owner", f"{v} {k}", append=True)
return cls(dom)
def __init__(self, dom: FileDOM):
self._dom = dom
self.path = os.path.dirname(os.path.dirname(dom.src))
def __str__(self):
return self._dom.__str__()

View File

@ -1,275 +0,0 @@
"""FileDOM parse and formating"""
import re
import os
from dataclasses import dataclass
from typing import Sequence, NamedTuple, List, \
Dict, Optional, Tuple, Union, Generator, TypeVar
from ipaddress import ip_network, IPv4Network, IPv6Network
import log
DOM = TypeVar("DOM", bound="FileDOM")
@dataclass(frozen=True)
class Value:
"""Dom Value"""
value: str
def __eq__(self, other: str) -> bool:
return self.value == other
def __str__(self) -> str:
return self.value
@property
def lines(self) -> List[str]:
"""return value split into lines"""
return self.value.splitlines()
@property
def fields(self) -> List[str]:
"""return value split into fields"""
return self.value.split()
@property
def as_net(self) -> Union[IPv4Network, IPv6Network]:
"""return value as an ip network"""
return ip_network(self.value)
@property
def as_net6(self) -> IPv6Network:
"""return value as an ip network"""
net = ip_network(self.value)
if isinstance(net, IPv6Network):
return net
n = net
return ip_network(
f"::FFFF:{n.network_address}/{n.prefixlen + 96}")
@property
def as_key(self) -> str:
"""Format as key name"""
return self.value.replace("/", "_").replace(" ", "")
@property
def as_spec(self) -> List[str]:
"get the spec definition"
fields = self.fields
i = fields.index(">")
if i is None:
return []
return fields[i:]
class Row(NamedTuple):
"""DOM Row"""
key: str
value: Value
lineno: int
src: str = None
@property
def loc(self) -> str:
"""format as location"""
s = f"{self.src} Line {self.lineno} "
s += "" if self.key == "" else f"Key [{self.key}]:"
return s
class FileDOM:
"""Parses a reg file"""
def __init__(self,
text: Optional[Sequence[str]] = None,
src: Optional[str] = None):
self.valid = False
self.dom = [] # type: List[Row]
self.keys = {} # type: Dict[str, int]
self.multi = {} # type: Dict[str, int]
self.mntner = [] # type: List[str]
self.namespace = ""
self.primary_key = ""
self.src = src
if text is not None:
self.parse(text, src=src)
def parse(self, text: Sequence[str], src: Optional[str] = None):
"""Parse an input string generator"""
dom = []
keys = {}
multi = {}
mntner = []
last_multi = None
self.valid = False
self.src = self.src if src is None else src
for lineno, i in enumerate(text, 1):
# print(lineno, i)
if re.match(r'[ \t]', i):
if len(dom) == 0:
log.error(f"File {src} does not parse properly")
return
dom[-1][1] += "\n" + i.strip()
if dom[-1][0] not in multi:
multi[dom[-1][0]] = []
if last_multi is None:
multi[dom[-1][0]].append(lineno)
last_multi = dom[-1][0]
else:
if i[0] == '+':
dom[-1][1] += "\n"
if dom[-1][0] not in multi:
multi[dom[-1][0]] = []
if last_multi is None:
multi[dom[-1][0]].append(lineno)
last_multi = dom[-1][0]
i = i.split(":")
if len(i) < 2:
continue
dom.append([i[0].strip(), ':'.join(
i[1:]).strip(), lineno - 1])
if i[0].strip() not in keys:
keys[i[0].strip()] = []
keys[i[0].strip()].append(len(dom) - 1)
last_multi = None
if dom[-1][0] == 'mnt-by':
mntner.append(dom[-1][1])
self.dom = [Row(k, Value(v), n, self.src) for k, v, n in dom]
if len(self.dom) > 1:
self.primary_key = self.dom[0].key
self.keys = keys
self.multi = multi
self.mntner = mntner
if self.src is None:
self.src = f"{self.schema}/{self.name}"
self.valid = True
@property
def schema(self) -> str:
"""return the schema name for file"""
if len(self.dom) < 1:
return None
return self.dom[0].key
@property
def name(self) -> str:
"""return the friendly name for file"""
if self.primary_key != "":
return self.get(self.primary_key).value
if len(self.dom) < 1:
return "none"
fields = self.dom[0].value.fields
if len(fields) < 1:
return "none"
return fields[0]
@property
def rel(self) -> str:
"generate rel for schema ref"
return f"{self.namespace}.{self.schema}"
@property
def index(self) -> Tuple[str, str]:
"""generate index key/value pair"""
name = self.src.split("/")[-1].replace("_", "/")
return f"{self.namespace}.{self.schema}", name
def __str__(self):
length = 19
for i in self.dom:
if len(i.key) > length:
length = len(i.key) + 2
s = ""
for i in self.dom:
sp = i.value.lines
if len(sp) == 0:
s += i.key + ":" + " " * (length - len(i.key)) + "\n"
continue
s += i.key + ":" + " " * (length - len(i.key)) + sp[0] + "\n"
for m in sp[1:]:
if m == "":
s += "+\n"
continue
s += " " * (length + 1) + m + "\n"
return s
def get(self, key, index=0, default=None):
"""Get a key value"""
if key not in self.keys:
return default
if index >= len(self.keys[key]) or index <= -len(self.keys[key]):
return default
return self.dom[self.keys[key][index]].value
def get_all(self, key) -> Generator[str, None, None]:
"Get all values for a key"
if key not in self.keys:
return
for i in self.keys[key]:
yield self.dom[i].value
def put(self, key, value, index=0, append=False):
"""Put a value"""
if key not in self.keys:
self.keys[key] = []
i = (self.keys[key][index:index+1] or (None,))[0]
if i is None or append:
i = len(self.dom)
self.dom.append(Row(key, Value(value), i))
elif i is not None:
self.dom[i] = Row(key, Value(value), i)
if index not in self.keys[key]:
self.keys[key].append(i)
@classmethod
def from_file(cls, fn: str) -> DOM:
"""Parses FileDOM from file"""
with open(fn, mode='r', encoding='utf-8') as f:
dom = cls(src=fn, text=f.readlines())
return dom
def index_files(path: str,
namespace: str,
primary_keys: Dict[str, str]) -> FileDOM:
"""generate list of dom files"""
for root, _, files in os.walk(path):
if root == path:
continue
if root.endswith(".rpsl"):
continue
for f in files:
dom = FileDOM.from_file(os.path.join(root, f))
dom.namespace = namespace
if dom.schema in primary_keys:
dom.primary_key = primary_keys[dom.schema]
yield dom

View File

@ -1,23 +0,0 @@
"Metafile"
from dataclasses import dataclass
from typing import Sequence, Generator
from .rspl import RPSL
from .file import Value
@dataclass
class MetaFile:
"file"
obj_type: str
obj_name: str
class MetaDOM:
"metafile dom"
def __init__(self, lis: Sequence[MetaFile], rpsl: RPSL):
self.lis = lis
self.rpsl = rpsl
def get(self, name: str) -> Generator[Value, None, None]:
"get values"

View File

@ -1,232 +0,0 @@
"Net Tree"
from ipaddress import ip_network, IPv6Network
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Optional, Generator, TypeVar
NET = IPv6Network
V6_NET = ip_network("::/0")
V4_NET = ip_network("::ffff:0.0.0.0/96")
NT = TypeVar("NT", bound="NetTree")
def as_net6(value: str) -> IPv6Network:
"""return value as an ip network"""
net = ip_network(value)
if isinstance(net, IPv6Network):
return net
n = net
return ip_network(
f"::FFFF:{n.network_address}/{n.prefixlen + 96}")
@dataclass
class NetRecord:
"Network Record"
network: NET
policy: str
status: str
is_leaf: bool = False
@property
def object_type(self) -> str:
"""object type"""
if self.is_leaf:
return "route" if V4_NET.supernet_of(self.network) \
else "route6"
return "inetnum" if V4_NET.supernet_of(self.network) \
else "inet6num"
@property
def object_name(self) -> str:
"""object name"""
if V4_NET.supernet_of(self.network):
n = self.network.network_address.exploded.replace(":", "")[-8:]
return ip_network((
bytes.fromhex(n),
self.network.prefixlen - 96,
)).with_prefixlen.replace("/", "_")
return self.network.with_prefixlen.replace("/", "_")
def __str__(self) -> str:
return f"{self.object_type}/{self.object_name}"
@dataclass
class NetList:
"Network List"
index: int
parent: Optional[int]
level: int
net: Optional[NetRecord]
nets: List[NET]
routes: List[NetRecord] = field(default_factory=list)
def in_net(self, i: NET) -> Tuple[bool, NET]:
"find a network within a list of networks"
found = False
net = None
for n in self.nets:
if n.supernet_of(i):
found = True
net = n
break
return found, net
def in_routes(self, i: NET) -> Tuple[bool, NET]:
"find a network within a list of networks"
found = False
net = None
for n in self.routes:
if n.network.supernet_of(i):
found = True
net = n
break
return found, net
class NetTree:
"Network Tree"
def __init__(self,
nets: Optional[List[NetRecord]] = None,
routes: Optional[List[NetRecord]] = None):
self.tree = {} # type: Dict[NET, NetList]
if routes is None:
routes = []
if nets is not None:
self.make_tree(nets, routes)
def __getitem__(self, key):
return self.tree[key]
def find_tree(self, ip: str) -> Generator[NetList, None, None]:
"""Find net in tree"""
net = V6_NET
current = self.tree[net]
needle = as_net6(ip)
yield current
while True:
found, net = current.in_net(needle)
if found:
current = self.tree[net]
yield current
continue
break
def make_tree(self,
nets: List[NetRecord],
routes: List[NetRecord]):
"""build a network tree index"""
root = V6_NET
self.tree = {root: NetList(0, None, -1, None, [])}
index = 0
for index, net in enumerate(sorted(
sorted(nets, key=lambda x: x.network),
key=lambda x: x.network.prefixlen)):
current = self.tree[root]
while True:
found, n = current.in_net(net.network)
if found:
current = self.tree[n]
continue
if current.level >= 0:
current.nets.append(net.network)
self.tree[net.network] = NetList(
index, current.index, current.level + 1, net, [])
break
for index, net in enumerate(sorted(
sorted(routes, key=lambda x: x.network),
key=lambda x: x.network.prefixlen), index):
current = self.tree[root]
while True:
found, n = current.in_net(net.network)
if found:
current = self.tree[n]
continue
rec = NetRecord(net.network, "-", "-", True)
current.routes.append(rec)
break
def write_csv(self, fn: str = ".netindex"):
"write tree to csv"
with open(fn, "w") as f:
f.writelines(self._lines())
def __str__(self) -> str:
return "".join(self._lines())
def _lines(self) -> Generator[str, None, None]:
for v in sorted(
sorted(self.tree.values(), key=lambda x: x.index),
key=lambda x: x.level):
net_addr = v.net.network.network_address.exploded
net_pfx = v.net.network.prefixlen
yield (
"|".join([str(i) for i in (
f"{v.index:04d}|{v.parent:04d}|{v.level:04d}",
net_addr,
net_pfx,
v.net.policy,
v.net.status,
v.net.object_type,
v.net.object_name,
)]) + "\n")
for route in v.routes:
net_addr = route.network.network_address.exploded
net_pfx = route.network.prefixlen
yield (
"|".join([str(i) for i in (
f"{0:04d}|{v.index:04d}|{v.level+1:04d}",
net_addr,
net_pfx,
route.policy,
route.status,
route.object_type,
route.object_name,
)]) + "\n")
@classmethod
def read_csv(cls, fn) -> NT:
"read tree from csv"
inttree = {} # type: Dict[int, NetRecord]
with open(fn) as fd:
for line in fd.readlines():
sp = line.split(sep="|")
if len(sp) != 9:
continue
net = ip_network(f"{sp[3]}/{sp[4]}")
is_leaf = sp[7] in ("route", "route6")
rec = NetRecord(net, sp[5], sp[6], is_leaf)
if is_leaf:
inttree[sp[1]].routes.append(rec)
else:
lis = NetList(sp[0], sp[1], sp[2], rec, [])
inttree[sp[0]] = lis
if sp[0] != sp[1]:
inttree[sp[1]].nets.append(net)
nettree = {}
for v in inttree.values():
nettree[v.net.network] = v
c = cls()
c.tree = nettree
return c

View File

@ -1,119 +0,0 @@
"RPSL"
import os.path
from typing import Dict, List, Tuple, TypeVar, Optional, Sequence
from .file import FileDOM
from .nettree import NetTree, NetList
from .schema import SchemaDOM, State
from .transact import TransactDOM
from .config import Config
R = TypeVar('R', bound="RPSL")
class RPSL:
"RSPL"
def __init__(self, config: Config):
self._config = config
self._files = {} # type: Dict[Tuple[str, str], str]
self._lookup = {} # type: Dict[str, List[Tuple[str, str]]]
self._links = {} \
# type: Dict[Tuple[str, str], List[Tuple[str, str, str]]]
self._nettree = None # type: NetTree
self._schema = {} # type: Dict[str, SchemaDOM]
self._load_index()
def _load_index(self):
with open(self._config.index_file) as fd:
for line in fd.readlines():
sp = line.strip().split(sep="|")
self._files[(sp[0], sp[1])] = sp[2]
self._lookup[sp[1]] = self._lookup.get(sp[1], [])
self._lookup[sp[1]].append((sp[0], sp[1]))
with open(self._config.links_file) as fd:
for line in fd.readlines():
sp = line.strip().split(sep="|")
key = (sp[0], sp[1])
arr = self._links.get(key, [])
arr.append((sp[2], sp[3], sp[4]))
self._links[key] = arr
self._nettree = NetTree.read_csv(self._config.nettree_file)
files = TransactDOM.from_file(self._config.schema_file)
for schema in files.schemas:
self._schema[schema.ref] = schema
def append_index(self, dom: FileDOM):
"append files to index"
key, value = dom.index, (dom.src, ",".join(dom.mntner))
self._lookup[key] = value
def scan_files(self, files: List[FileDOM]) -> State:
"scan files for schema errors"
state = State()
for dom in files:
s = self._schema.get(dom.rel)
if s is None:
state.warning(dom.dom[0],
f"{dom.src} schema not found for {dom.rel}")
continue
state = s.check_file(dom, lookups=self._files, state=state)
return state
def find(self,
text: str,
schema: Optional[str] = None) -> Sequence[str]:
"Find files that match text and schema"
keys = [(schema, text)]
if schema is None:
keys = self._lookup.get(text, [])
return [self._files[i] for i in keys]
def related(
self,
key: Tuple[str, str]) -> Sequence[str]:
"Get files related to file"
related = set()
for link in self.links(key):
key = (link[1], link[2])
related.add(key)
return [self._files[i] for i in related]
def find_network(self, ip: str) -> Sequence[NetList]:
"""Find Network in index
Args:
ip (str): ip address
Returns:
Generator[NetList, None, None]: generator of netlists
"""
return self._nettree.find_tree(ip)
def load_file(self, fn: str) -> FileDOM:
"load file"
fn = os.path.join(self._config.path, fn)
fo = FileDOM.from_file(fn)
fo.namespace = self._config.namespace
fo.primary_keys = self._config.primary_keys
return fo
def load_files(self, fns: Sequence[str]) -> Sequence[NetList]:
for fn in fns:
yield self.load_file(fn)
def links(self, key: Tuple[str, str]) -> List[Tuple[str, str]]:
"get links"
return self._links.get(key, [])
def schema(self, name: str) -> SchemaDOM:
"get schema"
return self._schema.get(name)

View File

@ -1,279 +0,0 @@
"""Schema DOM"""
import re
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Optional, List, Tuple, Dict, Set, TypeVar
import log
from .file import FileDOM, Row
DOM = TypeVar("DOM", bound="FileDOM")
STATE = TypeVar("STATE", bound="State")
class Level(Enum):
"""State error level"""
info = auto()
warning = auto()
error = auto()
@dataclass
class State:
"""State of schema check
"""
state: bool = True
msgs: List[Tuple[Level, Row, str]] = field(default_factory=list)
def __eq__(self, other: bool) -> bool:
return self.state == other
def __bool__(self):
return self.state
def __str__(self) -> str:
return "PASS" if self.state else "FAIL"
def extend(self, state: STATE):
"apply state to state"
self.msgs.extend(state.msgs)
self.state = state.state
def print_msgs(self):
"""print out state info"""
for (level, row, msg) in self.msgs:
if level == Level.info:
log.info(f"{row.loc} {msg}")
elif level == Level.warning:
log.warning(f"{row.loc} {msg}")
elif level == Level.error:
log.error(f"{row.loc} {msg}")
def info(self, r: Row, s: str):
"""Add warning"""
self.msgs.append((Level.info, r, s))
def warning(self, r: Row, s: str):
"""Add warning"""
self.msgs.append((Level.warning, r, s))
def error(self, r: Row, s: str):
"""Add error"""
self.state = False
self.msgs.append((Level.error, r, s))
class SchemaDOM:
"""Schema DOM"""
def __init__(self,
dom: FileDOM,
src: Optional[str] = None):
self.valid = False
self.name = None
self.ref = None
self.primary = None
self.type = None
self.src = src
self._schema = {} # type: Dict[str, Set[str]]
self._spec = {} # type: Dict[str, str]
self._links = {} # type: Dict[str, List[str]]
self.dom = dom
self.parse(dom)
@property
def links(self) -> Dict[str, List[str]]:
"return schema links"
return self._links
@property
def namespace(self) -> str:
"get namespace"
ns = "default"
ref = self._dom.get("ref")
if ref is not None:
ns = ref.value.split(".")[0]
return ns
def parse(self, f: FileDOM):
"""Parse a FileDOM into a SchemaDOM"""
self.src = self.src if f.src is None else f.src
self._dom = f
schema = {}
for row in f.dom:
if row.key == 'ref':
self.ref = str(row.value)
elif row.key == 'schema':
self.name = str(row.value)
if row.key != 'key':
continue
lines = row.value.fields
key = lines.pop(0)
schema[key] = set()
for i in lines:
if i == ">":
break
schema[key].add(i)
if i.startswith("lookup="):
self._links[key] = i.split("=", 2)[1].split(",")
schema = self._process_schema(schema)
self.valid = True
self._schema = schema
return schema
def _process_schema(self, schema):
for k, v in schema.items():
if 'schema' in v:
self.type = k
if 'primary' in v:
self.primary = k
schema[k].add("oneline")
if "multiline" in v:
schema[k].remove("multiline")
schema[k].add("single")
if "multiple" in v:
schema[k].remove("multiple")
schema[k].add("required")
if "optional" in v:
schema[k].remove("optional")
if "recommend" in v:
schema[k].remove("recommend")
if "deprecate" in v:
schema[k].remove("deprecate")
if 'oneline' not in v:
schema[k].add("multiline")
if 'single' not in v:
schema[k].add("multiple")
return schema
def check_file(self,
f: FileDOM,
lookups=None,
state: Optional[State] = None) -> State:
"""Check a FileDOM for correctness(tm)"""
if state is None:
state = State()
file_state = State()
if not f.valid:
file_state.error(Row("", "", 0, f.src), "file does not parse")
file_state = self._check_file_structure(file_state, f)
file_state = self._check_file_values(file_state, f, lookups)
file_state = inetnum_check(file_state, f)
print("CHECK\t%-10s\t%-44s\t%s\tMNTNERS: %s" %
(f.schema, f.src.split("/")[-1], file_state, ','.join(f.mntner)))
state.extend(file_state)
return state
def _check_file_structure(self, state: State, f: FileDOM) -> State:
for k, v in self._schema.items():
row = Row(k, "", 0, f.src)
if 'required' in v and k not in f.keys:
state.error(row, "not found and is required")
elif 'recommend' in v and k not in f.keys:
state.info(row, "not found and is recommended")
if 'schema' in v and f"{f.namespace}.{f.dom[0].key}" != self.ref:
state.error(row, "not found and is required as the first line")
if 'single' in v and k in f.keys and len(f.keys[k]) > 1:
state.warning(row, "first defined here and has repeated keys")
for i in f.keys[k][1:]:
state.error(row, f"repeated on {i} can only appear once")
if 'oneline' in v and k in f.multi:
for i in f.keys[k]:
state.error(row, "can not have multiple lines")
return state
def _check_file_values(self,
state: State,
f: FileDOM,
lookups: Optional[List[Tuple[str, str]]] = None
) -> State:
for row in f.dom:
c = row.value.as_key
src = "None" if f.src is None else f.src
if row.key == self.primary and not src.endswith(c):
state.error(row,
f"primary [{row.value}]" +
f" does not match filename [{src}].")
if row.key.startswith("x-"):
state.info(row, "is user defined")
continue
if row.key not in self._schema:
state.error(row, "not in schema")
continue
if 'deprecate' in self._schema[row.key]:
state.info(row, "was found and is deprecated")
if lookups is not None:
state = self._check_file_lookups(state, row, lookups)
return state
def _check_file_lookups(self,
state: State,
row: Row,
lookups: List[Tuple[str, str]] = None
) -> State:
if row.key not in self._links:
return state
refs = self._links[row.key]
val = row.value.fields[0]
found = False
for ref in refs:
if (ref, val) in lookups:
found = True
if not found:
state.error(row,
f"{row.key} references object {val} " +
f"in {refs} but does not exist.")
return state
def __str__(self) -> str:
return self._dom.__str__()
@staticmethod
def from_file(src: str) -> DOM:
"""Parses SchemaDOM from file"""
with open(src, mode='r', encoding='utf-8') as f:
dom = FileDOM(src=src, text=f.readlines())
return SchemaDOM(dom=dom)
def inetnum_check(state: State, dom: FileDOM) -> State:
"""Sanity Check for checking the inet[6]num value"""
if dom.schema == "inetnum" or dom.schema == "inet6num":
cidr = dom.get("cidr").as_net
Lnet = cidr.network_address.exploded
Hnet = cidr.broadcast_address.exploded
cidr_range = f"{Lnet}-{Hnet}"
file_range = dom.get(dom.schema)
file_range = re.sub(r"\s+", "", str(file_range), flags=re.UNICODE)
if cidr_range != file_range:
state.error(Row("", "", 0, dom.src),
f"inetnum range [{file_range}] " +
f"does not match: [{cidr_range}]")
return state

View File

@ -1,21 +0,0 @@
"spec"
from dataclasses import dataclass
from typing import Dict, List, Enum
class Rule:
pass
@dataclass
class LabelRule(Rule):
name: str
def parse(self, fields: Sequence[str]) -> Optional[Tuple[str, str]]:
@dataclass
class Spec:
keys: Dict[str, SpecRule]
@classmethod
def from_dom(cls, dom: file.FileDOM):
for key in

View File

@ -1,134 +0,0 @@
#!/usr/bin/env python3
"""Test FileDOM"""
import unittest
import inspect
from pprint import pprint
from .filedom import FileDOM
class TestFileDOM(unittest.TestCase):
"""Test FileDOM"""
def test_parse(self):
"""Test Parsing"""
s = """
person: Xuu
remarks: test
+
Multi-Line
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
self.assertTrue(dom.valid)
self.assertEqual(dom.schema, "person")
self.assertEqual(dom.get("person"), "Xuu")
self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertIsNone(dom.get("xxx"))
self.assertEqual(dom.get("xxx", default="default"), "default")
self.assertEqual(str(dom), s)
def test_put_values(self):
"""Test putting values"""
s = """
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
dom.put("source", "SOURIS")
self.assertEqual(dom.get("source"), "SOURIS")
dom.put("contact", "mail:me@sour.is", append=True)
self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is")
def test_parse_ip6address(self):
"""Test network ip address parsing"""
s = """
inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
cidr: fd00::/8
netname: ROOT-DN42-ULA
descr: DN42 ULA Address Space
status: ALLOCATED
policy: open
org: ORG-DN42
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
s = inspect.cleandoc(s)+"\n"
dom = FileDOM(text=s.splitlines())
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "fd00::/8")
self.assertEqual(
cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inet6num"), f"{start} - {end}")
def test_parse_ip4address(self):
"""Test network ip address parsing"""
s = """
inetnum: 172.20.0.0 - 172.23.255.255
cidr: 172.20.0.0/14
netname: ROOT-DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM(text=s.splitlines())
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "172.20.0.0/14")
self.assertEqual(
cidr.exploded, "172.20.0.0/14")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inetnum"), f"{start} - {end}")
@unittest.skip
def test_bad_parse(self):
"""bad parse stuff"""
s = """
person: Xuu
EXTRA
:
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
pprint(dom.dom)
self.assertEqual(str(dom), s)
if __name__ == '__main__':
unittest.main()

View File

@ -1,134 +0,0 @@
#!/usr/bin/env python3
"""Test FileDOM"""
import unittest
import inspect
from pprint import pprint
from .filedom import FileDOM
class TestFileDOM(unittest.TestCase):
"""Test FileDOM"""
def test_parse(self):
"""Test Parsing"""
s = """
person: Xuu
remarks: test
+
Multi-Line
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
self.assertTrue(dom.valid)
self.assertEqual(dom.schema, "person")
self.assertEqual(dom.get("person"), "Xuu")
self.assertEqual(dom.get("contact"), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertIsNone(dom.get("xxx"))
self.assertEqual(dom.get("xxx", default="default"), "default")
self.assertEqual(str(dom), s)
def test_put_values(self):
"""Test putting values"""
s = """
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
dom.put("source", "SOURIS")
self.assertEqual(dom.get("source"), "SOURIS")
dom.put("contact", "mail:me@sour.is", append=True)
self.assertEqual(str(dom.get("contact")), "xmpp:xuu@xmpp.dn42")
self.assertEqual(dom.get("contact", index=1), "mail:xuu@dn42.us")
self.assertEqual(dom.get("contact", index=2), "mail:me@sour.is")
def test_parse_ip6address(self):
"""Test network ip address parsing"""
s = """
inet6num: fd00:0000:0000:0000:0000:0000:0000:0000 - fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff
cidr: fd00::/8
netname: ROOT-DN42-ULA
descr: DN42 ULA Address Space
status: ALLOCATED
policy: open
org: ORG-DN42
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
s = inspect.cleandoc(s)+"\n"
dom = FileDOM(text=s.splitlines())
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "fd00::/8")
self.assertEqual(
cidr.exploded, "fd00:0000:0000:0000:0000:0000:0000:0000/8")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inet6num"), f"{start} - {end}")
def test_parse_ip4address(self):
"""Test network ip address parsing"""
s = """
inetnum: 172.20.0.0 - 172.23.255.255
cidr: 172.20.0.0/14
netname: ROOT-DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM(text=s.splitlines())
cidr = dom.get("cidr").as_net
self.assertEqual(cidr.compressed, "172.20.0.0/14")
self.assertEqual(
cidr.exploded, "172.20.0.0/14")
end = cidr.broadcast_address.exploded
start = cidr.network_address.exploded
self.assertEqual(dom.get("inetnum"), f"{start} - {end}")
@unittest.skip
def test_bad_parse(self):
"""bad parse stuff"""
s = """
person: Xuu
EXTRA
:
source: DN42
"""
s = inspect.cleandoc(s)+"\n"
dom = FileDOM()
dom.parse(s.splitlines())
pprint(dom.dom)
self.assertEqual(str(dom), s)
if __name__ == '__main__':
unittest.main()

View File

@ -1,62 +0,0 @@
"Testing NetTree"
import unittest
from ipaddress import ip_network
from .nettree import NetTree, NetRecord
records = [
NetRecord(
ip_network("::/0"),
["DN42-MNT"],
"closed",
"ALLOCATED"),
NetRecord(
ip_network("::ffff:0.0.0.0/96"),
["DN42-MNT"],
"closed",
"ALLOCATED"),
NetRecord(
ip_network("::ffff:172.21.64.0/125"),
["XUU-MNT"],
"closed",
"ALLOCATED"),
NetRecord(
ip_network("fdea:a15a:77b9::/48"),
["XUU-MNT"],
"closed",
"ALLOCATED"),
]
text = [
"0|0|0|0000:0000:0000:0000:0000:0000:0000:0000|0|inet6num|::_0|closed|ALLOCATED|DN42-MNT", # noqa: E501
"1|0|1|fdea:a15a:77b9:0000:0000:0000:0000:0000|48|inet6num|fdea:a15a:77b9::_48|closed|ALLOCATED|XUU-MNT", # noqa: E501
"2|0|1|0000:0000:0000:0000:0000:ffff:0000:0000|96|inetnum|0.0.0.0_0|closed|ALLOCATED|DN42-MNT", # noqa: E501
"3|2|2|0000:0000:0000:0000:0000:ffff:ac15:4000|125|inetnum|172.21.64.0_29|closed|ALLOCATED|XUU-MNT" # noqa: E501
]
class TestNetTree(unittest.TestCase):
"testing NetTree"
def test_nettree(self):
"test NetTree"
tree = NetTree(records)
for (left, right) in zip(str(tree).splitlines(), text):
self.assertEqual(left, right)
def test_find(self):
"test NetTree"
tree = NetTree(records)
tt = [
("fdea:a15a:77b9:ffff::/64", (True, 2)),
("fdea:a15a:77ba:ffff::/64", (True, 1)),
("::ffff:172.21.64.0/126", (True, 3)),
("::ffff:172.21.64.4/126", (True, 3)),
("::ffff:172.21.64.8/126", (True, 2)),
]
for (net, expect) in tt:
self.assertEqual(
tree.find_tree(ip_network(net)),
expect,
msg="network "+net)

View File

@ -1,285 +0,0 @@
"""Test SchemaDOM"""
import inspect
import unittest
from .schema import SchemaDOM
from .filedom import FileDOM
def clean(s: str) -> str:
"Clean input for use"
return inspect.cleandoc(s) + "\n"
test_files = [
("SCHEMA-SCHEMA", clean(
r"""
schema: SCHEMA-SCHEMA
ref: dn42.schema
key: schema required single primary schema > [name]
key: ref required single > [schema]
key: key required multiple > [key-name]
{required|optional|recommend|deprecate}
{single|multiple} {primary|} {schema|}
lookup=str '>' [spec]...
key: mnt-by required multiple lookup=dn42.mntner > [mntner]
key: remarks optional multiple > [text]...
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
remarks: # option descriptions
Attribute names must match /[a-zA-Z]([a-zA-Z0-9_\-]*[a-zA-Z0-9])?/.
+
required
: object required to have at least one
optional
: object not required to have at least one
+
single
: only one of this type allowed
multiple
: more than one of this type allowed
+
primary
: use field as lookup key for lookup
* only one allowed per schema
* does not allow newlines
+
schema
: use field name as the name of the schema
* only one allowed per schema
* does not allow newlines
+
lookup
: schema match to use for related record
""" # noqa: E501
)),
("INETNUM-SCHEMA", clean(
r"""
schema: INETNUM-SCHEMA
ref: dn42.inetnum
key: inetnum required single schema
key: cidr required single primary
key: netname required single
key: nserver optional multiple > [domain-name]
key: country optional multiple
key: descr optional single
key: status optional single > {ALLOCATED|ASSIGNED} {PI|PA|}
key: policy optional single > {open|closed|ask|reserved}
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: zone-c optional multiple lookup=dn42.person,dn42.role
key: mnt-by optional multiple lookup=dn42.mntner
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("ROLE-SCHEMA", clean(
r"""
schema: ROLE-SCHEMA
ref: dn42.role
key: role required single schema
key: nic-hdl required single primary
key: mnt-by required multiple lookup=dn42.mntner
key: org optional multiple lookup=dn42.organisation
key: admin-c optional multiple lookup=dn42.person
key: tech-c optional multiple lookup=dn42.person
key: abuse-c optional multiple lookup=dn42.person
key: abuse-mailbox optional multiple
key: descr optional single
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("PERSON-SCHEMA", clean(
r"""
schema: PERSON-SCHEMA
ref: dn42.person
key: person required single schema
key: nic-hdl required single primary
key: mnt-by required multiple lookup=dn42.mntner
key: org optional multiple lookup=dn42.organisation
key: nick optional multiple
key: pgp-fingerprint optional multiple
key: www optional multiple
key: e-mail optional multiple
key: contact optional multiple
key: abuse-mailbox optional multiple
key: phone optional multiple
key: fax-no optional multiple
key: address optional multiple
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("MNTNER-SCHEMA", clean(
r"""
schema: MNTNER-SCHEMA
ref: dn42.mntner
key: mntner required single primary schema
key: descr optional single
key: mnt-by required multiple lookup=dn42.mntner
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: auth optional multiple > [method] [value]...
key: org optional multiple lookup=dn42.organisation
key: abuse-mailbox optional single
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("REGISTRY-SCHEMA", clean(
r"""
schema: REGISTRY-SCHEMA
ref: dn42.registry
key: registry required single primary schema
key: url required multiple
key: descr optional multiple
key: mnt-by required multiple lookup=dn42.mntner
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("172.21.64.0_29", clean(
r"""
inetnum: 172.21.64.0 - 172.21.64.7
cidr: 172.21.64.0/29
netname: XUU-TEST-NET
descr: Xuu TestNet
country: US
admin-c: SOURIS-DN42
tech-c: SOURIS-DN42
mnt-by: XUU-MNT
nserver: lavana.sjc.xuu.dn42
nserver: kapha.mtr.xuu.dn42
nserver: rishi.bre.xuu.dn42
status: ALLOCATED
remarks: This is a transfernet.
source: DN42
"""
)),
("SOURIS-DN42", clean(
r"""
role: Souris Organization Role
abuse-mailbox: abuse@sour.is
admin-c: XUU-DN42
tech-c: XUU-DN42
nic-hdl: SOURIS-DN42
mnt-by: XUU-MNT
source: DN42
"""
)),
("XUU-DN42", clean(
r"""
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
"""
)),
("XUU-MNT", clean(
r"""
mntner: XUU-MNT
descr: Xuu Maintenance Object
admin-c: SOURIS-DN42
tech-c: SOURIS-DN42
mnt-by: XUU-MNT
source: DN42
"""
)),
("DN42-MNT", clean(
r"""
mntner: DN42-MNT
descr: mntner for owning objects in the name of whole dn42.
mnt-by: DN42-MNT
source: DN42
""" # noqa: E501
)),
("DN42", clean(
r"""
registry: DN42
url: https://git.dn42.us/dn42/registry
mnt-by: DN42-MNT
source: DN42
"""
)),
]
class TestSchema(unittest.TestCase):
"""Test SchemaDOM
"""
def test_schema_parse(self):
"""Test schema parsing
"""
d = FileDOM(src="schema/SCHEMA-SCHEMA")
d.parse(test_files[0][1].splitlines())
self.assertEqual(str(d), test_files[0][1])
self.assertTrue(d.valid)
s = SchemaDOM()
s.parse(d)
self.assertTrue(s.valid)
state = s.check_file(d)
self.assertTrue(state)
state.print()
def test_schema_all(self):
"""Test schema failures
"""
schemas = {}
for (fname, text) in {
row for row in test_files if row[0].endswith("-SCHEMA")}:
dom = FileDOM(src=fname)
dom.parse(text.splitlines())
schema = SchemaDOM()
schema.parse(dom)
self.assertTrue(schema.valid)
schemas[schema.ref] = schema
files = []
idx = {}
for (fname, text) in test_files:
dom = FileDOM(src=fname)
dom.parse(text.splitlines())
self.assertTrue(dom.valid)
self.assertEqual(str(dom), text)
files.append(dom)
key, value = dom.index
idx[key] = value
for dom in files:
s = schemas[f"{dom.ns}.{dom.schema}"]
state = s.check_file(dom, idx)
self.assertTrue(state)
state.print()

View File

@ -1,193 +0,0 @@
"Test TransactDOM"
import unittest
import inspect
from .transact import TransactDOM
def clean(s: str) -> str:
"Clean input for use"
return inspect.cleandoc(s) + "\n"
dn42_mnt_file = clean("""
.BEGIN DN42-MNT
schema: SCHEMA-SCHEMA
ref: dn42.schema
key: schema required single primary schema > [name]
key: ref required single > [schema]
key: key required multiple > [key-name]
{required|optional|recommend|deprecate}
{single|multiple} {primary|} {schema|}
lookup=str '>' [spec]...
key: mnt-by required multiple lookup=dn42.mntner > [mntner]
key: remarks optional multiple > [text]...
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
...
schema: INETNUM-SCHEMA
ref: dn42.inetnum
key: inetnum required single schema
key: cidr required single primary
key: netname required single
key: nserver optional multiple > [domain-name]
key: country optional multiple
key: descr optional single
key: status optional single > {ALLOCATED|ASSIGNED} {PI|PA|}
key: policy optional single > {open|closed|ask|reserved}
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: zone-c optional multiple lookup=dn42.person,dn42.role
key: mnt-by optional multiple lookup=dn42.mntner
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
...
schema: ROLE-SCHEMA
ref: dn42.role
key: role required single schema
key: nic-hdl required single primary
key: mnt-by required multiple lookup=dn42.mntner
key: org optional multiple lookup=dn42.organisation
key: admin-c optional multiple lookup=dn42.person
key: tech-c optional multiple lookup=dn42.person
key: abuse-c optional multiple lookup=dn42.person
key: abuse-mailbox optional multiple
key: descr optional single
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
...
schema: PERSON-SCHEMA
ref: dn42.person
key: person required single schema
key: nic-hdl required single primary
key: mnt-by required multiple lookup=dn42.mntner
key: org optional multiple lookup=dn42.organisation
key: nick optional multiple
key: pgp-fingerprint optional multiple
key: www optional multiple
key: e-mail optional multiple
key: contact optional multiple
key: abuse-mailbox optional multiple
key: phone optional multiple
key: fax-no optional multiple
key: address optional multiple
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
...
schema: MNTNER-SCHEMA
ref: dn42.mntner
key: mntner required single primary schema
key: descr optional single
key: mnt-by required multiple lookup=dn42.mntner
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: auth optional multiple > [method] [value]...
key: org optional multiple lookup=dn42.organisation
key: abuse-mailbox optional single
key: remarks optional multiple
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
...
schema: REGISTRY-SCHEMA
ref: dn42.registry
key: registry required single primary schema
key: url required multiple
key: descr optional multiple
key: mnt-by required multiple lookup=dn42.mntner
key: admin-c optional multiple lookup=dn42.person,dn42.role
key: tech-c optional multiple lookup=dn42.person,dn42.role
key: source required single lookup=dn42.registry
mnt-by: DN42-MNT
source: DN42
...
mntner: DN42-MNT
descr: mntner for owning objects in the name of whole dn42.
mnt-by: DN42-MNT
source: DN42
...
inetnum: 0.0.0.0 - 255.255.255.255
cidr: 0.0.0.0/0
netname: NET-BLK0-DN42
policy: open
descr: * The entire IPv4 address space
mnt-by: DN42-MNT
status: ALLOCATED
source: DN42
...
registry: DN42
url: https://git.dn42.us/dn42/registry
mnt-by: DN42-MNT
source: DN42
.END
""") # noqa E501
xuu_mnt_file = clean("""
.BEGIN XUU-MNT
.DELETE person XUU-DN42
inetnum: 172.21.64.0 - 172.21.64.7
cidr: 172.21.64.0/29
netname: XUU-TEST-NET
descr: Xuu TestNet
country: US
admin-c: SOURIS-DN42
tech-c: SOURIS-DN42
mnt-by: XUU-MNT
nserver: lavana.sjc.xuu.dn42
nserver: kapha.mtr.xuu.dn42
nserver: rishi.bre.xuu.dn42
status: ALLOCATED
remarks: This is a transfernet.
source: DN42
...
role: Souris Organization Role
abuse-mailbox: abuse@sour.is
admin-c: XUU-DN42
tech-c: XUU-DN42
nic-hdl: SOURIS-DN42
mnt-by: XUU-MNT
source: DN42
...
person: Xuu
remarks: test
contact: xmpp:xuu@xmpp.dn42
contact: mail:xuu@dn42.us
pgp-fingerprint: 20AE2F310A74EA7CEC3AE69F8B3B0604F164E04F
nic-hdl: XUU-DN42
mnt-by: XUU-MNT
source: DN42
.
mntner: XUU-MNT
descr: Xuu Maintenance Object
admin-c: SOURIS-DN42
tech-c: SOURIS-DN42
mnt-by: XUU-MNT
source: DN42
.END
""") # noqa E501
class TestTransactDOM(unittest.TestCase):
"test TransactDOM"
def test_transact_parse(self):
"test tranact parsing"
ts = TransactDOM(text=dn42_mnt_file.splitlines())
assert ts.mntner == "DN42-MNT"
assert len(ts.files) == 9
ts = TransactDOM(text=xuu_mnt_file.splitlines())
assert ts.mntner == "XUU-MNT"
assert len(ts.files) == 4
assert len(ts.delete) == 1
assert ts.delete[0] == ("person", "XUU-DN42")

View File

@ -1,74 +0,0 @@
"TransactDOM"
from typing import Sequence, List, Optional, Tuple, TypeVar
from .file import FileDOM
from .schema import SchemaDOM
DOM = TypeVar("DOM", bound="TransactDOM")
class TransactDOM():
"""Parses a transaction file"""
def __init__(self,
text: Optional[Sequence[str]] = None):
self.valid = False
self.files = [] # type: List[FileDOM]
self.schemas = []
self.delete = [] # type: List[Tuple[str, str]]
self.mntner = None # type: Optional[str]
if text is not None:
self.parse(text)
def parse(self, text: Sequence[str]):
"parse text"
buffer = [] # type: List[str]
for (i, line) in enumerate(text, 1):
_ = i
if self.mntner is None:
if not line.startswith(".BEGIN"):
continue
fields = line.split()
if len(fields) < 2:
continue
self.mntner = fields[1]
continue
if line.startswith("."):
if len(buffer) > 0:
dom = FileDOM(text=buffer)
buffer = []
if dom.valid:
self.files.append(dom)
if dom.schema == 'schema':
self.schemas.append(SchemaDOM(dom))
if line.startswith(".DELETE"):
sp = line.split()
if len(sp) > 2:
self.delete.append((sp[1], sp[2]))
continue
buffer.append(line)
def __str__(self) -> str:
s = f".BEGIN {self.mntner}\n"
s += "\n".join({f"DELETE {i}" for i in self.delete})
s += "...\n".join({str(record) for record in self.files})
s += ".END"
return s
@staticmethod
def from_file(src: str) -> DOM:
"Read transact from files"
with open(src) as f:
return TransactDOM(f.readlines())

View File

@ -1,45 +0,0 @@
"DN42 Utils"
import os.path
from typing import List, Tuple
def remove_prefix(text, prefix):
"remove the prefix"
if text.startswith(prefix):
return text[len(prefix):]
return text
def shift(args: List[str]) -> Tuple[str, List[str]]:
"shift off first arg + rest"
if len(args) == 0:
return None, []
if len(args) == 1:
return args[0], []
return args[0], args[1:]
def find_rpsl(path: str) -> str:
"Find the root directory for RPSL"
path = os.path.abspath(path)
rpsl = os.path.join(path, ".rpsl")
while not os.path.exists(rpsl):
if path == "/":
break
path = os.path.dirname(path)
rpsl = os.path.join(path, ".rpsl")
if not os.path.exists(rpsl):
return None
return path
def exists(*args: str) -> bool:
"check if files exist"
for i in args:
if not os.path.exists(i):
return False
return True

View File

@ -1,206 +0,0 @@
"""Simple Logger"""
from __future__ import print_function
import os
import sys
import inspect
import datetime
import traceback
from enum import IntEnum
OUTPUT = sys.stderr
LEVEL = ["CRIT", "ERR ", "WARN", "NOTE", "INFO", "DBUG", "...."]
CLEVEL = ["\x1B[41mCRIT\x1B[0m",
"\x1B[31mERR \x1B[0m",
"\x1B[33mWARN\x1B[0m",
"\x1B[32mNOTE\x1B[0m",
"\x1B[34mINFO\x1B[0m",
"\x1B[90mDBUG\x1B[0m",
"\x1B[90m....\x1B[0m"]
MSG = "{0} {1} {2} {3} {4} {5} :: {6}"
CMSG = "[{1}]\x1B[90m {2} {3}:{5} [{4}]\x1B[0m {6}\x1B[0m"
CMULTI = "[{1}]\x1B[90m {2}\x1B[0m"
class Level(IntEnum):
"""Log Level enumeration"""
VERB_CRITICAL = 0
VERB_ERROR = 1
VERB_WARN = 2
VERB_NOTICE = 3
VERB_INFO = 4
VERB_DEBUG = 5
VERB_NONE = -1
class Log:
"""Logger"""
log_dir = ""
log_pfx = "main"
level_console = Level.VERB_ERROR
level_file = Level.VERB_NONE
level_full = False
count = [0, 0, 0, 0, 0, 0]
def __init__(self):
self.prog_name = sys.argv[0].rsplit("/", 1)[-1]
self.prog_name = self.prog_name.split(".", 1)[0]
self.log_pfx = self.prog_name
def __del__(self):
if self.level_console >= 5:
crit, err, warn, note, inf, dbug = tuple(self.count)
os.write(1, "[\x1B[90m\x1B[90mDBUG\x1B[90m] Log Counters" +
f" crit:{crit}" +
f" err:{err}" +
f" warn: {warn}" +
f" note: {note}" +
f" info: {inf}" +
f" dbug: {dbug}\x1B[0m\n")
def set_dir(self, name: str):
"""Set output directory"""
if not os.path.isdir(name):
os.makedirs(name)
self.log_dir = name
def output(self, level: Level, message: str, frame=1):
"""Write a message to console or log, conditionally."""
if level < 0 or level > 5:
level = 5
self.count[level] += 1
# function_name = inspect.stack()[1][3]
cur_date = datetime.datetime.now()
(frame, file, ln, fn, _, _) = inspect.getouterframes(
inspect.currentframe())[frame]
message = str(message).split("\n")
cmsg = CMSG if self.level_full else CMULTI
if self.level_console >= level:
if len(message) == 1:
if self.level_full:
arg = (str(cur_date),
CLEVEL[level],
self.prog_name,
file, fn, ln, message[0])
else:
arg = str(cur_date), CLEVEL[level], message[0]
print(cmsg.format(*arg), file=OUTPUT)
else:
if self.level_full:
arg = str(cur_date), CLEVEL[
level], self.prog_name, file, fn, ln, ""
print(cmsg.format(*arg), file=OUTPUT)
for line in message:
print(CMULTI.format(str(cur_date),
CLEVEL[Level.VERB_NONE], line),
file=OUTPUT)
if self.level_file >= level:
self.set_dir("./logs")
log_file_name = os.path.join(
self.log_dir,
self.log_pfx + str(cur_date.strftime('%Y-%m-%d')) + ".txt")
with open(log_file_name, "a") as logger:
logger.write(MSG.format(str(cur_date),
LEVEL[level],
self.prog_name,
file, fn, ln, message[0]) + "\n")
for line in message[1:]:
logger.write(MSG.format(str(cur_date),
LEVEL[Level.VERB_NONE],
self.prog_name,
file, fn, ln, line) + "\n")
def fatal(self, message: str):
"""Log a fatal error"""
self.output(Level.VERB_CRITICAL, message, 2)
sys.exit(1)
def critical(self, message: str):
"""Log a critical error"""
self.output(Level.VERB_CRITICAL, message, 2)
def error(self, message: str):
"""Log a normal error"""
self.output(Level.VERB_ERROR, message, 2)
def warning(self, message: str):
"""Log a warning"""
self.output(Level.VERB_WARN, message, 2)
def notice(self, message: str):
"""Log a notice"""
self.output(Level.VERB_NOTICE, message, 2)
def info(self, message: str):
"""Log an informational"""
self.output(Level.VERB_INFO, message, 2)
def debug(self, message: str):
"""Log a debug"""
self.output(Level.VERB_DEBUG, message, 2)
default = Log()
fatal = default.fatal
critical = default.critical
error = default.error
warning = default.warning
notice = default.notice
info = default.info
debug = default.debug
class LogException:
"""Catches an exception to log it"""
stop = None
def __init__(self, stop: bool = True):
self.stop = stop
def __enter__(self, stop: bool = True):
pass
def __exit__(self, exc_type, value, trace) -> bool:
if exc_type is None:
return True
if exc_type is SystemExit and value.args == (0,):
return True
log_string, _ = fmt_exception(exc_type, value, trace)
default.output(Level.VERB_CRITICAL, 'Failure\n\n' + log_string, 2)
if self.stop is False:
return False
fatal("ABORTING EXECUTION")
return False
def fmt_exception(exc_type, exc_value, exc_traceback):
"""format exception to string"""
lines = traceback.format_exception(exc_type, exc_value, exc_traceback)
log_string = ''.join(line for line in lines)
email_string = ''.join('<br />' + line for line in lines)
return log_string, email_string
exception = LogException

View File

@ -1,82 +0,0 @@
"""rpsl a tool for managing RPSL databases
==========================================
Usage: rpsl [command] [options]
rpsl help [command]
"""
import os
import sys
from typing import Optional
import importlib
import pkgutil
from dn42.utils import find_rpsl, remove_prefix, shift
discovered_plugins = {
remove_prefix(name, "rpsl_"): importlib.import_module(name)
for finder, name, ispkg
in pkgutil.iter_modules()
if name.startswith("rpsl_")
}
def do_help(cmd: Optional[str] = None):
"Print Help and exit"
print(__doc__, file=sys.stderr)
if cmd is None:
print("Available commands:", file=sys.stderr)
for pkg in discovered_plugins.keys():
print(f" - {pkg}", file=sys.stderr)
return 0
if cmd not in discovered_plugins:
print(f"Command not found: {cmd}", file=sys.stderr)
return 1
print(discovered_plugins[cmd].__doc__, file=sys.stderr)
return 0
def run() -> int:
"run application command"
_, args = shift(sys.argv) # drop exec name
cmd, args = shift(args)
working_dir = os.getcwd()
working_dir = os.environ.get("WORKING_DIR", working_dir)
prog_dir = os.path.dirname(os.path.realpath(__file__))
rpsl_dir = os.environ.get("RPSL_DIR", working_dir)
rpsl_dir = find_rpsl(rpsl_dir)
if cmd is None or cmd == 'help':
cmd, _ = shift(args)
return do_help(cmd)
if cmd not in discovered_plugins:
print(f"Unsupported Command: {cmd}")
return 1
pkg = discovered_plugins[cmd]
if 'run' not in dir(pkg):
print(f"Command {cmd} is not compatible with rspl.", file=sys.stderr)
return 1
return pkg.run(args, {
"WORKING_DIR": working_dir,
"BIN_DIR": prog_dir,
"RPSL_DIR": rpsl_dir,
})
if __name__ == '__main__':
code = run()
sys.exit(code)

View File

@ -1,9 +0,0 @@
#!/usr/bin/env python3
"run main code"
import sys
from main import run
if __name__ == '__main__':
code = run()
sys.exit(code)

View File

@ -1,170 +0,0 @@
"""RSPL Build Indexes
=====================
Usage: rspl index
"""
import os
import sys
from typing import Dict, Generator, List, Set, Tuple, Sequence
from dn42.rpsl import FileDOM, SchemaDOM, TransactDOM, NetTree, \
NetRecord, Config, index_files
from dn42.utils import remove_prefix
def run(args: List[str], env: Dict[str, str]) -> int:
"rspl index"
_ = args
path = env.get("RPSL_DIR")
if path is None:
print("RPSL directory not found. do `rpsl init` or set RPSL_DIR",
file=sys.stderr)
return 1
config = Config.from_path(path)
if not os.path.exists(config.config_file):
print("RPSL config files not found. do `rpsl init`?", file=sys.stderr)
return 1
if not os.path.isdir(config.schema_dir):
print("schema directory not found in path", file=sys.stderr)
sys.exit(1)
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
idx = index_files(path,
namespace=config.namespace,
primary_keys=config.primary_keys)
lookup, schemas, files, nets, routes = build_index(idx, rspl=config)
print(
f"Reading Files: done! files: {len(files)}" +
f" schemas: {len(schemas)}" +
f" networks: {len(nets)}",
f" routes: {len(routes)}",
file=sys.stderr)
print("Writing .rpsl/index", file=sys.stderr)
with open(".rpsl/index", 'w') as out:
print("Writing .rpsl/links", file=sys.stderr)
with open(".rpsl/links", 'w') as link_out:
for dom in files:
s = schemas.get(dom.rel)
if s is None:
print(
f"{dom.src} schema not found for {dom.rel}",
file=sys.stderr)
continue
primary, mntner = dom.get(s.primary), ",".join(dom.mntner)
_ = mntner
src = remove_prefix(dom.src, path+os.sep)
print(dom.rel, primary, src, # mntner,
sep="|", file=out)
for (link, rel, d) in generate_links(dom, s.links, lookup):
print(f"{dom.rel}|{dom.name}|{link}|{rel}|{d}",
file=link_out)
print("Generate .rpsl/nettree", file=sys.stderr)
tree = NetTree(nets, routes)
print("Writing .rpsl/nettree", file=sys.stderr)
tree.write_csv(".rpsl/nettree")
print("Writing .rpsl/schema", file=sys.stderr)
s = TransactDOM()
s.mntner = "DN42-MNT"
s.files = schemas.values()
with open(".rpsl/schema", "w") as out:
print(s, file=out)
print("done.", file=sys.stderr)
return 0
class NotRPSLPath(Exception):
"error raised if unable to determine RPSL root"
def build_index(
idx: Sequence[FileDOM],
rspl: Config,
) -> Tuple[
Set[Tuple[str, str]],
Dict[str, SchemaDOM],
List[FileDOM],
List[NetRecord]]:
"build index for files"
lookup = set() # type: Set[Tuple[str, str]]
schemas = {} # type: Dict[str, SchemaDOM]
files = [] # type: List[FileDOM]
nets = [] # type: List[NetRecord]
routes = [] # type: List[NetRecord]
print(r"Reading Files...", end="\r", flush=True, file=sys.stderr)
net_types = rspl.network_parents
net_leafs = rspl.network_children
for (i, dom) in enumerate(idx):
if not dom.valid:
print("E", end="", flush=True)
continue
key = dom.index
lookup.add(key)
files.append(dom)
if dom.schema == rspl.schema:
schema = SchemaDOM(dom)
schemas[schema.ref] = schema
if dom.schema in net_types:
nets.append(NetRecord(
dom.get("cidr").as_net6,
dom.get("policy", default="closed"),
dom.get("status", default="ASSIGNED"),
))
if dom.schema in net_leafs:
routes.append(NetRecord(
dom.get(dom.primary_key).as_net6,
dom.get("policy", default="none"),
dom.get("status", default="none"),
True,
))
if i % 120 == 0:
print(
f"Reading Files: files: {len(files)}" +
f" schemas: {len(schemas)} " +
f" networks: {len(nets)}",
f" routes: {len(routes)}",
end="\r", flush=True, file=sys.stderr)
return (lookup, schemas, files, nets, routes)
def generate_links(
dom: FileDOM,
links: Dict[str, List[str]],
lookup: Set[Tuple[str, str]]
) -> Generator[Tuple[str, str, str], None, None]:
"print file links out to file"
for (link, refs) in links.items():
for d in dom.get_all(link):
found = False
for ref in refs:
if (ref, d.value) in lookup:
found = True
yield (link, ref, d)
if not found:
print(f"{dom.name} missing link {link} {d.value}")

View File

@ -1,103 +0,0 @@
"""RSPL Initialize data store
=============================
Usage: rspl init [options]
Options:
--namespace=<ns> Namespace (default: current working dir name)
--schema=<schema> Schema (default: schema)
--owners=<mntner> Owner (default: mntner)
--default-owner=<mnt> Default Owner (default: DN42-MNT)
--source=<src> Source (default: DN42)
--force Force creation of config
"""
import sys
import os.path
import argparse
from typing import List, Dict, Generator, Tuple, Set, TypeVar
from dn42.rpsl import Config, FileDOM, SchemaDOM
import rpsl_index
Group = TypeVar("Group", set, tuple)
parser = argparse.ArgumentParser()
parser.add_argument("--namespace", type=str, default=None)
parser.add_argument("--owners", type=str, default="mntner")
parser.add_argument("--schema", type=str, default="schema")
parser.add_argument("--force", action='store_true')
def run(args: List[str], env: Dict[str, str]) -> int:
"rspl init"
opts = parser.parse_args(args)
if opts.namespace is None:
opts.namespace = os.path.basename(env.get("WORKING_DIR"))
rpsl_dir = env.get("RPSL_DIR")
if rpsl_dir is not None and not opts.force:
print(f"RPSL database already initialized! Found in: {rpsl_dir}")
return 1
rpsl_dir = env.get("WORKING_DIR")
schema_dir = os.path.join(rpsl_dir, opts.schema)
network_owners, primary_keys, dir_name = {}, {}, {}
if os.path.exists(schema_dir):
ns, network_owners, primary_keys, dir_name = \
_parse_schema(schema_dir, opts.namespace)
rpsl = Config.build(path=rpsl_dir,
namespace=ns,
schema=opts.schema,
owners=opts.owners,
dir_name=dir_name,
network_owners=network_owners,
primary_keys=primary_keys)
os.makedirs(os.path.dirname(rpsl.config_file), exist_ok=True)
with open(rpsl.config_file, "w") as f:
print(rpsl, file=f)
print(f"Created: {rpsl.config_file}", file=sys.stderr)
env["RPSL_DIR"] = rpsl_dir
rpsl_index.run(args, env)
return 0
def _read_schemas(path: str) -> Generator[SchemaDOM, None, None]:
for root, _, files in os.walk(path):
for f in files:
dom = FileDOM.from_file(os.path.join(root, f))
schema = SchemaDOM(dom)
yield schema
def _parse_schema(path: str, ns: str) -> Tuple[str, Group, Group, Group]:
schemas = _read_schemas(path)
namespace = ns
network_owner = set() # type: Set[str, str]
primary_key = set() # type: Set[str, str]
dir_name = set() # type: Set[str, str]
for s in schemas:
if s.type == "schema":
if s.namespace != namespace:
namespace = s.namespace
for i in s.dom.get_all("network-owner"):
network_owner.add((s.type, i.value))
d = s.dom.get("dir-name")
if d is not None:
dir_name.add((s.type, d.value))
if s.primary != s.type:
primary_key.add((s.type, s.primary))
return namespace, network_owner, primary_key, dir_name

View File

@ -1,66 +0,0 @@
"""RSPL Scan
============
Usage: rspl scan [options]
Options:
--scan-dir=<dir> Scan given directory
--scan-file=<file> Scan given file
--add-index Add scanned items to lookup table
"""
import os
import sys
import argparse
from typing import List, Dict
from dn42.rpsl import RPSL, Config, TransactDOM, index_files
parser = argparse.ArgumentParser()
parser.add_argument("--add-index", action='store_true')
parser.add_argument("--scan-dir", type=str, default=None)
parser.add_argument("--scan-file", type=str, default=None)
def run(args: List[str], env: Dict[str, str]) -> int:
"""run scan script"""
opts = parser.parse_args(args)
path = env.get("RPSL_DIR")
if path is None:
print("RPSL directory not found. do `rpsl init` or set RPSL_DIR",
file=sys.stderr)
return 1
config = Config.from_path(path)
if not os.path.exists(config.index_file) or \
not os.path.exists(config.schema_file):
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
rpsl = RPSL(config)
files = _file_gen(path, opts, wd=env.get("WORKING_DIR"), config=config)
if opts.add_index:
files, g = [], files
print("Add scanned items to lookup index...", file=sys.stderr)
for dom in g:
files.append(dom)
rpsl.append_index(dom)
print("Scanning files...", file=sys.stderr)
status = rpsl.scan_files(files)
status.print_msgs()
print(status)
return 0 if status else 1
def _file_gen(path, opts: argparse.Namespace, wd: str, config: Config):
if opts.scan_dir is not None:
path = os.path.join(wd, opts.scan_dir)
elif opts.scan_file is not None:
path = os.path.join(wd, opts.scan_file)
return TransactDOM.from_file(path).files
return index_files(path, config.namespace, config.primary_keys)

View File

@ -1,12 +0,0 @@
"""RSPL Status
==============
"""
from typing import List, Dict
def run(args: List[str], env: Dict[str, str]) -> int:
"do run"
print("RUN STATUS", args, env)
return 0

View File

@ -1,86 +0,0 @@
"""RSPL Whois Search
====================
Usage: rpsl whois [text]
"""
import sys
from itertools import chain
from typing import List, Dict, Optional, Set, Tuple
from dn42.rpsl import RPSL, Config, FileDOM, as_net6
from dn42.utils import shift, exists
def run(args: List[str], env: Dict[str, str]) -> int:
"do whois search"
if len(args) == 0:
print("Usage: rpsl whois [text]")
rpsl_dir = env.get("RPSL_DIR")
if rpsl_dir is None:
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
config = Config.from_path(rpsl_dir)
if not exists(config.index_file,
config.schema_file,
config.links_file):
print("RPSL index files not found. do `rpsl index`?", file=sys.stderr)
return 1
rpsl = RPSL(config)
schema = None # type: Optional[str]
text, args = shift(args)
if len(args) > 0:
schema = text
text, args = shift(args)
ip = None
try:
ip = as_net6(text)
except ValueError:
pass
principle = [] # type: List[FileDOM]
related_nets = [] # type: List[FileDOM]
related_idx = set() # type: Set[Tuple[str, str]]
if ip is not None:
print(f"# Searching network {text}...")
nets = list(rpsl.find_network(text))
last_net = nets[-1]
dom = rpsl.load_file(str(last_net.net))
principle.append(dom)
related_idx.add(dom.index)
ok, route = last_net.in_routes(ip)
if ok:
dom = rpsl.load_file(str(route))
principle.append(dom)
related_idx.add(dom.index)
for net in nets[:-1]:
dom = rpsl.load_file(str(net.net))
related_nets.append(dom)
else:
for dom in rpsl.find(text, schema):
principle.append(dom)
related_idx.add(dom.index)
print("# Found objects")
for dom in principle:
print(dom)
if len(related_nets) > 0:
print("# Related Networks")
for dom in related_nets:
print(dom)
print("# Related objects")
lis = set(chain.from_iterable(rpsl.related(i) for i in related_idx))
for dom in rpsl.load_files(sorted(lis)):
print(dom)
return 0