Compare commits

...

11 Commits

Author SHA1 Message Date
Ventilaar
5a4726ac10 Add queue download function
All checks were successful
Update worker server / build-and-publish (release) Successful in 9s
Generate docker image / build-and-publish (release) Successful in 1m3s
2025-01-18 22:20:17 +01:00
Ventilaar
46bde82d32 Hotfix shared state issue
All checks were successful
Update worker server / build-and-publish (release) Successful in 12s
Generate docker image / build-and-publish (release) Successful in 19s
2024-12-07 14:58:52 +01:00
Ventilaar
6c681d6b07 Uhhh
All checks were successful
Update worker server / build-and-publish (release) Successful in 9s
Generate docker image / build-and-publish (release) Successful in 49s
2024-12-05 22:20:55 +01:00
Ventilaar
0d5d233e90 Cleanup and documentation
All checks were successful
Generate docker image / build-and-publish (release) Successful in 19s
Update worker server / build-and-publish (release) Successful in 20s
2024-12-05 22:15:42 +01:00
Ventilaar
548a4860fc it was google!
All checks were successful
Generate docker image / build-and-publish (release) Successful in 55s
Update worker server / build-and-publish (release) Successful in 10s
2024-10-15 16:23:43 +02:00
Ventilaar
da333ab4f6 lets hope it was a fluke
Some checks failed
Generate docker image / build-and-publish (release) Failing after 27s
Update worker server / build-and-publish (release) Successful in 11s
2024-10-15 16:20:44 +02:00
Ventilaar
f2b01033ea compact even more
Some checks failed
Generate docker image / build-and-publish (release) Has been cancelled
Update worker server / build-and-publish (release) Has been cancelled
2024-10-15 16:08:05 +02:00
Ventilaar
49f0ea7481 whyyyy
Some checks failed
Generate docker image / build-and-publish (release) Has been cancelled
Update worker server / build-and-publish (release) Has been cancelled
2024-10-15 16:06:17 +02:00
Ventilaar
f1287a4212 pymongo requires gcc now?
Some checks failed
Generate docker image / build-and-publish (release) Failing after 3m26s
Update worker server / build-and-publish (release) Successful in 9s
2024-10-15 15:59:24 +02:00
Ventilaar
30ea647ca9 Ok, long time no commit. I dont know what ive changed, pray it works
Some checks failed
Update worker server / build-and-publish (release) Successful in 15s
Generate docker image / build-and-publish (release) Failing after 25s
2024-10-15 15:48:09 +02:00
Ventilaar
a7c640a8cf Fix search error, add tombstone 2024-05-04 22:49:50 +02:00
14 changed files with 258 additions and 62 deletions

View File

@@ -1,7 +1,7 @@
FROM python:3-alpine
FROM python:3.12-alpine
WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
COPY . /app
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "ayta:create_app()"]
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "1", "ayta:create_app()"]

View File

@@ -6,7 +6,7 @@ current cronjob yt-dlp archive service.
Partially inspired by [hobune](https://github.com/rebane2001/hobune). While that project is amazingby it's own, it's just not scaleable.
## The idea
Having over 250k videos, scaling the current cronjob yt-dlp archive task is just really hard. Filetypes change, things get partially downloaded and such.
Having over 350k videos, scaling the current cronjob yt-dlp archive task is just really hard. Filetypes change, things get partially downloaded and such.
Partially yt-dlp is to blame because it's a package that needs to change all the time. But with this some changes are not accounted for.
yt-dlp will still do the downloads. But a flask frontend will be developed to make all downloaded videos easily indexable.
For it to be quick (unlike hobune) a database has to be implemented. This could get solved by a static site generator type of software, but that is not my choice.
@@ -54,7 +54,7 @@ Extra functionality for further development of features.
Mainly focused on retiring the cronjob based scripts and moving it to celery based tasks
- [ ] manage videos by ID's instead of per channel basis
- [ ] download videos from queue
- [ ] Manage websub callbacks
- [x] Manage websub callbacks
### Stage 4
Mongodb finally has it's limitations.

View File

@@ -41,7 +41,8 @@ def create_app(test_config=None):
app.jinja_env.filters['pretty_time'] = filters.pretty_time
app.jinja_env.filters['current_time'] = filters.current_time
app.jinja_env.filters['epoch_time'] = filters.epoch_time
app.jinja_env.filters['epoch_date'] = filters.epoch_date
from .blueprints import watch
from .blueprints import index
from .blueprints import admin

View File

@@ -1,8 +1,8 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app
from ..nosql import get_nosql
from ..dlp import checkChannelId, getChannelInfo
from ..decorators import login_required
from ..tasks import websub_subscribe_callback, websub_unsubscribe_callback
from ..tasks import test_sleep, websub_subscribe_callback, websub_unsubscribe_callback, video_download, video_queue
from datetime import datetime
from secrets import token_urlsafe
@@ -190,8 +190,16 @@ def queue():
flash(f'Cleaned retired endpoints')
elif task == 'manual-queue':
get_nosql().queue_insertQueue(value, 'webui')
flash(f'Added to queue: {value}')
if not get_nosql().check_exists(value):
direct = request.form.get('direct', None)
if direct:
task = video_download.delay(value)
flash(f"Started task {task.id}")
else:
get_nosql().queue_insertQueue(value, 'webui')
flash(f'Added to queue: {value}')
else:
flash(f'This video ID already exists in the archive: {value}')
elif task == 'delete-queue':
get_nosql().queue_deleteQueue(value)
@@ -199,7 +207,7 @@ def queue():
elif task == 'empty-queue':
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
flash(f'Queue has been emptied')
return redirect(url_for('admin.queue'))
@@ -234,4 +242,19 @@ def users():
users = get_nosql().list_all_users()
return render_template('admin/users.html', users=users)
return render_template('admin/users.html', users=users)
@bp.route('/workers', methods=['GET', 'POST'])
#@login_required
def workers():
if request.method == 'POST':
task = request.form.get('task', None)
if task == 'test-sleep':
test_sleep.delay()
elif task == 'queue-single':
video_queue.delay()
celery = current_app.extensions.get('celery')
tasks = celery.control.inspect().active()
return render_template('admin/workers.html', tasks=tasks)

View File

@@ -16,9 +16,15 @@ def pretty_time(time):
except:
return time # return given time
def epoch_time(time):
def epoch_date(epoch):
try:
return datetime.fromtimestamp(time).strftime('%d %b %Y')
return datetime.fromtimestamp(epoch).strftime('%d %b %Y')
except:
return None
def epoch_time(epoch):
try:
return datetime.fromtimestamp(epoch).strftime('%d %b %Y %H:%M:%S')
except:
return None

View File

@@ -98,7 +98,7 @@ class Mango:
stats['videos'] = self.info_json.count_documents({})
stats['channels'] = self.channels.count_documents({})
stats['queue'] = self.posters_queue.count_documents({})
stats['queue'] = self.download_queue.count_documents({})
return stats
@@ -195,11 +195,14 @@ class Mango:
def get_orphaned_videos(self):
""" Returns a SET of YouTube video ID's which have info_jsons in the collection but no permanent channel is defined. SLOW OPERATION """
# Ok lemme explain. Perform inner join from channel collection on channel_id key. match only the fields which are empty. return video id
pipeline = [{'$lookup': {'from': 'channels', 'localField': 'channel_id', 'foreignField': 'id', 'as': 'channel'}}, {'$match': {'channel': {'$size': 0}}},{'$project': {'id': 1}}]
pipeline = [{'$match': {'_status': 'available'}},
{'$lookup': {'from': 'channels', 'localField': 'channel_id', 'foreignField': 'id', 'as': 'channel'}},
{'$match': {'channel': {'$size': 0}}},{'$project': {'id': 1}},
{'$project': {'id': 1}}]
results = self.info_json.aggregate(pipeline)
ids = [result['id'] for result in results]
return tuple(ids)
def get_recent_videos(self, count=99):
@@ -426,7 +429,15 @@ class Mango:
def queue_emptyQueue(self):
return self.download_queue.delete_many({})
def queue_getNext(self, newest=False):
sort = []
if newest:
sort = [( 'created_time', pymongo.DESCENDING )]
return self.download_queue.find_one({}, sort=sort)
##########################################
# HELPER FUNCTIONS #
##########################################
@@ -450,7 +461,4 @@ def clean_info_json(originalInfo, format='dict'):
return json.dumps(originalInfo)
else:
print('The requested output format is not supported!')
if __name__ == '__main__':
mango = Mango('mongodb://root:example@192.168.66.140:27017')

View File

@@ -23,15 +23,18 @@ class OIDC():
self.domain = config['DOMAIN']
self.window = 120 # the time window to allow states and nonces in seconds
# Authentication provider url must be HTTPS and end on a TLD
if self.provider[:8] != 'https://' or self.provider[-1] == '/':
print('Incorrect OIDC provider URI', flush=True)
exit()
# Get the provider configuration endpoints
configuration = requests.get(f'{self.provider}/.well-known/openid-configuration').json()
jwks_uri = configuration.get('jwks_uri')
self.authorize_uri = configuration.get('authorization_endpoint')
# Start the JWKS management client, it will load the keys and maintain them
self.jwks_manager = jwt.PyJWKClient(jwks_uri)
#######################################################
@@ -39,10 +42,13 @@ class OIDC():
def state_maintenance(self):
from datetime import datetime
# Current time minus the acceptable window
pivot = datetime.now().timestamp() - self.window
# List with expired states
expired_states = [state for state, timestamp in self.states.items() if timestamp <= pivot]
# Remove expired states from store
for state in expired_states:
del self.states[state]
@@ -50,22 +56,30 @@ class OIDC():
import secrets
from datetime import datetime
# Clean state store first
self.state_maintenance()
# Generate token and paired timestamp
state = secrets.token_urlsafe(8)
timestamp = datetime.now().timestamp()
# Add token to the state store
self.states[state] = timestamp
# Return the state
return state
def state_check(self, state):
# Clean state store first
self.state_maintenance()
# If given state is actively stored
if state in self.states:
# Delete state and return True
del self.states[state]
return True
# Given state is not stored
return False
#######################################################
@@ -116,21 +130,33 @@ class OIDC():
def check_bearer(self, token):
import jwt
# Test given JWT
try:
# Get the signed public key from the token
signing_key = self.jwks_manager.get_signing_key_from_jwt(token).key
# Try to decode the token, this will also check the validity in these points:
# 1. Token is signed by expected keys
# 2. Token is issued by the expected provider
# 3. Expected parameters are really in the token
# 4. Token is really intended for us
# 5. Token is still valid (with 5 sec margin)
decoded = jwt.decode(token, signing_key,
algorithms=jwt.algorithms.get_default_algorithms(),
issuer=self.provider,
require=['aud', 'client_id', 'exp', 'iat', 'iss', 'rat', 'sub'],
audience=self.client_id,
leeway=5)
# Any exception (invalid JWT, invalid formatting etc...) must return False
except Exception as e:
print(e, flush=True)
return False
# double check if given token is really requested by us
# Double check if given token is really requested by us by matching the nonce in the signed key
if not self.nonce_check(decoded.get('nonce', None)):
return False
# Return the unique user identifier
return decoded.get('sub', False)

View File

@@ -5,6 +5,45 @@ from flask import current_app
# CELERY TASKS #
##########################################
@shared_task()
def test_sleep(time=60):
from time import sleep
sleep(time)
return True
@shared_task()
def video_download(videoId):
"""
I do not want to deal with the quirks of native yt-dlp in python, hence the subprocess.
"""
import subprocess
process = subprocess.run(['/usr/local/bin/yt-dlp', '--config-location', '/var/www/archive.ventilaar.net/goodstuff/config_video.conf', '--', f'https://www.youtube.com/watch?v={videoId}'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if process.returncode != 0:
return False
return True
@shared_task()
def video_queue():
"""
Gets the oldest video ID from the queue and runs video_download() on it.
"""
from .nosql import get_nosql
videoId = get_nosql().queue_getNext()
if videoId:
videoId = videoId['id']
else:
return None
if video_download(videoId):
get_nosql().queue_deleteQueue(videoId)
return True
else:
return False
@shared_task()
def websub_subscribe_callback(channelId):
import requests
@@ -85,8 +124,13 @@ def websub_process_data():
if state == 'added':
if not get_nosql().check_exists(videoId): # if video not exists
get_nosql().queue_insertQueue(videoId, 'WebSub')
# note for future me
# the websub notifications report ALL videos, including shorts and livestreams
# so if you are going to work on individual video downloading make sure you filter them!
elif state == 'removed':
# we currently do not do anything with removed videos
# but the idea is to trigger a full channel mirror in case a creator started to mass delete videos
pass
get_nosql().websub_deletePostProcessing(_id)
@@ -114,8 +158,9 @@ def websub_renew_expiring(hours=6):
websub_subscribe_callback.delay(data.get('channel'))
# limit amount of subscribe requests to spread out the requests over time
# with an expiration pivot of 6h and a maximum validity of 5 days we can currently handle 3072 channels
count = count + 1
if count >= 100:
if count >= 256:
break
##########################################

View File

@@ -11,79 +11,89 @@
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Global channel options</h5>
<h5>Global channel options</h5>
</div>
</div>
<div class="row">
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.system') }}">
<div class="card black-text">
<a href="{{ url_for('admin.system') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">System</span>
<p class="grey-text">Internal system settings</p>
<p class="grey-text">Internal system settings</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.channels') }}">
<div class="card black-text">
<a href="{{ url_for('admin.channels') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Channels</span>
<p class="grey-text">Manage channels in the system</p>
<p class="grey-text">Manage channels in the system</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.runs') }}">
<div class="card black-text">
<a href="{{ url_for('admin.runs') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Archive runs</span>
<p class="grey-text">Look at the cron run logs</p>
<p class="grey-text">Look at the cron run logs</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.websub') }}">
<div class="card black-text">
<a href="{{ url_for('admin.websub') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">WebSub</span>
<p class="grey-text">Edit WebSub YouTube links</p>
<p class="grey-text">Edit WebSub YouTube links</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.reports') }}">
<div class="card black-text">
<a href="{{ url_for('admin.reports') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Reports</span>
<p class="grey-text">View user reports</p>
<p class="grey-text">View user reports</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.queue') }}">
<div class="card black-text">
<a href="{{ url_for('admin.queue') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Queue</span>
<p class="grey-text">Video download queue and API access</p>
<p class="grey-text">Video download queue and API access</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.users') }}">
<div class="card black-text">
<a href="{{ url_for('admin.users') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Users</span>
<p class="grey-text">Authenticated users</p>
<p class="grey-text">Authenticated users</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.workers') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Workers</span>
<p class="grey-text">Worker and task management</p>
</div>
</div>
</a>
</div>
</div>
{% endblock %}

View File

@@ -65,7 +65,7 @@
</div>
<div class="col s12 mt-5 input-field">
<div class="switch">
<label>Queue<input type="checkbox" value="direct" name="value" disabled><span class="lever"></span>Direct</label>
<label>Queue<input type="checkbox" value="direct" name="direct"><span class="lever"></span>Direct</label>
<span class="supporting-text">Queue up or start directly</span>
</div>
</div>

View File

@@ -0,0 +1,48 @@
{% extends 'material_base.html' %}
{% block title %}Workers administration page{% endblock %}
{% block description %}Workers administration page of the AYTA system{% endblock %}
{% block content %}
<div class="row">
<div class="col s12">
<h4>Workers administration page</h4>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Options</h5>
</div>
</div>
<form method="POST">
<input title="test-sleep" type="submit" value="test-sleep" name="task">
<input title="test-sleep" type="submit" value="queue-single" name="task">
</form>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h6>Current workers</h6>
{% for worker in tasks %}
<span>{{ worker }}</span>
<table class="striped highlight responsive-table" style=" border: 1px solid black;">
<thead>
<tr>
<th>ID</th>
<th>Task</th>
<th>Time started</th>
</tr>
</thead>
<tbody>
{% for task in tasks[worker] %}
<tr>
<td>{{ task.get('id') }}</td>
<td>{{ task.get('type') }}</td>
<td>{{ task.get('time_start')|epoch_time }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endfor %}
</div>
</div>
{% endblock %}

View File

@@ -25,12 +25,24 @@
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCIcgBZ9hEJxHv6r_jDYOMqg') }}"><span class="title">Unus Annus</span></a>
<p>Reason: This channel does not exist. (Self removed)</p>
<p>Reason: This channel does not exist.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCz1s8aJYSQuaXJCtEi-VWRA') }}"><span class="title">Dutch Legion</span></a>
<p>Reason: This account has been terminated due to multiple or severe violations of YouTube's policy prohibiting hate speech.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UC91-8aNaRbp71UMEb_34ryg') }}"><span class="title">RBMK5000</span></a>
<p>Reason: This channel does not exist.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCoPSAT64vfXlulyWd_dPE3Q') }}"><span class="title">Evilfisher2</span></a>
<p>Reason: This channel was removed because it violated our Community Guidelines.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCZXkvavD2YKnFCzCkZ-bNPw') }}"><span class="title">mrabhy</span></a>
<p>Reason: This channel was removed because it violated our Community Guidelines.</p>
</li>
</ul>
</div>
<div class="col s12 l6 center-align">
@@ -43,6 +55,22 @@
<a href="{{ url_for('channel.channel', channelId='UCzGdxkzULCa9RlD-Q2EZPXQ') }}"><span class="title">Kalashnikov Group</span></a>
<p>Reason: This account has been terminated for a violation of YouTube's Terms of Service.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCtfg1tENiu3SgGMZVduFmTg') }}"><span class="title">FiberNinja</span></a>
<p>Reason: This channel was removed because it violated our Community Guidelines.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCv4VkfbX8YfqodF-4coEEfQ') }}"><span class="title">James Somerton</span></a>
<p>Reason: This channel does not exist.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UC8XH9kpilkuss4bVeRZD1kw') }}"><span class="title">Plagued Moth</span></a>
<p>Reason: This channel was removed because it violated our Community Guidelines.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCxZTTWP0QN7-ch2wW1QeFwg') }}"><span class="title">CowOfTheSea</span></a>
<p>Reason: This channel was removed because it violated our Community Guidelines.</p>
</li>
</ul>
</div>
</div>

View File

@@ -27,7 +27,7 @@
<div class="col s12 l3">
<p><b>Video by:</b> <a href="{{ url_for('channel.channel', channelId=render.get('info').get('channel_id')) }}">{{ render.get('info').get('uploader') }}</a></p>
<p><b>Upload date:</b> {{ render.get('info').get('upload_date')|pretty_time }}</p>
<p><b>Archive date:</b> {{ render.get('info').get('epoch')|epoch_time }}</p>
<p><b>Archive date:</b> {{ render.get('info').get('epoch')|epoch_date }}</p>
<p><b>Video length:</b> {{ render.get('info').get('duration')|pretty_duration }}</p>
</div>
<div class="col s4 l3 center-align">

View File

@@ -8,4 +8,5 @@ yt-dlp
gunicorn
celery
sqlalchemy
requests
pyjwt[crypto]