Compare commits

..

3 Commits

Author SHA1 Message Date
Ventilaar
43e6c00787 idk
All checks were successful
Update worker server / build-and-publish (release) Successful in 18s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-21 20:49:40 +01:00
Ventilaar
d42030dcbc Small concurrency and logging fix
All checks were successful
Update worker server / build-and-publish (release) Successful in 10s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-19 13:27:09 +01:00
Ventilaar
5530179558 Small fixup
All checks were successful
Update worker server / build-and-publish (release) Successful in 11s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-18 23:39:32 +01:00
6 changed files with 58 additions and 25 deletions

View File

@@ -209,12 +209,11 @@ def queue():
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
elif task == 'run-download':
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
elif task == 'queue-run-once':
video_queue.delay()
value = int(value) if value.isdigit() else 1
for x in range(value):
task = video_queue.delay()
flash(f'Task has been started on the oldest queued item: {task.id}')
return redirect(url_for('admin.queue'))
@@ -252,7 +251,7 @@ def users():
return render_template('admin/users.html', users=users)
@bp.route('/workers', methods=['GET', 'POST'])
#@login_required
@login_required
def workers():
if request.method == 'POST':
task = request.form.get('task', None)
@@ -262,4 +261,5 @@ def workers():
celery = current_app.extensions.get('celery')
tasks = celery.control.inspect().active()
return render_template('admin/workers.html', tasks=tasks)
reserved = celery.control.inspect().reserved()
return render_template('admin/workers.html', tasks=tasks, reserved=reserved)

View File

@@ -411,13 +411,17 @@ class Mango:
##########################################
def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists
if not self.download_queue.count_documents({'id': videoId}) >= 1:
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
# key already in queue
return False
# if already queued
if self.download_queue.count_documents({'id': videoId}) >= 1:
return False
# if already in archive
if self.check_exists(videoId):
return False
# add to queue
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}):
@@ -430,18 +434,20 @@ class Mango:
def queue_emptyQueue(self):
return self.download_queue.delete_many({})
def queue_setFailed(self, videoId):
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}})
def queue_getNext(self):
""" Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""
while True:
queueItem = self.download_queue.find_one({})
queueItem = self.download_queue.find_one({'status': 'queued'})
if not queueItem:
return None
elif self.check_exists(self, queueItem['id']):
elif self.check_exists(queueItem['id']):
self.queue_deleteQueue(queueItem['id'])
self.download_queue.update_one({'id': queueItem['id']}, {'$set': {'status': 'working'}})
return queueItem
##########################################
# HELPER FUNCTIONS #
##########################################

View File

@@ -4,6 +4,7 @@ class OIDC():
Additionally this class provides the function to generate redirect url's and check bearer tokens on their validity as well as caching jwt signing keys.
Fairly barebones and should be 100% secure. (famous last words)
This is made for form posted JWT's. While not the most secure it is the most easy way to implement. Moving on to a code based solution might be preferred in the future.
The nonce and state store is in memory, so only one instance can be used at a time until central key caching is implemented.
"""
def __init__(self, app=None):
self.states = {}
@@ -151,7 +152,6 @@ class OIDC():
# Any exception (invalid JWT, invalid formatting etc...) must return False
except Exception as e:
print(e, flush=True)
return False
# Double check if given token is really requested by us by matching the nonce in the signed key

View File

@@ -42,6 +42,7 @@ def video_queue():
get_nosql().queue_deleteQueue(videoId)
return True
else:
get_nosql().queue_setFailed(videoId)
return False
@shared_task()

View File

@@ -29,7 +29,8 @@
<br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<form class="mt-4 input-field" method="post" onsubmit="return confirm('Are you sure?');">
<input type="number" style="width: 80px" value="1" name="value" min="1" max="99">
<button class="btn mb-2 green" type="submit" name="task" value="queue-run-once">Download oldest queued</button>
<br>
<span class="supporting-text">Will download the oldest queued video ID</span>
@@ -151,11 +152,11 @@
<td>
<form method="post">
<input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') != 'queued' %}disabled{% endif %}>🗑️</button>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') == 'working' %}disabled{% endif %}>🗑️</button>
</form>
<form method="post">
<input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="run-download" title="Run download task" disabled}>⏩</button>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="run-download" title="Run download task" disabled>⏩</button>
<!-- This function fill not work until the download queue and video download process is rewritten -->
</form>
</td>

View File

@@ -19,8 +19,33 @@
</form>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h6>Current workers</h6>
<div class="col s12 m-4">
<h5>Reserved tasks per worker</h5>
<p>Usually 4 tasks per worker</p>
{% for worker in reserved %}
<span>{{ worker }}</span>
<table class="striped highlight responsive-table" style=" border: 1px solid black;">
<thead>
<tr>
<th>ID</th>
<th>Task</th>
<th>Arguments</th>
</tr>
</thead>
<tbody>
{% for task in reserved[worker] %}
<tr>
<td>{{ task.get('id') }}</td>
<td>{{ task.get('name') }}</td>
<td>{{ task.get('args') }} {{ task.get('kwargs') }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endfor %}
</div>
<div class="col s12 m-4">
<h5>Current workers and processing tasks</h5>
{% for worker in tasks %}
<span>{{ worker }}</span>
<table class="striped highlight responsive-table" style=" border: 1px solid black;">
@@ -35,7 +60,7 @@
{% for task in tasks[worker] %}
<tr>
<td>{{ task.get('id') }}</td>
<td>{{ task.get('type') }}</td>
<td>{{ task.get('name') }}</td>
<td>{{ task.get('time_start')|epoch_time }}</td>
</tr>
{% endfor %}