1
mirror of https://github.com/yt-dlp/yt-dlp synced 2024-12-25 04:13:46 +01:00
yt-dlp/yt_dlp/minicurses.py
The Hatsune Daishi adbc4ec4bb
[dash,youtube] Download live from start to end (#888)
* Add option `--live-from-start` to enable downloading live videos from start
* Add key `is_from_start` in formats to identify formats (of live videos) that downloads from start
* [dash] Create protocol `http_dash_segments_generator` that allows a function to be passed instead of fragments
* [fragment] Allow multiple live dash formats to download simultaneously
* [youtube] Implement fragment re-fetching for the live dash formats
* [youtube] Re-extract dash manifest every 5 hours (manifest expires in 6hrs)
* [postprocessor/ffmpeg] Add `FFmpegFixupDuplicateMoovPP` to fixup duplicated moov atoms

Known issue: Ctrl+C doesn't work on Windows when downloading multiple formats

Closes #1521
Authored by: nao20010128nao, pukkandan
2021-12-20 11:36:46 +05:30

182 lines
5.1 KiB
Python

import functools
from threading import Lock
from .utils import supports_terminal_sequences, write_string
CONTROL_SEQUENCES = {
'DOWN': '\n',
'UP': '\033[A',
'ERASE_LINE': '\033[K',
'RESET': '\033[0m',
}
_COLORS = {
'BLACK': '0',
'RED': '1',
'GREEN': '2',
'YELLOW': '3',
'BLUE': '4',
'PURPLE': '5',
'CYAN': '6',
'WHITE': '7',
}
_TEXT_STYLES = {
'NORMAL': '0',
'BOLD': '1',
'UNDERLINED': '4',
}
def format_text(text, f):
'''
@param f String representation of formatting to apply in the form:
[style] [light] font_color [on [light] bg_color]
Eg: "red", "bold green on light blue"
'''
f = f.upper()
tokens = f.strip().split()
bg_color = ''
if 'ON' in tokens:
if tokens[-1] == 'ON':
raise SyntaxError(f'Empty background format specified in {f!r}')
if tokens[-1] not in _COLORS:
raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
bg_color = f'4{_COLORS[tokens.pop()]}'
if tokens[-1] == 'LIGHT':
bg_color = f'0;10{bg_color[1:]}'
tokens.pop()
if tokens[-1] != 'ON':
raise SyntaxError(f'Invalid format {f.split(" ON ", 1)[1]!r} in {f!r}')
bg_color = f'\033[{bg_color}m'
tokens.pop()
if not tokens:
fg_color = ''
elif tokens[-1] not in _COLORS:
raise SyntaxError(f'{tokens[-1]} in {f!r} must be a color')
else:
fg_color = f'3{_COLORS[tokens.pop()]}'
if tokens and tokens[-1] == 'LIGHT':
fg_color = f'9{fg_color[1:]}'
tokens.pop()
fg_style = tokens.pop() if tokens and tokens[-1] in _TEXT_STYLES else 'NORMAL'
fg_color = f'\033[{_TEXT_STYLES[fg_style]};{fg_color}m'
if tokens:
raise SyntaxError(f'Invalid format {" ".join(tokens)!r} in {f!r}')
if fg_color or bg_color:
return f'{fg_color}{bg_color}{text}{CONTROL_SEQUENCES["RESET"]}'
else:
return text
class MultilinePrinterBase:
def __init__(self, stream=None, lines=1):
self.stream = stream
self.maximum = lines - 1
self._HAVE_FULLCAP = supports_terminal_sequences(stream)
def __enter__(self):
return self
def __exit__(self, *args):
self.end()
def print_at_line(self, text, pos):
pass
def end(self):
pass
def _add_line_number(self, text, line):
if self.maximum:
return f'{line + 1}: {text}'
return text
def write(self, *text):
write_string(''.join(text), self.stream)
class QuietMultilinePrinter(MultilinePrinterBase):
pass
class MultilineLogger(MultilinePrinterBase):
def write(self, *text):
self.stream.debug(''.join(text))
def print_at_line(self, text, pos):
# stream is the logger object, not an actual stream
self.write(self._add_line_number(text, pos))
class BreaklineStatusPrinter(MultilinePrinterBase):
def print_at_line(self, text, pos):
self.write(self._add_line_number(text, pos), '\n')
class MultilinePrinter(MultilinePrinterBase):
def __init__(self, stream=None, lines=1, preserve_output=True):
super().__init__(stream, lines)
self.preserve_output = preserve_output
self._lastline = self._lastlength = 0
self._movelock = Lock()
def lock(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self._movelock:
return func(self, *args, **kwargs)
return wrapper
def _move_cursor(self, dest):
current = min(self._lastline, self.maximum)
yield '\r'
distance = dest - current
if distance < 0:
yield CONTROL_SEQUENCES['UP'] * -distance
elif distance > 0:
yield CONTROL_SEQUENCES['DOWN'] * distance
self._lastline = dest
@lock
def print_at_line(self, text, pos):
if self._HAVE_FULLCAP:
self.write(*self._move_cursor(pos), CONTROL_SEQUENCES['ERASE_LINE'], text)
return
text = self._add_line_number(text, pos)
textlen = len(text)
if self._lastline == pos:
# move cursor at the start of progress when writing to same line
prefix = '\r'
if self._lastlength > textlen:
text += ' ' * (self._lastlength - textlen)
self._lastlength = textlen
else:
# otherwise, break the line
prefix = '\n'
self._lastlength = textlen
self.write(prefix, text)
self._lastline = pos
@lock
def end(self):
# move cursor to the end of the last line, and write line break
# so that other to_screen calls can precede
text = self._move_cursor(self.maximum) if self._HAVE_FULLCAP else []
if self.preserve_output:
self.write(*text, '\n')
return
if self._HAVE_FULLCAP:
self.write(
*text, CONTROL_SEQUENCES['ERASE_LINE'],
f'{CONTROL_SEQUENCES["UP"]}{CONTROL_SEQUENCES["ERASE_LINE"]}' * self.maximum)
else:
self.write(*text, ' ' * self._lastlength)