Compare commits

..

7 Commits

Author SHA1 Message Date
Ventilaar
729b24debb Task routing
All checks were successful
Update worker server / build-and-publish (release) Successful in 19s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-24 20:49:17 +01:00
Ventilaar
20e5793cd8 Performance and cleanup
All checks were successful
Update worker server / build-and-publish (release) Successful in 18s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-23 15:57:36 +01:00
Ventilaar
282b895170 Bug and performance fix
All checks were successful
Generate docker image / build-and-publish (release) Successful in 19s
Update worker server / build-and-publish (release) Successful in 10s
2025-01-23 14:52:15 +01:00
Ventilaar
38f6f04260 Fix None iterable and add new background task
All checks were successful
Update worker server / build-and-publish (release) Successful in 22s
Generate docker image / build-and-publish (release) Successful in 57s
2025-01-23 14:21:01 +01:00
Ventilaar
43e6c00787 idk
All checks were successful
Update worker server / build-and-publish (release) Successful in 18s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-21 20:49:40 +01:00
Ventilaar
d42030dcbc Small concurrency and logging fix
All checks were successful
Update worker server / build-and-publish (release) Successful in 10s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-19 13:27:09 +01:00
Ventilaar
5530179558 Small fixup
All checks were successful
Update worker server / build-and-publish (release) Successful in 11s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-18 23:39:32 +01:00
10 changed files with 142 additions and 45 deletions

View File

@@ -26,6 +26,15 @@ def create_app(test_config=None):
config['CELERY']['beat_schedule'] = {} config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000} config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100} config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 50000}
# Celery task routing
# Tasks not defined in this configuration will be routed to the default queue "celery"
config['CELERY']['task_routes'] = {
'ayta.tasks.video_download': {'queue': 'download'},
'ayta.tasks.video_queue': {'queue': 'download'}
}
app = Flask(__name__) app = Flask(__name__)
app.config.from_mapping(config) app.config.from_mapping(config)

View File

@@ -2,7 +2,7 @@ from flask import Blueprint, render_template, request, redirect, url_for, flash,
from ..nosql import get_nosql from ..nosql import get_nosql
from ..dlp import checkChannelId, getChannelInfo from ..dlp import checkChannelId, getChannelInfo
from ..decorators import login_required from ..decorators import login_required
from ..tasks import test_sleep, websub_subscribe_callback, websub_unsubscribe_callback, video_download, video_queue from ..tasks import test_sleep, websub_subscribe_callback, websub_unsubscribe_callback, video_download, video_queue, playlist_to_queue
from datetime import datetime from datetime import datetime
from secrets import token_urlsafe from secrets import token_urlsafe
@@ -30,28 +30,35 @@ def channels():
generic = {} generic = {}
if request.method == 'POST': if request.method == 'POST':
channelId = request.form.get('channel_id', None) task = request.form.get('task', None)
originalName = request.form.get('original_name', None)
addedDate = request.form.get('added_date', None)
### add some validation if task == 'subscribe-websub':
channelId = request.form.get('channel_id', None)
originalName = request.form.get('original_name', None)
addedDate = request.form.get('added_date', None)
addedDate = datetime.strptime(addedDate, '%Y-%m-%d') ### add some validation
if checkChannelId(channelId) is False: addedDate = datetime.strptime(addedDate, '%Y-%m-%d')
channelId, originalName = getChannelInfo(channelId, ('channel_id', 'uploader'))
if not get_nosql().insert_new_channel(channelId, originalName, addedDate): if checkChannelId(channelId) is False:
flash('Error inserting new channel, you probably made a mistake somewhere') channelId, originalName = getChannelInfo(channelId, ('channel_id', 'uploader'))
return redirect(url_for('admin.channels'))
return redirect(url_for('admin.channel', channelId=channelId)) if not get_nosql().insert_new_channel(channelId, originalName, addedDate):
flash('Error inserting new channel, you probably made a mistake somewhere')
return redirect(url_for('admin.channels'))
return redirect(url_for('admin.channel', channelId=channelId))
elif task == 'playlist-queue':
task = playlist_to_queue.delay()
flash(f'Task playlist-queue has been queued: {task.id}')
generic['currentDate'] = datetime.utcnow() generic['currentDate'] = datetime.utcnow()
channelIds = get_nosql().list_all_channels() channelIds = get_nosql().list_all_channels()
for channelId in channelIds: for channelId in channelIds:
channels[channelId] = get_nosql().get_channel_info(channelId) channels[channelId] = get_nosql().get_channel_info(channelId, limited=True)
channels[channelId]['video_count'] = get_nosql().get_channel_videos_count(channelId) channels[channelId]['video_count'] = get_nosql().get_channel_videos_count(channelId)
return render_template('admin/channels.html', channels=channels, generic=generic) return render_template('admin/channels.html', channels=channels, generic=generic)
@@ -209,12 +216,11 @@ def queue():
get_nosql().queue_emptyQueue() get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied') flash(f'Queue has been emptied')
elif task == 'run-download':
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
elif task == 'queue-run-once': elif task == 'queue-run-once':
video_queue.delay() value = int(value) if value.isdigit() else 1
for x in range(value):
task = video_queue.delay()
flash(f'Task has been started on the oldest queued item: {task.id}')
return redirect(url_for('admin.queue')) return redirect(url_for('admin.queue'))
@@ -252,7 +258,7 @@ def users():
return render_template('admin/users.html', users=users) return render_template('admin/users.html', users=users)
@bp.route('/workers', methods=['GET', 'POST']) @bp.route('/workers', methods=['GET', 'POST'])
#@login_required @login_required
def workers(): def workers():
if request.method == 'POST': if request.method == 'POST':
task = request.form.get('task', None) task = request.form.get('task', None)
@@ -262,4 +268,5 @@ def workers():
celery = current_app.extensions.get('celery') celery = current_app.extensions.get('celery')
tasks = celery.control.inspect().active() tasks = celery.control.inspect().active()
return render_template('admin/workers.html', tasks=tasks) reserved = celery.control.inspect().reserved()
return render_template('admin/workers.html', tasks=tasks, reserved=reserved)

View File

@@ -11,7 +11,7 @@ def base():
channelIds = get_nosql().list_all_channels() channelIds = get_nosql().list_all_channels()
for channelId in channelIds: for channelId in channelIds:
channel = get_nosql().get_channel_info(channelId) channel = get_nosql().get_channel_info(channelId, limited=True)
channel['video_count'] = get_nosql().get_channel_videos_count(channelId) channel['video_count'] = get_nosql().get_channel_videos_count(channelId)
channels.append(channel) channels.append(channel)

View File

@@ -168,8 +168,12 @@ class Mango:
ids.append(video['id']) ids.append(video['id'])
return tuple(ids) return tuple(ids)
def get_channel_info(self, channelId): def get_channel_info(self, channelId, limited=False):
return self.channels.find_one({'id': channelId}) projection = {}
if limited:
projection['playlist'] = 0
return self.channels.find_one({'id': channelId}, projection)
def update_channel_key(self, channelId, key, value): def update_channel_key(self, channelId, key, value):
@@ -411,13 +415,16 @@ class Mango:
########################################## ##########################################
def queue_insertQueue(self, videoId, endpointId=None): def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists # if already queued
if not self.download_queue.count_documents({'id': videoId}) >= 1: if self.download_queue.count_documents({'id': videoId}) >= 1:
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id return False
return True
# key already in queue # if already in archive
return False if self.check_exists(videoId):
return False
# add to queue
return self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
def queue_deleteQueue(self, videoId): def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}): if self.download_queue.delete_one({'id': videoId}):
@@ -430,18 +437,21 @@ class Mango:
def queue_emptyQueue(self): def queue_emptyQueue(self):
return self.download_queue.delete_many({}) return self.download_queue.delete_many({})
def queue_setFailed(self, videoId):
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}})
def queue_getNext(self): def queue_getNext(self):
""" Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet.""" """ Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""
while True: while True:
queueItem = self.download_queue.find_one({}) queueItem = self.download_queue.find_one({'status': 'queued'})
if not queueItem: if not queueItem:
return None return None
elif self.check_exists(self, queueItem['id']): elif self.check_exists(queueItem['id']):
self.queue_deleteQueue(queueItem['id']) self.queue_deleteQueue(queueItem['id'])
continue
self.download_queue.update_one({'id': queueItem['id']}, {'$set': {'status': 'working'}})
return queueItem return queueItem
########################################## ##########################################
# HELPER FUNCTIONS # # HELPER FUNCTIONS #
########################################## ##########################################

View File

@@ -4,6 +4,7 @@ class OIDC():
Additionally this class provides the function to generate redirect url's and check bearer tokens on their validity as well as caching jwt signing keys. Additionally this class provides the function to generate redirect url's and check bearer tokens on their validity as well as caching jwt signing keys.
Fairly barebones and should be 100% secure. (famous last words) Fairly barebones and should be 100% secure. (famous last words)
This is made for form posted JWT's. While not the most secure it is the most easy way to implement. Moving on to a code based solution might be preferred in the future. This is made for form posted JWT's. While not the most secure it is the most easy way to implement. Moving on to a code based solution might be preferred in the future.
The nonce and state store is in memory, so only one instance can be used at a time until central key caching is implemented.
""" """
def __init__(self, app=None): def __init__(self, app=None):
self.states = {} self.states = {}
@@ -151,7 +152,6 @@ class OIDC():
# Any exception (invalid JWT, invalid formatting etc...) must return False # Any exception (invalid JWT, invalid formatting etc...) must return False
except Exception as e: except Exception as e:
print(e, flush=True)
return False return False
# Double check if given token is really requested by us by matching the nonce in the signed key # Double check if given token is really requested by us by matching the nonce in the signed key

View File

@@ -42,6 +42,7 @@ def video_queue():
get_nosql().queue_deleteQueue(videoId) get_nosql().queue_deleteQueue(videoId)
return True return True
else: else:
get_nosql().queue_setFailed(videoId)
return False return False
@shared_task() @shared_task()
@@ -163,6 +164,28 @@ def websub_renew_expiring(hours=6):
if count >= 256: if count >= 256:
break break
@shared_task()
def playlist_to_queue():
from .nosql import get_nosql
import random
from datetime import datetime, timedelta
pivot = datetime.utcnow() - timedelta(days=3) # calculates 3 days before now
channels = list(get_nosql().list_all_channels(active=True))
random.shuffle(channels) # randomize channelId order because otherwise the queue will follow the channel order as well
for channel in channels:
info = get_nosql().get_channel_info(channel)
# if last_run not set or last_run is older than the pivot (indicating it has not been updated)
if not info.get('last_run') or info.get('last_run') < pivot:
# skip channel
continue
for item in info['playlist']['entries']:
videoId = item['id']
get_nosql().queue_insertQueue(videoId, 'Playlist mirroring')
########################################## ##########################################
# TASK MODULES # # TASK MODULES #
########################################## ##########################################

View File

@@ -15,6 +15,18 @@
</div> </div>
</div> </div>
<div class="row"> <div class="row">
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Direct actions</span>
<form class="mt-4" method="post">
<button class="btn mb-2 green" type="submit" name="task" value="playlist-queue">Playlist to Queue</button>
<br>
<span class="supporting-text">Forcerun playlist to queue task</span>
</form>
</div>
</div>
</div>
<div class="col s12 l4 m-4"> <div class="col s12 l4 m-4">
<div class="card"> <div class="card">
<div class="card-content"> <div class="card-content">
@@ -38,7 +50,7 @@
}); });
</script> </script>
</div> </div>
<button class="btn mt-4" type="submit" name="action" value="add_channel">Add</button> <button class="btn mt-4" type="submit" name="task" value="add_channel">Add</button>
</form> </form>
</div> </div>
</div> </div>

View File

@@ -29,7 +29,8 @@
<br> <br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span> <span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form> </form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');"> <form class="mt-4 input-field" method="post" onsubmit="return confirm('Are you sure?');">
<input type="number" style="width: 80px" value="1" name="value" min="1" max="99">
<button class="btn mb-2 green" type="submit" name="task" value="queue-run-once">Download oldest queued</button> <button class="btn mb-2 green" type="submit" name="task" value="queue-run-once">Download oldest queued</button>
<br> <br>
<span class="supporting-text">Will download the oldest queued video ID</span> <span class="supporting-text">Will download the oldest queued video ID</span>
@@ -151,11 +152,11 @@
<td> <td>
<form method="post"> <form method="post">
<input type="text" value="{{ id.get('id') }}" name="value" hidden> <input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') != 'queued' %}disabled{% endif %}>🗑️</button> <button class="btn-small waves-effect waves-light" type="submit" name="task" value="delete-queue" title="Delete from queue" {% if id.get('status') == 'working' %}disabled{% endif %}>🗑️</button>
</form> </form>
<form method="post"> <form method="post">
<input type="text" value="{{ id.get('id') }}" name="value" hidden> <input type="text" value="{{ id.get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="run-download" title="Run download task" disabled}>⏩</button> <button class="btn-small waves-effect waves-light" type="submit" name="task" value="run-download" title="Run download task" disabled>⏩</button>
<!-- This function fill not work until the download queue and video download process is rewritten --> <!-- This function fill not work until the download queue and video download process is rewritten -->
</form> </form>
</td> </td>

View File

@@ -19,8 +19,40 @@
</form> </form>
<div class="divider"></div> <div class="divider"></div>
<div class="row"> <div class="row">
<div class="col s12"> <div class="col s12 m-4">
<h6>Current workers</h6> <h5>Reserved tasks per worker</h5>
<p>Usually 4 tasks per worker</p>
{% if reserved is none %}
<h6>No workers with reserved tasks, are there any workers with stuck tasks or are they even online?</h6>
{% else %}
{% for worker in reserved %}
<span>{{ worker }}</span>
<table class="striped highlight responsive-table" style=" border: 1px solid black;">
<thead>
<tr>
<th>ID</th>
<th>Task</th>
<th>Arguments</th>
</tr>
</thead>
<tbody>
{% for task in reserved[worker] %}
<tr>
<td>{{ task.get('id') }}</td>
<td>{{ task.get('name') }}</td>
<td>{{ task.get('args') }} {{ task.get('kwargs') }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endfor %}
{% endif %}
</div>
<div class="col s12 m-4">
<h5>Current workers and processing tasks</h5>
{% if tasks is none %}
<h6>No workers with running tasks, are there any workers with stuck tasks or are they even online?</h6>
{% else %}
{% for worker in tasks %} {% for worker in tasks %}
<span>{{ worker }}</span> <span>{{ worker }}</span>
<table class="striped highlight responsive-table" style=" border: 1px solid black;"> <table class="striped highlight responsive-table" style=" border: 1px solid black;">
@@ -35,13 +67,14 @@
{% for task in tasks[worker] %} {% for task in tasks[worker] %}
<tr> <tr>
<td>{{ task.get('id') }}</td> <td>{{ task.get('id') }}</td>
<td>{{ task.get('type') }}</td> <td>{{ task.get('name') }}</td>
<td>{{ task.get('time_start')|epoch_time }}</td> <td>{{ task.get('time_start')|epoch_time }}</td>
</tr> </tr>
{% endfor %} {% endfor %}
</tbody> </tbody>
</table> </table>
{% endfor %} {% endfor %}
{% endif %}
</div> </div>
</div> </div>
{% endblock %} {% endblock %}

View File

@@ -3,6 +3,8 @@
flask flask
flask-caching flask-caching
flask-limiter flask-limiter
flask-sqlalchemy
flask-migrate
pymongo pymongo
yt-dlp yt-dlp
gunicorn gunicorn