Compare commits

...

1 Commits

Author SHA1 Message Date
Ventilaar
d42030dcbc Small concurrency and logging fix
All checks were successful
Update worker server / build-and-publish (release) Successful in 10s
Generate docker image / build-and-publish (release) Successful in 19s
2025-01-19 13:27:09 +01:00
2 changed files with 17 additions and 10 deletions

View File

@@ -411,13 +411,17 @@ class Mango:
########################################## ##########################################
def queue_insertQueue(self, videoId, endpointId=None): def queue_insertQueue(self, videoId, endpointId=None):
# if no document exists # if already queued
if not self.download_queue.count_documents({'id': videoId}) >= 1: if self.download_queue.count_documents({'id': videoId}) >= 1:
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id return False
return True
# if already in archive
# key already in queue if self.check_exists(videoId):
return False return False
# add to queue
self.download_queue.insert_one({'id': videoId, 'endpoint': endpointId, 'created_time': current_time(object=True), 'status': 'queued'}).inserted_id
return True
def queue_deleteQueue(self, videoId): def queue_deleteQueue(self, videoId):
if self.download_queue.delete_one({'id': videoId}): if self.download_queue.delete_one({'id': videoId}):
@@ -430,18 +434,20 @@ class Mango:
def queue_emptyQueue(self): def queue_emptyQueue(self):
return self.download_queue.delete_many({}) return self.download_queue.delete_many({})
def queue_setFailed(self, videoId):
return self.download_queue.update_one({'id': videoId}, {'$set': {'status': 'failed'}})
def queue_getNext(self): def queue_getNext(self):
""" Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet.""" """ Returns a LIST of queue parameters. Function first checks if ID exists, if so deletes and then checks the next queued until queue is empty (None) or queued id does not exist yet."""
while True: while True:
queueItem = self.download_queue.find_one({}) queueItem = self.download_queue.find_one({'status': 'queued'})
if not queueItem: if not queueItem:
return None return None
elif self.check_exists(queueItem['id']): elif self.check_exists(queueItem['id']):
self.queue_deleteQueue(queueItem['id']) self.queue_deleteQueue(queueItem['id'])
self.download_queue.update_one({'id': queueItem['id']}, {'$set': {'status': 'working'}})
return queueItem return queueItem
########################################## ##########################################
# HELPER FUNCTIONS # # HELPER FUNCTIONS #
########################################## ##########################################

View File

@@ -42,6 +42,7 @@ def video_queue():
get_nosql().queue_deleteQueue(videoId) get_nosql().queue_deleteQueue(videoId)
return True return True
else: else:
get_nosql().queue_setFailed(videoId)
return False return False
@shared_task() @shared_task()