Source code for faust.auth
"""Authentication Credentials."""
import ssl
from typing import Any, Optional, Union
from faust.types.auth import AuthProtocol, CredentialsT, SASLMechanism
__all__ = [
'Credentials',
'SASLCredentials',
'GSSAPICredentials',
'SSLCredentials',
]
[docs]class Credentials(CredentialsT):
"""Base class for authentication credentials."""
[docs]class SASLCredentials(Credentials):
"""Describe SASL credentials."""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.PLAIN
username: Optional[str]
password: Optional[str]
ssl_context: Optional[ssl.SSLContext]
def __init__(self, *,
username: str = None,
password: str = None,
ssl_context: ssl.SSLContext = None,
mechanism: Union[str, SASLMechanism] = None) -> None:
self.username = username
self.password = password
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
if mechanism is not None:
self.mechanism = SASLMechanism(mechanism)
def __repr__(self) -> str:
return f'<{type(self).__name__}: username={self.username}>'
[docs]class GSSAPICredentials(Credentials):
"""Describe GSSAPI credentials over SASL."""
protocol = AuthProtocol.SASL_PLAINTEXT
mechanism: SASLMechanism = SASLMechanism.GSSAPI
ssl_context: Optional[ssl.SSLContext]
def __init__(self, *,
kerberos_service_name: str = 'kafka',
kerberos_domain_name: str = None,
ssl_context: ssl.SSLContext = None,
mechanism: Union[str, SASLMechanism] = None) -> None:
self.kerberos_service_name = kerberos_service_name
self.kerberos_domain_name = kerberos_domain_name
self.ssl_context = ssl_context
if ssl_context is not None:
self.protocol = AuthProtocol.SASL_SSL
if mechanism is not None:
self.mechanism = SASLMechanism(mechanism)
def __repr__(self) -> str:
return '<{0}: kerberos service={1!r} domain={2!r}'.format(
type(self).__name__,
self.kerberos_service_name,
self.kerberos_domain_name,
)
[docs]class SSLCredentials(Credentials):
"""Describe SSL credentials/settings."""
protocol = AuthProtocol.SSL
context: ssl.SSLContext
def __init__(self, context: ssl.SSLContext = None, *,
purpose: Any = None,
cafile: Optional[str] = None,
capath: Optional[str] = None,
cadata: Optional[str] = None) -> None:
if context is None:
context = ssl.create_default_context(
purpose=purpose,
cafile=cafile,
capath=capath,
cadata=cadata,
)
self.context = context
def __repr__(self) -> str:
return f'<{type(self).__name__}: context={self.context}>'