Compare commits

...

2 Commits

Author SHA1 Message Date
Ventilaar
1d5934275c Handle websub added messages to queue 2024-04-21 00:26:00 +02:00
Ventilaar
72af6b6126 Handle mass websub subscriptions with added statistics. General cleanup 2024-04-18 23:36:45 +02:00
13 changed files with 226 additions and 121 deletions

View File

@@ -1,4 +1,4 @@
name: Generate release
name: Generate docker image
on:
release:
@@ -22,13 +22,4 @@ jobs:
uses: docker/build-push-action@v5
with:
push: true
tags: git.ventilaar.nl/ventilaar/ayta:latest
- name: Update worker server
uses: appleboy/ssh-action@v1.0.3
with:
host: 192.168.66.109
username: root
key: ${{ secrets.SERVER_KEY }}
port: 22
script: /root/update_worker.sh
tags: git.ventilaar.nl/ventilaar/ayta:latest

View File

@@ -0,0 +1,18 @@
name: Update worker server
on:
release:
types: [published]
jobs:
build-and-publish:
runs-on: ubuntu-latest
steps:
- name: Update worker server
uses: appleboy/ssh-action@v1.0.3
with:
host: 192.168.66.109
username: root
key: ${{ secrets.SERVER_KEY }}
port: 22
script: /root/update_worker.sh

View File

@@ -25,7 +25,7 @@ def create_app(test_config=None):
config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
#config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 6}
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
app = Flask(__name__)
app.config.from_mapping(config)

View File

@@ -76,10 +76,10 @@ def channel(channelId):
return redirect(url_for('admin.channel', channelId=channelId))
if task == 'update-value':
if key == 'active':
if key in ['active', 'websub']:
value = True if value else False
if key == 'added_date':
if key in ['added_date']:
value = datetime.strptime(value, '%Y-%m-%d')
get_nosql().update_channel_key(channelId, key, value)
@@ -109,6 +109,8 @@ def run(runId):
@bp.route('/websub', methods=['GET', 'POST'])
@login_required
def websub():
render = {}
if request.method == 'POST':
task = request.form.get('task', None)
value = request.form.get('value', None)
@@ -118,18 +120,30 @@ def websub():
flash(f"Started task {task.id}")
return redirect(url_for('admin.websub'))
elif task == 'clean-retired':
get_nosql().websub_cleanRetired()
return redirect(url_for('admin.websub'))
elif task == 'unsubscribe-callbacks':
for callbackId in get_nosql().websub_getCallbacks():
websub_unsubscribe_callback.delay(callbackId)
flash(f"Started unsubscribe tasks for all callbacks")
return redirect(url_for('admin.websub'))
elif task == 'subscribe-channels':
for channelId in get_nosql().list_all_channels(websub=True):
websub_subscribe_callback.delay(channelId)
flash(f'Started subscribe tasks for activated channels')
return redirect(url_for('admin.websub'))
callbackIds = get_nosql().websub_getCallbacks()
callbacks = {}
render['stats'] = get_nosql().websub_statistics()
for callbackId in callbackIds:
callbacks[callbackId] = get_nosql().websub_getCallback(callbackId)
return render_template('admin/websub.html', callbacks=callbacks)
return render_template('admin/websub.html', callbacks=callbacks, render=render)
@bp.route('/reports', methods=['GET', 'POST'])
@login_required
@@ -147,9 +161,9 @@ def reports():
return render_template('admin/reports.html', reports=reports)
@bp.route('/posters', methods=['GET', 'POST'])
@bp.route('/queue', methods=['GET', 'POST'])
@login_required
def posters():
def queue():
if request.method == 'POST':
task = request.form.get('task', None)
value = request.form.get('value', None)
@@ -160,34 +174,39 @@ def posters():
flash('Description must be at least 8 characters long')
if value and len(value) >= 12:
get_nosql().poster_newEndpoint(value, description)
get_nosql().queue_newEndpoint(value, description)
flash(f'Created endpoint ID: {value}')
else:
value = token_urlsafe(16)
get_nosql().poster_newEndpoint(value, description)
get_nosql().queue_newEndpoint(value, description)
flash(f'Created endpoint ID: {value}')
elif task == 'retire':
get_nosql().poster_retireEndpoint(value)
get_nosql().queue_retireEndpoint(value)
flash(f'Endpoint retired: {value}')
elif task == 'clean-retired':
get_nosql().poster_cleanRetired()
get_nosql().queue_cleanRetired()
flash(f'Cleaned retired endpoints')
elif task == 'manual-queue':
get_nosql().poster_insertQueue('manual', value)
get_nosql().queue_insertQueue(value, 'webui')
flash(f'Added to queue: {value}')
elif task == 'delete-queue':
get_nosql().poster_deleteQueue(value)
get_nosql().queue_deleteQueue(value)
flash(f'Deleted from queue: {value}')
elif task == 'empty-queue':
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
return redirect(url_for('admin.posters'))
return redirect(url_for('admin.queue'))
endpoints = get_nosql().poster_getEndpoints()
queue = get_nosql().poster_getQueue()
endpoints = get_nosql().queue_getEndpoints()
queue = get_nosql().queue_getQueue()
return render_template('admin/posters.html', endpoints=endpoints, queue=queue)
return render_template('admin/queue.html', endpoints=endpoints, queue=queue)
@bp.route('/users', methods=['GET', 'POST'])
@login_required

View File

@@ -39,8 +39,8 @@ def websub(cap):
return abort(404)
@bp.route('/poster/<cap>', methods=['POST'])
def poster(cap):
@bp.route('/queue/<cap>', methods=['POST'])
def queue(cap):
# if endpoint does not exist
if not get_nosql().poster_isActive(cap):
return abort(404)
@@ -60,7 +60,7 @@ def poster(cap):
return abort(409)
# try to insert
if get_nosql().poster_insertQueue(cap, videoId):
if get_nosql().poster_insertQueue(videoId, cap):
return '', 202
else:
return abort(409)

View File

@@ -37,7 +37,7 @@ def base():
render['info'] = get_nosql().get_video_info(vGet)
render['params'] = request.args.get('v')
if render['info']['_status'] != 'available':
if render['info'].get('_status') != 'available':
flash(render['info'].get('_status_description', 'Video unavailable because of technical errors. Come back later.'))
return redirect(url_for('index.base'))

View File

@@ -36,13 +36,13 @@ class Mango:
self.db = self.client['ayta']
self.channels = self.db['channels']
self.info_json = self.db['info_json']
self.posters_queue = self.db['posters_queue']
self.download_queue = self.db['download_queue']
self.run_log = self.db['run_log']
self.channel_log = self.db['channel_log']
self.websub_callbacks = self.db['websub_callbacks']
self.websub_data = self.db['websub_data']
self.reports = self.db['reports']
self.posters_endpoints = self.db['posters_endpoints']
self.queue_endpoints = self.db['queue_endpoints']
self.users = self.db['users']
self.ensure_indexes()
@@ -142,12 +142,14 @@ class Mango:
# channel operations #
##########################################
def list_all_channels(self, active=False):
def list_all_channels(self, active=False, websub=False):
""" Returns a SET of YouTube channel ID's; Depending on given positional BOOL only active channels or everything"""
search_terms = {}
if active:
search_terms['active'] = True
elif websub:
search_terms['websub'] = True
channels = []
for channel in self.channels.find(search_terms, {'id': 1}):
@@ -169,9 +171,6 @@ class Mango:
def get_channel_info(self, channelId):
return self.channels.find_one({'id': channelId})
def update_channel_state(self, channelId, state):
self.channels.update_one({'id': channelId}, {"$set": {"active": bool(state)}})
return True
def update_channel_key(self, channelId, key, value):
self.channels.update_one({'id': channelId}, {"$set": {key: value}})
@@ -301,18 +300,15 @@ class Mango:
def websub_existsCallback(self, callbackId, channel=False):
if channel:
query = {'channel': callbackId}
query = {'channel': callbackId, 'status': {'$in': ['requesting', 'active', 'retiring']}}
else:
query = {'id': callbackId}
query = {'id': callbackId, 'status': {'$in': ['requesting', 'active', 'retiring']}}
status = self.websub_callbacks.find_one(query, {'id': 1, 'status': 1})
if not status:
return False
if status.get('status') in ['requesting', 'active', 'retiring']:
if status:
return status.get('id')
return False
def websub_retiringCallback(self, callbackId):
@@ -333,7 +329,7 @@ class Mango:
def websub_getCallback(self, callbackId):
return self.websub_callbacks.find_one({'id': callbackId})
def websub_getCallbacks(self, channelId=''):
def websub_getCallbacks(self, channelId=None):
callbacks = []
if channelId:
@@ -341,7 +337,6 @@ class Mango:
else:
filter = {}
for callback in self.websub_callbacks.find(filter, {'id': 1}):
callbacks.append(callback['id'])
@@ -363,64 +358,74 @@ class Mango:
def websub_deletePostProcessing(self, _id):
self.websub_data.delete_one({'_id': _id})
def websub_cleanRetired(self, days=3):
def websub_cleanRetired(self, days=1):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.websub_callbacks.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
self.websub_callbacks.delete_many({'status': 'requesting', 'requesting_time': {'$lt': days}})
return True
def websub_statistics(self):
stats = {}
stats['unprocessed_data'] = self.websub_data.count_documents({'state': 'unprocessed'})
stats['active_callbacks'] = self.websub_callbacks.count_documents({'status': 'active'})
return stats
##########################################
# POSTER FUNCTIONS #
# QUEUE FUNCTIONS #
##########################################
def poster_newEndpoint(self, endpointId, description=''):
self.posters_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)})
def queue_newEndpoint(self, endpointId, description=''):
self.queue_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)})
return endpointId
def queue_retireEndpoint(self, endpointId):
return self.queue_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}})
def queue_isActive(self, endpointId):
status = self.queue_endpoints.find_one({'id': endpointId}, {'status': 1})
if not status:
return False
elif status.get('status') == 'active':
return True
else:
return False
def queue_getEndpoints(self):
return self.queue_endpoints.find({})
def poster_insertQueue(self, endpointId, videoId):
def queue_cleanRetired(self, days=3):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.queue_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
return True
##########################################
def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists
if not self.posters_queue.count_documents({'id': videoId}) >= 1:
self.posters_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
if not self.download_queue.count_documents({'id': videoId}) >= 1:
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
# key already in queue
return False
def poster_deleteQueue(self, videoId):
if self.posters_queue.delete_one({'id': videoId}):
def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}):
return True
return False
def poster_retireEndpoint(self, endpointId):
return self.posters_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}})
def queue_getQueue(self):
return self.download_queue.find({})
def poster_isActive(self, endpointId):
status = self.posters_endpoints.find_one({'id': endpointId}, {'status': 1})
if not status:
return False
status = status.get('status')
if status == 'active':
return True
return False
def poster_getEndpoints(self):
return self.posters_endpoints.find({})
def poster_getQueue(self):
return self.posters_queue.find({})
def poster_cleanRetired(self, days=3):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.posters_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
return True
def queue_emptyQueue(self):
return self.download_queue.delete_many({})
##########################################
# HELPER FUNCTIONS #

View File

@@ -1,4 +1,10 @@
class OIDC():
"""
This function class is nothing more than a nonce and state store for security in the authentication mechanism.
Additionally this class provides the function to generate redirect url's and check bearer tokens on their validity as well as caching jwt signing keys.
Fairly barebones and should be 100% secure. (famous last words)
This is made for form posted JWT's. While not the most secure it is the most easy way to implement. Moving on to a code based solution might be preferred in the future.
"""
def __init__(self, app=None):
self.states = {}
self.nonces = {}
@@ -15,6 +21,7 @@ class OIDC():
self.client_id = config['OIDC_ID']
self.provider = config['OIDC_PROVIDER']
self.domain = config['DOMAIN']
self.window = 120 # the time window to allow states and nonces in seconds
if self.provider[:8] != 'https://' or self.provider[-1] == '/':
print('Incorrect OIDC provider URI', flush=True)
@@ -27,12 +34,12 @@ class OIDC():
self.jwks_manager = jwt.PyJWKClient(jwks_uri)
#################################
#######################################################
def state_maintenance(self):
from datetime import datetime
pivot = datetime.now().timestamp() - 120
pivot = datetime.now().timestamp() - self.window
expired_states = [state for state, timestamp in self.states.items() if timestamp <= pivot]
@@ -61,12 +68,14 @@ class OIDC():
return False
#################################
#######################################################
# Same code as above but a different store for nonces #
#######################################################
def nonce_maintenance(self):
from datetime import datetime
pivot = datetime.now().timestamp() - 120
pivot = datetime.now().timestamp() - self.window
expired_nonces = [nonce for nonce, timestamp in self.nonces.items() if timestamp <= pivot]
@@ -95,7 +104,7 @@ class OIDC():
return False
#################################
#######################################################
def generate_redirect(self):
return str(f'{self.authorize_uri}'

View File

@@ -33,7 +33,9 @@ def websub_subscribe_callback(channelId):
response = requests.post(url, data=data)
if response.status_code == 202:
return True
# maybe handle errors?
return False
@shared_task()
@@ -61,6 +63,8 @@ def websub_unsubscribe_callback(callbackId):
if response.status_code == 202:
return True
# maybe handle errors?
return False
@shared_task()
@@ -68,37 +72,51 @@ def websub_process_data():
from .nosql import get_nosql
while True:
data = get_nosql().websub_getFirstPostData()
if not data:
blob = get_nosql().websub_getFirstPostData()
if not blob:
break
_id, data = data
_id, data = blob
parsed = do_parse_data(data)
if not parsed:
get_nosql().websub_deletePostProcessing(_id)
state, channelId, videoId = parsed
if parsed:
state, channelId, videoId = parsed
if state == 'added':
if not get_nosql().check_exists(videoId): # if video not exists
get_nosql().queue_insertQueue(videoId, 'WebSub')
elif state == 'removed':
pass
get_nosql().websub_deletePostProcessing(_id)
@shared_task()
def websub_renew_expiring(hours=6):
from .nosql import get_nosql
from datetime import datetime, timedelta
count = 0
for callbackId in get_nosql().websub_getCallbacks():
data = get_nosql().websub_getCallback(callbackId)
pivot = datetime.utcnow() - timedelta(hours=hours)
expires = data.get('activation_time') + timedelta(seconds=data.get('lease'))
if pivot <= expires: # if expiration happens after the calculation time pass the loop
if data.get('status') not in ['active']: # callback not active
continue
print(f'{callbackId} should be renewed')
pivot = datetime.utcnow() + timedelta(hours=hours) # hours past now
expires = data.get('activation_time') + timedelta(seconds=data.get('lease')) # callback expires at
if pivot <= expires: # expiration happens after n hours fron now
continue # skip callback
# expiration happens within n hours
websub_subscribe_callback.delay(data.get('channel'))
# limit amount of subscribe requests to spread out the requests over time
count = count + 1
if count >= 100:
break
##########################################
# TASK MODULES #

View File

@@ -19,7 +19,7 @@
{% for item in channelInfo %}
<form method="POST">
<div class="input-field">
<span class="supporting-text">{{ item }}</span>
<span class="supporting-text mb-2">{{ item }}</span>
<input class="validate" type="text" value="{{ item }}" name="key" hidden>
</div>

View File

@@ -66,11 +66,11 @@
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.posters') }}">
<a href="{{ url_for('admin.queue') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Posters</span>
<p class="grey-text">Extension posters</p>
<span class="card-title">Queue</span>
<p class="grey-text">Video download queue and API access</p>
</div>
</div>
</a>

View File

@@ -1,25 +1,38 @@
{% extends 'material_base.html' %}
{% block title %}Posters administration page{% endblock %}
{% block description %}Posters administration page of the AYTA system{% endblock %}
{% block title %}Queue administration page{% endblock %}
{% block description %}Queue administration page of the AYTA system{% endblock %}
{% block content %}
<div class="row">
<div class="col s12 l11">
<h4>Posters administration page</h4>
</div>
<div class="col s12 l1 m-5">
<form method="POST">
<input title="Prunes all deleted endpoints, but keeps last 3 days" type="submit" value="clean-retired" name="task">
</form>
<div class="col s12">
<h4>Queue administration page</h4>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Poster options</h5>
<h5>Options</h5>
</div>
</div>
<div class="row">
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Direct actions</span>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 red" type="submit" name="task" value="empty-queue">Empty Queue</button>
<br>
<span class="supporting-text">Removes all queued ids</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2" type="submit" name="task" value="clean-retired">Clean retired</button>
<br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form>
</div>
</div>
</div>
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">

View File

@@ -4,14 +4,9 @@
{% block content %}
<div class="row">
<div class="col s12 l11">
<div class="col s12">
<h4>WebSub administration page</h4>
</div>
<div class="col s12 l1 m-5">
<form method="POST">
<input title="Prunes all retired callbacks, but keeps last 3 days" type="submit" value="clean-retired" name="task">
</form>
</div>
</div>
<div class="divider"></div>
<div class="row">
@@ -19,6 +14,43 @@
<h5>WebSub options</h5>
</div>
</div>
<div class="row">
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Direct actions</span>
<form method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 green" type="submit" name="task" value="subscribe-channels">Subscribe channels</button>
<br>
<span class="supporting-text">Send WebSub subscription request for all activated channels. (This will renew existing ones as well)</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 red" type="submit" name="task" value="unsubscribe-callbacks">Unsubscribe channels</button>
<br>
<span class="supporting-text">Send WebSub unsubscription request for all activated endpoints. (This will only unsubscribe, not disable)</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2" type="submit" name="task" value="clean-retired">Clean retired</button>
<br>
<span class="supporting-text">Prunes all retired callbacks, but keeps until last day</span>
</form>
</div>
</div>
</div>
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Statistics</span>
<h6>Unprocessed callback datapoints</h6>
<p>{{ render['stats']['unprocessed_data'] }}</p>
<h6>Active callbacks</h6>
<p>{{ render['stats']['active_callbacks'] }}</p>
<h6>Something</h6>
<p>Blah</p>
</div>
</div>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s6 l9">