from datetime import timedelta
from hashlib import sha256
from logging import getLogger
from string import Template
from typing import Callable, Optional, Type, Union, List
from urllib.parse import urlencode

from django.core.cache import BaseCache, caches
from django.http import HttpRequest, HttpResponse, JsonResponse, QueryDict
from django.shortcuts import redirect, render
from django.utils.encoding import force_bytes
from django.utils.module_loading import import_string

from axes.conf import settings
from axes.models import AccessBase

log = getLogger(__name__)

try:
    import ipware.ip

    IPWARE_INSTALLED = True
except ImportError:
    IPWARE_INSTALLED = False


def get_cache() -> BaseCache:
    """
    Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set.
    """

    return caches[getattr(settings, "AXES_CACHE", "default")]


def get_cache_timeout(request: Optional[HttpRequest] = None) -> Optional[int]:
    """
    Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME.

    The cache timeout can be either None if not configured or integer of seconds if configured.

    Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, float, integer, callable, or str path,
    and this function offers a unified _integer or None_ representation of that configuration
    for use with the Django cache backends.
    """

    cool_off = get_cool_off(request)
    if cool_off is None:
        return None
    return int(cool_off.total_seconds())


def get_cool_off(request: Optional[HttpRequest] = None) -> Optional[timedelta]:
    """
    Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME.

    The return value is either None or timedelta.

    Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, integer/float of hours,
    a path to a callable or a callable taking 1 argument (the request). This function
    offers a unified _timedelta or None_ representation of that configuration for use with the
    Axes internal implementations.

    :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
    """

    cool_off = settings.AXES_COOLOFF_TIME

    if isinstance(cool_off, int):
        return timedelta(hours=cool_off)
    if isinstance(cool_off, float):
        return timedelta(minutes=cool_off * 60)
    if isinstance(cool_off, str):
        cool_off_func = import_string(cool_off)
        return cool_off_func(request)
    if callable(cool_off):
        return cool_off(request)  # pylint: disable=not-callable

    return cool_off


def get_cool_off_iso8601(delta: timedelta) -> str:
    """
    Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs.
    """

    seconds = delta.total_seconds()
    minutes, seconds = divmod(seconds, 60)
    hours, minutes = divmod(minutes, 60)
    days, hours = divmod(hours, 24)

    days_str = f"{days:.0f}D" if days else ""

    time_str = "".join(
        f"{value:.0f}{designator}"
        for value, designator in [[hours, "H"], [minutes, "M"], [seconds, "S"]]
        if value
    )

    if time_str:
        return f"P{days_str}T{time_str}"
    return f"P{days_str}"


def get_credentials(username: Optional[str] = None, **kwargs) -> dict:
    """
    Calculate credentials for Axes to use internally from given username and kwargs.

    Axes will set the username value into the key defined with ``settings.AXES_USERNAME_FORM_FIELD``
    and update the credentials dictionary with the kwargs given on top of that.
    """

    credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
    credentials.update(kwargs)
    return credentials


def get_client_username(
    request: HttpRequest, credentials: Optional[dict] = None
) -> str:
    """
    Resolve client username from the given request or credentials if supplied.

    The order of preference for fetching the username is as follows:

    1. If configured, use ``AXES_USERNAME_CALLABLE``, and supply ``request, credentials`` as arguments
    2. If given, use ``credentials`` and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``)
    3. Use request.POST and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``)

    :param request: incoming Django ``HttpRequest`` or similar object from authentication backend or other source
    :param credentials: incoming credentials ``dict`` or similar object from authentication backend or other source
    """

    if settings.AXES_USERNAME_CALLABLE:
        log.debug("Using settings.AXES_USERNAME_CALLABLE to get username")

        if callable(settings.AXES_USERNAME_CALLABLE):
            return settings.AXES_USERNAME_CALLABLE(  # pylint: disable=not-callable
                request, credentials
            )
        if isinstance(settings.AXES_USERNAME_CALLABLE, str):
            return import_string(settings.AXES_USERNAME_CALLABLE)(request, credentials)
        raise TypeError(
            "settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None."
        )

    if credentials:
        log.debug(
            "Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD"
        )
        return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)

    log.debug(
        "Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD"
    )

    request_data = getattr(request, "data", request.POST)
    return request_data.get(settings.AXES_USERNAME_FORM_FIELD, None)


def get_client_ip_address(
    request: HttpRequest,
    use_ipware: Optional[bool] = None,
) -> Optional[str]:
    """
    Get client IP address as configured by the user.

    The order of preference for address resolution is as follows:

    1. If configured, use ``AXES_CLIENT_IP_CALLABLE``, and supply ``request`` as argument
    2. If available, use django-ipware package (parameters can be configured in the Axes package)
    3. Use ``request.META.get('REMOTE_ADDR', None)`` as a fallback

    :param request: incoming Django ``HttpRequest`` or similar object from authentication backend or other source
    """

    if settings.AXES_CLIENT_IP_CALLABLE:
        log.debug("Using settings.AXES_CLIENT_IP_CALLABLE to get client IP address")

        if callable(settings.AXES_CLIENT_IP_CALLABLE):
            return settings.AXES_CLIENT_IP_CALLABLE(  # pylint: disable=not-callable
                request
            )
        if isinstance(settings.AXES_CLIENT_IP_CALLABLE, str):
            return import_string(settings.AXES_CLIENT_IP_CALLABLE)(request)
        raise TypeError(
            "settings.AXES_CLIENT_IP_CALLABLE needs to be a string, callable, or None."
        )

    # Resolve using django-ipware from a configuration flag that can be set to False to explicitly disable
    # this is added to both enable or disable the branch when ipware is installed in the test environment
    if use_ipware is None:
        use_ipware = IPWARE_INSTALLED
    if use_ipware:
        log.debug("Using django-ipware to get client IP address")

        client_ip_address, _ = ipware.ip.get_client_ip(
            request,
            proxy_order=settings.AXES_IPWARE_PROXY_ORDER,
            proxy_count=settings.AXES_IPWARE_PROXY_COUNT,
            proxy_trusted_ips=settings.AXES_IPWARE_PROXY_TRUSTED_IPS,
            request_header_order=settings.AXES_IPWARE_META_PRECEDENCE_ORDER,
        )
        return client_ip_address

    log.debug(
        "Using request.META.get('REMOTE_ADDR', None) fallback method to get client IP address"
    )
    return request.META.get("REMOTE_ADDR", None)


def get_client_user_agent(request: HttpRequest) -> str:
    return request.META.get("HTTP_USER_AGENT", "<unknown>")[:255]


def get_client_path_info(request: HttpRequest) -> str:
    return request.META.get("PATH_INFO", "<unknown>")[:255]


def get_client_http_accept(request: HttpRequest) -> str:
    return request.META.get("HTTP_ACCEPT", "<unknown>")[:1025]


def get_lockout_parameters(
    request_or_attempt: Union[HttpRequest, AccessBase],
    credentials: Optional[dict] = None,
) -> List[Union[str, List[str]]]:
    if callable(settings.AXES_LOCKOUT_PARAMETERS):
        return settings.AXES_LOCKOUT_PARAMETERS(request_or_attempt, credentials)

    if isinstance(settings.AXES_LOCKOUT_PARAMETERS, str):
        return import_string(settings.AXES_LOCKOUT_PARAMETERS)(
            request_or_attempt, credentials
        )

    if isinstance(settings.AXES_LOCKOUT_PARAMETERS, list):
        return settings.AXES_LOCKOUT_PARAMETERS

    raise TypeError(
        "settings.AXES_LOCKOUT_PARAMETERS needs to be a callable or iterable"
    )


def get_client_parameters(
    username: str,
    ip_address: str,
    user_agent: str,
    request_or_attempt: Union[HttpRequest, AccessBase],
    credentials: Optional[dict] = None,
) -> List[dict]:
    """
    Get query parameters for filtering AccessAttempt queryset.

    This method returns a dict that guarantees iteration order for keys and values,
    and can so be used in e.g. the generation of hash keys or other deterministic functions.

    Returns list of dict, every item of list are separate parameters
    """
    lockout_parameters = get_lockout_parameters(request_or_attempt, credentials)

    parameters_dict = {
        "username": username,
        "ip_address": ip_address,
        "user_agent": user_agent,
    }

    filter_kwargs = []

    for parameter in lockout_parameters:
        try:
            if isinstance(parameter, str):
                filter_kwarg = {parameter: parameters_dict[parameter]}
            else:
                filter_kwarg = {
                    combined_parameter: parameters_dict[combined_parameter]
                    for combined_parameter in parameter
                }
            filter_kwargs.append(filter_kwarg)

        except KeyError as e:
            error_msg = (
                f"{e} lockout parameter is not allowed. "
                f"Allowed parameters: {', '.join(parameters_dict.keys())}"
            )
            log.exception(error_msg)
            raise ValueError(error_msg) from e

    return filter_kwargs


def make_cache_key_list(filter_kwargs_list: List[dict]) -> List[str]:
    cache_keys = []
    for filter_kwargs in filter_kwargs_list:
        cache_key_components = "".join(
            value for value in filter_kwargs.values() if value
        )
        cache_key_digest = sha256(cache_key_components.encode()).hexdigest()
        cache_keys.append(f"axes-{cache_key_digest}")
    return cache_keys


def get_client_cache_keys(
    request_or_attempt: Union[HttpRequest, AccessBase],
    credentials: Optional[dict] = None,
) -> List[str]:
    """
    Build cache key name from request or AccessAttempt object.

    :param request_or_attempt: HttpRequest or AccessAttempt object
    :param credentials: credentials containing user information
    :return cache_key: Hash key that is usable for Django cache backends
    """

    if isinstance(request_or_attempt, AccessBase):
        username = request_or_attempt.username
        ip_address = request_or_attempt.ip_address
        user_agent = request_or_attempt.user_agent
    else:
        username = get_client_username(request_or_attempt, credentials)
        ip_address = get_client_ip_address(request_or_attempt)
        user_agent = get_client_user_agent(request_or_attempt)

    filter_kwargs_list = get_client_parameters(
        username, ip_address, user_agent, request_or_attempt, credentials
    )

    return make_cache_key_list(filter_kwargs_list)


def get_client_str(
    username: str,
    ip_address: str,
    user_agent: str,
    path_info: str,
    request: HttpRequest,
) -> str:
    """
    Get a readable string that can be used in e.g. logging to distinguish client requests.

    Example log format would be
    ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}``
    """

    if settings.AXES_CLIENT_STR_CALLABLE:
        log.debug("Using settings.AXES_CLIENT_STR_CALLABLE to get client string.")

        if callable(settings.AXES_CLIENT_STR_CALLABLE):
            return settings.AXES_CLIENT_STR_CALLABLE(  # pylint: disable=not-callable
                username, ip_address, user_agent, path_info, request
            )
        if isinstance(settings.AXES_CLIENT_STR_CALLABLE, str):
            return import_string(settings.AXES_CLIENT_STR_CALLABLE)(
                username, ip_address, user_agent, path_info, request
            )
        raise TypeError(
            "settings.AXES_CLIENT_STR_CALLABLE needs to be a string, callable or None."
        )

    client_dict = {}

    if settings.AXES_VERBOSE:
        # Verbose mode logs every attribute that is available
        client_dict["username"] = username
        client_dict["ip_address"] = ip_address
        client_dict["user_agent"] = user_agent
    else:
        # Other modes initialize the attributes that are used for the actual lockouts
        client_list = get_client_parameters(username, ip_address, user_agent, request)
        client_dict = {}
        for client in client_list:
            client_dict.update(client)
    client_dict = cleanse_parameters(client_dict.copy())
    # Path info is always included as last component in the client string for traceability purposes
    if path_info and isinstance(path_info, (tuple, list)):
        path_info = path_info[0]
    client_dict["path_info"] = path_info

    # Template the internal dictionary representation into a readable and concatenated {key: "value"} format
    template = Template('$key: "$value"')
    items = [{"key": k, "value": v} for k, v in client_dict.items()]
    client_str = ", ".join(template.substitute(item) for item in items)
    client_str = "{" + client_str + "}"
    return client_str


def cleanse_parameters(params: dict) -> dict:
    """
    Replace sensitive parameter values in a parameter dict with
    a safe placeholder value.

    Parameters name ``'password'`` will always be cleansed.  Additionally,
    parameters named in ``settings.AXES_SENSITIVE_PARAMETERS`` and
    ``settings.AXES_PASSWORD_FORM_FIELD will be cleansed.

    This is used to prevent passwords and similar values from
    being logged in cleartext.
    """
    sensitive_parameters = ["password"] + settings.AXES_SENSITIVE_PARAMETERS
    if settings.AXES_PASSWORD_FORM_FIELD:
        sensitive_parameters.append(settings.AXES_PASSWORD_FORM_FIELD)

    if sensitive_parameters:
        cleansed = params.copy()
        for param in sensitive_parameters:
            if param in cleansed:
                cleansed[param] = "********************"
        return cleansed
    return params


def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str:
    """
    Turns a query dictionary into an easy-to-read list of key-value pairs.

    If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` or if the fieldname is included
    in ``settings.AXES_SENSITIVE_PARAMETERS`` its value will be masked.

    The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads.
    """

    query_dict = cleanse_parameters(query.copy())

    template = Template("$key=$value")
    items = [{"key": k, "value": v} for k, v in query_dict.items()]
    query_str = "\n".join(template.substitute(item) for item in items)

    return query_str[:max_length]


def get_failure_limit(request: HttpRequest, credentials) -> int:
    if callable(settings.AXES_FAILURE_LIMIT):
        return settings.AXES_FAILURE_LIMIT(  # pylint: disable=not-callable
            request, credentials
        )
    if isinstance(settings.AXES_FAILURE_LIMIT, str):
        return import_string(settings.AXES_FAILURE_LIMIT)(request, credentials)
    if isinstance(settings.AXES_FAILURE_LIMIT, int):
        return settings.AXES_FAILURE_LIMIT
    raise TypeError("settings.AXES_FAILURE_LIMIT needs to be a callable or an integer")


def get_lockout_message() -> str:
    if settings.AXES_COOLOFF_TIME:
        return settings.AXES_COOLOFF_MESSAGE
    return settings.AXES_PERMALOCK_MESSAGE


def get_lockout_response(
    request: HttpRequest, credentials: Optional[dict] = None
) -> HttpResponse:
    if settings.AXES_LOCKOUT_CALLABLE:
        if callable(settings.AXES_LOCKOUT_CALLABLE):
            return settings.AXES_LOCKOUT_CALLABLE(  # pylint: disable=not-callable
                request, credentials
            )
        if isinstance(settings.AXES_LOCKOUT_CALLABLE, str):
            return import_string(settings.AXES_LOCKOUT_CALLABLE)(request, credentials)
        raise TypeError(
            "settings.AXES_LOCKOUT_CALLABLE needs to be a string, callable, or None."
        )

    status = settings.AXES_HTTP_RESPONSE_CODE
    context = {
        "failure_limit": get_failure_limit(request, credentials),
        "username": get_client_username(request, credentials) or "",
    }

    cool_off = get_cool_off(request)
    if cool_off:
        context.update(
            {
                "cooloff_time": get_cool_off_iso8601(
                    cool_off
                ),  # differing old name is kept for backwards compatibility
                "cooloff_timedelta": cool_off,
            }
        )

    if request.META.get("HTTP_X_REQUESTED_WITH") == "XMLHttpRequest":
        json_response = JsonResponse(context, status=status)
        json_response["Access-Control-Allow-Origin"] = (
            settings.AXES_ALLOWED_CORS_ORIGINS
        )
        json_response["Access-Control-Allow-Methods"] = "POST, OPTIONS"
        json_response["Access-Control-Allow-Headers"] = (
            "Origin, Content-Type, Accept, Authorization, x-requested-with"
        )
        return json_response

    if settings.AXES_LOCKOUT_TEMPLATE:
        return render(request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status)

    if settings.AXES_LOCKOUT_URL:
        lockout_url = settings.AXES_LOCKOUT_URL
        query_string = urlencode({"username": context["username"]})
        url = f"{lockout_url}?{query_string}"
        return redirect(url)

    return HttpResponse(get_lockout_message(), status=status)


def is_ip_address_in_whitelist(ip_address: str) -> bool:
    if not settings.AXES_IP_WHITELIST:
        return False

    return (  # pylint: disable=unsupported-membership-test
        ip_address in settings.AXES_IP_WHITELIST
    )


def is_ip_address_in_blacklist(ip_address: str) -> bool:
    if not settings.AXES_IP_BLACKLIST:
        return False

    return (  # pylint: disable=unsupported-membership-test
        ip_address in settings.AXES_IP_BLACKLIST
    )


def is_client_ip_address_whitelisted(request: HttpRequest):
    """
    Check if the given request refers to a whitelisted IP.
    """

    if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(
        request.axes_ip_address
    ):
        return True

    if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(
        request.axes_ip_address
    ):
        return True

    return False


def is_client_ip_address_blacklisted(request: HttpRequest) -> bool:
    """
    Check if the given request refers to a blacklisted IP.
    """

    if is_ip_address_in_blacklist(request.axes_ip_address):
        return True

    if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(
        request.axes_ip_address
    ):
        return True

    return False


def is_client_method_whitelisted(request: HttpRequest) -> bool:
    """
    Check if the given request uses a whitelisted method.
    """

    if settings.AXES_NEVER_LOCKOUT_GET and request.method == "GET":
        return True

    return False


def is_user_attempt_whitelisted(
    request: HttpRequest, credentials: Optional[dict] = None
) -> bool:
    """
    Check if the given request or credentials refer to a whitelisted username.

    This method invokes the ``settings.AXES_WHITELIST`` callable
    with ``request`` and ``credentials`` arguments.

    This function could use the following implementation for checking
    the lockout flags from a specific property in the user object:

    .. code-block: python

       username_value = get_client_username(request, credentials)
       username_field = getattr(
           get_user_model(),
           "USERNAME_FIELD",
           "username"
       )
       kwargs = {username_field: username_value}

       user_model = get_user_model()
       user = user_model.objects.get(**kwargs)
       return user.nolockout
    """

    whitelist_callable = settings.AXES_WHITELIST_CALLABLE
    if whitelist_callable is None:
        return False
    if callable(whitelist_callable):
        return whitelist_callable(request, credentials)  # pylint: disable=not-callable
    if isinstance(whitelist_callable, str):
        return import_string(whitelist_callable)(request, credentials)

    raise TypeError(
        "settings.AXES_WHITELIST_CALLABLE needs to be a string, callable, or None."
    )


def toggleable(func) -> Callable:
    """
    Decorator that toggles function execution based on settings.

    If the ``settings.AXES_ENABLED`` flag is set to ``False``
    the decorated function never runs and a None is returned.

    This decorator is only suitable for functions that do not
    require return values to be passed back to callers.
    """

    def inner(*args, **kwargs):  # pylint: disable=inconsistent-return-statements
        if settings.AXES_ENABLED:
            return func(*args, **kwargs)

    return inner


def get_client_session_hash(request: HttpRequest) -> str:
    """
    Get client session and returns the SHA256 hash of session key, forcing session creation if required.

    If no session is available on request returns an empty string.
    """
    try:
        session = request.session
    except AttributeError:
        # when no session is available just return an empty string
        return ""

    # ensure that a session key exists at this point
    # because session middleware usually creates the session key at the end
    # of request cycle
    if session.session_key is None:
        session.create()

    return sha256(force_bytes(session.session_key)).hexdigest()
