Compare commits

...

2 Commits

Author SHA1 Message Date
Ventilaar
729b24debb Task routing
All checks were successful
Update worker server / build-and-publish (release) Successful in 19s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-24 20:49:17 +01:00
Ventilaar
20e5793cd8 Performance and cleanup
All checks were successful
Update worker server / build-and-publish (release) Successful in 18s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-23 15:57:36 +01:00
5 changed files with 23 additions and 7 deletions

View File

@@ -26,7 +26,15 @@ def create_app(test_config=None):
config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 5000}
config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 50000}
# Celery task routing
# Tasks not defined in this configuration will be routed to the default queue "celery"
config['CELERY']['task_routes'] = {
'ayta.tasks.video_download': {'queue': 'download'},
'ayta.tasks.video_queue': {'queue': 'download'}
}
app = Flask(__name__)
app.config.from_mapping(config)

View File

@@ -51,7 +51,7 @@ def channels():
return redirect(url_for('admin.channel', channelId=channelId))
elif task == 'playlist-queue':
task = playlist_to_queue.delay()
task = playlist_to_queue.delay()
flash(f'Task playlist-queue has been queued: {task.id}')
generic['currentDate'] = datetime.utcnow()

View File

@@ -11,7 +11,7 @@ def base():
channelIds = get_nosql().list_all_channels()
for channelId in channelIds:
channel = get_nosql().get_channel_info(channelId)
channel = get_nosql().get_channel_info(channelId, limited=True)
channel['video_count'] = get_nosql().get_channel_videos_count(channelId)
channels.append(channel)

View File

@@ -424,8 +424,7 @@ class Mango:
return False
# add to queue
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
return self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}):

View File

@@ -165,14 +165,23 @@ def websub_renew_expiring(hours=6):
break
@shared_task()
def playlist_to_queue(hours=6):
def playlist_to_queue():
from .nosql import get_nosql
import random
from datetime import datetime, timedelta
channels = get_nosql().list_all_channels(active=True)
pivot = datetime.utcnow() - timedelta(days=3) # calculates 3 days before now
channels = list(get_nosql().list_all_channels(active=True))
random.shuffle(channels) # randomize channelId order because otherwise the queue will follow the channel order as well
for channel in channels:
info = get_nosql().get_channel_info(channel)
# if last_run not set or last_run is older than the pivot (indicating it has not been updated)
if not info.get('last_run') or info.get('last_run') < pivot:
# skip channel
continue
for item in info['playlist']['entries']:
videoId = item['id']
get_nosql().queue_insertQueue(videoId, 'Playlist mirroring')