From 2a260e385d498fdb0b4260a3544ee11f62b838e6 Mon Sep 17 00:00:00 2001 From: Akshat Mardia Date: Fri, 28 Mar 2025 02:16:53 -0700 Subject: [PATCH] moved utils fns --- youtube_dl/traversal.py | 486 +++++++++++++++++++++++++++++++++++++++- youtube_dl/utils.py | 480 +-------------------------------------- 2 files changed, 489 insertions(+), 477 deletions(-) diff --git a/youtube_dl/traversal.py b/youtube_dl/traversal.py index 834cfef7f..b11469752 100644 --- a/youtube_dl/traversal.py +++ b/youtube_dl/traversal.py @@ -1,10 +1,482 @@ # coding: utf-8 -# TODO: move these utils.fns here and move import to utils -# flake8: noqa -from .utils import ( - dict_get, - get_first, - T, - traverse_obj, +import itertools +import inspect + +from .compat import ( + compat_casefold, + compat_etree_iterfind, + compat_re_Match, + compat_str, + compat_contextlib_suppress, + compat_http_cookies, + compat_collections_abc, + compat_etree_Element, + ) + +NO_DEFAULT = object() +IDENTITY = lambda x: x + + +class LazyList(compat_collections_abc.Iterable): + """Lazy immutable list from an iterable + Note that slices of a LazyList are lists and not LazyList""" + + class IndexError(IndexError): + def __init__(self, cause=None): + if cause: + # reproduce `raise from` + self.__cause__ = cause + super(IndexError, self).__init__() + + def __init__(self, iterable, **kwargs): + # kwarg-only + reverse = kwargs.get('reverse', False) + _cache = kwargs.get('_cache') + + self._iterable = iter(iterable) + self._cache = [] if _cache is None else _cache + self._reversed = reverse + + def __iter__(self): + if self._reversed: + # We need to consume the entire iterable to iterate in reverse + for item in self.exhaust(): + yield item + return + for item in self._cache: + yield item + for item in self._iterable: + self._cache.append(item) + yield item + + def _exhaust(self): + self._cache.extend(self._iterable) + self._iterable = [] # Discard the emptied iterable to make it pickle-able + return self._cache + + def exhaust(self): + """Evaluate the entire iterable""" + return self._exhaust()[::-1 if self._reversed else 1] + + @staticmethod + def _reverse_index(x): + return None if x is None else ~x + + def __getitem__(self, idx): + if isinstance(idx, slice): + if self._reversed: + idx = slice(self._reverse_index(idx.start), self._reverse_index(idx.stop), -(idx.step or 1)) + start, stop, step = idx.start, idx.stop, idx.step or 1 + elif isinstance(idx, int): + if self._reversed: + idx = self._reverse_index(idx) + start, stop, step = idx, idx, 0 + else: + raise TypeError('indices must be integers or slices') + if ((start or 0) < 0 or (stop or 0) < 0 + or (start is None and step < 0) + or (stop is None and step > 0)): + # We need to consume the entire iterable to be able to slice from the end + # Obviously, never use this with infinite iterables + self._exhaust() + try: + return self._cache[idx] + except IndexError as e: + raise self.IndexError(e) + n = max(start or 0, stop or 0) - len(self._cache) + 1 + if n > 0: + self._cache.extend(itertools.islice(self._iterable, n)) + try: + return self._cache[idx] + except IndexError as e: + raise self.IndexError(e) + + def __bool__(self): + try: + self[-1] if self._reversed else self[0] + except self.IndexError: + return False + return True + + def __len__(self): + self._exhaust() + return len(self._cache) + + def __reversed__(self): + return type(self)(self._iterable, reverse=not self._reversed, _cache=self._cache) + + def __copy__(self): + return type(self)(self._iterable, reverse=self._reversed, _cache=self._cache) + + def __repr__(self): + # repr and str should mimic a list. So we exhaust the iterable + return repr(self.exhaust()) + + def __str__(self): + return repr(self.exhaust()) + + +def is_iterable_like(x, allowed_types=compat_collections_abc.Iterable, blocked_types=NO_DEFAULT): + if blocked_types is NO_DEFAULT: + blocked_types = (compat_str, bytes, compat_collections_abc.Mapping) + return isinstance(x, allowed_types) and not isinstance(x, blocked_types) + + +def variadic(x, allowed_types=NO_DEFAULT): + if isinstance(allowed_types, compat_collections_abc.Iterable): + allowed_types = tuple(allowed_types) + return x if is_iterable_like(x, blocked_types=allowed_types) else (x,) + + +def try_call(*funcs, **kwargs): + + # parameter defaults + expected_type = kwargs.get('expected_type') + fargs = kwargs.get('args', []) + fkwargs = kwargs.get('kwargs', {}) + + for f in funcs: + try: + val = f(*fargs, **fkwargs) + except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError): + pass + else: + if expected_type is None or isinstance(val, expected_type): + return val + + +if __debug__: + # Raise TypeError if args can't be bound + # needs compat owing to unstable inspect API, thanks PSF :-( + try: + inspect.signature + + def _try_bind_args(fn, *args, **kwargs): + inspect.signature(fn).bind(*args, **kwargs) + except AttributeError: + # Py < 3.3 + def _try_bind_args(fn, *args, **kwargs): + fn_args = inspect.getargspec(fn) + # Py2: ArgInfo(args, varargs, keywords, defaults) + # Py3: ArgSpec(args, varargs, keywords, defaults) + if not fn_args.keywords: + for k in kwargs: + if k not in (fn_args.args or []): + raise TypeError("got an unexpected keyword argument: '{0}'".format(k)) + if not fn_args.varargs: + args_to_bind = len(args) + bindable = len(fn_args.args or []) + if args_to_bind > bindable: + raise TypeError('too many positional arguments') + bindable -= len(fn_args.defaults or []) + if args_to_bind < bindable: + if kwargs: + bindable -= len(set(fn_args.args or []) & set(kwargs)) + if bindable > args_to_bind: + raise TypeError("missing a required argument: '{0}'".format(fn_args.args[args_to_bind])) + + +def traverse_obj(obj, *paths, **kwargs): + """ + Safely traverse nested `dict`s and `Iterable`s, etc + + >>> obj = [{}, {"key": "value"}] + >>> traverse_obj(obj, (1, "key")) + 'value' + + Each of the provided `paths` is tested and the first producing a valid result will be returned. + The next path will also be tested if the path branched but no results could be found. + Supported values for traversal are `Mapping`, `Iterable`, `re.Match`, `xml.etree.ElementTree` + (xpath) and `http.cookies.Morsel`. + Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded. + + The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`. + + The keys in the path can be one of: + - `None`: Return the current object. + - `set`: Requires the only item in the set to be a type or function, + like `{type}`/`{type, type, ...}`/`{func}`. If one or more `type`s, + return only values that have one of the types. If a function, + return `func(obj)`. + - `str`/`int`: Return `obj[key]`. For `re.Match`, return `obj.group(key)`. + - `slice`: Branch out and return all values in `obj[key]`. + - `Ellipsis`: Branch out and return a list of all values. + - `tuple`/`list`: Branch out and return a list of all matching values. + Read as: `[traverse_obj(obj, branch) for branch in branches]`. + - `function`: Branch out and return values filtered by the function. + Read as: `[value for key, value in obj if function(key, value)]`. + For `Sequence`s, `key` is the index of the value. + For `Iterable`s, `key` is the enumeration count of the value. + For `re.Match`es, `key` is the group number (0 = full match) + as well as additionally any group names, if given. + - `dict`: Transform the current object and return a matching dict. + Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`. + - `any`-builtin: Take the first matching object and return it, resetting branching. + - `all`-builtin: Take all matching objects and return them as a list, resetting branching. + + `tuple`, `list`, and `dict` all support nested paths and branches. + + @params paths Paths which to traverse by. + Keyword arguments: + @param default Value to return if the paths do not match. + If the last key in the path is a `dict`, it will apply to each value inside + the dict instead, depth first. Try to avoid if using nested `dict` keys. + @param expected_type If a `type`, only accept final values of this type. + If any other callable, try to call the function on each result. + If the last key in the path is a `dict`, it will apply to each value inside + the dict instead, recursively. This does respect branching paths. + @param get_all If `False`, return the first matching result, otherwise all matching ones. + @param casesense If `False`, consider string dictionary keys as case insensitive. + + The following is only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API + + @param _traverse_string Whether to traverse into objects as strings. + If `True`, any non-compatible object will first be + converted into a string and then traversed into. + The return value of that path will be a string instead, + not respecting any further branching. + + + @returns The result of the object traversal. + If successful, `get_all=True`, and the path branches at least once, + then a list of results is returned instead. + A list is always returned if the last path branches and no `default` is given. + If a path ends on a `dict` that result will always be a `dict`. + """ + + # parameter defaults + default = kwargs.get('default', NO_DEFAULT) + expected_type = kwargs.get('expected_type') + get_all = kwargs.get('get_all', True) + casesense = kwargs.get('casesense', True) + _traverse_string = kwargs.get('_traverse_string', False) + + # instant compat + str = compat_str + + casefold = lambda k: compat_casefold(k) if isinstance(k, str) else k + + if isinstance(expected_type, type): + type_test = lambda val: val if isinstance(val, expected_type) else None + else: + type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,)) + + def lookup_or_none(v, k, getter=None): + with compat_contextlib_suppress(LookupError): + return getter(v, k) if getter else v[k] + + def from_iterable(iterables): + # chain.from_iterable(['ABC', 'DEF']) --> A B C D E F + for it in iterables: + for item in it: + yield item + + def apply_key(key, obj, is_last): + branching = False + + if obj is None and _traverse_string: + if key is Ellipsis or callable(key) or isinstance(key, slice): + branching = True + result = () + else: + result = None + + elif key is None: + result = obj + + elif isinstance(key, set): + assert len(key) >= 1, 'At least one item is required in a `set` key' + if all(isinstance(item, type) for item in key): + result = obj if isinstance(obj, tuple(key)) else None + else: + item = next(iter(key)) + assert len(key) == 1, 'Multiple items in a `set` key must all be types' + result = try_call(item, args=(obj,)) if not isinstance(item, type) else None + + elif isinstance(key, (list, tuple)): + branching = True + result = from_iterable( + apply_path(obj, branch, is_last)[0] for branch in key) + + elif key is Ellipsis: + branching = True + if isinstance(obj, compat_http_cookies.Morsel): + obj = dict(obj, key=obj.key, value=obj.value) + if isinstance(obj, compat_collections_abc.Mapping): + result = obj.values() + elif is_iterable_like(obj, (compat_collections_abc.Iterable, compat_etree_Element)): + result = obj + elif isinstance(obj, compat_re_Match): + result = obj.groups() + elif _traverse_string: + branching = False + result = str(obj) + else: + result = () + + elif callable(key): + branching = True + if isinstance(obj, compat_http_cookies.Morsel): + obj = dict(obj, key=obj.key, value=obj.value) + if isinstance(obj, compat_collections_abc.Mapping): + iter_obj = obj.items() + elif is_iterable_like(obj, (compat_collections_abc.Iterable, compat_etree_Element)): + iter_obj = enumerate(obj) + elif isinstance(obj, compat_re_Match): + iter_obj = itertools.chain( + enumerate(itertools.chain((obj.group(),), obj.groups())), + obj.groupdict().items()) + elif _traverse_string: + branching = False + iter_obj = enumerate(str(obj)) + else: + iter_obj = () + + result = (v for k, v in iter_obj if try_call(key, args=(k, v))) + if not branching: # string traversal + result = ''.join(result) + + elif isinstance(key, dict): + iter_obj = ((k, _traverse_obj(obj, v, False, is_last)) for k, v in key.items()) + result = dict((k, v if v is not None else default) for k, v in iter_obj + if v is not None or default is not NO_DEFAULT) or None + + elif isinstance(obj, compat_collections_abc.Mapping): + if isinstance(obj, compat_http_cookies.Morsel): + obj = dict(obj, key=obj.key, value=obj.value) + result = (try_call(obj.get, args=(key,)) + if casesense or try_call(obj.__contains__, args=(key,)) + else next((v for k, v in obj.items() if casefold(k) == key), None)) + + elif isinstance(obj, compat_re_Match): + result = None + if isinstance(key, int) or casesense: + # Py 2.6 doesn't have methods in the Match class/type + result = lookup_or_none(obj, key, getter=lambda _, k: obj.group(k)) + + elif isinstance(key, str): + result = next((v for k, v in obj.groupdict().items() + if casefold(k) == key), None) + + else: + result = None + if isinstance(key, (int, slice)): + if is_iterable_like(obj, (compat_collections_abc.Sequence, compat_etree_Element)): + branching = isinstance(key, slice) + result = lookup_or_none(obj, key) + elif _traverse_string: + result = lookup_or_none(str(obj), key) + + elif isinstance(obj, compat_etree_Element) and isinstance(key, str): + xpath, _, special = key.rpartition('/') + if not special.startswith('@') and not special.endswith('()'): + xpath = key + special = None + + # Allow abbreviations of relative paths, absolute paths error + if xpath.startswith('/'): + xpath = '.' + xpath + elif xpath and not xpath.startswith('./'): + xpath = './' + xpath + + def apply_specials(element): + if special is None: + return element + if special == '@': + return element.attrib + if special.startswith('@'): + return try_call(element.attrib.get, args=(special[1:],)) + if special == 'text()': + return element.text + raise SyntaxError('apply_specials is missing case for {0!r}'.format(special)) + + if xpath: + result = list(map(apply_specials, compat_etree_iterfind(obj, xpath))) + else: + result = apply_specials(obj) + + return branching, result if branching else (result,) + + def lazy_last(iterable): + iterator = iter(iterable) + prev = next(iterator, NO_DEFAULT) + if prev is NO_DEFAULT: + return + + for item in iterator: + yield False, prev + prev = item + + yield True, prev + + def apply_path(start_obj, path, test_type): + objs = (start_obj,) + has_branched = False + + key = None + for last, key in lazy_last(variadic(path, (str, bytes, dict, set))): + if not casesense and isinstance(key, str): + key = compat_casefold(key) + + if key in (any, all): + has_branched = False + filtered_objs = (obj for obj in objs if obj not in (None, {})) + if key is any: + objs = (next(filtered_objs, None),) + else: + objs = (list(filtered_objs),) + continue + + if __debug__ and callable(key): + # Verify function signature + _try_bind_args(key, None, None) + + new_objs = [] + for obj in objs: + branching, results = apply_key(key, obj, last) + has_branched |= branching + new_objs.append(results) + + objs = from_iterable(new_objs) + + if test_type and not isinstance(key, (dict, list, tuple)): + objs = map(type_test, objs) + + return objs, has_branched, isinstance(key, dict) + + def _traverse_obj(obj, path, allow_empty, test_type): + results, has_branched, is_dict = apply_path(obj, path, test_type) + results = LazyList(x for x in results if x not in (None, {})) + + if get_all and has_branched: + if results: + return results.exhaust() + if allow_empty: + return [] if default is NO_DEFAULT else default + return None + + return results[0] if results else {} if allow_empty and is_dict else None + + for index, path in enumerate(paths, 1): + result = _traverse_obj(obj, path, index == len(paths), True) + if result is not None: + return result + + return None if default is NO_DEFAULT else default + + +def dict_get(d, key_or_keys, default=None, skip_false_values=True): + exp = (lambda x: x or None) if skip_false_values else IDENTITY + return traverse_obj(d, *variadic(key_or_keys), expected_type=exp, + default=default, get_all=False) + + +def get_first(obj, keys, **kwargs): + return traverse_obj(obj, (Ellipsis,) + tuple(variadic(keys)), get_all=False, **kwargs) + + +def T(*x): + """ For use in yt-dl instead of {type, ...} or set((type, ...)) """ + return set(x) diff --git a/youtube_dl/utils.py b/youtube_dl/utils.py index ac1e78002..490ec6b94 100644 --- a/youtube_dl/utils.py +++ b/youtube_dl/utils.py @@ -15,7 +15,6 @@ import email.utils import email.header import errno import functools -import inspect import io import itertools import json @@ -42,26 +41,20 @@ from .compat import ( compat_HTMLParser, compat_basestring, compat_brotli as brotli, - compat_casefold, compat_chr, - compat_collections_abc, compat_contextlib_suppress, compat_cookiejar, compat_ctypes_WINFUNCTYPE, compat_datetime_timedelta_total_seconds, - compat_etree_Element, compat_etree_fromstring, - compat_etree_iterfind, compat_expanduser, compat_html_entities, compat_html_entities_html5, compat_http_client, - compat_http_cookies, compat_integer_types, compat_kwargs, compat_ncompress as ncompress, compat_os_name, - compat_re_Match, compat_re_Pattern, compat_shlex_quote, compat_str, @@ -83,6 +76,16 @@ from .socks import ( sockssocket, ) +from .traversal import ( # noqa + get_first, + dict_get, + T, + LazyList, + variadic, + try_call, + traverse_obj +) + def register_socks_protocols(): # "Register" SOCKS protocols @@ -4002,105 +4005,6 @@ def detect_exe_version(output, version_re=None, unrecognized='present'): return unrecognized -class LazyList(compat_collections_abc.Iterable): - """Lazy immutable list from an iterable - Note that slices of a LazyList are lists and not LazyList""" - - class IndexError(IndexError): - def __init__(self, cause=None): - if cause: - # reproduce `raise from` - self.__cause__ = cause - super(IndexError, self).__init__() - - def __init__(self, iterable, **kwargs): - # kwarg-only - reverse = kwargs.get('reverse', False) - _cache = kwargs.get('_cache') - - self._iterable = iter(iterable) - self._cache = [] if _cache is None else _cache - self._reversed = reverse - - def __iter__(self): - if self._reversed: - # We need to consume the entire iterable to iterate in reverse - for item in self.exhaust(): - yield item - return - for item in self._cache: - yield item - for item in self._iterable: - self._cache.append(item) - yield item - - def _exhaust(self): - self._cache.extend(self._iterable) - self._iterable = [] # Discard the emptied iterable to make it pickle-able - return self._cache - - def exhaust(self): - """Evaluate the entire iterable""" - return self._exhaust()[::-1 if self._reversed else 1] - - @staticmethod - def _reverse_index(x): - return None if x is None else ~x - - def __getitem__(self, idx): - if isinstance(idx, slice): - if self._reversed: - idx = slice(self._reverse_index(idx.start), self._reverse_index(idx.stop), -(idx.step or 1)) - start, stop, step = idx.start, idx.stop, idx.step or 1 - elif isinstance(idx, int): - if self._reversed: - idx = self._reverse_index(idx) - start, stop, step = idx, idx, 0 - else: - raise TypeError('indices must be integers or slices') - if ((start or 0) < 0 or (stop or 0) < 0 - or (start is None and step < 0) - or (stop is None and step > 0)): - # We need to consume the entire iterable to be able to slice from the end - # Obviously, never use this with infinite iterables - self._exhaust() - try: - return self._cache[idx] - except IndexError as e: - raise self.IndexError(e) - n = max(start or 0, stop or 0) - len(self._cache) + 1 - if n > 0: - self._cache.extend(itertools.islice(self._iterable, n)) - try: - return self._cache[idx] - except IndexError as e: - raise self.IndexError(e) - - def __bool__(self): - try: - self[-1] if self._reversed else self[0] - except self.IndexError: - return False - return True - - def __len__(self): - self._exhaust() - return len(self._cache) - - def __reversed__(self): - return type(self)(self._iterable, reverse=not self._reversed, _cache=self._cache) - - def __copy__(self): - return type(self)(self._iterable, reverse=self._reversed, _cache=self._cache) - - def __repr__(self): - # repr and str should mimic a list. So we exhaust the iterable - return repr(self.exhaust()) - - def __str__(self): - return repr(self.exhaust()) - - class PagedList(object): def __len__(self): # This is only useful for tests @@ -4337,41 +4241,6 @@ def multipart_encode(data, boundary=None): return out, content_type -def is_iterable_like(x, allowed_types=compat_collections_abc.Iterable, blocked_types=NO_DEFAULT): - if blocked_types is NO_DEFAULT: - blocked_types = (compat_str, bytes, compat_collections_abc.Mapping) - return isinstance(x, allowed_types) and not isinstance(x, blocked_types) - - -def variadic(x, allowed_types=NO_DEFAULT): - if isinstance(allowed_types, compat_collections_abc.Iterable): - allowed_types = tuple(allowed_types) - return x if is_iterable_like(x, blocked_types=allowed_types) else (x,) - - -def dict_get(d, key_or_keys, default=None, skip_false_values=True): - exp = (lambda x: x or None) if skip_false_values else IDENTITY - return traverse_obj(d, *variadic(key_or_keys), expected_type=exp, - default=default, get_all=False) - - -def try_call(*funcs, **kwargs): - - # parameter defaults - expected_type = kwargs.get('expected_type') - fargs = kwargs.get('args', []) - fkwargs = kwargs.get('kwargs', {}) - - for f in funcs: - try: - val = f(*fargs, **fkwargs) - except (AttributeError, KeyError, TypeError, IndexError, ZeroDivisionError): - pass - else: - if expected_type is None or isinstance(val, expected_type): - return val - - def try_get(src, getter, expected_type=None): if not isinstance(getter, (list, tuple)): getter = [getter] @@ -6211,335 +6080,6 @@ def clean_podcast_url(url): )/''', '', url) -if __debug__: - # Raise TypeError if args can't be bound - # needs compat owing to unstable inspect API, thanks PSF :-( - try: - inspect.signature - - def _try_bind_args(fn, *args, **kwargs): - inspect.signature(fn).bind(*args, **kwargs) - except AttributeError: - # Py < 3.3 - def _try_bind_args(fn, *args, **kwargs): - fn_args = inspect.getargspec(fn) - # Py2: ArgInfo(args, varargs, keywords, defaults) - # Py3: ArgSpec(args, varargs, keywords, defaults) - if not fn_args.keywords: - for k in kwargs: - if k not in (fn_args.args or []): - raise TypeError("got an unexpected keyword argument: '{0}'".format(k)) - if not fn_args.varargs: - args_to_bind = len(args) - bindable = len(fn_args.args or []) - if args_to_bind > bindable: - raise TypeError('too many positional arguments') - bindable -= len(fn_args.defaults or []) - if args_to_bind < bindable: - if kwargs: - bindable -= len(set(fn_args.args or []) & set(kwargs)) - if bindable > args_to_bind: - raise TypeError("missing a required argument: '{0}'".format(fn_args.args[args_to_bind])) - - -def traverse_obj(obj, *paths, **kwargs): - """ - Safely traverse nested `dict`s and `Iterable`s, etc - - >>> obj = [{}, {"key": "value"}] - >>> traverse_obj(obj, (1, "key")) - 'value' - - Each of the provided `paths` is tested and the first producing a valid result will be returned. - The next path will also be tested if the path branched but no results could be found. - Supported values for traversal are `Mapping`, `Iterable`, `re.Match`, `xml.etree.ElementTree` - (xpath) and `http.cookies.Morsel`. - Unhelpful values (`{}`, `None`) are treated as the absence of a value and discarded. - - The paths will be wrapped in `variadic`, so that `'key'` is conveniently the same as `('key', )`. - - The keys in the path can be one of: - - `None`: Return the current object. - - `set`: Requires the only item in the set to be a type or function, - like `{type}`/`{type, type, ...}`/`{func}`. If one or more `type`s, - return only values that have one of the types. If a function, - return `func(obj)`. - - `str`/`int`: Return `obj[key]`. For `re.Match`, return `obj.group(key)`. - - `slice`: Branch out and return all values in `obj[key]`. - - `Ellipsis`: Branch out and return a list of all values. - - `tuple`/`list`: Branch out and return a list of all matching values. - Read as: `[traverse_obj(obj, branch) for branch in branches]`. - - `function`: Branch out and return values filtered by the function. - Read as: `[value for key, value in obj if function(key, value)]`. - For `Sequence`s, `key` is the index of the value. - For `Iterable`s, `key` is the enumeration count of the value. - For `re.Match`es, `key` is the group number (0 = full match) - as well as additionally any group names, if given. - - `dict`: Transform the current object and return a matching dict. - Read as: `{key: traverse_obj(obj, path) for key, path in dct.items()}`. - - `any`-builtin: Take the first matching object and return it, resetting branching. - - `all`-builtin: Take all matching objects and return them as a list, resetting branching. - - `tuple`, `list`, and `dict` all support nested paths and branches. - - @params paths Paths which to traverse by. - Keyword arguments: - @param default Value to return if the paths do not match. - If the last key in the path is a `dict`, it will apply to each value inside - the dict instead, depth first. Try to avoid if using nested `dict` keys. - @param expected_type If a `type`, only accept final values of this type. - If any other callable, try to call the function on each result. - If the last key in the path is a `dict`, it will apply to each value inside - the dict instead, recursively. This does respect branching paths. - @param get_all If `False`, return the first matching result, otherwise all matching ones. - @param casesense If `False`, consider string dictionary keys as case insensitive. - - The following is only meant to be used by YoutubeDL.prepare_outtmpl and is not part of the API - - @param _traverse_string Whether to traverse into objects as strings. - If `True`, any non-compatible object will first be - converted into a string and then traversed into. - The return value of that path will be a string instead, - not respecting any further branching. - - - @returns The result of the object traversal. - If successful, `get_all=True`, and the path branches at least once, - then a list of results is returned instead. - A list is always returned if the last path branches and no `default` is given. - If a path ends on a `dict` that result will always be a `dict`. - """ - - # parameter defaults - default = kwargs.get('default', NO_DEFAULT) - expected_type = kwargs.get('expected_type') - get_all = kwargs.get('get_all', True) - casesense = kwargs.get('casesense', True) - _traverse_string = kwargs.get('_traverse_string', False) - - # instant compat - str = compat_str - - casefold = lambda k: compat_casefold(k) if isinstance(k, str) else k - - if isinstance(expected_type, type): - type_test = lambda val: val if isinstance(val, expected_type) else None - else: - type_test = lambda val: try_call(expected_type or IDENTITY, args=(val,)) - - def lookup_or_none(v, k, getter=None): - with compat_contextlib_suppress(LookupError): - return getter(v, k) if getter else v[k] - - def from_iterable(iterables): - # chain.from_iterable(['ABC', 'DEF']) --> A B C D E F - for it in iterables: - for item in it: - yield item - - def apply_key(key, obj, is_last): - branching = False - - if obj is None and _traverse_string: - if key is Ellipsis or callable(key) or isinstance(key, slice): - branching = True - result = () - else: - result = None - - elif key is None: - result = obj - - elif isinstance(key, set): - assert len(key) >= 1, 'At least one item is required in a `set` key' - if all(isinstance(item, type) for item in key): - result = obj if isinstance(obj, tuple(key)) else None - else: - item = next(iter(key)) - assert len(key) == 1, 'Multiple items in a `set` key must all be types' - result = try_call(item, args=(obj,)) if not isinstance(item, type) else None - - elif isinstance(key, (list, tuple)): - branching = True - result = from_iterable( - apply_path(obj, branch, is_last)[0] for branch in key) - - elif key is Ellipsis: - branching = True - if isinstance(obj, compat_http_cookies.Morsel): - obj = dict(obj, key=obj.key, value=obj.value) - if isinstance(obj, compat_collections_abc.Mapping): - result = obj.values() - elif is_iterable_like(obj, (compat_collections_abc.Iterable, compat_etree_Element)): - result = obj - elif isinstance(obj, compat_re_Match): - result = obj.groups() - elif _traverse_string: - branching = False - result = str(obj) - else: - result = () - - elif callable(key): - branching = True - if isinstance(obj, compat_http_cookies.Morsel): - obj = dict(obj, key=obj.key, value=obj.value) - if isinstance(obj, compat_collections_abc.Mapping): - iter_obj = obj.items() - elif is_iterable_like(obj, (compat_collections_abc.Iterable, compat_etree_Element)): - iter_obj = enumerate(obj) - elif isinstance(obj, compat_re_Match): - iter_obj = itertools.chain( - enumerate(itertools.chain((obj.group(),), obj.groups())), - obj.groupdict().items()) - elif _traverse_string: - branching = False - iter_obj = enumerate(str(obj)) - else: - iter_obj = () - - result = (v for k, v in iter_obj if try_call(key, args=(k, v))) - if not branching: # string traversal - result = ''.join(result) - - elif isinstance(key, dict): - iter_obj = ((k, _traverse_obj(obj, v, False, is_last)) for k, v in key.items()) - result = dict((k, v if v is not None else default) for k, v in iter_obj - if v is not None or default is not NO_DEFAULT) or None - - elif isinstance(obj, compat_collections_abc.Mapping): - if isinstance(obj, compat_http_cookies.Morsel): - obj = dict(obj, key=obj.key, value=obj.value) - result = (try_call(obj.get, args=(key,)) - if casesense or try_call(obj.__contains__, args=(key,)) - else next((v for k, v in obj.items() if casefold(k) == key), None)) - - elif isinstance(obj, compat_re_Match): - result = None - if isinstance(key, int) or casesense: - # Py 2.6 doesn't have methods in the Match class/type - result = lookup_or_none(obj, key, getter=lambda _, k: obj.group(k)) - - elif isinstance(key, str): - result = next((v for k, v in obj.groupdict().items() - if casefold(k) == key), None) - - else: - result = None - if isinstance(key, (int, slice)): - if is_iterable_like(obj, (compat_collections_abc.Sequence, compat_etree_Element)): - branching = isinstance(key, slice) - result = lookup_or_none(obj, key) - elif _traverse_string: - result = lookup_or_none(str(obj), key) - - elif isinstance(obj, compat_etree_Element) and isinstance(key, str): - xpath, _, special = key.rpartition('/') - if not special.startswith('@') and not special.endswith('()'): - xpath = key - special = None - - # Allow abbreviations of relative paths, absolute paths error - if xpath.startswith('/'): - xpath = '.' + xpath - elif xpath and not xpath.startswith('./'): - xpath = './' + xpath - - def apply_specials(element): - if special is None: - return element - if special == '@': - return element.attrib - if special.startswith('@'): - return try_call(element.attrib.get, args=(special[1:],)) - if special == 'text()': - return element.text - raise SyntaxError('apply_specials is missing case for {0!r}'.format(special)) - - if xpath: - result = list(map(apply_specials, compat_etree_iterfind(obj, xpath))) - else: - result = apply_specials(obj) - - return branching, result if branching else (result,) - - def lazy_last(iterable): - iterator = iter(iterable) - prev = next(iterator, NO_DEFAULT) - if prev is NO_DEFAULT: - return - - for item in iterator: - yield False, prev - prev = item - - yield True, prev - - def apply_path(start_obj, path, test_type): - objs = (start_obj,) - has_branched = False - - key = None - for last, key in lazy_last(variadic(path, (str, bytes, dict, set))): - if not casesense and isinstance(key, str): - key = compat_casefold(key) - - if key in (any, all): - has_branched = False - filtered_objs = (obj for obj in objs if obj not in (None, {})) - if key is any: - objs = (next(filtered_objs, None),) - else: - objs = (list(filtered_objs),) - continue - - if __debug__ and callable(key): - # Verify function signature - _try_bind_args(key, None, None) - - new_objs = [] - for obj in objs: - branching, results = apply_key(key, obj, last) - has_branched |= branching - new_objs.append(results) - - objs = from_iterable(new_objs) - - if test_type and not isinstance(key, (dict, list, tuple)): - objs = map(type_test, objs) - - return objs, has_branched, isinstance(key, dict) - - def _traverse_obj(obj, path, allow_empty, test_type): - results, has_branched, is_dict = apply_path(obj, path, test_type) - results = LazyList(x for x in results if x not in (None, {})) - - if get_all and has_branched: - if results: - return results.exhaust() - if allow_empty: - return [] if default is NO_DEFAULT else default - return None - - return results[0] if results else {} if allow_empty and is_dict else None - - for index, path in enumerate(paths, 1): - result = _traverse_obj(obj, path, index == len(paths), True) - if result is not None: - return result - - return None if default is NO_DEFAULT else default - - -def T(*x): - """ For use in yt-dl instead of {type, ...} or set((type, ...)) """ - return set(x) - - -def get_first(obj, keys, **kwargs): - return traverse_obj(obj, (Ellipsis,) + tuple(variadic(keys)), get_all=False, **kwargs) - - def join_nonempty(*values, **kwargs): # parameter defaults