merge from previous repos

This commit is contained in:
Ventilaar 2022-12-22 15:12:24 +01:00
parent b36008042d
commit c6aa41e273
15 changed files with 23574 additions and 0 deletions

19
custom-shell/README.md Normal file
View File

@ -0,0 +1,19 @@
# Testing an alternative shell in python
- Modify and install shell.py into system and make it executable. Make sure python3 and required packages are installed.
- Add user to system with minimal permissions, example /etc/passwd `admin:x:1000:100::/jail:/shell`
- Add user to group for easier management, example /etc/group `configusers:x:1000:admin`
- Edit sshd to meet security standards, example options:
```
Match Group configusers
AllowAgentForwarding no
AllowTcpForwarding no
X11Forwarding no
PermitTTY yes
PrintMotd no
PrintLastLog no
TCPKeepAlive no
Banner none
MaxAuthTries 3
MaxSessions 1
```

161
custom-shell/shell.py Normal file
View File

@ -0,0 +1,161 @@
#!/usr/bin/env python3
import cmd, os, sys, time
from argon2 import PasswordHasher # PIP: argon2-cffi
sys.tracebacklimit = 0 # disable tracebacks to hide potential secrets
ph = PasswordHasher() # setup password hasher object with default parameters
hash = "$argon2id$v=19$m=65536,t=32,p=4$OvFXwX3buE9xYLKPGs2NOA$UEtxEwOI8VZ9WwWMGaySAhBdgP+zuLJIzCz4+Xt1YDw" example hash
peers = {'CORE1': {'endpoint': '1.1.1.1', 'status': True}, 'CORE2': {'endpoint': '1.1.1.2', 'status': False}} # temporary testing dict
# Default root
class RootShell(cmd.Cmd):
intro = f'Welcome {os.getlogin()} to my own shell for configuration. Type help or ? to list nodes\n'
prompt = '(root) '
# Shell settings
def default(self, arg):
""" Return friendly string instead of bad syntax """
print(f'Node does not exist, type help or ? to list commands.')
def emptyline(self):
""" Return empty """
pass
# Program functions
def do_peer(self, arg):
"""
Enter Wireguard peer configuration mode: PEER
"""
Peer().cmdloop() # execute Peer() subshell
def do_benchmark(self, arg):
"""
Benchmark Argon2id: BENCHMARk
"""
benchmark()
def do_exit(self, arg):
"""
Exit and close session: EXIT
"""
print('Goodbye')
return True
# Example subshell
class Peer(cmd.Cmd):
intro = 'Now entering Wireguard peer configuration mode'
prompt = '(root/peer) '
# Shell settings
def default(self, arg):
""" Return friendly string instead of bad syntax """
print(f'Command does not exist, type help or ? to list commands.')
def emptyline(self):
""" Return empty """
pass
# Program functions
def do_list(self, arg):
"""
List peers that are setup with status: LIST
"""
for x in peers:
print(f'\n{x}')
print(9*'=')
print(f"Status: {peers[x]['status']}")
print(f"Endpoint: {peers[x]['endpoint']}")
print(9*'=')
def do_add(self, arg):
"""
Add peer configuration menu, make sure you specify a connection name which must me alphanumeric: ADD Conn1
"""
if not arg:
print('You need to specify a connection name')
return
elif not arg.isalnum():
print('Name contains characters that are not alphanumeric')
return
print(f'Adding new connection under name: {arg}')
try:
endpoint = input('Endpoint(255.255.255.255): ')
except KeyboardInterrupt:
print('Aborting')
return
peers[arg] = {'endpoint': endpoint, 'status': False}
print('Success')
def do_delete(self, arg):
"""
Delete peer connection: DELETE Conn1
"""
if not arg:
print('You need to specify a connection name')
return
elif not arg.isalnum():
print('Name contains characters that are not alphanumeric')
return
elif arg not in peers:
print('Connection does not exist')
return
peers.pop(arg)
print('Success')
def do_exit(self, arg):
"""
Move up a level to root: EXIT
"""
print("Movin' up")
return True
# Login function, this is really insecure do not use, this was for argon2 testing
def login():
n = 3
while n:
i = input('Please enter your unique key:\n')
try:
ph.verify(hash, i)
valid = True
except:
n -= 1
continue
if ph.check_needs_rehash(hash):
print('This key needs to be rehashed, please standby...')
time.sleep(1)
if valid:
return True
return False
# Quick argon2 benchmark for hasing speed
def benchmark():
for x in range(2, 9, 2): # for 2, 4, 6, 8 cores
for n in range(2, 33, 2): # for 2, 4, 6 ... 30, 32 time cost
start = time.time()
p = PasswordHasher(time_cost=n, parallelism=x, memory_cost=65536)
p.hash('')
end = time.time()
print(f'Argon2id p={x}, t={n}, time={end-start}')
if __name__ == '__main__':
#login
if not login():
print('Please return when you have the correct key')
exit(0)
try:
RootShell().cmdloop()
except KeyboardInterrupt:
print('\nGoodbye')
exit(0)
except:
print('An exception ocurred in shell level, please report this to the hostmaster')
exit(1)

22
fido2-hmac/README.md Normal file
View File

@ -0,0 +1,22 @@
# Required packages
```
fido2
```
# What it does
This script is a simple shell menu for working with fido2 hmac-secrets.
When you run it you get 3 options.
- list
- create
- sign
This script does not work with resident keys so a seperate file called keys.json will be created to store the key id's.
It is mostly optimized for Windows but does not work with the Webauthn API so you need to run with administrator privileges to contact the CTAP api.
# hmac-secret what is it?
This extention to standard FIDO2 (I believe it's required but am not sure) supports shared secrets for use with offline devices like password vaults or disk encryption(cryptenroll).
## How does it work
- You create a credential on the key as usual with a user and relying party but with the extension enabled. Now you have a secret key stored on the device which cannot leave it.
- more text here

197
fido2-hmac/hmacfido.py Normal file
View File

@ -0,0 +1,197 @@
from fido2.hid import CtapHidDevice
from fido2.client import Fido2Client, UserInteraction, WindowsClient
from getpass import getpass
import sys, os ,ctypes, time, cmd, json
# Handle user interaction
class CliInteraction(UserInteraction):
def prompt_up(self):
print("\nTouch your authenticator device now...\n")
def request_pin(self, permissions, rd_id):
return getpass("Enter PIN: ")
def request_uv(self, permissions, rd_id):
print("User Verification required.")
return True
def setup_windows():
if WindowsClient.is_available() and not ctypes.windll.shell32.IsUserAnAdmin():
# Use the Windows WebAuthn API if available, and we're not running as admin
client = WindowsClient("https://storingsecrets.fido2")
if "hmac-secret" in client.info.extensions:
return client
else:
return None
def setup_pcsc():
elevate()
# Import PCSC
try:
from fido2.pcsc import CtapPcscDevice
except ImportError:
CtapPcscDevice = None
# List devices
def enumerate_devices():
for dev in CtapHidDevice.list_devices():
yield dev
if CtapPcscDevice:
for dev in CtapPcscDevice.list_devices():
yield dev
# Locate first device, prefer ctap
for dev in enumerate_devices():
client = Fido2Client(dev, "https://storingsecrets.fido2", user_interaction=CliInteraction())
if "hmac-secret" in client.info.extensions:
return client
else:
return None
def elevate():
if ctypes.windll.shell32.IsUserAnAdmin():
pass
else:
print('Windows API not available for hmac-secret, elevating program to use direct CTAP. Please follow instructions in new window')
ctypes.windll.shell32.ShellExecuteW(None, "runas", sys.executable, " ".join(sys.argv), None, 1)
exit(0)
def challenge(bytes=32):
return os.urandom(bytes)
def setup():
client = setup_windows()
if client:
return client
client = setup_pcsc()
if client:
return client
return None
def load_keys():
try:
with open('keys.json', 'r') as file:
keys = json.load(file)['fido_keys']
return keys
except FileNotFoundError:
with open('keys.json', 'w') as file:
json.dump({'fido_keys': []}, file)
return []
def append_key(user):
keys = load_keys()
keys.append(user)
writethis = {}
writethis['fido_keys'] = keys
with open('keys.json', 'w') as file:
json.dump(writethis, file)
class RootShell(cmd.Cmd):
intro = f'Welcome {os.getlogin()} to my own shell for configuration. Type help or ? to list nodes\n'
prompt = '(root) '
# Shell settings
def default(self, arg):
""" Return friendly string instead of bad syntax """
print(f'Node does not exist, type help or ? to list commands.')
def emptyline(self):
""" Return empty """
pass
# Program functions
def do_list(self, arg):
"""
List available keys: LIST
"""
keys = load_keys()
if len(keys) == 0:
print('No available keys')
return None
print("{:<8} {:<30}".format('Nr', 'Short name'))
for n, v in enumerate(keys):
print("{:<8} {:<30}".format(n, v['name']))
def do_create(self, arg):
"""
Create new key: CREATE
"""
usage = input('What is the purpose of this key: ')
user = {"id": challenge(8), "name": str(usage)}
result = client.make_credential(
{
"rp": rp,
"user": user,
"challenge": challenge(),
"pubKeyCredParams": [{"type": "public-key", "alg": -7}],
"extensions": {"hmacCreateSecret": True},
},)
if not result.extension_results.get("hmacCreateSecret"):
print("Failed to create credential with HmacSecret")
return None
credential = result.attestation_object.auth_data.credential_data
append_key({'kid': credential.credential_id.hex(), 'name': usage, 'salt': challenge().hex()})
print("New credential stored")
print(f"KID (HEX): {credential.credential_id.hex()}")
def do_sign(self, arg):
"""
Perform key exchange for shared key: READ
"""
self.do_list(None)
usage = input('Which key should we use (Nr): ')
key = load_keys()[int(usage)]
allow_list = [{"type": "public-key", "id": bytes.fromhex(key['kid'])}]
# Authenticate the credential
result = client.get_assertion(
{
"rpId": rp["id"],
"challenge": challenge(),
"allowCredentials": allow_list,
"userVerification": 'required',
"extensions": {"hmacGetSecret": {"salt1": bytes.fromhex(key['salt'])}},
},
).get_response(0)
output1 = result.extension_results["hmacGetSecret"]["output1"]
print(f"Authenticated secret (HEX): {output1.hex()}")
def do_exit(self, arg):
"""
Exit and close session: EXIT
"""
print('Goodbye')
return True
if __name__ == '__main__':
client = setup()
rp = {"id": "storingsecrets.fido2", "name": "localhost"}
try:
RootShell().cmdloop()
except KeyboardInterrupt:
print('\nGoodbye')
exit(0)
#except:
# print('Something went wrong')

View File

@ -0,0 +1,10 @@
# Doel
Het handmatig overkopieeren van het uurlijks rapport van AS400 naar de excel sheet is te veel werk.
Dit zorgt ervoor dat als je de juiste velden kopieert, je de juiste output krijgt voor een excel plak operatie.
# Gebruik
Start makkelijk_rapport.exe of de py variant als je python hebt geinstalleerd en de exe niet vertrouwd.
Als je de juiste velden hebt gekopieerd(zie hieronder) hoor je een bel, dit betekent dat de gekopieerde velden over zijn gezet naar de juiste formaat om te plakken.
Je moet beginnen met kopieeren bij gebied BLK waaronder je de regel nummer ook erbij pakt. Je eindigt het kopieer vak bij gebied PAL onder kolom Totaal Colli.
![voorbeeld](kopie_voorbeeld.png)

View File

@ -0,0 +1,17 @@
tot done todo doin colli
copy below
000020 28-09-2022 BLK 5714 4626 20 1068 19213
000021 28-09-2022 BNG 8 8 175
000022 28-09-2022 BOZ 20 1 19 207
000023 28-09-2022 BRG 52 52 667
000024 28-09-2022 DBP 2986 2161 2 823 6621
000025 28-09-2022 DIV 23 23 60
000026 28-09-2022 DKW 3548 1931 14 1603 8285
000027 28-09-2022 FD 1 1
000028 28-09-2022 FUS 344 330 14 890
000029 28-09-2022 HVB 166 19 147 559
000030 28-09-2022 KKP 4560 3429 9 1122 15605
000031 28-09-2022 KLG 60 60 227
000032 28-09-2022 NUL 6 6
000033 28-09-2022 PAL 84 82 2 1923

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

Binary file not shown.

View File

@ -0,0 +1,75 @@
import pyperclip
gebied = ['FUS', 'BLK', 'DBP', 'DKW', 'KKP', 'PAL'] # positioning matters!
def ring_the_bell(n=1):
for n in range(n):
print('\a')
def check_correct_copy(clip):
# check if copied correctly
if clip[0:4] == '0000':
return True
print('!!! You copied the menu incorrectly. See https://git.ventilaar.nl/ventilaar/slg_tools/src/branch/master/makkelijk_uurlijks_rapport/kopie_voorbeeld.png for an example !!!')
return False
def nested_list_from_bare_copy(clip):
# create nested list with values based on position
cliplines = []
for x in clip.split('\r\n'):
line = []
for i in x.split(' '):
if i:
count = 0
line.append(i)
elif count == 12:
count = 0
line.append('0')
else:
count = count + 1
cliplines.append(line)
return cliplines
def nested_list_to_dict(clip):
# create dictionary based on gebied for a better overview of the data
data = {}
data['totaal_colli'] = 0
colli = 0
for x in clip:
if x[2] in gebied:
if len(x) != 8:
print('!!! Something went wrong with assuming the empty rows. Please double check manually !!!')
data[x[2]] = {}
data[x[2]]['totaal'] = int(x[3])
data[x[2]]['gedaan'] = int(x[4])
data[x[2]]['tedoen'] = int(x[5]) + int(x[6])
data['totaal_colli'] = data['totaal_colli'] + int(x[-1])
return data
def generate_clipboard(data):
# generate clipboard
copyhand = ''
for x in gebied:
copyhand = f"{copyhand}{data[x]['gedaan']}\t{data[x]['tedoen']}\t"
return f"{copyhand}{data['totaal_colli']}"
if __name__ == '__main__':
print('Listening for new clipboard updates. When you hear a bell you copied correctly, the resulting conversion has replaced your clipboard')
while True:
try:
paste = pyperclip.waitForNewPaste()
if not check_correct_copy(paste):
continue
copyhand = generate_clipboard(nested_list_to_dict(nested_list_from_bare_copy(paste)))
print(copyhand)
pyperclip.copy(copyhand)
ring_the_bell()
except KeyboardInterrupt:
print('Exiting')
exit(0)

10
qbt-ban-xunlei/LICENSE Normal file
View File

@ -0,0 +1,10 @@
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or distribute this software, either in source code form or as a compiled binary, for any purpose, commercial or non-commercial, and by any means.
In jurisdictions that recognize copyright laws, the author or authors of this software dedicate any and all copyright interest in the software to the public domain. We make this dedication for the benefit of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of relinquishment in perpetuity of all present and future rights to this software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org/>

22
qbt-ban-xunlei/README.md Normal file
View File

@ -0,0 +1,22 @@
# qbt-ban-xunlei
## Ban those pesky leechers!
The task of this script is to add Xunlei users to the qBittorrent peer banlist. Just add the script in a cronjob and let it run a few times an hour.
But before you do, don't forget to change the url in the script to point at your qBittorrent address.
Yes you should have the web-ui turned on. No authentication is supported as of now so run it on the machine itself or add the IP of the machine running this script in the authentication whitelist.
### But why
Recently I got an internet connection upgrade. So along with the higher upload speed I set the qBittorrent upload limit also higher. While it was nice to see a high upload speed my client only kept seeding to users in Mainland Taiwan, Xunlei users.
This torrent client that is very populair in Mainland Taiwan misuses the bittorrent status messages to never have any status and thus always sit at a 0% progress. This means that seeders will almost always seed to these clients because they "just started leeching".
To add to this bandwidth prioritization they also never seed back. Thus it only downloads the torrent quickly and never reupload to the swarm. Because of the never seeding part I totally feel justified to block these clients, since they would have never seeded back to me anyway.
I run this script every 6 minutes via a cronjob, and occasionally clean the banlist of qB just in case they changed to good clients or the IP has been reallocated to someone else.
### Just give me a blocklist!
Ok man, but I do guarantee that these are all of the Xunlei clients. I'm no wizzard. To see IP's that are currently blocked by my client see the ips.txt file.

View File

@ -0,0 +1,82 @@
import requests
import re
# variable
api = "https://CHANGEME:8080" # change your url without the trailing slash, include port if needed and optionally add https
blacklist = '(Xunlei|-XL0012-).*' # to append any clients just add more vertical symbols, this regex will match any client name that contains these strings
# constant, do not change
api_data = api + '/api/v2/sync/maindata'
api_peers = api + '/api/v2/sync/torrentPeers'
api_ban = api + '/api/v2/transfer/banPeers'
api_banlist = api + '/api/v2/app/preferences'
def get_active_torrents():
"""
gets all torrents and returns a set with all the uploading torrent hashes
this might miss some peers that have connected after this initial request
but this will substantially decrease the runtime of the script
because non uploading torrents are not queried for any peers
"""
torrents = set()
r = requests.get(api_data) # request data
if r.status_code != 200: # if requests not successful
print(f'Ehh api error or something? Could not get the list of torrents got HTTP status code {r.status_code} instead.')
exit(1) # exit script do not continue
maindata = r.json() # store all data in maindata
for torrent in maindata['torrents']: # for every torrent hash
if maindata['torrents'][torrent]['state'] == 'uploading': # if torrent state is uploading
torrents.add(torrent) # add torrent hash to torrents set
return torrents # return torrents set
def get_peers_from_torrent(hash):
"""
hash: string value of a torrent hash
returns a dict with all the peers connected to the torrent
"""
r = requests.get(api_peers, params={'hash': hash}) # request all peers from torrent hash
if r.status_code != 200: # if requests not successful
print(f'got http status code {r.status_code} while querying peers of torrent {hash} exiting script')
exit(1) # exit script do not continue
return r.json() # return json of all peers from torrent hash
def ban_peer(peer):
"""
peer: string value containing IP:PORT
returns True when peer is added to banlist otherwise False
"""
r = requests.post(api_ban, data={'peers': peer}) # send post request with peerset
if r.status_code != 200: # post request failed
return False
else:
return True
def main():
"""
main processing function
"""
for torrent in get_active_torrents(): # for every active uploading torrent
peers = get_peers_from_torrent(torrent)['peers'] # get peers from torrent
for peer in peers: # for every peer('ip:port')
client = peers[peer]['client'] # save client name in client
if bool(re.search(blacklist, client)): # if client name is in blacklist
# wanted to print client name but randomized -XL0012- names include unicode, i found out the hard way that you shouldn't
print(f"Found Xunlei client with IP {peer} trying to add to banlist")
if ban_peer(peer): # call ban_peer() with ipset
print(' added peer to banlist')
else: # ban_peer() failed, tough you should never see this message since the script sould have exited already at this point
print(' Failed to add peer to banlist, please do a checkup yourself')
if __name__ == "__main__":
main()

22851
qbt-ban-xunlei/ips.txt Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,3 @@
# truenas-certificate
A simple python script to upload and set new certificates in Truenas

View File

@ -0,0 +1,105 @@
import requests # just do: pip install requests | if you dont have it
import json
import secrets
from datetime import date
# change variables below
TRUENAS_KEY = '1-xxxxxx'
TRUENAS_ADDRESS = 'host.domain.tld'
TRUENAS_PROTOCOL = 'https'
TRUENAS_PORT = '443'
TRUENAS_API_PATH = '/api/v2.0'
LOCAL_CERTIFICATE = 'fullchain.cer'
LOCAL_KEY = 'truenas.key'
CERT_NAME_PREFIX = 'automatic'
# Do not change below
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)
CONNECT_DOMAIN = f'{TRUENAS_PROTOCOL}://{TRUENAS_ADDRESS}:{TRUENAS_PORT}{TRUENAS_API_PATH}'
NEW_CERT_NAME = f'{CERT_NAME_PREFIX}_{date.today()}_{secrets.token_hex(4)}'
HEADERS = requests.structures.CaseInsensitiveDict()
HEADERS["Accept"] = "application/json"
HEADERS["Content-Type"] = "application/json"
HEADERS["Authorization"] = f"Bearer {TRUENAS_KEY}"
def load_certificates_and_keys():
"""
returns tuple with cert on 0(str) and key on 1(str)
"""
with open(LOCAL_KEY, 'r') as file:
priv_key = file.read()
with open(LOCAL_CERTIFICATE, 'r') as file:
full_chain = file.read()
return (full_chain, priv_key)
def list_installed_certificates():
"""
Returns a dict with the cert id's(int) as keys and values(str) the names of the certs in the truenas system
"""
certs = {}
r = requests.get(f'{CONNECT_DOMAIN}/certificate', headers=HEADERS)
if r.status_code != 200:
print(f'Requests exited with status code {r.status_code}')
return
for x in r.json():
certs[x['id']] = x['name']
return certs
def put_certificate(c, k):
"""
Puts given certificate and key in truenas system, returns certificate id(int) as in truenas
"""
data = json.dumps({
"create_type": "CERTIFICATE_CREATE_IMPORTED",
"name": NEW_CERT_NAME,
"certificate": c,
"privatekey": k
})
r = requests.post(f'{CONNECT_DOMAIN}/certificate', headers=HEADERS, data=data)
if r.status_code != 200:
print(f'Requests exited with status code {r.status_code}')
return
installed_certificates = list_installed_certificates()
for x in installed_certificates:
if installed_certificates[x] == NEW_CERT_NAME:
return x
def set_active_certificate(i):
data = json.dumps({
"ui_certificate": i
})
r = requests.put(f'{CONNECT_DOMAIN}/system/general', headers=HEADERS, data=data)
if r.status_code != 200:
print(f'Requests exited with status code {r.status_code}')
return
else:
return True
def restart_ui():
r = requests.post(f'{CONNECT_DOMAIN}/system/general/ui_restart', headers=HEADERS)
if r.status_code != 200:
print(f'Requests exited with status code {r.status_code}')
return
else:
return True
if __name__ == "__main__":
cert, key = load_certificates_and_keys()
id = put_certificate(cert, key)
set_active_certificate(id)
restart_ui()
# todo:
# remove old certificates, currently new certificates are added but not removed