Compare commits

..

3 Commits

Author SHA1 Message Date
Ventilaar
d42030dcbc Small concurrency and logging fix
All checks were successful
Update worker server / build-and-publish (release) Successful in 10s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-19 13:27:09 +01:00
Ventilaar
5530179558 Small fixup
All checks were successful
Update worker server / build-and-publish (release) Successful in 11s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-18 23:39:32 +01:00
Ventilaar
1186d236f2 Rework video queue download
All checks were successful
Update worker server / build-and-publish (release) Successful in 11s
Generate docker image / build-and-publish (release) Successful in 14s
2025-01-18 23:29:12 +01:00
5 changed files with 110 additions and 89 deletions

View File

@@ -209,6 +209,10 @@ def queue():
get_nosql().queue_emptyQueue() get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied') flash(f'Queue has been emptied')
elif task == 'queue-run-once':
task = video_queue.delay()
flash(f'Task has been started on the oldest queued item: {task.id}')
return redirect(url_for('admin.queue')) return redirect(url_for('admin.queue'))
endpoints = get_nosql().queue_getEndpoints() endpoints = get_nosql().queue_getEndpoints()
@@ -251,8 +255,6 @@ def workers():
task = request.form.get('task', None) task = request.form.get('task', None)
if task == 'test-sleep': if task == 'test-sleep':
test_sleep.delay() test_sleep.delay()
elif task == 'queue-single':
video_queue.delay()
celery = current_app.extensions.get('celery') celery = current_app.extensions.get('celery')

View File

@@ -411,14 +411,18 @@ class Mango:
########################################## ##########################################
def queue_insertQueue(self, videoId, endpointId=None): def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists # if already queued
if not self.download_queue.count_documents({'id': videoId}) >= 1: if self.download_queue.count_documents({'id': videoId}) >= 1:
return False
# if already in archive
if self.check_exists(videoId):
return False
# add to queue
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True return True
# key already in queue
return False
def queue_deleteQueue(self, videoId): def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}): if self.download_queue.delete_one({'id': videoId}):
return True return True
@@ -430,13 +434,19 @@ class Mango:
def queue_emptyQueue(self): def queue_emptyQueue(self):
return self.download_queue.delete_many({}) return self.download_queue.delete_many({})
def queue_getNext(self, newest=False): def queue_setFailed(self, videoId):
sort = [] return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}})
if newest: def queue_getNext(self):
sort = [( 'created_time', pymongo.DESCENDING )] """ Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""
while True:
return self.download_queue.find_one({}, sort=sort) queueItem = self.download_queue.find_one({'status': 'queued'})
if not queueItem:
return None
elif self.check_exists(queueItem['id']):
self.queue_deleteQueue(queueItem['id'])
self.download_queue.update_one({'id': queueItem['id']}, {'$set': {'status': 'working'}})
return queueItem
########################################## ##########################################
# HELPER FUNCTIONS # # HELPER FUNCTIONS #

View File

@@ -42,6 +42,7 @@ def video_queue():
get_nosql().queue_deleteQueue(videoId) get_nosql().queue_deleteQueue(videoId)
return True return True
else: else:
get_nosql().queue_setFailed(videoId)
return False return False
@shared_task() @shared_task()

View File

@@ -29,7 +29,11 @@
<br> <br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span> <span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form> </form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 green" type="submit" name="task" value="queue-run-once">Download oldest queued</button>
<br>
<span class="supporting-text">Will download the oldest queued video ID</span>
</form>
</div> </div>
</div> </div>
</div> </div>
@@ -149,6 +153,11 @@
<input type="text" value="{{ id.get('id') }}" name="value" hidden> <input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') != 'queued' %}disabled{% endif %}>🗑️</button> <button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') != 'queued' %}disabled{% endif %}>🗑️</button>
</form> </form>
<form method="post">
<input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="run-download" title="Run download task" disabled>⏩</button>
<!-- This function fill not work until the download queue and video download process is rewritten -->
</form>
</td> </td>
<td>{{ id.get('id') }}</td> <td>{{ id.get('id') }}</td>
<td>{{ id.get('endpoint') }}</td> <td>{{ id.get('endpoint') }}</td>

View File

@@ -16,7 +16,6 @@
</div> </div>
<form method="POST"> <form method="POST">
<input title="test-sleep" type="submit" value="test-sleep" name="task"> <input title="test-sleep" type="submit" value="test-sleep" name="task">
<input title="test-sleep" type="submit" value="queue-single" name="task">
</form> </form>
<div class="divider"></div> <div class="divider"></div>
<div class="row"> <div class="row">