You've already forked amazing-ytdlp-archive
Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
89ce9b1c0a | ||
![]() |
729b24debb | ||
![]() |
20e5793cd8 | ||
![]() |
282b895170 |
@@ -26,7 +26,15 @@ def create_app(test_config=None):
|
||||
config['CELERY']['beat_schedule'] = {}
|
||||
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
|
||||
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
|
||||
config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 5000}
|
||||
config['CELERY']['beat_schedule']['Queue up new videos in static channel playlists'] = {'task': 'ayta.tasks.playlist_to_queue', 'schedule': 50000}
|
||||
|
||||
# Celery task routing
|
||||
# Tasks not defined in this configuration will be routed to the default queue "celery"
|
||||
|
||||
config['CELERY']['task_routes'] = {
|
||||
'ayta.tasks.video_download': {'queue': 'download'},
|
||||
'ayta.tasks.video_queue': {'queue': 'download'}
|
||||
}
|
||||
|
||||
app = Flask(__name__)
|
||||
app.config.from_mapping(config)
|
||||
|
@@ -32,7 +32,7 @@ def channels():
|
||||
if request.method == 'POST':
|
||||
task = request.form.get('task', None)
|
||||
|
||||
if task == 'subscribe-websub':
|
||||
if task == 'add_channel':
|
||||
channelId = request.form.get('channel_id', None)
|
||||
originalName = request.form.get('original_name', None)
|
||||
addedDate = request.form.get('added_date', None)
|
||||
@@ -51,14 +51,14 @@ def channels():
|
||||
return redirect(url_for('admin.channel', channelId=channelId))
|
||||
|
||||
elif task == 'playlist-queue':
|
||||
task = playlist_to_queue.delay()
|
||||
task = playlist_to_queue.delay()
|
||||
flash(f'Task playlist-queue has been queued: {task.id}')
|
||||
|
||||
generic['currentDate'] = datetime.utcnow()
|
||||
channelIds = get_nosql().list_all_channels()
|
||||
|
||||
for channelId in channelIds:
|
||||
channels[channelId] = get_nosql().get_channel_info(channelId)
|
||||
channels[channelId] = get_nosql().get_channel_info(channelId, limited=True)
|
||||
channels[channelId]['video_count'] = get_nosql().get_channel_videos_count(channelId)
|
||||
|
||||
return render_template('admin/channels.html', channels=channels, generic=generic)
|
||||
@@ -163,6 +163,10 @@ def reports():
|
||||
get_nosql().close_report(value)
|
||||
flash(f'Report closed {value}')
|
||||
return redirect(url_for('admin.reports'))
|
||||
elif task == 'clean-closed':
|
||||
get_nosql().report_clean()
|
||||
flash(f'Cleaned closed reports older than 30 days')
|
||||
return redirect(url_for('admin.reports'))
|
||||
|
||||
reports = get_nosql().list_reports()
|
||||
|
||||
|
@@ -11,7 +11,7 @@ def base():
|
||||
channelIds = get_nosql().list_all_channels()
|
||||
|
||||
for channelId in channelIds:
|
||||
channel = get_nosql().get_channel_info(channelId)
|
||||
channel = get_nosql().get_channel_info(channelId, limited=True)
|
||||
channel['video_count'] = get_nosql().get_channel_videos_count(channelId)
|
||||
channels.append(channel)
|
||||
|
||||
|
@@ -168,8 +168,12 @@ class Mango:
|
||||
ids.append(video['id'])
|
||||
return tuple(ids)
|
||||
|
||||
def get_channel_info(self, channelId):
|
||||
return self.channels.find_one({'id': channelId})
|
||||
def get_channel_info(self, channelId, limited=False):
|
||||
projection = {}
|
||||
if limited:
|
||||
projection['playlist'] = 0
|
||||
|
||||
return self.channels.find_one({'id': channelId}, projection)
|
||||
|
||||
|
||||
def update_channel_key(self, channelId, key, value):
|
||||
@@ -243,6 +247,10 @@ class Mango:
|
||||
def close_report(self, _id):
|
||||
_id = ObjectId(_id)
|
||||
return self.reports.update_one({'_id': _id}, {'$set': {'status': 'closed', 'closing_time': current_time(object=True)}})
|
||||
|
||||
def report_clean(self, keep=30):
|
||||
days = self.datetime.utcnow() - self.timedelta(days=keep)
|
||||
self.reports.delete_many({'status': 'closed', 'closing_time': {'$lt': days}})
|
||||
|
||||
##########################################
|
||||
# RUNLOG FUNCTIONS #
|
||||
@@ -420,8 +428,7 @@ class Mango:
|
||||
return False
|
||||
|
||||
# add to queue
|
||||
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
|
||||
return True
|
||||
return self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
|
||||
|
||||
def queue_deleteQueue(self, videoId):
|
||||
if self.download_queue.delete_one({'id': videoId}):
|
||||
@@ -434,8 +441,8 @@ class Mango:
|
||||
def queue_emptyQueue(self):
|
||||
return self.download_queue.delete_many({})
|
||||
|
||||
def queue_setFailed(self, videoId):
|
||||
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}})
|
||||
def queue_setFailed(self, videoId, reason=None):
|
||||
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed', 'fail_reason': reason}})
|
||||
|
||||
def queue_getNext(self):
|
||||
""" Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""
|
||||
@@ -445,6 +452,7 @@ class Mango:
|
||||
return None
|
||||
elif self.check_exists(queueItem['id']):
|
||||
self.queue_deleteQueue(queueItem['id'])
|
||||
continue
|
||||
self.download_queue.update_one({'id': queueItem['id']}, {'$set': {'status': 'working'}})
|
||||
return queueItem
|
||||
|
||||
|
@@ -21,8 +21,8 @@ def video_download(videoId):
|
||||
process = subprocess.run(['/usr/local/bin/yt-dlp', '--config-location', '/var/www/archive.ventilaar.net/goodstuff/config_video.conf', '--', f'https://www.youtube.com/watch?v={videoId}'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
|
||||
|
||||
if process.returncode != 0:
|
||||
return False
|
||||
return True
|
||||
return (False, process.stdout)
|
||||
return (True, None)
|
||||
|
||||
@shared_task()
|
||||
def video_queue():
|
||||
@@ -38,11 +38,13 @@ def video_queue():
|
||||
else:
|
||||
return None
|
||||
|
||||
if video_download(videoId):
|
||||
status, reason = video_download(videoId)
|
||||
|
||||
if status:
|
||||
get_nosql().queue_deleteQueue(videoId)
|
||||
return True
|
||||
else:
|
||||
get_nosql().queue_setFailed(videoId)
|
||||
get_nosql().queue_setFailed(videoId, reason)
|
||||
return False
|
||||
|
||||
@shared_task()
|
||||
@@ -165,13 +167,22 @@ def websub_renew_expiring(hours=6):
|
||||
break
|
||||
|
||||
@shared_task()
|
||||
def playlist_to_queue(hours=6):
|
||||
def playlist_to_queue():
|
||||
from .nosql import get_nosql
|
||||
import random
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
channels = get_nosql().list_all_channels(active=True)
|
||||
pivot = datetime.utcnow() - timedelta(days=3) # calculates 3 days before now
|
||||
channels = list(get_nosql().list_all_channels(active=True))
|
||||
random.shuffle(channels) # randomize channelId order because otherwise the queue will follow the channel order as well
|
||||
|
||||
for channel in channels:
|
||||
info = get_nosql().get_channel_info()
|
||||
info = get_nosql().get_channel_info(channel)
|
||||
|
||||
# if last_run not set or last_run is older than the pivot (indicating it has not been updated)
|
||||
if not info.get('last_run') or info.get('last_run') < pivot:
|
||||
# skip channel
|
||||
continue
|
||||
|
||||
for item in info['playlist']['entries']:
|
||||
videoId = item['id']
|
||||
|
@@ -144,6 +144,7 @@
|
||||
<th>endpoint</th>
|
||||
<th>status</th>
|
||||
<th>created_time</th>
|
||||
<th>fail_reason</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
@@ -164,6 +165,7 @@
|
||||
<td>{{ id.get('endpoint') }}</td>
|
||||
<td>{{ id.get('status') }}</td>
|
||||
<td>{{ id.get('created_time') }}</td>
|
||||
<td>{{ id.get('faiL_reason') }}</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
|
Reference in New Issue
Block a user