Source code for faust.utils.urls

from typing import List, Optional, Union
from yarl import URL

__all__ = ['urllist', 'ensure_scheme']


[docs]def urllist(arg: Union[URL, str, List[URL]], *, default_scheme: str = None) -> List[URL]: if not arg: raise ValueError('URL list argument cannot be empty') if not isinstance(arg, list): urls = str(arg).split(';') scheme = URL(urls[0]).scheme or default_scheme return [ensure_scheme(scheme, u) for u in urls] else: scheme = arg[0].scheme or default_scheme return [ensure_scheme(scheme, u) for u in arg]
[docs]def ensure_scheme(default_scheme: Optional[str], url: Union[str, URL]) -> URL: scheme: Optional[str] = None if default_scheme: if isinstance(url, URL): scheme = url.scheme else: _scheme, has_scheme, _ = url.partition('://') scheme = _scheme if has_scheme else None if not scheme: return URL(f'{default_scheme}://{url}') return URL(url)