utils.py
- Supporting Functions¶
Gate One Utility Functions and Classes¶
-
exception
gateone.core.utils.
MimeTypeFail
[source]¶ Raised by
create_data_uri
if the mimetype of a file could not be guessed.
-
exception
gateone.core.utils.
SSLGenerationError
[source]¶ Raised by
gen_self_signed_ssl
if an error is encountered generating a self-signed SSL certificate.
-
exception
gateone.core.utils.
ChownError
[source]¶ Raised by
recursive_chown
if an OSError is encountered while trying to recursively chown a directory.
-
class
gateone.core.utils.
AutoExpireDict
(*args, **kwargs)[source]¶ An override of Python's
dict
that expires keys after a given _expire_timeout timeout (datetime.timedelta
). The default expiration is one hour. It is used like so:>>> expiring_dict = AutoExpireDict(timeout=timedelta(minutes=10)) >>> expiring_dict['somekey'] = 'some value' >>> # You can see when this key was created: >>> print(expiring_dict.creation_times['somekey']) 2013-04-15 18:44:18.224072
10 minutes later your key will be gone:
>>> 'somekey' in expiring_dict False
The 'timeout' may be be given as a
datetime.timedelta
object or a string like, "1d", "30s" (will be passed through theconvert_to_timedelta
function).By default
AutoExpireDict
will check for expired keys every 30 seconds but this can be changed by setting the 'interval':>>> expiring_dict = AutoExpireDict(interval=5000) # 5 secs >>> # Or to change it after you've created one: >>> expiring_dict.interval = "10s"
The 'interval' may be an integer, a
datetime.timedelta
object, or a string such as '10s' or '5m' (will be passed through theconvert_to_timedelta
function).If there are no keys remaining the
tornado.ioloop.PeriodicCallback
(self._key_watcher
) that checks expiration will be automatically stopped. As soon as a new key is added it will be started back up again.Note
Only works if there's a running instances of
tornado.ioloop.IOLoop
.-
timeout
¶ A
property
that controls how long a key will last before being automatically removed. May be be given as adatetime.timedelta
object or a string like, "1d", "30s" (will be passed through theconvert_to_timedelta
function).
-
interval
¶ A
property
that controls how often we check for expired keys. May be given as milliseconds (integer), adatetime.timedelta
object, or a string like, "1d", "30s" (will be passed through theconvert_to_timedelta
function).
-
-
class
gateone.core.utils.
memoize
(fn, timeout=None)[source]¶ A memoization decorator that works with multiple arguments as well as unhashable arguments (e.g. dicts). It also self-expires any memoized calls after the timedelta specified via timeout.
If a timeout is not given memoized information will be discared after five minutes.
Note
Expiration checks will be performed every 30 seconds.
-
gateone.core.utils.
debug_info
(name, *args, **kwargs)[source]¶ This function returns a string like this:
>>> debug_info('some_function', 5, 10, foo="bar") 'some_function(5, 10, foo="bar")'
Primarily aimed at debugging.
-
gateone.core.utils.
shell_command
(cmd, timeout_duration=5)[source]¶ Resets the SIGCHLD signal handler (if necessary), executes cmd via
getstatusoutput()
, then re-enables the SIGCHLD handler (if it was set to something other than SIG_DFL). Returns the result ofgetstatusoutput()
which is a tuple in the form of:(exitstatus, output)
If the command takes longer than timeout_duration seconds, it will be auto-killed and the following will be returned:
(255, _("ERROR: Timeout running shell command"))
-
gateone.core.utils.
json_encode
(obj)[source]¶ On some platforms (CentOS 6.2, specifically)
tornado.escape.json_decode
doesn't seem to work just right when it comes to returning unicode strings. This is just a wrapper that ensures that the returned string is unicode.
-
gateone.core.utils.
gen_self_signed_ssl
(path=None)[source]¶ Generates a self-signed SSL certificate using
pyOpenSSL
or the openssl command depending on what's available, The resulting key/certificate will use the RSA algorithm at 4096 bits.
-
gateone.core.utils.
gen_self_signed_openssl
(path=None)[source]¶ This method will generate a secure self-signed SSL key/certificate pair (using the openssl command) saving the result as 'certificate.pem' and 'keyfile.pem' to path. If path is not given the result will be saved in the current working directory. The certificate will be valid for 10 years.
Note
The self-signed certificate will utilize 4096-bit RSA encryption.
-
gateone.core.utils.
gen_self_signed_pyopenssl
(notAfter=None, path=None)[source]¶ This method will generate a secure self-signed SSL key/certificate pair (using
pyOpenSSL
) saving the result as 'certificate.pem' and 'keyfile.pem' in path. If path is not given the result will be saved in the current working directory. By default the certificate will be valid for 10 years but this can be overridden by passing a valid timestamp via the notAfter argument.Examples:
>>> gen_self_signed_ssl(60 * 60 * 24 * 365) # 1-year certificate >>> gen_self_signed_ssl() # 10-year certificate
Note
The self-signed certificate will utilize 4096-bit RSA encryption.
-
gateone.core.utils.
none_fix
(val)[source]¶ If val is a string that utlimately means 'none', return None. Otherwise return val as-is. Examples:
>>> none_fix('none') None >>> none_fix('0') None >>> none_fix('whatever') 'whatever'
-
gateone.core.utils.
str2bool
(val)[source]¶ Converts strings like, 'false', 'true', '0', and '1' into their boolean equivalents (in Python). If no logical match is found, return False. Examples:
>>> str2bool('false') False >>> str2bool('1') True >>> str2bool('whatever') False
-
gateone.core.utils.
generate_session_id
()[source]¶ Returns a random, 45-character session ID. Example:
>>> generate_session_id() "NzY4YzFmNDdhMTM1NDg3Y2FkZmZkMWJmYjYzNjBjM2Y5O" >>>
-
gateone.core.utils.
mkdir_p
(path)[source]¶ Pythonic version of "mkdir -p". Example equivalents:
>>> mkdir_p('/tmp/test/testing') # Does the same thing as... >>> from subprocess import call >>> call('mkdir -p /tmp/test/testing')
Note
This doesn't actually call any external commands.
-
gateone.core.utils.
cmd_var_swap
(cmd, **kwargs)[source]¶ Returns cmd with %variable% replaced with the keys/values passed in via kwargs. This function is used by Gate One's Terminal application to swap the following Gate One variables in defined terminal 'commands':
%SESSION% session %SESSION_DIR% session_dir %SESSION_HASH% session_hash %USERDIR% user_dir %USER% user %TIME% time This allows for unique or user-specific values to be swapped into command line arguments like so:
ssh_connect.py -M -S '%SESSION%/%SESSION%/%r@%L:%p'
Could become:
ssh_connect.py -M -S '/tmp/gateone/NWI0YzYxNzAwMTA3NGYyZmI0OWJmODczYmQyMjQwMDYwM/%r@%L:%p'
Here's an example:
>>> cmd = "echo '%FOO% %BAR%'" >>> cmd_var_swap(cmd, foo="FOOYEAH,", bar="BAR NONE!") "echo 'FOOYEAH, BAR NONE!'"
Note
The variables passed into this function via kwargs are case insensitive.
cmd_var_swap(cmd, session=var)
would produce the same output ascmd_var_swap(cmd, SESSION=var)
.
-
gateone.core.utils.
short_hash
(to_shorten)[source]¶ Converts to_shorten into a really short hash depenendent on the length of to_shorten. The result will be safe for use as a file name.
Note
Collisions are possible but highly unlikely because of how this method is typically used.
-
gateone.core.utils.
random_words
(n=1)[source]¶ Returns n random English words (as a tuple of strings) from the
english_wordlist.txt
file (bundled with Gate One).Note
In Python 2 the words will be Unicode strings.
-
gateone.core.utils.
get_process_tree
(parent_pid)[source]¶ Returns a list of child pids that were spawned from parent_pid.
Note
Will include parent_pid in the output list.
-
gateone.core.utils.
kill_dtached_proc_bsd
(session, location, term)[source]¶ A BSD-specific implementation of
kill_dtached_proc
since Macs don't have /proc. Seems simpler thankill_dtached_proc()
but actually having to call a subprocess is less efficient (due to the sophisticated signal handling required byshell_command()
).
-
gateone.core.utils.
killall_bsd
(session_dir, pid_file=None)[source]¶ A BSD-specific version of
killall
since Macs don't have /proc.Note
pid_file is not used by this function. It is simply here to provide compatibility with
killall
.
-
gateone.core.utils.
kill_session_processes
(session)[source]¶ Kills all processes that match a given session (which is a unique, 45-character string).
-
gateone.core.utils.
entry_point_files
(ep_group, enabled=None)[source]¶ Given an entry point group name (ep_group), returns a dict of available Python, JS, and CSS plugins for Gate One:
{ 'css': ['editor/static/codemirror.css'], 'js': ['editor/static/codemirror.js', 'editor/static/editor.js'], 'py': [<module 'gateone.plugins.editor' from 'gateone/plugins/editor/__init__.pyc'>] }
Optionally, the returned dict will include only those modules and files for plugins in the enabled list (if given).
Note
Python plugins will be imported automatically as part of the discovery process.
To do this it uses the
pkg_resources
module from setuptools. For plugins to be imported correctly they need to register themselves using the givenentry_point
group (ep_group) in their setup.py. Gate One (currently) uses the following entry point group names:- go_plugins
- go_applications
- go_applications_plugins
...but this function can return the JS, CSS, and Python modules for any entry point that uses the same module_name/static/ layout.
-
gateone.core.utils.
load_modules
(modules)[source]¶ Given a list of Python modules, imports them.
Note
Assumes they're all in
sys.path
.
-
gateone.core.utils.
merge_handlers
(handlers)[source]¶ Takes a list of Tornado handlers like this:
[ (r"/", MainHandler), (r"/ws", TerminalWebSocket), (r"/auth", AuthHandler), (r"/style", StyleHandler), ... (r"/style", SomePluginHandler), ]
...and returns a list with duplicate handlers removed; giving precedence to handlers with higher indexes. This allows plugins to override Gate One's default handlers. Given the above, this is what would be returned:
[ (r"/", MainHandler), (r"/ws", TerminalWebSocket), (r"/auth", AuthHandler), ... (r"/style", SomePluginHandler), ]
This example would replace the default "/style" handler with SomePluginHandler; overriding Gate One's default StyleHandler.
-
gateone.core.utils.
convert_to_timedelta
(time_val)[source]¶ Given a time_val (string) such as '5d', returns a
datetime.timedelta
object representing the given value (e.g.timedelta(days=5)
). Accepts the following '<num><char>' formats:Character Meaning Example (none) Milliseconds '500' -> 500 Milliseconds s Seconds '60s' -> 60 Seconds m Minutes '5m' -> 5 Minutes h Hours '24h' -> 24 Hours d Days '7d' -> 7 Days M Months '2M' -> 2 Months y Years '10y' -> 10 Years Examples:
>>> convert_to_timedelta('7d') datetime.timedelta(7) >>> convert_to_timedelta('24h') datetime.timedelta(1) >>> convert_to_timedelta('60m') datetime.timedelta(0, 3600) >>> convert_to_timedelta('120s') datetime.timedelta(0, 120)
-
gateone.core.utils.
convert_to_bytes
(size_val)[source]¶ Given a size_val (string) such as '100M', returns an integer representing an equivalent amount of bytes. Accepts the following '<num><char>' formats:
Character Meaning Example B (or none) Bytes '100' or '100b' -> 100 K Kilobytes '1k' -> 1024 M Megabytes '1m' -> 1048576 G Gigabytes '1g' -> 1073741824 T Terabytes '1t' -> 1099511627776 P Petabytes '1p' -> 1125899906842624 E Exabytes '1e' -> 1152921504606846976 Z Zettabytes '1z' -> 1180591620717411303424L Y Yottabytes '7y' -> 1208925819614629174706176L Note
If no character is given the size_val will be assumed to be in bytes.
Tip
All characters will be converted to upper case before conversion (case-insensitive).
Examples:
>>> convert_to_bytes('2M') 2097152 >>> convert_to_bytes('2g') 2147483648
-
gateone.core.utils.
total_seconds
(td)[source]¶ Given a timedelta (td) return an integer representing the equivalent of Python 2.7's
datetime.timdelta.total_seconds()
.
-
gateone.core.utils.
process_opt_esc_sequence
(chars)[source]¶ Parse the chars passed from
terminal.Terminal
by way of the special, optional escape sequence handler (e.g. '<plugin>|<text>') into a tuple of (<plugin name>, <text>). Here's an example:>>> process_opt_esc_sequence('ssh|user@host:22') ('ssh', 'user@host:22')
-
gateone.core.utils.
raw
(text, replacement_dict=None)[source]¶ Returns text as a string with special characters replaced by visible equivalents using replacement_dict. If replacement_dict is None or False the global REPLACEMENT_DICT will be used. Example:
>>> test = '\x1b]0;Some xterm title\x07' >>> print(raw(test)) '^[]0;Some title^G'
-
gateone.core.utils.
create_data_uri
(filepath, mimetype=None)[source]¶ Given a file at filepath, return that file as a data URI.
Raises a
MimeTypeFail
exception if the mimetype could not be guessed.
-
gateone.core.utils.
human_readable_bytes
(nbytes)[source]¶ Returns nbytes as a human-readable string in a similar fashion to how it would be displayed by
ls -lh
ordf -h
.
-
gateone.core.utils.
which
(binary, path=None)[source]¶ Returns the full path of binary (string) just like the 'which' command. Optionally, a path (colon-delimited string) may be given to use instead of
os.environ['PATH']
.
-
gateone.core.utils.
touch
(path)[source]¶ Emulates the 'touch' command by creating the file at path if it does not exist. If the file exist its modification time will be updated.
-
gateone.core.utils.
timeout_func
(func, args=(), kwargs={}, timeout_duration=10, default=None)[source]¶ Sets a timeout on the given function, passing it the given args, kwargs, and a default value to return in the event of a timeout. If default is a function that function will be called in the event of a timeout.
-
gateone.core.utils.
valid_hostname
(hostname, allow_underscore=False)[source]¶ Returns True if the given hostname is valid according to RFC rules. Works with Internationalized Domain Names (IDN) and optionally, hostnames with an underscore (if allow_underscore is True).
The rules for hostnames:
- Must be less than 255 characters.
- Individual labels (separated by dots) must be <= 63 characters.
- Only the ASCII alphabet (A-Z) is allowed along with dashes (-) and dots (.).
- May not start with a dash or a dot.
- May not end with a dash.
- If an IDN, when converted to Punycode it must comply with the above.
IP addresses will be validated according to their well-known specifications.
Examples:
>>> valid_hostname('foo.bar.com.') # Standard FQDN True >>> valid_hostname('2foo') # Short hostname True >>> valid_hostname('-2foo') # No good: Starts with a dash False >>> valid_hostname('host_a') # No good: Can't have underscore False >>> valid_hostname('host_a', allow_underscore=True) # Now it'll validate True >>> valid_hostname(u'ジェーピーニック.jp') # Example valid IDN True
-
gateone.core.utils.
recursive_chown
(path, uid, gid)[source]¶ Emulates 'chown -R uid:gid path' in pure Python
-
gateone.core.utils.
check_write_permissions
(user, path)[source]¶ Returns
True
if the given user has write permissions to path. user can be a UID (int) or a username (string).
-
gateone.core.utils.
bind
(function, self)[source]¶ Will return function with self bound as the first argument. Allows one to write functions like this:
def foo(self, whatever): return whatever
...outside of the construct of a class.
-
gateone.core.utils.
minify
(path_or_fileobj, kind)[source]¶ Returns path_or_fileobj as a minified string. kind should be one of 'js' or 'css'. Works with JavaScript and CSS files using
slimit
andcssmin
, respectively.
-
gateone.core.utils.
_minify
(path_or_fileobj, kind)¶ Returns path_or_fileobj as a minified string. kind should be one of 'js' or 'css'. Works with JavaScript and CSS files using
slimit
andcssmin
, respectively.
-
gateone.core.utils.
get_or_cache
(cache_dir, path, minify=True)[source]¶ Given a path, returns the cached version of that file. If the file has yet to be cached, cache it and return the result. If minify is
True
(the default), the file will be minified as part of the caching process (if possible).
-
gateone.core.utils.
drop_privileges
(uid='nobody', gid='nogroup', supl_groups=None)[source]¶ Drop privileges by changing the current process owner/group to uid/gid (both may be an integer or a string). If supl_groups (list) is given the process will be assigned those values as its effective supplemental groups. If supl_groups is None it will default to using 'tty' as the only supplemental group. Example:
drop_privileges('gateone', 'gateone', ['tty'])
This would change the current process owner to gateone/gateone with 'tty' as its only supplemental group.
Note
On most Unix systems users must belong to the 'tty' group to create new controlling TTYs which is necessary for 'pty.fork()' to work.
Tip
If you get errors like, "OSError: out of pty devices" it likely means that your OS uses something other than 'tty' as the group owner of the devpts filesystem. 'mount | grep pts' will tell you the owner (look for gid=<owner>).
-
gateone.core.utils.
strip_xss
(html, whitelist=None, replacement=u'\u2421')[source]¶ This function returns a tuple containing:
- html with all non-whitelisted HTML tags replaced with replacement. Any tags that contain JavaScript, VBScript, or other known XSS/executable functions will also be removed.
- A list containing the tags that were removed.
If whitelist is not given the following will be used:
whitelist = set([ 'a', 'abbr', 'aside', 'audio', 'bdi', 'bdo', 'blockquote', 'canvas', 'caption', 'code', 'col', 'colgroup', 'data', 'dd', 'del', 'details', 'div', 'dl', 'dt', 'em', 'figcaption', 'figure', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'hr', 'i', 'img', 'ins', 'kbd', 'li', 'mark', 'ol', 'p', 'pre', 'q', 'rp', 'rt', 'ruby', 's', 'samp', 'small', 'source', 'span', 'strong', 'sub', 'summary', 'sup', 'time', 'track', 'u', 'ul', 'var', 'video', 'wbr' ])
Example:
>>> html = '<span>Hello, exploit: <img src="javascript:alert("pwned!")"></span>' >>> strip_xss(html) (u'<span>Hello, exploit: \u2421</span>', ['<img src="javascript:alert("pwned!")">'])
Note
The default replacement is the unicode ␡ character (u"u2421").
If replacement is "entities" bad HTML tags will be encoded into HTML entities. This allows things like <script>'whatever'</script> to be displayed without execution (which would be much less annoying to users that were merely trying to share a code example). Here's an example:
>>> html = '<span>Hello, exploit: <img src="javascript:alert("pwned!")"></span>' >>> strip_xss(html, replacement="entities") ('<span>Hello, exploit: <span>Hello, exploit: <img src="javascript:alert("pwned!")"></span></span>', ['<img src="javascript:alert("pwned!")">']) (u'<span>Hello, exploit: \u2421</span>', ['<img src="javascript:alert("pwned!")">'])
Note
This function should work to protect against all the XSS examples at OWASP. Please let us know if you find something we missed.
-
gateone.core.utils.
create_signature
(*parts, **kwargs)[source]¶ Creates an HMAC signature using the given parts and kwargs. The first argument must be the 'secret' followed by any arguments that are to be part of the hash. The only kwargs that is used is 'hmac_algo'. 'hmac_algo' may be any HMAC algorithm present in the hashlib module. If not provided,
hashlib.sha1
will be used. Example usage:create_signature( 'secret', 'some-api-key', 'user@somehost', '1234567890123', hmac_algo=hashlib.sha1)
Note
The API 'secret' must be the first argument. Also, the order does matter.