Handle websub added messages to queue

This commit is contained in:
Ventilaar 2024-04-21 00:26:00 +02:00
parent 72af6b6126
commit 1d5934275c
No known key found for this signature in database
8 changed files with 99 additions and 76 deletions

View File

@ -25,7 +25,7 @@ def create_app(test_config=None):
config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
#config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 6}
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
app = Flask(__name__)
app.config.from_mapping(config)

View File

@ -161,9 +161,9 @@ def reports():
return render_template('admin/reports.html', reports=reports)
@bp.route('/posters', methods=['GET', 'POST'])
@bp.route('/queue', methods=['GET', 'POST'])
@login_required
def posters():
def queue():
if request.method == 'POST':
task = request.form.get('task', None)
value = request.form.get('value', None)
@ -174,34 +174,39 @@ def posters():
flash('Description must be at least 8 characters long')
if value and len(value) >= 12:
get_nosql().poster_newEndpoint(value, description)
get_nosql().queue_newEndpoint(value, description)
flash(f'Created endpoint ID: {value}')
else:
value = token_urlsafe(16)
get_nosql().poster_newEndpoint(value, description)
get_nosql().queue_newEndpoint(value, description)
flash(f'Created endpoint ID: {value}')
elif task == 'retire':
get_nosql().poster_retireEndpoint(value)
get_nosql().queue_retireEndpoint(value)
flash(f'Endpoint retired: {value}')
elif task == 'clean-retired':
get_nosql().poster_cleanRetired()
get_nosql().queue_cleanRetired()
flash(f'Cleaned retired endpoints')
elif task == 'manual-queue':
get_nosql().poster_insertQueue('manual', value)
get_nosql().queue_insertQueue(value, 'webui')
flash(f'Added to queue: {value}')
elif task == 'delete-queue':
get_nosql().poster_deleteQueue(value)
get_nosql().queue_deleteQueue(value)
flash(f'Deleted from queue: {value}')
elif task == 'empty-queue':
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
return redirect(url_for('admin.posters'))
return redirect(url_for('admin.queue'))
endpoints = get_nosql().poster_getEndpoints()
queue = get_nosql().poster_getQueue()
endpoints = get_nosql().queue_getEndpoints()
queue = get_nosql().queue_getQueue()
return render_template('admin/posters.html', endpoints=endpoints, queue=queue)
return render_template('admin/queue.html', endpoints=endpoints, queue=queue)
@bp.route('/users', methods=['GET', 'POST'])
@login_required

View File

@ -39,8 +39,8 @@ def websub(cap):
return abort(404)
@bp.route('/poster/<cap>', methods=['POST'])
def poster(cap):
@bp.route('/queue/<cap>', methods=['POST'])
def queue(cap):
# if endpoint does not exist
if not get_nosql().poster_isActive(cap):
return abort(404)
@ -60,7 +60,7 @@ def poster(cap):
return abort(409)
# try to insert
if get_nosql().poster_insertQueue(cap, videoId):
if get_nosql().poster_insertQueue(videoId, cap):
return '', 202
else:
return abort(409)

View File

@ -37,7 +37,7 @@ def base():
render['info'] = get_nosql().get_video_info(vGet)
render['params'] = request.args.get('v')
if render['info']['_status'] != 'available':
if render['info'].get('_status') != 'available':
flash(render['info'].get('_status_description', 'Video unavailable because of technical errors. Come back later.'))
return redirect(url_for('index.base'))

View File

@ -36,13 +36,13 @@ class Mango:
self.db = self.client['ayta']
self.channels = self.db['channels']
self.info_json = self.db['info_json']
self.posters_queue = self.db['posters_queue']
self.download_queue = self.db['download_queue']
self.run_log = self.db['run_log']
self.channel_log = self.db['channel_log']
self.websub_callbacks = self.db['websub_callbacks']
self.websub_data = self.db['websub_data']
self.reports = self.db['reports']
self.posters_endpoints = self.db['posters_endpoints']
self.queue_endpoints = self.db['queue_endpoints']
self.users = self.db['users']
self.ensure_indexes()
@ -375,55 +375,57 @@ class Mango:
return stats
##########################################
# POSTER FUNCTIONS #
# QUEUE FUNCTIONS #
##########################################
def poster_newEndpoint(self, endpointId, description=''):
self.posters_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)})
def queue_newEndpoint(self, endpointId, description=''):
self.queue_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)})
return endpointId
def queue_retireEndpoint(self, endpointId):
return self.queue_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}})
def queue_isActive(self, endpointId):
status = self.queue_endpoints.find_one({'id': endpointId}, {'status': 1})
if not status:
return False
elif status.get('status') == 'active':
return True
else:
return False
def queue_getEndpoints(self):
return self.queue_endpoints.find({})
def poster_insertQueue(self, endpointId, videoId):
def queue_cleanRetired(self, days=3):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.queue_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
return True
##########################################
def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists
if not self.posters_queue.count_documents({'id': videoId}) >= 1:
self.posters_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
if not self.download_queue.count_documents({'id': videoId}) >= 1:
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
# key already in queue
return False
def poster_deleteQueue(self, videoId):
if self.posters_queue.delete_one({'id': videoId}):
def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}):
return True
return False
def poster_retireEndpoint(self, endpointId):
return self.posters_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}})
def queue_getQueue(self):
return self.download_queue.find({})
def poster_isActive(self, endpointId):
status = self.posters_endpoints.find_one({'id': endpointId}, {'status': 1})
if not status:
return False
status = status.get('status')
if status == 'active':
return True
return False
def poster_getEndpoints(self):
return self.posters_endpoints.find({})
def poster_getQueue(self):
return self.posters_queue.find({})
def poster_cleanRetired(self, days=3):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.posters_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
return True
def queue_emptyQueue(self):
return self.download_queue.delete_many({})
##########################################
# HELPER FUNCTIONS #

View File

@ -72,20 +72,23 @@ def websub_process_data():
from .nosql import get_nosql
while True:
data = get_nosql().websub_getFirstPostData()
if not data:
blob = get_nosql().websub_getFirstPostData()
if not blob:
break
_id, data = data
_id, data = blob
parsed = do_parse_data(data)
if not parsed:
get_nosql().websub_deletePostProcessing(_id)
state, channelId, videoId = parsed
if parsed:
state, channelId, videoId = parsed
# do things
if state == 'added':
if not get_nosql().check_exists(videoId): # if video not exists
get_nosql().queue_insertQueue(videoId, 'WebSub')
elif state == 'removed':
pass
get_nosql().websub_deletePostProcessing(_id)
@shared_task()
@ -112,7 +115,7 @@ def websub_renew_expiring(hours=6):
# limit amount of subscribe requests to spread out the requests over time
count = count + 1
if count >= 16:
if count >= 100:
break
##########################################

View File

@ -66,11 +66,11 @@
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.posters') }}">
<a href="{{ url_for('admin.queue') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Posters</span>
<p class="grey-text">Extension posters</p>
<span class="card-title">Queue</span>
<p class="grey-text">Video download queue and API access</p>
</div>
</div>
</a>

View File

@ -1,25 +1,38 @@
{% extends 'material_base.html' %}
{% block title %}Posters administration page{% endblock %}
{% block description %}Posters administration page of the AYTA system{% endblock %}
{% block title %}Queue administration page{% endblock %}
{% block description %}Queue administration page of the AYTA system{% endblock %}
{% block content %}
<div class="row">
<div class="col s12 l11">
<h4>Posters administration page</h4>
</div>
<div class="col s12 l1 m-5">
<form method="POST">
<input title="Prunes all deleted endpoints, but keeps last 3 days" type="submit" value="clean-retired" name="task">
</form>
<div class="col s12">
<h4>Queue administration page</h4>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Poster options</h5>
<h5>Options</h5>
</div>
</div>
<div class="row">
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Direct actions</span>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 red" type="submit" name="task" value="empty-queue">Empty Queue</button>
<br>
<span class="supporting-text">Removes all queued ids</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2" type="submit" name="task" value="clean-retired">Clean retired</button>
<br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form>
</div>
</div>
</div>
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">