Handle mass websub subscriptions with added statistics. General cleanup
This commit is contained in:
parent
8bf8e08af3
commit
72af6b6126
|
@ -1,4 +1,4 @@
|
|||
name: Generate release
|
||||
name: Generate docker image
|
||||
|
||||
on:
|
||||
release:
|
||||
|
@ -22,13 +22,4 @@ jobs:
|
|||
uses: docker/build-push-action@v5
|
||||
with:
|
||||
push: true
|
||||
tags: git.ventilaar.nl/ventilaar/ayta:latest
|
||||
|
||||
- name: Update worker server
|
||||
uses: appleboy/ssh-action@v1.0.3
|
||||
with:
|
||||
host: 192.168.66.109
|
||||
username: root
|
||||
key: ${{ secrets.SERVER_KEY }}
|
||||
port: 22
|
||||
script: /root/update_worker.sh
|
||||
tags: git.ventilaar.nl/ventilaar/ayta:latest
|
|
@ -0,0 +1,18 @@
|
|||
name: Update worker server
|
||||
|
||||
on:
|
||||
release:
|
||||
types: [published]
|
||||
|
||||
jobs:
|
||||
build-and-publish:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Update worker server
|
||||
uses: appleboy/ssh-action@v1.0.3
|
||||
with:
|
||||
host: 192.168.66.109
|
||||
username: root
|
||||
key: ${{ secrets.SERVER_KEY }}
|
||||
port: 22
|
||||
script: /root/update_worker.sh
|
|
@ -76,10 +76,10 @@ def channel(channelId):
|
|||
return redirect(url_for('admin.channel', channelId=channelId))
|
||||
|
||||
if task == 'update-value':
|
||||
if key == 'active':
|
||||
if key in ['active', 'websub']:
|
||||
value = True if value else False
|
||||
|
||||
if key == 'added_date':
|
||||
if key in ['added_date']:
|
||||
value = datetime.strptime(value, '%Y-%m-%d')
|
||||
|
||||
get_nosql().update_channel_key(channelId, key, value)
|
||||
|
@ -109,6 +109,8 @@ def run(runId):
|
|||
@bp.route('/websub', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
def websub():
|
||||
render = {}
|
||||
|
||||
if request.method == 'POST':
|
||||
task = request.form.get('task', None)
|
||||
value = request.form.get('value', None)
|
||||
|
@ -118,18 +120,30 @@ def websub():
|
|||
|
||||
flash(f"Started task {task.id}")
|
||||
return redirect(url_for('admin.websub'))
|
||||
|
||||
elif task == 'clean-retired':
|
||||
get_nosql().websub_cleanRetired()
|
||||
return redirect(url_for('admin.websub'))
|
||||
elif task == 'unsubscribe-callbacks':
|
||||
for callbackId in get_nosql().websub_getCallbacks():
|
||||
websub_unsubscribe_callback.delay(callbackId)
|
||||
flash(f"Started unsubscribe tasks for all callbacks")
|
||||
return redirect(url_for('admin.websub'))
|
||||
elif task == 'subscribe-channels':
|
||||
for channelId in get_nosql().list_all_channels(websub=True):
|
||||
websub_subscribe_callback.delay(channelId)
|
||||
flash(f'Started subscribe tasks for activated channels')
|
||||
return redirect(url_for('admin.websub'))
|
||||
|
||||
|
||||
callbackIds = get_nosql().websub_getCallbacks()
|
||||
callbacks = {}
|
||||
|
||||
render['stats'] = get_nosql().websub_statistics()
|
||||
|
||||
for callbackId in callbackIds:
|
||||
callbacks[callbackId] = get_nosql().websub_getCallback(callbackId)
|
||||
|
||||
return render_template('admin/websub.html', callbacks=callbacks)
|
||||
return render_template('admin/websub.html', callbacks=callbacks, render=render)
|
||||
|
||||
@bp.route('/reports', methods=['GET', 'POST'])
|
||||
@login_required
|
||||
|
|
|
@ -142,12 +142,14 @@ class Mango:
|
|||
# channel operations #
|
||||
##########################################
|
||||
|
||||
def list_all_channels(self, active=False):
|
||||
def list_all_channels(self, active=False, websub=False):
|
||||
""" Returns a SET of YouTube channel ID's; Depending on given positional BOOL only active channels or everything"""
|
||||
search_terms = {}
|
||||
|
||||
if active:
|
||||
search_terms['active'] = True
|
||||
elif websub:
|
||||
search_terms['websub'] = True
|
||||
|
||||
channels = []
|
||||
for channel in self.channels.find(search_terms, {'id': 1}):
|
||||
|
@ -169,9 +171,6 @@ class Mango:
|
|||
def get_channel_info(self, channelId):
|
||||
return self.channels.find_one({'id': channelId})
|
||||
|
||||
def update_channel_state(self, channelId, state):
|
||||
self.channels.update_one({'id': channelId}, {"$set": {"active": bool(state)}})
|
||||
return True
|
||||
|
||||
def update_channel_key(self, channelId, key, value):
|
||||
self.channels.update_one({'id': channelId}, {"$set": {key: value}})
|
||||
|
@ -301,18 +300,15 @@ class Mango:
|
|||
|
||||
def websub_existsCallback(self, callbackId, channel=False):
|
||||
if channel:
|
||||
query = {'channel': callbackId}
|
||||
query = {'channel': callbackId, 'status': {'$in': ['requesting', 'active', 'retiring']}}
|
||||
else:
|
||||
query = {'id': callbackId}
|
||||
query = {'id': callbackId, 'status': {'$in': ['requesting', 'active', 'retiring']}}
|
||||
|
||||
status = self.websub_callbacks.find_one(query, {'id': 1, 'status': 1})
|
||||
|
||||
if not status:
|
||||
return False
|
||||
|
||||
if status.get('status') in ['requesting', 'active', 'retiring']:
|
||||
if status:
|
||||
return status.get('id')
|
||||
|
||||
|
||||
return False
|
||||
|
||||
def websub_retiringCallback(self, callbackId):
|
||||
|
@ -333,7 +329,7 @@ class Mango:
|
|||
def websub_getCallback(self, callbackId):
|
||||
return self.websub_callbacks.find_one({'id': callbackId})
|
||||
|
||||
def websub_getCallbacks(self, channelId=''):
|
||||
def websub_getCallbacks(self, channelId=None):
|
||||
callbacks = []
|
||||
|
||||
if channelId:
|
||||
|
@ -341,7 +337,6 @@ class Mango:
|
|||
else:
|
||||
filter = {}
|
||||
|
||||
|
||||
for callback in self.websub_callbacks.find(filter, {'id': 1}):
|
||||
callbacks.append(callback['id'])
|
||||
|
||||
|
@ -363,13 +358,21 @@ class Mango:
|
|||
def websub_deletePostProcessing(self, _id):
|
||||
self.websub_data.delete_one({'_id': _id})
|
||||
|
||||
def websub_cleanRetired(self, days=3):
|
||||
def websub_cleanRetired(self, days=1):
|
||||
days = self.datetime.utcnow() - self.timedelta(days=days)
|
||||
|
||||
self.websub_callbacks.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
|
||||
self.websub_callbacks.delete_many({'status': 'requesting', 'requesting_time': {'$lt': days}})
|
||||
|
||||
return True
|
||||
|
||||
def websub_statistics(self):
|
||||
stats = {}
|
||||
|
||||
stats['unprocessed_data'] = self.websub_data.count_documents({'state': 'unprocessed'})
|
||||
stats['active_callbacks'] = self.websub_callbacks.count_documents({'status': 'active'})
|
||||
|
||||
return stats
|
||||
|
||||
##########################################
|
||||
# POSTER FUNCTIONS #
|
||||
|
|
19
ayta/oidc.py
19
ayta/oidc.py
|
@ -1,4 +1,10 @@
|
|||
class OIDC():
|
||||
"""
|
||||
This function class is nothing more than a nonce and state store for security in the authentication mechanism.
|
||||
Additionally this class provides the function to generate redirect url's and check bearer tokens on their validity as well as caching jwt signing keys.
|
||||
Fairly barebones and should be 100% secure. (famous last words)
|
||||
This is made for form posted JWT's. While not the most secure it is the most easy way to implement. Moving on to a code based solution might be preferred in the future.
|
||||
"""
|
||||
def __init__(self, app=None):
|
||||
self.states = {}
|
||||
self.nonces = {}
|
||||
|
@ -15,6 +21,7 @@ class OIDC():
|
|||
self.client_id = config['OIDC_ID']
|
||||
self.provider = config['OIDC_PROVIDER']
|
||||
self.domain = config['DOMAIN']
|
||||
self.window = 120 # the time window to allow states and nonces in seconds
|
||||
|
||||
if self.provider[:8] != 'https://' or self.provider[-1] == '/':
|
||||
print('Incorrect OIDC provider URI', flush=True)
|
||||
|
@ -27,12 +34,12 @@ class OIDC():
|
|||
|
||||
self.jwks_manager = jwt.PyJWKClient(jwks_uri)
|
||||
|
||||
#################################
|
||||
#######################################################
|
||||
|
||||
def state_maintenance(self):
|
||||
from datetime import datetime
|
||||
|
||||
pivot = datetime.now().timestamp() - 120
|
||||
pivot = datetime.now().timestamp() - self.window
|
||||
|
||||
expired_states = [state for state, timestamp in self.states.items() if timestamp <= pivot]
|
||||
|
||||
|
@ -61,12 +68,14 @@ class OIDC():
|
|||
|
||||
return False
|
||||
|
||||
#################################
|
||||
#######################################################
|
||||
# Same code as above but a different store for nonces #
|
||||
#######################################################
|
||||
|
||||
def nonce_maintenance(self):
|
||||
from datetime import datetime
|
||||
|
||||
pivot = datetime.now().timestamp() - 120
|
||||
pivot = datetime.now().timestamp() - self.window
|
||||
|
||||
expired_nonces = [nonce for nonce, timestamp in self.nonces.items() if timestamp <= pivot]
|
||||
|
||||
|
@ -95,7 +104,7 @@ class OIDC():
|
|||
|
||||
return False
|
||||
|
||||
#################################
|
||||
#######################################################
|
||||
|
||||
def generate_redirect(self):
|
||||
return str(f'{self.authorize_uri}'
|
||||
|
|
|
@ -33,7 +33,9 @@ def websub_subscribe_callback(channelId):
|
|||
response = requests.post(url, data=data)
|
||||
if response.status_code == 202:
|
||||
return True
|
||||
|
||||
|
||||
# maybe handle errors?
|
||||
|
||||
return False
|
||||
|
||||
@shared_task()
|
||||
|
@ -61,6 +63,8 @@ def websub_unsubscribe_callback(callbackId):
|
|||
if response.status_code == 202:
|
||||
return True
|
||||
|
||||
# maybe handle errors?
|
||||
|
||||
return False
|
||||
|
||||
@shared_task()
|
||||
|
@ -79,6 +83,8 @@ def websub_process_data():
|
|||
get_nosql().websub_deletePostProcessing(_id)
|
||||
|
||||
state, channelId, videoId = parsed
|
||||
|
||||
# do things
|
||||
|
||||
get_nosql().websub_deletePostProcessing(_id)
|
||||
|
||||
|
@ -86,19 +92,28 @@ def websub_process_data():
|
|||
def websub_renew_expiring(hours=6):
|
||||
from .nosql import get_nosql
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
count = 0
|
||||
|
||||
for callbackId in get_nosql().websub_getCallbacks():
|
||||
data = get_nosql().websub_getCallback(callbackId)
|
||||
|
||||
pivot = datetime.utcnow() - timedelta(hours=hours)
|
||||
expires = data.get('activation_time') + timedelta(seconds=data.get('lease'))
|
||||
|
||||
if pivot <= expires: # if expiration happens after the calculation time pass the loop
|
||||
if data.get('status') not in ['active']: # callback not active
|
||||
continue
|
||||
|
||||
print(f'{callbackId} should be renewed')
|
||||
pivot = datetime.utcnow() + timedelta(hours=hours) # hours past now
|
||||
expires = data.get('activation_time') + timedelta(seconds=data.get('lease')) # callback expires at
|
||||
|
||||
if pivot <= expires: # expiration happens after n hours fron now
|
||||
continue # skip callback
|
||||
|
||||
# expiration happens within n hours
|
||||
websub_subscribe_callback.delay(data.get('channel'))
|
||||
|
||||
# limit amount of subscribe requests to spread out the requests over time
|
||||
count = count + 1
|
||||
if count >= 16:
|
||||
break
|
||||
|
||||
##########################################
|
||||
# TASK MODULES #
|
||||
|
|
|
@ -19,7 +19,7 @@
|
|||
{% for item in channelInfo %}
|
||||
<form method="POST">
|
||||
<div class="input-field">
|
||||
<span class="supporting-text">{{ item }}</span>
|
||||
<span class="supporting-text mb-2">{{ item }}</span>
|
||||
<input class="validate" type="text" value="{{ item }}" name="key" hidden>
|
||||
</div>
|
||||
|
||||
|
|
|
@ -4,14 +4,9 @@
|
|||
|
||||
{% block content %}
|
||||
<div class="row">
|
||||
<div class="col s12 l11">
|
||||
<div class="col s12">
|
||||
<h4>WebSub administration page</h4>
|
||||
</div>
|
||||
<div class="col s12 l1 m-5">
|
||||
<form method="POST">
|
||||
<input title="Prunes all retired callbacks, but keeps last 3 days" type="submit" value="clean-retired" name="task">
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
<div class="divider"></div>
|
||||
<div class="row">
|
||||
|
@ -19,6 +14,43 @@
|
|||
<h5>WebSub options</h5>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col s12 l4 m-4">
|
||||
<div class="card">
|
||||
<div class="card-content">
|
||||
<span class="card-title">Direct actions</span>
|
||||
<form method="post" onsubmit="return confirm('Are you sure?');">
|
||||
<button class="btn mb-2 green" type="submit" name="task" value="subscribe-channels">Subscribe channels</button>
|
||||
<br>
|
||||
<span class="supporting-text">Send WebSub subscription request for all activated channels. (This will renew existing ones as well)</span>
|
||||
</form>
|
||||
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
|
||||
<button class="btn mb-2 red" type="submit" name="task" value="unsubscribe-callbacks">Unsubscribe channels</button>
|
||||
<br>
|
||||
<span class="supporting-text">Send WebSub unsubscription request for all activated endpoints. (This will only unsubscribe, not disable)</span>
|
||||
</form>
|
||||
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
|
||||
<button class="btn mb-2" type="submit" name="task" value="clean-retired">Clean retired</button>
|
||||
<br>
|
||||
<span class="supporting-text">Prunes all retired callbacks, but keeps until last day</span>
|
||||
</form>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col s12 l4 m-4">
|
||||
<div class="card">
|
||||
<div class="card-content">
|
||||
<span class="card-title">Statistics</span>
|
||||
<h6>Unprocessed callback datapoints</h6>
|
||||
<p>{{ render['stats']['unprocessed_data'] }}</p>
|
||||
<h6>Active callbacks</h6>
|
||||
<p>{{ render['stats']['active_callbacks'] }}</p>
|
||||
<h6>Something</h6>
|
||||
<p>Blah</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="divider"></div>
|
||||
<div class="row">
|
||||
<div class="col s6 l9">
|
||||
|
|
Loading…
Reference in New Issue