Source code for faust.types.auth
import ssl
from enum import Enum
from typing import Optional, Union
__all__ = [
'AUTH_PROTOCOLS_SSL',
'AUTH_PROTOCOLS_SASL',
'AuthProtocol',
'CredentialsArg',
'CredentialsT',
'SASLMechanism',
'to_credentials',
]
[docs]class AuthProtocol(Enum):
SSL = 'SSL'
PLAINTEXT = 'PLAINTEXT'
SASL_PLAINTEXT = 'SASL_PLAINTEXT'
SASL_SSL = 'SASL_SSL'
[docs]class SASLMechanism(Enum):
PLAIN = 'PLAIN'
GSSAPI = 'GSSAPI'
AUTH_PROTOCOLS_SSL = {AuthProtocol.SSL, AuthProtocol.SASL_SSL}
AUTH_PROTOCOLS_SASL = {AuthProtocol.SASL_PLAINTEXT, AuthProtocol.SASL_SSL}
[docs]class CredentialsT:
protocol: AuthProtocol
CredentialsArg = Union[CredentialsT, ssl.SSLContext]
[docs]def to_credentials(obj: CredentialsArg = None) -> Optional[CredentialsT]:
if obj is not None:
from faust.auth import SSLCredentials # XXX :(
if isinstance(obj, ssl.SSLContext):
return SSLCredentials(obj)
if isinstance(obj, CredentialsT):
return obj
from faust.exceptions import ImproperlyConfigured
raise ImproperlyConfigured(
f'Unknown credentials type {type(obj)}: {obj}')
return None