Compare commits

...

18 Commits

Author SHA1 Message Date
Ventilaar
f2b01033ea compact even more
Some checks failed
Generate docker image / build-and-publish (release) Has been cancelled
Update worker server / build-and-publish (release) Has been cancelled
2024-10-15 16:08:05 +02:00
Ventilaar
49f0ea7481 whyyyy
Some checks failed
Generate docker image / build-and-publish (release) Has been cancelled
Update worker server / build-and-publish (release) Has been cancelled
2024-10-15 16:06:17 +02:00
Ventilaar
f1287a4212 pymongo requires gcc now?
Some checks failed
Generate docker image / build-and-publish (release) Failing after 3m26s
Update worker server / build-and-publish (release) Successful in 9s
2024-10-15 15:59:24 +02:00
Ventilaar
30ea647ca9 Ok, long time no commit. I dont know what ive changed, pray it works
Some checks failed
Update worker server / build-and-publish (release) Successful in 15s
Generate docker image / build-and-publish (release) Failing after 25s
2024-10-15 15:48:09 +02:00
Ventilaar
a7c640a8cf Fix search error, add tombstone 2024-05-04 22:49:50 +02:00
Ventilaar
f6da232164 Rename functions 2024-04-21 00:31:25 +02:00
Ventilaar
1d5934275c Handle websub added messages to queue 2024-04-21 00:26:00 +02:00
Ventilaar
72af6b6126 Handle mass websub subscriptions with added statistics. General cleanup 2024-04-18 23:36:45 +02:00
Ventilaar
8bf8e08af3 Forgot admin imports 2024-04-18 00:59:46 +02:00
Ventilaar
236b56915b Handle WebSub endpoint renewing. Basic code for XML parsing (not implemented yet) 2024-04-18 00:56:22 +02:00
Ventilaar
ac0243a783 Quick key rename title_slug 2024-04-17 12:24:14 +02:00
Ventilaar
bb78c97d52 Do not store websub posted raw data as str 2024-04-10 11:25:05 +02:00
Ventilaar
7ccb827a9c hotfix the hotfix of the hotfix 2024-04-09 13:01:23 +02:00
Ventilaar
9c0e4fb63c Hotfix the websub hotfix. Add button to easily monitor websub callbacks. Clean stuck websub requests after 3 days 2024-04-09 12:56:57 +02:00
Ventilaar
75d42ad3cd Websub callback domain hotfix 2024-04-09 12:16:47 +02:00
Ventilaar
4fa0ee2c68 Hotfix channel sorting 2024-04-09 12:11:14 +02:00
Ventilaar
7e06c8673b Update PyJWT requirement 2024-04-06 23:27:18 +02:00
Ventilaar
96565e9e2b Add small time difference leeway 2024-04-06 23:23:32 +02:00
24 changed files with 563 additions and 191 deletions

View File

@@ -1,4 +1,4 @@
name: Generate release
name: Generate docker image
on:
release:
@@ -22,13 +22,4 @@ jobs:
uses: docker/build-push-action@v5
with:
push: true
tags: git.ventilaar.nl/ventilaar/ayta:latest
- name: Update worker server
uses: appleboy/ssh-action@v1.0.3
with:
host: 192.168.66.109
username: root
key: ${{ secrets.SERVER_KEY }}
port: 22
script: /root/update_worker.sh
tags: git.ventilaar.nl/ventilaar/ayta:latest

View File

@@ -0,0 +1,18 @@
name: Update worker server
on:
release:
types: [published]
jobs:
build-and-publish:
runs-on: ubuntu-latest
steps:
- name: Update worker server
uses: appleboy/ssh-action@v1.0.3
with:
host: 192.168.66.109
username: root
key: ${{ secrets.SERVER_KEY }}
port: 22
script: /root/update_worker.sh

View File

@@ -1,7 +1,9 @@
FROM python:3-alpine
RUN apk update && apk add python3-dev gcc libc-dev libffi-dev && rm -rf /var/cache/apk/*
WORKDIR /app
COPY requirements.txt /app
RUN pip install --no-cache-dir -r requirements.txt
RUN apk del python3-dev gcc libc-dev libffi-dev && rm -rf /var/cache/apk/*
COPY . /app
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "ayta:create_app()"]

View File

@@ -48,17 +48,17 @@ Extra functionality for further development of features.
- [x] Video reporting functionality
- [x] Ability (for external applications) to queue up video ids for download
- [x] Add websub requesting and receiving ability. (not fully usable yet without celery tasks)
- [] OIDC or Webauthn logins instead of static argon2 passwords
- [x] OIDC or Webauthn logins instead of static argon2 passwords
### Stage 3
Mainly focused on retiring the cronjob based scripts and moving it to celery based tasks
- [] manage videos by ID's instead of per channel basis
- [] download videos from queue
- [] Manage websub callbacks
- [ ] manage videos by ID's instead of per channel basis
- [ ] download videos from queue
- [ ] Manage websub callbacks
### Stage 4
Mongodb finally has it's limitations.
- [] Migrate to postgresql
- [ ] Migrate to postgresql
### Stage ...
Since this is my flagship software which I have developed more features will be added.

View File

@@ -9,15 +9,24 @@ def create_app(test_config=None):
config = {'MONGO_CONNECTION': os.environ.get('AYTA_MONGOCONNECTION', 'mongodb://root:example@192.168.66.140:27017'),
'OIDC_PROVIDER': os.environ.get('AYTA_OIDC_PROVIDER', 'https://auth.ventilaar.nl'),
'OIDC_ID': os.environ.get('AYTA_OIDC_ID', 'ayta'),
'CACHE_TYPE': os.environ.get('AYTA_CACHETYPE', 'SimpleCache'),
'CACHE_DEFAULT_TIMEOUT': int(os.environ.get('AYTA_CACHETIMEOUT', 6)),
'SECRET_KEY': os.environ.get('AYTA_SECRETKEY', secrets.token_hex(32)),
'DEBUG': bool(os.environ.get('AYTA_DEBUG', False)),
'DOMAIN': os.environ.get('AYTA_DOMAIN', 'https://testing.mashallah.nl'),
'CELERY': dict(broker_url=str(os.environ.get('AYTA_CELERYBROKER', 'amqp://guest:guest@192.168.66.140:5672/')),
task_ignore_result=True,)
'CELERY': {'broker_url': str(os.environ.get('AYTA_CELERYBROKER', 'amqp://guest:guest@192.168.66.140:5672/'))}
}
# Static Flask configuration options
config['CELERY']['task_ignore_result'] = True
config['CACHE_TYPE'] = 'SimpleCache'
config['SECRET_KEY'] = secrets.token_bytes(32)
# Celery Periodic tasks
config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 100}
app = Flask(__name__)
app.config.from_mapping(config)
@@ -32,7 +41,8 @@ def create_app(test_config=None):
app.jinja_env.filters['pretty_time'] = filters.pretty_time
app.jinja_env.filters['current_time'] = filters.current_time
app.jinja_env.filters['epoch_time'] = filters.epoch_time
app.jinja_env.filters['epoch_date'] = filters.epoch_date
from .blueprints import watch
from .blueprints import index
from .blueprints import admin

View File

@@ -1,8 +1,8 @@
from flask import Blueprint, render_template, request, redirect, url_for, flash
from flask import Blueprint, render_template, request, redirect, url_for, flash, current_app
from ..nosql import get_nosql
from ..dlp import checkChannelId, getChannelInfo
from ..decorators import login_required
from ..tasks import subscribe_websub_callback, unsubscribe_websub_callback
from ..tasks import test_sleep, websub_subscribe_callback, websub_unsubscribe_callback, video_download
from datetime import datetime
from secrets import token_urlsafe
@@ -71,15 +71,15 @@ def channel(channelId):
value = request.form.get('value', None)
if task == 'subscribe-websub':
task = subscribe_websub_callback.delay(channelId)
task = websub_subscribe_callback.delay(channelId)
flash(f"Started task {task.id}")
return redirect(url_for('admin.channel', channelId=channelId))
if task == 'update-value':
if key == 'active':
if key in ['active', 'websub']:
value = True if value else False
if key == 'added_date':
if key in ['added_date']:
value = datetime.strptime(value, '%Y-%m-%d')
get_nosql().update_channel_key(channelId, key, value)
@@ -109,29 +109,41 @@ def run(runId):
@bp.route('/websub', methods=['GET', 'POST'])
@login_required
def websub():
render = {}
if request.method == 'POST':
task = request.form.get('task', None)
value = request.form.get('value', None)
if task == 'unsubscribe':
channelId = get_nosql().websub_getCallback(value).get('channel')
task = unsubscribe_websub_callback.delay(value, channelId)
task = websub_unsubscribe_callback.delay(value)
flash(f"Started task {task.id}")
return redirect(url_for('admin.websub'))
elif task == 'clean-retired':
get_nosql().websub_cleanRetired()
return redirect(url_for('admin.websub'))
elif task == 'unsubscribe-callbacks':
for callbackId in get_nosql().websub_getCallbacks():
websub_unsubscribe_callback.delay(callbackId)
flash(f"Started unsubscribe tasks for all callbacks")
return redirect(url_for('admin.websub'))
elif task == 'subscribe-channels':
for channelId in get_nosql().list_all_channels(websub=True):
websub_subscribe_callback.delay(channelId)
flash(f'Started subscribe tasks for activated channels')
return redirect(url_for('admin.websub'))
callbackIds = get_nosql().websub_getCallbacks()
callbacks = {}
render['stats'] = get_nosql().websub_statistics()
for callbackId in callbackIds:
callbacks[callbackId] = get_nosql().websub_getCallback(callbackId)
return render_template('admin/websub.html', callbacks=callbacks)
return render_template('admin/websub.html', callbacks=callbacks, render=render)
@bp.route('/reports', methods=['GET', 'POST'])
@login_required
@@ -149,9 +161,9 @@ def reports():
return render_template('admin/reports.html', reports=reports)
@bp.route('/posters', methods=['GET', 'POST'])
@bp.route('/queue', methods=['GET', 'POST'])
@login_required
def posters():
def queue():
if request.method == 'POST':
task = request.form.get('task', None)
value = request.form.get('value', None)
@@ -162,34 +174,45 @@ def posters():
flash('Description must be at least 8 characters long')
if value and len(value) >= 12:
get_nosql().poster_newEndpoint(value, description)
get_nosql().queue_newEndpoint(value, description)
flash(f'Created endpoint ID: {value}')
else:
value = token_urlsafe(16)
get_nosql().poster_newEndpoint(value, description)
get_nosql().queue_newEndpoint(value, description)
flash(f'Created endpoint ID: {value}')
elif task == 'retire':
get_nosql().poster_retireEndpoint(value)
get_nosql().queue_retireEndpoint(value)
flash(f'Endpoint retired: {value}')
elif task == 'clean-retired':
get_nosql().poster_cleanRetired()
get_nosql().queue_cleanRetired()
flash(f'Cleaned retired endpoints')
elif task == 'manual-queue':
get_nosql().poster_insertQueue('manual', value)
flash(f'Added to queue: {value}')
direct = request.form.get('direct', None)
if direct:
task = video_download.delay(value)
flash(f"Started task {task.id}")
else:
get_nosql().queue_insertQueue(value, 'webui')
flash(f'Added to queue: {value}')
elif task == 'delete-queue':
get_nosql().poster_deleteQueue(value)
get_nosql().queue_deleteQueue(value)
flash(f'Deleted from queue: {value}')
elif task == 'empty-queue':
get_nosql().queue_emptyQueue()
flash(f'Queue has been emptied')
return redirect(url_for('admin.posters'))
return redirect(url_for('admin.queue'))
endpoints = get_nosql().poster_getEndpoints()
queue = get_nosql().poster_getQueue()
endpoints = get_nosql().queue_getEndpoints()
queue = get_nosql().queue_getQueue()
return render_template('admin/posters.html', endpoints=endpoints, queue=queue)
return render_template('admin/queue.html', endpoints=endpoints, queue=queue)
@bp.route('/users', methods=['GET', 'POST'])
@login_required
@@ -217,4 +240,17 @@ def users():
users = get_nosql().list_all_users()
return render_template('admin/users.html', users=users)
return render_template('admin/users.html', users=users)
@bp.route('/workers', methods=['GET', 'POST'])
#@login_required
def workers():
if request.method == 'POST':
task = request.form.get('task', None)
if task == 'test-sleep':
test_sleep.delay()
celery = current_app.extensions.get('celery')
tasks = celery.control.inspect().active()
return render_template('admin/workers.html', tasks=tasks)

View File

@@ -33,16 +33,16 @@ def websub(cap):
return challenge
if get_nosql().websub_existsCallback(cap):
if not get_nosql().websub_savePost(cap, str(request.data)):
if not get_nosql().websub_savePost(cap, request.data):
return abort(500)
return '', 202
return abort(404)
@bp.route('/poster/<cap>', methods=['POST'])
def poster(cap):
@bp.route('/queue/<cap>', methods=['POST'])
def queue(cap):
# if endpoint does not exist
if not get_nosql().poster_isActive(cap):
if not get_nosql().queue_isActive(cap):
return abort(404)
videoId = request.form.get('v')
@@ -60,7 +60,7 @@ def poster(cap):
return abort(409)
# try to insert
if get_nosql().poster_insertQueue(cap, videoId):
if get_nosql().queue_insertQueue(videoId, cap):
return '', 202
else:
return abort(409)

View File

@@ -34,7 +34,7 @@ def channel(channelId):
for videoId in videoIds:
videos.append(get_nosql().get_video_info(videoId, limited=True))
videos = sorted(videos, key=lambda x: x.get('upload_date'), reverse=True)
videos = sorted(videos, key=lambda x: x.get('upload_date', '19700101'), reverse=True)
return render_template('channel/channel.html', channel=channelInfo, videos=videos)

View File

@@ -36,4 +36,9 @@ def base():
render['info'] = get_nosql().get_video_info(vGet)
render['params'] = request.args.get('v')
if render['info'].get('_status') != 'available':
flash(render['info'].get('_status_description', 'Video unavailable because of technical errors. Come back later.'))
return redirect(url_for('index.base'))
return render_template('watch/index.html', render=render)

View File

@@ -16,9 +16,15 @@ def pretty_time(time):
except:
return time # return given time
def epoch_time(time):
def epoch_date(epoch):
try:
return datetime.fromtimestamp(time).strftime('%d %b %Y')
return datetime.fromtimestamp(epoch).strftime('%d %b %Y')
except:
return None
def epoch_time(epoch):
try:
return datetime.fromtimestamp(epoch).strftime('%d %b %Y %H:%M:%S')
except:
return None

View File

@@ -36,13 +36,13 @@ class Mango:
self.db = self.client['ayta']
self.channels = self.db['channels']
self.info_json = self.db['info_json']
self.posters_queue = self.db['posters_queue']
self.download_queue = self.db['download_queue']
self.run_log = self.db['run_log']
self.channel_log = self.db['channel_log']
self.websub_callbacks = self.db['websub_callbacks']
self.websub_data = self.db['websub_data']
self.reports = self.db['reports']
self.posters_endpoints = self.db['posters_endpoints']
self.queue_endpoints = self.db['queue_endpoints']
self.users = self.db['users']
self.ensure_indexes()
@@ -98,7 +98,7 @@ class Mango:
stats['videos'] = self.info_json.count_documents({})
stats['channels'] = self.channels.count_documents({})
stats['queue'] = self.posters_queue.count_documents({})
stats['queue'] = self.download_queue.count_documents({})
return stats
@@ -142,12 +142,14 @@ class Mango:
# channel operations #
##########################################
def list_all_channels(self, active=False):
def list_all_channels(self, active=False, websub=False):
""" Returns a SET of YouTube channel ID's; Depending on given positional BOOL only active channels or everything"""
search_terms = {}
if active:
search_terms['active'] = True
elif websub:
search_terms['websub'] = True
channels = []
for channel in self.channels.find(search_terms, {'id': 1}):
@@ -169,9 +171,6 @@ class Mango:
def get_channel_info(self, channelId):
return self.channels.find_one({'id': channelId})
def update_channel_state(self, channelId, state):
self.channels.update_one({'id': channelId}, {"$set": {"active": bool(state)}})
return True
def update_channel_key(self, channelId, key, value):
self.channels.update_one({'id': channelId}, {"$set": {key: value}})
@@ -196,16 +195,19 @@ class Mango:
def get_orphaned_videos(self):
""" Returns a SET of YouTube video ID's which have info_jsons in the collection but no permanent channel is defined. SLOW OPERATION """
# Ok lemme explain. Perform inner join from channel collection on channel_id key. match only the fields which are empty. return video id
pipeline = [{'$lookup': {'from': 'channels', 'localField': 'channel_id', 'foreignField': 'id', 'as': 'channel'}}, {'$match': {'channel': {'$size': 0}}},{'$project': {'id': 1}}]
pipeline = [{'$match': {'_status': 'available'}},
{'$lookup': {'from': 'channels', 'localField': 'channel_id', 'foreignField': 'id', 'as': 'channel'}},
{'$match': {'channel': {'$size': 0}}},{'$project': {'id': 1}},
{'$project': {'id': 1}}]
results = self.info_json.aggregate(pipeline)
ids = [result['id'] for result in results]
return tuple(ids)
def get_recent_videos(self, count=99):
""" Returns a SET of YouTube video ID's which have been added last to the info_json collection """
result = self.info_json.find({}, {'_id': 0, 'id': 1}, sort=[('_id', pymongo.DESCENDING)]).limit(count)
result = self.info_json.find({'_status': 'available'}, {'_id': 0, 'id': 1}, sort=[('_id', pymongo.DESCENDING)]).limit(count)
ids = []
@@ -216,7 +218,7 @@ class Mango:
def get_video_info(self, videoId, limited=False):
if limited:
projection = {'_id': 1, 'id': 1, 'title': 1, 'upload_date': 1, 'description': 1, 'channel_id': 1, 'uploader': 1, 'epoch': 1, 'title_slug': 1}
projection = {'_id': 1, 'id': 1, 'title': 1, 'upload_date': 1, 'description': 1, 'channel_id': 1, 'uploader': 1, 'epoch': 1, '_title_slug': 1}
else:
projection = {}
@@ -294,22 +296,22 @@ class Mango:
status = status.get('status')
if status in ['requesting']:
self.websub_callbacks.update_one({'id': callbackId}, {'$set': {'status': 'active', 'activation_time': current_time(object=True), 'lease': lease}})
self.websub_callbacks.update_one({'id': callbackId}, {'$set': {'status': 'active', 'activation_time': current_time(object=True), 'lease': int(lease)}})
return True
return False
def websub_existsCallback(self, callbackId):
status = self.websub_callbacks.find_one({'id': callbackId}, {'status': 1})
def websub_existsCallback(self, callbackId, channel=False):
if channel:
query = {'channel': callbackId, 'status': {'$in': ['requesting', 'active', 'retiring']}}
else:
query = {'id': callbackId, 'status': {'$in': ['requesting', 'active', 'retiring']}}
status = self.websub_callbacks.find_one(query, {'id': 1, 'status': 1})
if not status:
return False
status = status.get('status')
if status:
return status.get('id')
if status in ['requesting', 'active', 'retiring']:
return True
return False
def websub_retiringCallback(self, callbackId):
@@ -330,7 +332,7 @@ class Mango:
def websub_getCallback(self, callbackId):
return self.websub_callbacks.find_one({'id': callbackId})
def websub_getCallbacks(self, channelId=''):
def websub_getCallbacks(self, channelId=None):
callbacks = []
if channelId:
@@ -338,7 +340,6 @@ class Mango:
else:
filter = {}
for callback in self.websub_callbacks.find(filter, {'id': 1}):
callbacks.append(callback['id'])
@@ -347,63 +348,87 @@ class Mango:
def websub_savePost(self, callbackId, data):
return self.websub_data.insert_one({'callback_id': callbackId, 'state': 'unprocessed', 'received_time': current_time(object=True), 'raw_data': data}).inserted_id
def websub_cleanRetired(self, days=3):
def websub_getFirstPostData(self):
data = self.websub_data.find_one({'state': 'unprocessed'}, {'_id': 1, 'raw_data': 1}, sort=[('received_time', 1)])
if not data:
return None
self.websub_data.update_one({'_id': data['_id']}, {'$set': {'state': 'processing'}})
return (data.get('_id'), data.get('raw_data'))
def websub_deletePostProcessing(self, _id):
self.websub_data.delete_one({'_id': _id})
def websub_cleanRetired(self, days=1):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.websub_callbacks.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
self.websub_callbacks.delete_many({'status': 'requesting', 'requesting_time': {'$lt': days}})
return True
def websub_statistics(self):
stats = {}
stats['unprocessed_data'] = self.websub_data.count_documents({'state': 'unprocessed'})
stats['active_callbacks'] = self.websub_callbacks.count_documents({'status': 'active'})
return stats
##########################################
# POSTER FUNCTIONS #
# QUEUE FUNCTIONS #
##########################################
def poster_newEndpoint(self, endpointId, description=''):
self.posters_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)})
def queue_newEndpoint(self, endpointId, description=''):
self.queue_endpoints.insert_one({'id': endpointId, 'description': description, 'status': 'active', 'created_time': current_time(object=True)})
return endpointId
def queue_retireEndpoint(self, endpointId):
return self.queue_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}})
def queue_isActive(self, endpointId):
status = self.queue_endpoints.find_one({'id': endpointId}, {'status': 1})
if not status:
return False
elif status.get('status') == 'active':
return True
else:
return False
def queue_getEndpoints(self):
return self.queue_endpoints.find({})
def poster_insertQueue(self, endpointId, videoId):
def queue_cleanRetired(self, days=3):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.queue_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
return True
##########################################
def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists
if not self.posters_queue.count_documents({'id': videoId}) >= 1:
self.posters_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
if not self.download_queue.count_documents({'id': videoId}) >= 1:
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
# key already in queue
return False
def poster_deleteQueue(self, videoId):
if self.posters_queue.delete_one({'id': videoId}):
def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}):
return True
return False
def poster_retireEndpoint(self, endpointId):
return self.posters_endpoints.update_one({'id': endpointId}, {'$set': {'status': 'retired', 'retired_time': current_time(object=True)}})
def queue_getQueue(self):
return self.download_queue.find({})
def poster_isActive(self, endpointId):
status = self.posters_endpoints.find_one({'id': endpointId}, {'status': 1})
if not status:
return False
status = status.get('status')
if status == 'active':
return True
return False
def poster_getEndpoints(self):
return self.posters_endpoints.find({})
def poster_getQueue(self):
return self.posters_queue.find({})
def poster_cleanRetired(self, days=3):
days = self.datetime.utcnow() - self.timedelta(days=days)
self.posters_endpoints.delete_many({'status': 'retired', 'retired_time': {'$lt': days}})
return True
def queue_emptyQueue(self):
return self.download_queue.delete_many({})
##########################################
# HELPER FUNCTIONS #

View File

@@ -1,4 +1,10 @@
class OIDC():
"""
This function class is nothing more than a nonce and state store for security in the authentication mechanism.
Additionally this class provides the function to generate redirect url's and check bearer tokens on their validity as well as caching jwt signing keys.
Fairly barebones and should be 100% secure. (famous last words)
This is made for form posted JWT's. While not the most secure it is the most easy way to implement. Moving on to a code based solution might be preferred in the future.
"""
def __init__(self, app=None):
self.states = {}
self.nonces = {}
@@ -15,27 +21,34 @@ class OIDC():
self.client_id = config['OIDC_ID']
self.provider = config['OIDC_PROVIDER']
self.domain = config['DOMAIN']
self.window = 120 # the time window to allow states and nonces in seconds
# Authentication provider url must be HTTPS and end on a TLD
if self.provider[:8] != 'https://' or self.provider[-1] == '/':
print('Incorrect OIDC provider URI', flush=True)
exit()
# Get the provider configuration endpoints
configuration = requests.get(f'{self.provider}/.well-known/openid-configuration').json()
jwks_uri = configuration.get('jwks_uri')
self.authorize_uri = configuration.get('authorization_endpoint')
# Start the JWKS management client, it will load the keys and maintain them
self.jwks_manager = jwt.PyJWKClient(jwks_uri)
#################################
#######################################################
def state_maintenance(self):
from datetime import datetime
pivot = datetime.now().timestamp() - 120
# Current time minus the acceptable window
pivot = datetime.now().timestamp() - self.window
# List with expired states
expired_states = [state for state, timestamp in self.states.items() if timestamp <= pivot]
# Remove expired states from store
for state in expired_states:
del self.states[state]
@@ -43,30 +56,40 @@ class OIDC():
import secrets
from datetime import datetime
# Clean state store first
self.state_maintenance()
# Generate token and paired timestamp
state = secrets.token_urlsafe(8)
timestamp = datetime.now().timestamp()
# Add token to the state store
self.states[state] = timestamp
# Return the state
return state
def state_check(self, state):
# Clean state store first
self.state_maintenance()
# If given state is actively stored
if state in self.states:
# Delete state and return True
del self.states[state]
return True
# Given state is not stored
return False
#################################
#######################################################
# Same code as above but a different store for nonces #
#######################################################
def nonce_maintenance(self):
from datetime import datetime
pivot = datetime.now().timestamp() - 120
pivot = datetime.now().timestamp() - self.window
expired_nonces = [nonce for nonce, timestamp in self.nonces.items() if timestamp <= pivot]
@@ -95,7 +118,7 @@ class OIDC():
return False
#################################
#######################################################
def generate_redirect(self):
return str(f'{self.authorize_uri}'
@@ -107,19 +130,33 @@ class OIDC():
def check_bearer(self, token):
import jwt
# Test given JWT
try:
# Get the signed public key from the token
signing_key = self.jwks_manager.get_signing_key_from_jwt(token).key
# Try to decode the token, this will also check the validity in these points:
# 1. Token is signed by expected keys
# 2. Token is issued by the expected provider
# 3. Expected parameters are really in the token
# 4. Token is really intended for us
# 5. Token is still valid (with 5 sec margin)
decoded = jwt.decode(token, signing_key,
algorithms=jwt.algorithms.get_default_algorithms(),
issuer=self.provider,
require=['aud', 'client_id', 'exp', 'iat', 'iss', 'rat', 'sub'],
audience=self.client_id)
except:
audience=self.client_id,
leeway=5)
# Any exception (invalid JWT, invalid formatting etc...) must return False
except Exception as e:
print(e, flush=True)
return False
# double check if given token is really requested by us
# Double check if given token is really requested by us by matching the nonce in the signed key
if not self.nonce_check(decoded.get('nonce', None)):
return False
# Return the unique user identifier
return decoded.get('sub', False)

View File

@@ -1,38 +1,77 @@
from celery import shared_task
from flask import current_app
##########################################
# CELERY TASKS #
##########################################
@shared_task()
def subscribe_websub_callback(channelId):
def test_sleep(time=60):
from time import sleep
sleep(time)
return True
@shared_task()
def video_download(videoId):
"""
I do not want to deal with the quirks of native yt-dlp in python, hence the subprocess.
"""
import subprocess
process = subprocess.run(['/usr/local/bin/yt-dlp', '--config-location', '/var/www/archive.ventilaar.net/goodstuff/config_video.conf', '--', f'https://www.youtube.com/watch?v={videoId}'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
if process.returncode != 0:
return False
return True
@shared_task()
def websub_subscribe_callback(channelId):
import requests
from .nosql import get_nosql
callbackId = get_nosql().websub_newCallback(channelId)
# check if a callback already exists for channel
answer = get_nosql().websub_existsCallback(channelId, channel=True)
if not answer:
callbackId = get_nosql().websub_newCallback(channelId)
else:
callbackId = answer
url = 'https://pubsubhubbub.appspot.com/subscribe'
data = {
'hub.callback': f'https://{current_app.config["DOMAIN"]}/api/websub//{callbackId}',
'hub.callback': f'{current_app.config["DOMAIN"]}/api/websub/{callbackId}',
'hub.topic': f'https://www.youtube.com/xml/feeds/videos.xml?channel_id={channelId}',
'hub.verify': 'async',
'hub.mode': 'subscribe',
'hub.verify_token': '',
'hub.secret': '',
'hub.lease_numbers': '86400',
'hub.lease_numbers': '432000',
}
get_nosql().websub_requestingCallback(callbackId)
response = requests.post(url, data=data)
if response.status_code == 202:
return True
# maybe handle errors?
return False
@shared_task()
def unsubscribe_websub_callback(callbackId, channelId):
def websub_unsubscribe_callback(callbackId):
import requests
from .nosql import get_nosql
answer = get_nosql().websub_existsCallback(callbackId)
if not answer:
return False
channelId = get_nosql().websub_getCallback(callbackId).get('channel')
url = 'https://pubsubhubbub.appspot.com/subscribe'
data = {'hub.callback': f'https://{current_app.config["DOMAIN"]}/api/websub/{callbackId}',
data = {'hub.callback': f'{current_app.config["DOMAIN"]}/api/websub/{callbackId}',
'hub.topic': f'https://www.youtube.com/xml/feeds/videos.xml?channel_id={channelId}',
'hub.verify': 'async',
'hub.mode': 'unsubscribe'
@@ -44,4 +83,101 @@ def unsubscribe_websub_callback(callbackId, channelId):
if response.status_code == 202:
return True
return False
# maybe handle errors?
return False
@shared_task()
def websub_process_data():
from .nosql import get_nosql
while True:
blob = get_nosql().websub_getFirstPostData()
if not blob:
break
_id, data = blob
parsed = do_parse_data(data)
if parsed:
state, channelId, videoId = parsed
if state == 'added':
if not get_nosql().check_exists(videoId): # if video not exists
get_nosql().queue_insertQueue(videoId, 'WebSub')
# note for future me
# the websub notifications report ALL videos, including shorts and livestreams
# so if you are going to work on individual video downloading make sure you filter them!
elif state == 'removed':
# we currently do not do anything with removed videos
# but the idea is to trigger a full channel mirror in case a creator started to mass delete videos
pass
get_nosql().websub_deletePostProcessing(_id)
@shared_task()
def websub_renew_expiring(hours=6):
from .nosql import get_nosql
from datetime import datetime, timedelta
count = 0
for callbackId in get_nosql().websub_getCallbacks():
data = get_nosql().websub_getCallback(callbackId)
if data.get('status') not in ['active']: # callback not active
continue
pivot = datetime.utcnow() + timedelta(hours=hours) # hours past now
expires = data.get('activation_time') + timedelta(seconds=data.get('lease')) # callback expires at
if pivot <= expires: # expiration happens after n hours fron now
continue # skip callback
# expiration happens within n hours
websub_subscribe_callback.delay(data.get('channel'))
# limit amount of subscribe requests to spread out the requests over time
# with an expiration pivot of 6h and a maximum validity of 5 days we can currently handle 3072 channels
count = count + 1
if count >= 256:
break
##########################################
# TASK MODULES #
##########################################
def do_parse_data(data):
import xml.etree.ElementTree as ET
data = data.decode('utf-8')
try:
root = ET.fromstring(data)
except ET.ParseError:
print('Not XML')
return False
yt = any(child.tag.startswith('{http://www.youtube.com/xml/schemas/2015}') for child in root.iter())
at = any(child.tag.startswith('{http://purl.org/atompub/tombstones/1.0}') for child in root.iter())
if yt and not at:
# Video published
state = 'added'
ns = {'yt': 'http://www.youtube.com/xml/schemas/2015', '': 'http://www.w3.org/2005/Atom'}
entry = root.find('.//{http://www.w3.org/2005/Atom}entry')
videoId = entry.find('./yt:videoId', ns).text
channelId = entry.find('./yt:channelId', ns).text
elif not yt and at:
# Video hidden
state = 'removed'
ns = {'at': 'http://purl.org/atompub/tombstones/1.0', '': 'http://www.w3.org/2005/Atom'}
deleted_entry = root.find('.//{http://purl.org/atompub/tombstones/1.0}deleted-entry')
videoId = deleted_entry.attrib['ref'].split(':')[-1]
channelId = deleted_entry.find('./at:by/uri', ns).text.split('/')[-1]
else:
print('Unknown xml')
return False
return (state, channelId, videoId)

View File

@@ -19,7 +19,7 @@
{% for item in channelInfo %}
<form method="POST">
<div class="input-field">
<span class="supporting-text">{{ item }}</span>
<span class="supporting-text mb-2">{{ item }}</span>
<input class="validate" type="text" value="{{ item }}" name="key" hidden>
</div>

View File

@@ -11,79 +11,89 @@
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Global channel options</h5>
<h5>Global channel options</h5>
</div>
</div>
<div class="row">
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.system') }}">
<div class="card black-text">
<a href="{{ url_for('admin.system') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">System</span>
<p class="grey-text">Internal system settings</p>
<p class="grey-text">Internal system settings</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.channels') }}">
<div class="card black-text">
<a href="{{ url_for('admin.channels') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Channels</span>
<p class="grey-text">Manage channels in the system</p>
<p class="grey-text">Manage channels in the system</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.runs') }}">
<div class="card black-text">
<a href="{{ url_for('admin.runs') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Archive runs</span>
<p class="grey-text">Look at the cron run logs</p>
<p class="grey-text">Look at the cron run logs</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.websub') }}">
<div class="card black-text">
<a href="{{ url_for('admin.websub') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">WebSub</span>
<p class="grey-text">Edit WebSub YouTube links</p>
<p class="grey-text">Edit WebSub YouTube links</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.reports') }}">
<div class="card black-text">
<a href="{{ url_for('admin.reports') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Reports</span>
<p class="grey-text">View user reports</p>
<p class="grey-text">View user reports</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.posters') }}">
<div class="card black-text">
<a href="{{ url_for('admin.queue') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Posters</span>
<p class="grey-text">Extension posters</p>
<span class="card-title">Queue</span>
<p class="grey-text">Video download queue and API access</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.users') }}">
<div class="card black-text">
<a href="{{ url_for('admin.users') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Users</span>
<p class="grey-text">Authenticated users</p>
<p class="grey-text">Authenticated users</p>
</div>
</div>
</a>
</a>
</div>
<div class="col s6 l4 m-4">
<a href="{{ url_for('admin.workers') }}">
<div class="card black-text">
<div class="card-content">
<span class="card-title">Workers</span>
<p class="grey-text">Worker and task management</p>
</div>
</div>
</a>
</div>
</div>
{% endblock %}

View File

@@ -1,25 +1,38 @@
{% extends 'material_base.html' %}
{% block title %}Posters administration page{% endblock %}
{% block description %}Posters administration page of the AYTA system{% endblock %}
{% block title %}Queue administration page{% endblock %}
{% block description %}Queue administration page of the AYTA system{% endblock %}
{% block content %}
<div class="row">
<div class="col s12 l11">
<h4>Posters administration page</h4>
</div>
<div class="col s12 l1 m-5">
<form method="POST">
<input title="Prunes all deleted endpoints, but keeps last 3 days" type="submit" value="clean-retired" name="task">
</form>
<div class="col s12">
<h4>Queue administration page</h4>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Poster options</h5>
<h5>Options</h5>
</div>
</div>
<div class="row">
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Direct actions</span>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 red" type="submit" name="task" value="empty-queue">Empty Queue</button>
<br>
<span class="supporting-text">Removes all queued ids</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2" type="submit" name="task" value="clean-retired">Clean retired</button>
<br>
<span class="supporting-text">Prunes all deactivated endpoints, but keeps last 3 days</span>
</form>
</div>
</div>
</div>
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
@@ -52,7 +65,7 @@
</div>
<div class="col s12 mt-5 input-field">
<div class="switch">
<label>Queue<input type="checkbox" value="direct" name="value" disabled><span class="lever"></span>Direct</label>
<label>Queue<input type="checkbox" value="direct" name="direct"><span class="lever"></span>Direct</label>
<span class="supporting-text">Queue up or start directly</span>
</div>
</div>

View File

@@ -4,14 +4,9 @@
{% block content %}
<div class="row">
<div class="col s12 l11">
<div class="col s12">
<h4>WebSub administration page</h4>
</div>
<div class="col s12 l1 m-5">
<form method="POST">
<input title="Prunes all retired callbacks, but keeps last 3 days" type="submit" value="clean-retired" name="task">
</form>
</div>
</div>
<div class="divider"></div>
<div class="row">
@@ -19,6 +14,43 @@
<h5>WebSub options</h5>
</div>
</div>
<div class="row">
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Direct actions</span>
<form method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 green" type="submit" name="task" value="subscribe-channels">Subscribe channels</button>
<br>
<span class="supporting-text">Send WebSub subscription request for all activated channels. (This will renew existing ones as well)</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2 red" type="submit" name="task" value="unsubscribe-callbacks">Unsubscribe channels</button>
<br>
<span class="supporting-text">Send WebSub unsubscription request for all activated endpoints. (This will only unsubscribe, not disable)</span>
</form>
<form class="mt-4" method="post" onsubmit="return confirm('Are you sure?');">
<button class="btn mb-2" type="submit" name="task" value="clean-retired">Clean retired</button>
<br>
<span class="supporting-text">Prunes all retired callbacks, but keeps until last day</span>
</form>
</div>
</div>
</div>
<div class="col s12 l4 m-4">
<div class="card">
<div class="card-content">
<span class="card-title">Statistics</span>
<h6>Unprocessed callback datapoints</h6>
<p>{{ render['stats']['unprocessed_data'] }}</p>
<h6>Active callbacks</h6>
<p>{{ render['stats']['active_callbacks'] }}</p>
<h6>Something</h6>
<p>Blah</p>
</div>
</div>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s6 l9">
@@ -50,6 +82,7 @@
{% for callback in callbacks %}
<tr class="filterable">
<td>
<a target="_blank" rel="noopener noreferrer" href="https://pubsubhubbub.appspot.com/subscription-details?hub.callback={{ config['DOMAIN'] }}/api/websub/{{ callbacks[callback].get('id') }}&hub.topic=https://www.youtube.com/xml/feeds/videos.xml?channel_id={{ callbacks[callback].get('channel') }}"><button class="btn-small waves-effect waves-light" title="Information on Pubsubhubbub (external link)"></button></a>
<form method="post">
<input type="text" value="{{ callbacks[callback].get('id') }}" name="value" hidden>
<button class="btn-small waves-effect waves-light" type="submit" name="task" value="unsubscribe" title="Send unsubscribe request to hub" {% if callbacks[callback].get('status') != 'active' %}disabled{% endif %}>🗑️</button>

View File

@@ -0,0 +1,47 @@
{% extends 'material_base.html' %}
{% block title %}Workers administration page{% endblock %}
{% block description %}Workers administration page of the AYTA system{% endblock %}
{% block content %}
<div class="row">
<div class="col s12">
<h4>Workers administration page</h4>
</div>
</div>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h5>Options</h5>
</div>
</div>
<form method="POST">
<input title="test-sleep" type="submit" value="test-sleep" name="task">
</form>
<div class="divider"></div>
<div class="row">
<div class="col s12">
<h6>Current workers</h6>
{% for worker in tasks %}
<span>{{ worker }}</span>
<table class="striped highlight responsive-table" style=" border: 1px solid black;">
<thead>
<tr>
<th>ID</th>
<th>Task</th>
<th>Time started</th>
</tr>
</thead>
<tbody>
{% for task in tasks[worker] %}
<tr>
<td>{{ task.get('id') }}</td>
<td>{{ task.get('type') }}</td>
<td>{{ task.get('time_start')|epoch_time }}</td>
</tr>
{% endfor %}
</tbody>
</table>
{% endfor %}
</div>
</div>
{% endblock %}

View File

@@ -25,7 +25,7 @@
<div class="card medium black-text">
<a href="{{ url_for('watch.base') }}?v={{ video.get('id') }}">
<div class="card-image">
<img loading="lazy" src="https://archive.ventilaar.net/videos/automatic/{{ video.get('channel_id') }}/{{ video.get('id') }}/{{ video.get('title_slug') }}.jpg">
<img loading="lazy" src="https://archive.ventilaar.net/videos/automatic/{{ video.get('channel_id') }}/{{ video.get('id') }}/{{ video.get('_title_slug') }}.jpg">
</div>
</a>
<div class="card-content activator">

View File

@@ -25,7 +25,7 @@
<div class="card medium black-text">
<a href="{{ url_for('watch.base') }}?v={{ video.get('id') }}">
<div class="card-image">
<img loading="lazy" src="https://archive.ventilaar.net/videos/automatic/{{ video.get('channel_id') }}/{{ video.get('id') }}/{{ video.get('title_slug') }}.jpg">
<img loading="lazy" src="https://archive.ventilaar.net/videos/automatic/{{ video.get('channel_id') }}/{{ video.get('id') }}/{{ video.get('_title_slug') }}.jpg">
</div>
</a>
<div class="card-content activator">

View File

@@ -25,7 +25,7 @@
<div class="card medium black-text">
<a href="{{ url_for('watch.base') }}?v={{ video.get('id') }}">
<div class="card-image">
<img loading="lazy" src="https://archive.ventilaar.net/videos/automatic/{{ video.get('channel_id') }}/{{ video.get('id') }}/{{ video.get('title_slug') }}.jpg">
<img loading="lazy" src="https://archive.ventilaar.net/videos/automatic/{{ video.get('channel_id') }}/{{ video.get('id') }}/{{ video.get('_title_slug') }}.jpg">
</div>
</a>
<div class="card-content activator">

View File

@@ -43,6 +43,10 @@
<a href="{{ url_for('channel.channel', channelId='UCzGdxkzULCa9RlD-Q2EZPXQ') }}"><span class="title">Kalashnikov Group</span></a>
<p>Reason: This account has been terminated for a violation of YouTube's Terms of Service.</p>
</li>
<li class="collection-item">
<a href="{{ url_for('channel.channel', channelId='UCtfg1tENiu3SgGMZVduFmTg') }}"><span class="title">FiberNinja</span></a>
<p>Reason: This channel was removed because it violated our Community Guidelines.</p>
</li>
</ul>
</div>
</div>

View File

@@ -14,8 +14,8 @@
<div class="row">
<div class="col s12 mt-4 center-align">
<video controls class="responsive-video">
<source src="https://archive.ventilaar.net/videos/automatic/{{ render.get('info').get('channel_id') }}/{{ render.get('info').get('id') }}/{{ render.get('info').get('title_slug') }}.mp4">
<source src="https://archive.ventilaar.net/videos/automatic/{{ render.get('info').get('channel_id') }}/{{ render.get('info').get('id') }}/{{ render.get('info').get('title_slug') }}.webm">
<source src="https://archive.ventilaar.net/videos/automatic/{{ render.get('info').get('channel_id') }}/{{ render.get('info').get('id') }}/{{ render.get('info').get('_title_slug') }}.mp4">
<source src="https://archive.ventilaar.net/videos/automatic/{{ render.get('info').get('channel_id') }}/{{ render.get('info').get('id') }}/{{ render.get('info').get('_title_slug') }}.webm">
Your browser does not support the video tag.
</video>
</div>
@@ -27,7 +27,7 @@
<div class="col s12 l3">
<p><b>Video by:</b> <a href="{{ url_for('channel.channel', channelId=render.get('info').get('channel_id')) }}">{{ render.get('info').get('uploader') }}</a></p>
<p><b>Upload date:</b> {{ render.get('info').get('upload_date')|pretty_time }}</p>
<p><b>Archive date:</b> {{ render.get('info').get('epoch')|epoch_time }}</p>
<p><b>Archive date:</b> {{ render.get('info').get('epoch')|epoch_date }}</p>
<p><b>Video length:</b> {{ render.get('info').get('duration')|pretty_duration }}</p>
</div>
<div class="col s4 l3 center-align">

View File

@@ -5,8 +5,7 @@ flask-caching
flask-limiter
pymongo
yt-dlp
argon2-cffi
gunicorn
celery
sqlalchemy
pyjwt
pyjwt[crypto]