Source code for gateone.auth.authorization

# -*- coding: utf-8 -*-
#
#       Copyright 2013 Liftoff Software Corporation
#

# Meta
__license__ = "AGPLv3 or Proprietary (see LICENSE.txt)"
__author__ = 'Dan McDougall <daniel.mcdougall@liftoffsoftware.com>'

__doc__ = """\
.. _authorization.py:

Authorization
==============
This module contains Gate One's authorization helpers.

Docstrings
==========
"""

# Import stdlib stuff
import os, logging, re

# Import our own stuff
from gateone.core.utils import noop
from gateone.core.utils import memoize
from gateone.core.configuration import RUDict
from gateone.core.locale import get_translation
from gateone.core.log import go_logger

# Localization support
_ = get_translation()

# Globals
auth_log = go_logger('gateone.auth')

# Authorization stuff
@memoize
def applicable_policies(application, user, policies):
    """
    Given an *application* and a *user* object, returns the merged/resolved
    policies from the given *policies* :class:`RUDict`.

    .. note:: Policy settings always start with '*', 'user', or 'group'.
    """
    # Start with the default policy
    try:
        policy = RUDict(policies['*'][application].copy())
    except KeyError:
        # No default policy--not good but not mandatory
        policy = RUDict()
    for key, value in policies.items():
        if key == '*':
            continue # Default policy was already handled
        if application not in value:
            continue # No sense processing inapplicable stuff
        # Handle users and their properties first
        if key.startswith('user=') or key.startswith('user.upn='):
            # UPNs are very straightforward
            upn = key.split('=', 1)[1]
            if re.match(upn, user['upn']):
                policy.update(value[application])
        elif key.startswith('user.'):
            # An attribute check (e.g. 'user.ip_address=10.1.1.1')
            attribute = key.split('.', 1)[1] # Get rid of the 'user.' part
            attribute, must_match = attribute.split('=', 1)
            if attribute in user:
                if re.match(must_match, user[attribute]):
                    policy.update(value[application])
        # TODO: Group stuff here (need attribute repo stuff first)
    return policy

[docs]class require(object): """ A decorator to add authorization requirements to any given function or method using condition classes. Condition classes are classes with check() methods that return True if the condition is met. Example of using @require with is_user():: @require(is_user('administrator')) def admin_index(self): return 'Hello, Administrator!' This would only allow the user, 'administrator' access to the index page. In this example the *condition* is the `is_user` function which checks that the logged-in user's username (aka UPN) is 'administrator'. """ def __init__(self, *conditions): self.conditions = conditions def __call__(self, f): conditions = self.conditions # The following only gets run when the wrapped method is called def wrapped_f(self, *args, **kwargs): # Now check the conditions for condition in conditions: # Conditions don't have access to self directly so we use the # 'self' associated with the user's open connection to update # the condition's 'instance' attribute condition.instance = self # This lets the condition know what it is being applied to: condition.function = f condition.f_args = args condition.f_kwargs = kwargs if not condition.check(): if hasattr(self, 'current_user') and self.current_user: if 'upn' in self.current_user: auth_log.error(_( '{"ip_address": "%s"} %s -> %s ' 'failed requirement: %s' % ( self.request.remote_ip, self.current_user['upn'], f.__name__, str(condition)))) else: auth_log.error(_( '{"ip_address": "%s"} unknown user -> %s ' 'failed requirement: %s' % ( self.request.remote_ip, f.__name__, str(condition)) )) # Try to notify the client of their failings msg = _("ERROR: %s" % condition.error) try: if hasattr(self, 'send_message'): self.send_message(msg) elif hasattr(self, 'ws'): # Inside an app, use ws self.ws.send_message(msg) except AttributeError: # This can happen if the client disconnects in the # middle of this operation. Ignore. pass return noop return f(self, *args, **kwargs) return wrapped_f
[docs]class authenticated(object): """ A condition class to be used with the @require decorator that returns True if the user is authenticated. .. note:: Only meant to be used with WebSockets. `tornado.web.RequestHandler` instances can use `@tornado.web.authenticated` """ error = _("Only valid users may access this function") def __str__(self): return "authenticated" def __init__(self): # These are just here as reminders that (they will be set when called) self.instance = None self.function = None self.f_args = None self.f_kwargs = None def check(self): if not self.instance.current_user: return False return True
[docs]class is_user(object): """ A condition class to be used with the @require decorator that returns True if the given username/UPN matches what's in `self._current_user`. """ error = _("You are not authorized to perform this action") def __str__(self): return "is_user: %s" % self.upn def __init__(self, upn): # NOTE: upn is the username (aka userPrincipalName) self.upn = upn self.instance = None self.function = None self.f_args = None self.f_kwargs = None def check(self): user = self.instance.current_user if user and 'upn' in user: logging.debug("Checking if %s == %s" % (user['upn'], self.upn)) return self.upn == user['upn'] else: return False
[docs]class policies(object): """ A condition class to be used with the @require decorator that returns True if all the given conditions are within the limits specified in Gate One's settings (e.g. 50limits.conf). Here's an example:: @require(authenticated(), policies('terminal')) def new_terminal(self, settings): # Actual function would be here pass That would apply all policies that are configured for the 'terminal' application. It works like this: The :class:`~app_terminal.TerminalApplication` application registers its name and policy-checking function inside of :meth:`~app_terminal.TerminalApplication.initialize` like so:: self.ws.security.update({'terminal': terminal_policies}) Whenever a function decorated with ``@require(policies('terminal'))`` is called the registered policy-checking function (e.g. :func:`app_terminal.terminal_policies`) will be called, passing the current instance of :class:`policies` as the only argument. It is then up to the policy-checking function to make a determination as to whether or not the user is allowed to execute the decorated function and must return `True` if allowed. Also note that the policy-checking function will be able to make modifications to the function and its arguments if the security policies warrant it. .. note:: If you write your own policy-checking function (like :func:`terminal_policies`) it is often a good idea to send a notification to the user indicating why they've been denied. You can do this with the :meth:`instance.send_message` method. """ # NOTE: In the future if we wish to use this function with Gate One itself # (as opposed to just a GOApplication) the 'app' will need to be 'gateone'. error = _("Your ability to perform this action has been restricted") def __str__(self): return "policies: %s" % self.app def __init__(self, app): self.app = app self.instance = None self.function = None self.f_args = None self.f_kwargs = None def check(self): security = self.instance.security if self.app in security: # Let the application's registered 'security' function make its own # determination. return security[self.app](self) return True # Nothing is registered for this application so it's OK