From 1d5934275c0a2d2b4fdd1020faa29564615f21be Mon Sep 17 00:00:00 2001 From: Ventilaar <34913684+ventilaar@users.noreply.github.com> Date: Sun, 21 Apr 2024 00:26:00 +0200 Subject: [PATCH] Handle websub added messages to queue --- ayta/__init__.py | 2 +- ayta/blueprints/admin.py | 29 ++++--- ayta/blueprints/api.py | 6 +- ayta/blueprints/watch.py | 2 +- ayta/nosql.py | 76 ++++++++++--------- ayta/tasks.py | 21 ++--- ayta/templates/admin/index.html | 6 +- .../admin/{posters.html => queue.html} | 33 +++++--- 8 files changed, 99 insertions(+), 76 deletions(-) rename ayta/templates/admin/{posters.html => queue.html} (81%) diff --git a/ayta/__init__.py b/ayta/__init__.py index 16c1db6..816ecb3 100644 --- a/ayta/__init__.py +++ b/ayta/__init__.py @@ -25,7 +25,7 @@ def create_app(test_config=None): config['CELERY']['beat_schedule'] = {} config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000} - #config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 6} + config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100} app = Flask(__name__) app.config.from_mapping(config) diff --git a/ayta/blueprints/admin.py b/ayta/blueprints/admin.py index 30c237c..beca85c 100644 --- a/ayta/blueprints/admin.py +++ b/ayta/blueprints/admin.py @@ -161,9 +161,9 @@ def reports(): return render_template('admin/reports.html', reports=reports) -@bp.route('/posters', methods=['GET', 'POST']) +@bp.route('/queue', methods=['GET', 'POST']) @login_required -def posters(): +def queue(): if request.method == 'POST': task = request.form.get('task', None) value = request.form.get('value', None) @@ -174,34 +174,39 @@ def posters(): flash('Description must be at least 8 characters long') if value and len(value) >= 12: - get_nosql().poster_newEndpoint(value, description) + get_nosql().queue_newEndpoint(value, description) flash(f'Created endpoint ID: {value}') else: value = token_urlsafe(16) - get_nosql().poster_newEndpoint(value, description) + get_nosql().queue_newEndpoint(value, description) flash(f'Created endpoint ID: {value}') + elif task == 'retire': - get_nosql().poster_retireEndpoint(value) + get_nosql().queue_retireEndpoint(value) flash(f'Endpoint retired: {value}') elif task == 'clean-retired': - get_nosql().poster_cleanRetired() + get_nosql().queue_cleanRetired() flash(f'Cleaned retired endpoints') elif task == 'manual-queue': - get_nosql().poster_insertQueue('manual', value) + get_nosql().queue_insertQueue(value, 'webui') flash(f'Added to queue: {value}') elif task == 'delete-queue': - get_nosql().poster_deleteQueue(value) + get_nosql().queue_deleteQueue(value) flash(f'Deleted from queue: {value}') + + elif task == 'empty-queue': + get_nosql().queue_emptyQueue() + flash(f'Queue has been emptied') - return redirect(url_for('admin.posters')) + return redirect(url_for('admin.queue')) - endpoints = get_nosql().poster_getEndpoints() - queue = get_nosql().poster_getQueue() + endpoints = get_nosql().queue_getEndpoints() + queue = get_nosql().queue_getQueue() - return render_template('admin/posters.html', endpoints=endpoints, queue=queue) + return render_template('admin/queue.html', endpoints=endpoints, queue=queue) @bp.route('/users', methods=['GET', 'POST']) @login_required diff --git a/ayta/blueprints/api.py b/ayta/blueprints/api.py index 92a0848..fe5beaa 100644 --- a/ayta/blueprints/api.py +++ b/ayta/blueprints/api.py @@ -39,8 +39,8 @@ def websub(cap): return abort(404) -@bp.route('/poster/', methods=['POST']) -def poster(cap): +@bp.route('/queue/', methods=['POST']) +def queue(cap): # if endpoint does not exist if not get_nosql().poster_isActive(cap): return abort(404) @@ -60,7 +60,7 @@ def poster(cap): return abort(409) # try to insert - if get_nosql().poster_insertQueue(cap, videoId): + if get_nosql().poster_insertQueue(videoId, cap): return '', 202 else: return abort(409) \ No newline at end of file diff --git a/ayta/blueprints/watch.py b/ayta/blueprints/watch.py index 2e3d030..6157788 100644 --- a/ayta/blueprints/watch.py +++ b/ayta/blueprints/watch.py @@ -37,7 +37,7 @@ def base(): render['info'] = get_nosql().get_video_info(vGet) render['params'] = request.args.get('v') - if render['info']['_status'] != 'available': + if render['info'].get('_status') != 'available': flash(render['info'].get('_status_description', 'Video unavailable because of technical errors. Come back later.')) return redirect(url_for('index.base')) diff --git a/ayta/nosql.py b/ayta/nosql.py index 1446e27..5f3e211 100644 --- a/ayta/nosql.py +++ b/ayta/nosql.py @@ -36,13 +36,13 @@ class Mango: self.db = self.client['ayta'] self.channels = self.db['channels'] self.info_json = self.db['info_json'] - self.posters_queue = self.db['posters_queue'] + self.download_queue = self.db['download_queue'] self.run_log = self.db['run_log'] self.channel_log = self.db['channel_log'] self.websub_callbacks = self.db['websub_callbacks'] self.websub_data = self.db['websub_data'] self.reports = self.db['reports'] - self.posters_endpoints = self.db['posters_endpoints'] + self.queue_endpoints = self.db['queue_endpoints'] self.users = self.db['users'] self.ensure_indexes() @@ -375,55 +375,57 @@ class Mango: return stats ########################################## - # POSTER FUNCTIONS # + # QUEUE FUNCTIONS # ########################################## - def poster_newEndpoint(self, endpointId, description=''): - self.posters_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)}) + def queue_newEndpoint(self, endpointId, description=''): + self.queue_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)}) return endpointId + + def queue_retireEndpoint(self, endpointId): + return self.queue_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}}) + + def queue_isActive(self, endpointId): + status = self.queue_endpoints.find_one({'id': endpointId}, {'status': 1}) + + if not status: + return False + elif status.get('status') == 'active': + return True + else: + return False + + def queue_getEndpoints(self): + return self.queue_endpoints.find({}) - def poster_insertQueue(self, endpointId, videoId): + def queue_cleanRetired(self, days=3): + days = self.datetime.utcnow() - self.timedelta(days=days) + + self.queue_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}}) + + return True + + ########################################## + + def queue_insertQueue(self, videoId, endpointId=None): # if no document exists - if not self.posters_queue.count_documents({'id': videoId}) >= 1: - self.posters_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id + if not self.download_queue.count_documents({'id': videoId}) >= 1: + self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id return True # key already in queue return False - def poster_deleteQueue(self, videoId): - if self.posters_queue.delete_one({'id': videoId}): + def queue_deleteQueue(self, videoId): + if self.download_queue.delete_one({'id': videoId}): return True return False - def poster_retireEndpoint(self, endpointId): - return self.posters_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}}) + def queue_getQueue(self): + return self.download_queue.find({}) - def poster_isActive(self, endpointId): - status = self.posters_endpoints.find_one({'id': endpointId}, {'status': 1}) - - if not status: - return False - - status = status.get('status') - - if status == 'active': - return True - - return False - - def poster_getEndpoints(self): - return self.posters_endpoints.find({}) - - def poster_getQueue(self): - return self.posters_queue.find({}) - - def poster_cleanRetired(self, days=3): - days = self.datetime.utcnow() - self.timedelta(days=days) - - self.posters_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}}) - - return True + def queue_emptyQueue(self): + return self.download_queue.delete_many({}) ########################################## # HELPER FUNCTIONS # diff --git a/ayta/tasks.py b/ayta/tasks.py index f694f56..0348f5c 100644 --- a/ayta/tasks.py +++ b/ayta/tasks.py @@ -72,20 +72,23 @@ def websub_process_data(): from .nosql import get_nosql while True: - data = get_nosql().websub_getFirstPostData() - if not data: + blob = get_nosql().websub_getFirstPostData() + if not blob: break - _id, data = data + _id, data = blob parsed = do_parse_data(data) - if not parsed: - get_nosql().websub_deletePostProcessing(_id) - - state, channelId, videoId = parsed + if parsed: + state, channelId, videoId = parsed - # do things + if state == 'added': + if not get_nosql().check_exists(videoId): # if video not exists + get_nosql().queue_insertQueue(videoId, 'WebSub') + elif state == 'removed': + pass + get_nosql().websub_deletePostProcessing(_id) @shared_task() @@ -112,7 +115,7 @@ def websub_renew_expiring(hours=6): # limit amount of subscribe requests to spread out the requests over time count = count + 1 - if count >= 16: + if count >= 100: break ########################################## diff --git a/ayta/templates/admin/index.html b/ayta/templates/admin/index.html index 5523d6b..55c9271 100644 --- a/ayta/templates/admin/index.html +++ b/ayta/templates/admin/index.html @@ -66,11 +66,11 @@
- +
- Posters -

Extension posters

+ Queue +

Video download queue and API access

diff --git a/ayta/templates/admin/posters.html b/ayta/templates/admin/queue.html similarity index 81% rename from ayta/templates/admin/posters.html rename to ayta/templates/admin/queue.html index ff57530..eb0276b 100644 --- a/ayta/templates/admin/posters.html +++ b/ayta/templates/admin/queue.html @@ -1,25 +1,38 @@ {% extends 'material_base.html' %} -{% block title %}Posters administration page{% endblock %} -{% block description %}Posters administration page of the AYTA system{% endblock %} +{% block title %}Queue administration page{% endblock %} +{% block description %}Queue administration page of the AYTA system{% endblock %} {% block content %}
-
-

Posters administration page

-
-
-
- -
+
+

Queue administration page

-
Poster options
+
Options
+
+
+
+ Direct actions +
+ +
+ Removes all queued ids +
+
+ +
+ Prunes all deactivated endpoints, but keeps last 3 days +
+ +
+
+