utils.py - Supporting Functions

Gate One Utility Functions and Classes

exception gateone.core.utils.MimeTypeFail[source]

Raised by create_data_uri if the mimetype of a file could not be guessed.

exception gateone.core.utils.SSLGenerationError[source]

Raised by gen_self_signed_ssl if an error is encountered generating a self-signed SSL certificate.

exception gateone.core.utils.ChownError[source]

Raised by recursive_chown if an OSError is encountered while trying to recursively chown a directory.

class gateone.core.utils.AutoExpireDict(*args, **kwargs)[source]

An override of Python's dict that expires keys after a given _expire_timeout timeout (datetime.timedelta). The default expiration is one hour. It is used like so:

>>> expiring_dict = AutoExpireDict(timeout=timedelta(minutes=10))
>>> expiring_dict['somekey'] = 'some value'
>>> # You can see when this key was created:
>>> print(expiring_dict.creation_times['somekey'])
2013-04-15 18:44:18.224072

10 minutes later your key will be gone:

>>> 'somekey' in expiring_dict
False

The 'timeout' may be be given as a datetime.timedelta object or a string like, "1d", "30s" (will be passed through the convert_to_timedelta function).

By default AutoExpireDict will check for expired keys every 30 seconds but this can be changed by setting the 'interval':

>>> expiring_dict = AutoExpireDict(interval=5000) # 5 secs
>>> # Or to change it after you've created one:
>>> expiring_dict.interval = "10s"

The 'interval' may be an integer, a datetime.timedelta object, or a string such as '10s' or '5m' (will be passed through the convert_to_timedelta function).

If there are no keys remaining the tornado.ioloop.PeriodicCallback ( self._key_watcher) that checks expiration will be automatically stopped. As soon as a new key is added it will be started back up again.

Note

Only works if there's a running instances of tornado.ioloop.IOLoop.

timeout

A property that controls how long a key will last before being automatically removed. May be be given as a datetime.timedelta object or a string like, "1d", "30s" (will be passed through the convert_to_timedelta function).

interval

A property that controls how often we check for expired keys. May be given as milliseconds (integer), a datetime.timedelta object, or a string like, "1d", "30s" (will be passed through the convert_to_timedelta function).

renew(key)[source]

Resets the timeout on the given key; like it was just created.

update(*args, **kwargs)[source]

An override that calls self.renew() for every key that gets updated.

clear()[source]

An override that empties self.creation_times and calls self._key_watcher.stop().

_timeout_checker()[source]

Walks self and removes keys that have passed the expiration point.

class gateone.core.utils.memoize(fn, timeout=None)[source]

A memoization decorator that works with multiple arguments as well as unhashable arguments (e.g. dicts). It also self-expires any memoized calls after the timedelta specified via timeout.

If a timeout is not given memoized information will be discared after five minutes.

Note

Expiration checks will be performed every 30 seconds.

gateone.core.utils.noop(*args, **kwargs)[source]

Do nothing (i.e. "No Operation")

gateone.core.utils.debug_info(name, *args, **kwargs)[source]

This function returns a string like this:

>>> debug_info('some_function', 5, 10, foo="bar")
'some_function(5, 10, foo="bar")'

Primarily aimed at debugging.

gateone.core.utils.write_pid(path)[source]

Writes our PID to path.

gateone.core.utils.read_pid(path)[source]

Reads our current PID from path.

gateone.core.utils.remove_pid(path)[source]

Removes the PID file at path.

gateone.core.utils.shell_command(cmd, timeout_duration=5)[source]

Resets the SIGCHLD signal handler (if necessary), executes cmd via getstatusoutput(), then re-enables the SIGCHLD handler (if it was set to something other than SIG_DFL). Returns the result of getstatusoutput() which is a tuple in the form of:

(exitstatus, output)

If the command takes longer than timeout_duration seconds, it will be auto-killed and the following will be returned:

(255, _("ERROR: Timeout running shell command"))
gateone.core.utils.json_encode(obj)[source]

On some platforms (CentOS 6.2, specifically) tornado.escape.json_decode doesn't seem to work just right when it comes to returning unicode strings. This is just a wrapper that ensures that the returned string is unicode.

gateone.core.utils.gen_self_signed_ssl(path=None)[source]

Generates a self-signed SSL certificate using pyOpenSSL or the openssl command depending on what's available, The resulting key/certificate will use the RSA algorithm at 4096 bits.

gateone.core.utils.gen_self_signed_openssl(path=None)[source]

This method will generate a secure self-signed SSL key/certificate pair (using the openssl command) saving the result as 'certificate.pem' and 'keyfile.pem' to path. If path is not given the result will be saved in the current working directory. The certificate will be valid for 10 years.

Note

The self-signed certificate will utilize 4096-bit RSA encryption.

gateone.core.utils.gen_self_signed_pyopenssl(notAfter=None, path=None)[source]

This method will generate a secure self-signed SSL key/certificate pair (using pyOpenSSL) saving the result as 'certificate.pem' and 'keyfile.pem' in path. If path is not given the result will be saved in the current working directory. By default the certificate will be valid for 10 years but this can be overridden by passing a valid timestamp via the notAfter argument.

Examples:

>>> gen_self_signed_ssl(60 * 60 * 24 * 365) # 1-year certificate
>>> gen_self_signed_ssl() # 10-year certificate

Note

The self-signed certificate will utilize 4096-bit RSA encryption.

gateone.core.utils.none_fix(val)[source]

If val is a string that utlimately means 'none', return None. Otherwise return val as-is. Examples:

>>> none_fix('none')
None
>>> none_fix('0')
None
>>> none_fix('whatever')
'whatever'
gateone.core.utils.str2bool(val)[source]

Converts strings like, 'false', 'true', '0', and '1' into their boolean equivalents (in Python). If no logical match is found, return False. Examples:

>>> str2bool('false')
False
>>> str2bool('1')
True
>>> str2bool('whatever')
False
gateone.core.utils.generate_session_id()[source]

Returns a random, 45-character session ID. Example:

>>> generate_session_id()
"NzY4YzFmNDdhMTM1NDg3Y2FkZmZkMWJmYjYzNjBjM2Y5O"
>>>
gateone.core.utils.mkdir_p(path)[source]

Pythonic version of "mkdir -p". Example equivalents:

>>> mkdir_p('/tmp/test/testing') # Does the same thing as...
>>> from subprocess import call
>>> call('mkdir -p /tmp/test/testing')

Note

This doesn't actually call any external commands.

gateone.core.utils.cmd_var_swap(cmd, **kwargs)[source]

Returns cmd with %variable% replaced with the keys/values passed in via kwargs. This function is used by Gate One's Terminal application to swap the following Gate One variables in defined terminal 'commands':

%SESSION% session
%SESSION_DIR% session_dir
%SESSION_HASH% session_hash
%USERDIR% user_dir
%USER% user
%TIME% time

This allows for unique or user-specific values to be swapped into command line arguments like so:

ssh_connect.py -M -S '%SESSION%/%SESSION%/%r@%L:%p'

Could become:

ssh_connect.py -M -S '/tmp/gateone/NWI0YzYxNzAwMTA3NGYyZmI0OWJmODczYmQyMjQwMDYwM/%r@%L:%p'

Here's an example:

>>> cmd = "echo '%FOO% %BAR%'"
>>> cmd_var_swap(cmd, foo="FOOYEAH,", bar="BAR NONE!")
"echo 'FOOYEAH, BAR NONE!'"

Note

The variables passed into this function via kwargs are case insensitive. cmd_var_swap(cmd, session=var) would produce the same output as cmd_var_swap(cmd, SESSION=var).

gateone.core.utils.short_hash(to_shorten)[source]

Converts to_shorten into a really short hash depenendent on the length of to_shorten. The result will be safe for use as a file name.

Note

Collisions are possible but highly unlikely because of how this method is typically used.

gateone.core.utils.random_words(n=1)[source]

Returns n random English words (as a tuple of strings) from the english_wordlist.txt file (bundled with Gate One).

Note

In Python 2 the words will be Unicode strings.

gateone.core.utils.get_process_tree(parent_pid)[source]

Returns a list of child pids that were spawned from parent_pid.

Note

Will include parent_pid in the output list.

gateone.core.utils.kill_dtached_proc_bsd(session, location, term)[source]

A BSD-specific implementation of kill_dtached_proc since Macs don't have /proc. Seems simpler than kill_dtached_proc() but actually having to call a subprocess is less efficient (due to the sophisticated signal handling required by shell_command()).

gateone.core.utils.killall_bsd(session_dir, pid_file=None)[source]

A BSD-specific version of killall since Macs don't have /proc.

Note

pid_file is not used by this function. It is simply here to provide compatibility with killall.

gateone.core.utils.kill_session_processes(session)[source]

Kills all processes that match a given session (which is a unique, 45-character string).

gateone.core.utils.entry_point_files(ep_group, enabled=None)[source]

Given an entry point group name (ep_group), returns a dict of available Python, JS, and CSS plugins for Gate One:

{
    'css': ['editor/static/codemirror.css'],
    'js': ['editor/static/codemirror.js', 'editor/static/editor.js'],
    'py': [<module 'gateone.plugins.editor' from 'gateone/plugins/editor/__init__.pyc'>]
}

Optionally, the returned dict will include only those modules and files for plugins in the enabled list (if given).

Note

Python plugins will be imported automatically as part of the discovery process.

To do this it uses the pkg_resources module from setuptools. For plugins to be imported correctly they need to register themselves using the given entry_point group (ep_group) in their setup.py. Gate One (currently) uses the following entry point group names:

  • go_plugins
  • go_applications
  • go_applications_plugins

...but this function can return the JS, CSS, and Python modules for any entry point that uses the same module_name/static/ layout.

gateone.core.utils.load_modules(modules)[source]

Given a list of Python modules, imports them.

Note

Assumes they're all in sys.path.

gateone.core.utils.merge_handlers(handlers)[source]

Takes a list of Tornado handlers like this:

[
    (r"/", MainHandler),
    (r"/ws", TerminalWebSocket),
    (r"/auth", AuthHandler),
    (r"/style", StyleHandler),
        ...
    (r"/style", SomePluginHandler),
]

...and returns a list with duplicate handlers removed; giving precedence to handlers with higher indexes. This allows plugins to override Gate One's default handlers. Given the above, this is what would be returned:

[
    (r"/", MainHandler),
    (r"/ws", TerminalWebSocket),
    (r"/auth", AuthHandler),
        ...
    (r"/style", SomePluginHandler),
]

This example would replace the default "/style" handler with SomePluginHandler; overriding Gate One's default StyleHandler.

gateone.core.utils.convert_to_timedelta(time_val)[source]

Given a time_val (string) such as '5d', returns a datetime.timedelta object representing the given value (e.g. timedelta(days=5)). Accepts the following '<num><char>' formats:

Character Meaning Example
(none) Milliseconds '500' -> 500 Milliseconds
s Seconds '60s' -> 60 Seconds
m Minutes '5m' -> 5 Minutes
h Hours '24h' -> 24 Hours
d Days '7d' -> 7 Days
M Months '2M' -> 2 Months
y Years '10y' -> 10 Years

Examples:

>>> convert_to_timedelta('7d')
datetime.timedelta(7)
>>> convert_to_timedelta('24h')
datetime.timedelta(1)
>>> convert_to_timedelta('60m')
datetime.timedelta(0, 3600)
>>> convert_to_timedelta('120s')
datetime.timedelta(0, 120)
gateone.core.utils.convert_to_bytes(size_val)[source]

Given a size_val (string) such as '100M', returns an integer representing an equivalent amount of bytes. Accepts the following '<num><char>' formats:

Character Meaning Example
B (or none) Bytes '100' or '100b' -> 100
K Kilobytes '1k' -> 1024
M Megabytes '1m' -> 1048576
G Gigabytes '1g' -> 1073741824
T Terabytes '1t' -> 1099511627776
P Petabytes '1p' -> 1125899906842624
E Exabytes '1e' -> 1152921504606846976
Z Zettabytes '1z' -> 1180591620717411303424L
Y Yottabytes '7y' -> 1208925819614629174706176L

Note

If no character is given the size_val will be assumed to be in bytes.

Tip

All characters will be converted to upper case before conversion (case-insensitive).

Examples:

>>> convert_to_bytes('2M')
2097152
>>> convert_to_bytes('2g')
2147483648
gateone.core.utils.total_seconds(td)[source]

Given a timedelta (td) return an integer representing the equivalent of Python 2.7's datetime.timdelta.total_seconds().

gateone.core.utils.process_opt_esc_sequence(chars)[source]

Parse the chars passed from terminal.Terminal by way of the special, optional escape sequence handler (e.g. '<plugin>|<text>') into a tuple of (<plugin name>, <text>). Here's an example:

>>> process_opt_esc_sequence('ssh|user@host:22')
('ssh', 'user@host:22')
gateone.core.utils.raw(text, replacement_dict=None)[source]

Returns text as a string with special characters replaced by visible equivalents using replacement_dict. If replacement_dict is None or False the global REPLACEMENT_DICT will be used. Example:

>>> test = '\x1b]0;Some xterm title\x07'
>>> print(raw(test))
'^[]0;Some title^G'
gateone.core.utils.create_data_uri(filepath, mimetype=None)[source]

Given a file at filepath, return that file as a data URI.

Raises a MimeTypeFail exception if the mimetype could not be guessed.

gateone.core.utils.human_readable_bytes(nbytes)[source]

Returns nbytes as a human-readable string in a similar fashion to how it would be displayed by ls -lh or df -h.

gateone.core.utils.which(binary, path=None)[source]

Returns the full path of binary (string) just like the 'which' command. Optionally, a path (colon-delimited string) may be given to use instead of os.environ['PATH'].

gateone.core.utils.touch(path)[source]

Emulates the 'touch' command by creating the file at path if it does not exist. If the file exist its modification time will be updated.

gateone.core.utils.timeout_func(func, args=(), kwargs={}, timeout_duration=10, default=None)[source]

Sets a timeout on the given function, passing it the given args, kwargs, and a default value to return in the event of a timeout. If default is a function that function will be called in the event of a timeout.

gateone.core.utils.valid_hostname(hostname, allow_underscore=False)[source]

Returns True if the given hostname is valid according to RFC rules. Works with Internationalized Domain Names (IDN) and optionally, hostnames with an underscore (if allow_underscore is True).

The rules for hostnames:

  • Must be less than 255 characters.
  • Individual labels (separated by dots) must be <= 63 characters.
  • Only the ASCII alphabet (A-Z) is allowed along with dashes (-) and dots (.).
  • May not start with a dash or a dot.
  • May not end with a dash.
  • If an IDN, when converted to Punycode it must comply with the above.

IP addresses will be validated according to their well-known specifications.

Examples:

>>> valid_hostname('foo.bar.com.') # Standard FQDN
True
>>> valid_hostname('2foo') # Short hostname
True
>>> valid_hostname('-2foo') # No good:  Starts with a dash
False
>>> valid_hostname('host_a') # No good: Can't have underscore
False
>>> valid_hostname('host_a', allow_underscore=True) # Now it'll validate
True
>>> valid_hostname(u'ジェーピーニック.jp') # Example valid IDN
True
gateone.core.utils.recursive_chown(path, uid, gid)[source]

Emulates 'chown -R uid:gid path' in pure Python

gateone.core.utils.check_write_permissions(user, path)[source]

Returns True if the given user has write permissions to path. user can be a UID (int) or a username (string).

gateone.core.utils.bind(function, self)[source]

Will return function with self bound as the first argument. Allows one to write functions like this:

def foo(self, whatever):
    return whatever

...outside of the construct of a class.

gateone.core.utils.minify(path_or_fileobj, kind)[source]

Returns path_or_fileobj as a minified string. kind should be one of 'js' or 'css'. Works with JavaScript and CSS files using slimit and cssmin, respectively.

gateone.core.utils._minify(path_or_fileobj, kind)

Returns path_or_fileobj as a minified string. kind should be one of 'js' or 'css'. Works with JavaScript and CSS files using slimit and cssmin, respectively.

gateone.core.utils.get_or_cache(cache_dir, path, minify=True)[source]

Given a path, returns the cached version of that file. If the file has yet to be cached, cache it and return the result. If minify is True (the default), the file will be minified as part of the caching process (if possible).

gateone.core.utils.drop_privileges(uid='nobody', gid='nogroup', supl_groups=None)[source]

Drop privileges by changing the current process owner/group to uid/gid (both may be an integer or a string). If supl_groups (list) is given the process will be assigned those values as its effective supplemental groups. If supl_groups is None it will default to using 'tty' as the only supplemental group. Example:

drop_privileges('gateone', 'gateone', ['tty'])

This would change the current process owner to gateone/gateone with 'tty' as its only supplemental group.

Note

On most Unix systems users must belong to the 'tty' group to create new controlling TTYs which is necessary for 'pty.fork()' to work.

Tip

If you get errors like, "OSError: out of pty devices" it likely means that your OS uses something other than 'tty' as the group owner of the devpts filesystem. 'mount | grep pts' will tell you the owner (look for gid=<owner>).

gateone.core.utils.strip_xss(html, whitelist=None, replacement=u'\u2421')[source]

This function returns a tuple containing:

  • html with all non-whitelisted HTML tags replaced with replacement. Any tags that contain JavaScript, VBScript, or other known XSS/executable functions will also be removed.
  • A list containing the tags that were removed.

If whitelist is not given the following will be used:

whitelist = set([
    'a', 'abbr', 'aside', 'audio', 'bdi', 'bdo', 'blockquote', 'canvas',
    'caption', 'code', 'col', 'colgroup', 'data', 'dd', 'del',
    'details', 'div', 'dl', 'dt', 'em', 'figcaption', 'figure', 'h1',
    'h2', 'h3', 'h4', 'h5', 'h6', 'hr', 'i', 'img', 'ins', 'kbd', 'li',
    'mark', 'ol', 'p', 'pre', 'q', 'rp', 'rt', 'ruby', 's', 'samp',
    'small', 'source', 'span', 'strong', 'sub', 'summary', 'sup',
    'time', 'track', 'u', 'ul', 'var', 'video', 'wbr'
])

Example:

>>> html = '<span>Hello, exploit: <img src="javascript:alert("pwned!")"></span>'
>>> strip_xss(html)
(u'<span>Hello, exploit: \u2421</span>', ['<img src="javascript:alert("pwned!")">'])

Note

The default replacement is the unicode ␡ character (u"u2421").

If replacement is "entities" bad HTML tags will be encoded into HTML entities. This allows things like <script>'whatever'</script> to be displayed without execution (which would be much less annoying to users that were merely trying to share a code example). Here's an example:

>>> html = '<span>Hello, exploit: <img src="javascript:alert("pwned!")"></span>'
>>> strip_xss(html, replacement="entities")
('<span>Hello, exploit: &lt;span&gt;Hello, exploit: &lt;img src="javascript:alert("pwned!")"&gt;&lt;/span&gt;</span>',
 ['<img src="javascript:alert("pwned!")">'])
(u'<span>Hello, exploit: \u2421</span>', ['<img src="javascript:alert("pwned!")">'])

Note

This function should work to protect against all the XSS examples at OWASP. Please let us know if you find something we missed.

gateone.core.utils.create_signature(*parts, **kwargs)[source]

Creates an HMAC signature using the given parts and kwargs. The first argument must be the 'secret' followed by any arguments that are to be part of the hash. The only kwargs that is used is 'hmac_algo'. 'hmac_algo' may be any HMAC algorithm present in the hashlib module. If not provided, hashlib.sha1 will be used. Example usage:

create_signature(
    'secret',
    'some-api-key',
    'user@somehost',
    '1234567890123',
    hmac_algo=hashlib.sha1)

Note

The API 'secret' must be the first argument. Also, the order does matter.

gateone.core.utils.kill_dtached_proc(session, location, term)[source]

Kills the dtach processes associated with the given session that matches the given location and term. All the dtach'd sub-processes will be killed as well.

gateone.core.utils.killall(session_dir, pid_file)[source]

Kills all running Gate One terminal processes including any detached dtach sessions.

Session_dir:The path to Gate One's session directory.
Pid_file:The path to Gate One's PID file