Compare commits

...

4 Commits

Author SHA1 Message Date
Ventilaar
46e5d8bb02 fuck
All checks were successful
Update worker server / build-and-publish (release) Successful in 19s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-29 22:23:41 +01:00
Ventilaar
89ce9b1c0a Add error reporting and fix channel add
All checks were successful
Update worker server / build-and-publish (release) Successful in 33s
Generate docker image / build-and-publish (release) Successful in 1m4s
2025-01-29 19:36:06 +01:00
Ventilaar
729b24debb Task routing
All checks were successful
Update worker server / build-and-publish (release) Successful in 19s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-24 20:49:17 +01:00
Ventilaar
20e5793cd8 Performance and cleanup
All checks were successful
Update worker server / build-and-publish (release) Successful in 18s
Generate docker image / build-and-publish (release) Successful in 20s
2025-01-23 15:57:36 +01:00
6 changed files with 42 additions and 14 deletions

View File

@@ -26,7 +26,15 @@ def create_app(test_config=None):
config['CELERY']['beat_schedule'] = {} config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000} config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100} config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 5000} config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 50000}
# Celery task routing
# Tasks not defined in this configuration will be routed to the default queue "celery"
config['CELERY']['task_routes'] = {
'ayta.tasks.video_download': {'queue': 'download'},
'ayta.tasks.video_queue': {'queue': 'download'}
}
app = Flask(__name__) app = Flask(__name__)
app.config.from_mapping(config) app.config.from_mapping(config)

View File

@@ -32,7 +32,7 @@ def channels():
if request.method == 'POST': if request.method == 'POST':
task = request.form.get('task', None) task = request.form.get('task', None)
if task == 'subscribe-websub': if task == 'add_channel':
channelId = request.form.get('channel_id', None) channelId = request.form.get('channel_id', None)
originalName = request.form.get('original_name', None) originalName = request.form.get('original_name', None)
addedDate = request.form.get('added_date', None) addedDate = request.form.get('added_date', None)
@@ -51,7 +51,7 @@ def channels():
return redirect(url_for('admin.channel', channelId=channelId)) return redirect(url_for('admin.channel', channelId=channelId))
elif task == 'playlist-queue': elif task == 'playlist-queue':
task = playlist_to_queue.delay() task = playlist_to_queue.delay()
flash(f'Task playlist-queue has been queued: {task.id}') flash(f'Task playlist-queue has been queued: {task.id}')
generic['currentDate'] = datetime.utcnow() generic['currentDate'] = datetime.utcnow()
@@ -163,6 +163,10 @@ def reports():
get_nosql().close_report(value) get_nosql().close_report(value)
flash(f'Report closed {value}') flash(f'Report closed {value}')
return redirect(url_for('admin.reports')) return redirect(url_for('admin.reports'))
elif task == 'clean-closed':
get_nosql().report_clean()
flash(f'Cleaned closed reports older than 30 days')
return redirect(url_for('admin.reports'))
reports = get_nosql().list_reports() reports = get_nosql().list_reports()

View File

@@ -11,7 +11,7 @@ def base():
channelIds = get_nosql().list_all_channels() channelIds = get_nosql().list_all_channels()
for channelId in channelIds: for channelId in channelIds:
channel = get_nosql().get_channel_info(channelId) channel = get_nosql().get_channel_info(channelId, limited=True)
channel['video_count'] = get_nosql().get_channel_videos_count(channelId) channel['video_count'] = get_nosql().get_channel_videos_count(channelId)
channels.append(channel) channels.append(channel)

View File

@@ -247,6 +247,10 @@ class Mango:
def close_report(self, _id): def close_report(self, _id):
_id = ObjectId(_id) _id = ObjectId(_id)
return self.reports.update_one({'_id': _id}, {'$set': {'status': 'closed', 'closing_time': current_time(object=True)}}) return self.reports.update_one({'_id': _id}, {'$set': {'status': 'closed', 'closing_time': current_time(object=True)}})
def report_clean(self, keep=30):
days = self.datetime.utcnow() - self.timedelta(days=keep)
self.reports.delete_many({'status': 'closed', 'closing_time': {'$lt': days}})
########################################## ##########################################
# RUNLOG FUNCTIONS # # RUNLOG FUNCTIONS #
@@ -424,8 +428,7 @@ class Mango:
return False return False
# add to queue # add to queue
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id return self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
def queue_deleteQueue(self, videoId): def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}): if self.download_queue.delete_one({'id': videoId}):
@@ -438,8 +441,8 @@ class Mango:
def queue_emptyQueue(self): def queue_emptyQueue(self):
return self.download_queue.delete_many({}) return self.download_queue.delete_many({})
def queue_setFailed(self, videoId): def queue_setFailed(self, videoId, reason=None):
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}}) return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed', 'fail_reason': reason}})
def queue_getNext(self): def queue_getNext(self):
""" Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet.""" """ Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""

View File

@@ -21,8 +21,8 @@ def video_download(videoId):
process = subprocess.run(['/usr/local/bin/yt-dlp', '--config-location', '/var/www/archive.ventilaar.net/goodstuff/config_video.conf', '--', f'https://www.youtube.com/watch?v={videoId}'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) process = subprocess.run(['/usr/local/bin/yt-dlp', '--config-location', '/var/www/archive.ventilaar.net/goodstuff/config_video.conf', '--', f'https://www.youtube.com/watch?v={videoId}'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if process.returncode != 0: if process.returncode != 0:
return False return (False, process.stdout)
return True return (True, None)
@shared_task() @shared_task()
def video_queue(): def video_queue():
@@ -38,11 +38,13 @@ def video_queue():
else: else:
return None return None
if video_download(videoId): status, reason = video_download(videoId)
if status:
get_nosql().queue_deleteQueue(videoId) get_nosql().queue_deleteQueue(videoId)
return True return True
else: else:
get_nosql().queue_setFailed(videoId) get_nosql().queue_setFailed(videoId, reason)
return False return False
@shared_task() @shared_task()
@@ -165,14 +167,23 @@ def websub_renew_expiring(hours=6):
break break
@shared_task() @shared_task()
def playlist_to_queue(hours=6): def playlist_to_queue():
from .nosql import get_nosql from .nosql import get_nosql
import random
from datetime import datetime, timedelta
channels = get_nosql().list_all_channels(active=True) pivot = datetime.utcnow() - timedelta(days=3) # calculates 3 days before now
channels = list(get_nosql().list_all_channels(active=True))
random.shuffle(channels) # randomize channelId order because otherwise the queue will follow the channel order as well
for channel in channels: for channel in channels:
info = get_nosql().get_channel_info(channel) info = get_nosql().get_channel_info(channel)
# if last_run not set or last_run is older than the pivot (indicating it has not been updated)
if not info.get('last_run') or info.get('last_run') < pivot:
# skip channel
continue
for item in info['playlist']['entries']: for item in info['playlist']['entries']:
videoId = item['id'] videoId = item['id']
get_nosql().queue_insertQueue(videoId, 'Playlist mirroring') get_nosql().queue_insertQueue(videoId, 'Playlist mirroring')

View File

@@ -144,6 +144,7 @@
<th>endpoint</th> <th>endpoint</th>
<th>status</th> <th>status</th>
<th>created_time</th> <th>created_time</th>
<th>fail_reason</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
@@ -164,6 +165,7 @@
<td>{{ id.get('endpoint') }}</td> <td>{{ id.get('endpoint') }}</td>
<td>{{ id.get('status') }}</td> <td>{{ id.get('status') }}</td>
<td>{{ id.get('created_time') }}</td> <td>{{ id.get('created_time') }}</td>
<td><textarea class="info">{{ id.get('fail_reason') }}</textarea></td>
</tr> </tr>
{% endfor %} {% endfor %}
</tbody> </tbody>