Handle WebSub endpoint renewing. Basic code for XML parsing (not implemented yet)

This commit is contained in:
Ventilaar 2024-04-18 00:56:22 +02:00
parent ac0243a783
commit 236b56915b
No known key found for this signature in database
4 changed files with 126 additions and 17 deletions

View File

@ -9,15 +9,24 @@ def create_app(test_config=None):
config = {'MONGO_CONNECTION': os.environ.get('AYTA_MONGOCONNECTION', 'mongodb://root:example@192.168.66.140:27017'),
'OIDC_PROVIDER': os.environ.get('AYTA_OIDC_PROVIDER', 'https://auth.ventilaar.nl'),
'OIDC_ID': os.environ.get('AYTA_OIDC_ID', 'ayta'),
'CACHE_TYPE': os.environ.get('AYTA_CACHETYPE', 'SimpleCache'),
'CACHE_DEFAULT_TIMEOUT': int(os.environ.get('AYTA_CACHETIMEOUT', 6)),
'SECRET_KEY': os.environ.get('AYTA_SECRETKEY', secrets.token_hex(32)),
'DEBUG': bool(os.environ.get('AYTA_DEBUG', False)),
'DOMAIN': os.environ.get('AYTA_DOMAIN', 'https://testing.mashallah.nl'),
'CELERY': dict(broker_url=str(os.environ.get('AYTA_CELERYBROKER', 'amqp://guest:guest@192.168.66.140:5672/')),
task_ignore_result=True,)
'CELERY': {'broker_url': str(os.environ.get('AYTA_CELERYBROKER', 'amqp://guest:guest@192.168.66.140:5672/'))}
}
# Static Flask configuration options
config['CELERY']['task_ignore_result'] = True
config['CACHE_TYPE'] = 'SimpleCache'
config['SECRET_KEY'] = secrets.token_bytes(32)
# Celery Periodic tasks
config['CELERY']['beat_schedule'] = {}
config['CELERY']['beat_schedule']['Renew WebSub endpoints'] = {'task': 'ayta.tasks.websub_renew_expiring', 'schedule': 4000}
#config['CELERY']['beat_schedule']['Process WebSub data'] = {'task': 'ayta.tasks.websub_process_data', 'schedule': 6}
app = Flask(__name__)
app.config.from_mapping(config)

View File

@ -36,4 +36,9 @@ def base():
render['info'] = get_nosql().get_video_info(vGet)
render['params'] = request.args.get('v')
if render['info']['_status'] != 'available':
flash(render['info'].get('_status_description', 'Video unavailable because of technical errors. Come back later.'))
return redirect(url_for('index.base'))
return render_template('watch/index.html', render=render)

View File

@ -205,7 +205,7 @@ class Mango:
def get_recent_videos(self, count=99):
""" Returns a SET of YouTube video ID's which have been added last to the info_json collection """
result = self.info_json.find({}, {'_id': 0, 'id': 1}, sort=[('_id', pymongo.DESCENDING)]).limit(count)
result = self.info_json.find({'_status': 'available'}, {'_id': 0, 'id': 1}, sort=[('_id', pymongo.DESCENDING)]).limit(count)
ids = []
@ -294,21 +294,24 @@ class Mango:
status = status.get('status')
if status in ['requesting']:
self.websub_callbacks.update_one({'id': callbackId}, {'$set': {'status': 'active', 'activation_time': current_time(object=True), 'lease': lease}})
self.websub_callbacks.update_one({'id': callbackId}, {'$set': {'status': 'active', 'activation_time': current_time(object=True), 'lease': int(lease)}})
return True
return False
def websub_existsCallback(self, callbackId):
status = self.websub_callbacks.find_one({'id': callbackId}, {'status': 1})
def websub_existsCallback(self, callbackId, channel=False):
if channel:
query = {'channel': callbackId}
else:
query = {'id': callbackId}
status = self.websub_callbacks.find_one(query, {'id': 1, 'status': 1})
if not status:
return False
status = status.get('status')
if status in ['requesting', 'active', 'retiring']:
return True
if status.get('status') in ['requesting', 'active', 'retiring']:
return status.get('id')
return False

View File

@ -1,12 +1,22 @@
from celery import shared_task
from flask import current_app
##########################################
# CELERY TASKS #
##########################################
@shared_task()
def subscribe_websub_callback(channelId):
def websub_subscribe_callback(channelId):
import requests
from .nosql import get_nosql
callbackId = get_nosql().websub_newCallback(channelId)
# check if a callback already exists for channel
answer = get_nosql().websub_existsCallback(channelId, channel=True)
if not answer:
callbackId = get_nosql().websub_newCallback(channelId)
else:
callbackId = answer
url = 'https://pubsubhubbub.appspot.com/subscribe'
data = {
@ -16,7 +26,7 @@ def subscribe_websub_callback(channelId):
'hub.mode': 'subscribe',
'hub.verify_token': '',
'hub.secret': '',
'hub.lease_numbers': '86400',
'hub.lease_numbers': '432000',
}
get_nosql().websub_requestingCallback(callbackId)
@ -27,10 +37,17 @@ def subscribe_websub_callback(channelId):
return False
@shared_task()
def unsubscribe_websub_callback(callbackId, channelId):
def websub_unsubscribe_callback(callbackId):
import requests
from .nosql import get_nosql
answer = get_nosql().websub_existsCallback(callbackId)
if not answer:
return False
channelId = get_nosql().websub_getCallback(callbackId).get('channel')
url = 'https://pubsubhubbub.appspot.com/subscribe'
data = {'hub.callback': f'{current_app.config["DOMAIN"]}/api/websub/{callbackId}',
'hub.topic': f'https://www.youtube.com/xml/feeds/videos.xml?channel_id={channelId}',
@ -44,4 +61,79 @@ def unsubscribe_websub_callback(callbackId, channelId):
if response.status_code == 202:
return True
return False
return False
@shared_task()
def websub_process_data():
from .nosql import get_nosql
while True:
data = get_nosql().websub_getFirstPostData()
if not data:
break
_id, data = data
parsed = do_parse_data(data)
if not parsed:
get_nosql().websub_deletePostProcessing(_id)
state, channelId, videoId = parsed
get_nosql().websub_deletePostProcessing(_id)
@shared_task()
def websub_renew_expiring(hours=6):
from .nosql import get_nosql
from datetime import datetime, timedelta
for callbackId in get_nosql().websub_getCallbacks():
data = get_nosql().websub_getCallback(callbackId)
pivot = datetime.utcnow() - timedelta(hours=hours)
expires = data.get('activation_time') + timedelta(seconds=data.get('lease'))
if pivot <= expires: # if expiration happens after the calculation time pass the loop
continue
print(f'{callbackId} should be renewed')
websub_subscribe_callback.delay(data.get('channel'))
##########################################
# TASK MODULES #
##########################################
def do_parse_data(data):
import xml.etree.ElementTree as ET
data = data.decode('utf-8')
try:
root = ET.fromstring(data)
except ET.ParseError:
print('Not XML')
return False
yt = any(child.tag.startswith('{http://www.youtube.com/xml/schemas/2015}') for child in root.iter())
at = any(child.tag.startswith('{http://purl.org/atompub/tombstones/1.0}') for child in root.iter())
if yt and not at:
# Video published
state = 'added'
ns = {'yt': 'http://www.youtube.com/xml/schemas/2015', '': 'http://www.w3.org/2005/Atom'}
entry = root.find('.//{http://www.w3.org/2005/Atom}entry')
videoId = entry.find('./yt:videoId', ns).text
channelId = entry.find('./yt:channelId', ns).text
elif not yt and at:
# Video hidden
state = 'removed'
ns = {'at': 'http://purl.org/atompub/tombstones/1.0', '': 'http://www.w3.org/2005/Atom'}
deleted_entry = root.find('.//{http://purl.org/atompub/tombstones/1.0}deleted-entry')
videoId = deleted_entry.attrib['ref'].split(':')[-1]
channelId = deleted_entry.find('./at:by/uri', ns).text.split('/')[-1]
else:
print('Unknown xml')
return False
return (state, channelId, videoId)