Compare commits

...

4 Commits

Author SHA1 Message Date
Ventilaar
d42030dcbc Small concurrency and logging fix
All checks were successful
Update worker server / build-and-publish (release) Successful in 10s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-19 13:27:09 +01:00
Ventilaar
5530179558 Small fixup
All checks were successful
Update worker server / build-and-publish (release) Successful in 11s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-18 23:39:32 +01:00
Ventilaar
1186d236f2 Rework video queue download
All checks were successful
Update worker server / build-and-publish (release) Successful in 11s
Generate docker image / build-and-publish (release) Successful in 14s
2025-01-18 23:29:12 +01:00
Ventilaar
5a4726ac10 Add queue download function
All checks were successful
Update worker server / build-and-publish (release) Successful in 9s
Generate docker image / build-and-publish (release) Successful in 1m3s
2025-01-18 22:20:17 +01:00
4 changed files with 134 additions and 86 deletions

View File

@@ -2,7 +2,7 @@ from flask import Blueprint, render_template, request, redirect, url_for, flash,
from ..nosql import get_nosql
from ..dlp import checkChannelId, getChannelInfo
from ..decorators import login_required
from ..tasks import test_sleep, websub_subscribe_callback, websub_unsubscribe_callback, video_download
from ..tasks import test_sleep, websub_subscribe_callback, websub_unsubscribe_callback, video_download, video_queue
from datetime import datetime
from secrets import token_urlsafe
@@ -209,6 +209,10 @@ def queue():
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
elif task == 'queue-run-once':
task = video_queue.delay()
flash(f'Task has been started on the oldest queued item: {task.id}')
return redirect(url_for('admin.queue'))
endpoints = get_nosql().queue_getEndpoints()

View File

@@ -411,14 +411,18 @@ class Mango:
##########################################
def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists
if not self.download_queue.count_documents({'id': videoId}) >= 1:
# if already queued
if self.download_queue.count_documents({'id': videoId}) >= 1:
return False
# if already in archive
if self.check_exists(videoId):
return False
# add to queue
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
# key already in queue
return False
def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}):
return True
@@ -430,6 +434,20 @@ class Mango:
def queue_emptyQueue(self):
return self.download_queue.delete_many({})
def queue_setFailed(self, videoId):
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}})
def queue_getNext(self):
""" Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""
while True:
queueItem = self.download_queue.find_one({'status': 'queued'})
if not queueItem:
return None
elif self.check_exists(queueItem['id']):
self.queue_deleteQueue(queueItem['id'])
self.download_queue.update_one({'id': queueItem['id']}, {'$set': {'status': 'working'}})
return queueItem
##########################################
# HELPER FUNCTIONS #
##########################################
@@ -454,6 +472,3 @@ def clean_info_json(originalInfo, format='dict'):
else:
print('The requested output format is not supported!')
if __name__ == '__main__':
mango = Mango('mongodb://root:example@192.168.66.140:27017')

View File

@@ -24,6 +24,26 @@ def video_download(videoId):
return False
return True
@shared_task()
def video_queue():
"""
Gets the oldest video ID from the queue and runs video_download() on it.
"""
from .nosql import get_nosql
videoId = get_nosql().queue_getNext()
if videoId:
videoId = videoId['id']
else:
return None
if video_download(videoId):
get_nosql().queue_deleteQueue(videoId)
return True
else:
get_nosql().queue_setFailed(videoId)
return False
@shared_task()
def websub_subscribe_callback(channelId):

View File

@@ -29,7 +29,11 @@
<br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 green" type="submit" name="task" value="queue-run-once">Download oldest queued</button>
<br>
<span class="supporting-text">Will download the oldest queued video ID</span>
</form>
</div>
</div>
</div>
@@ -149,6 +153,11 @@
<input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') != 'queued' %}disabled{% endif %}>🗑️</button>
</form>
<form method="post">
<input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="run-download" title="Run download task" disabled>⏩</button>
<!-- This function fill not work until the download queue and video download process is rewritten -->
</form>
</td>
<td>{{ id.get('id') }}</td>
<td>{{ id.get('endpoint') }}</td>