I'm pleased to announce the release of a data store with python map reduce
capabilities that is also fully ACID compliant, supports SQL as a query
language, foreign keys and joins. It is also well supported by numerous
companies and already available in many hosted environments. Just kidding: I'm
talking, of course, about PostgreSQL. I only recently discovered the excellent
python support in Postgres,
which allows you to run python functions inside queries. This allows you to
create a python map reduce system with very little effort. Of course, the point
of map reduce is more about distributing the computation, but if you're prepared
to don some wooden headphones for a
moment...
First you need to install the plpython package (something like postgresql-plpython-9.2)
and install it in your database postgres createlang plpython3u DBNAME.
Now create our pymap function:
CREATE FUNCTION pymap (src text, table_name text)
RETURNS SETOF json
AS $$
import json
exec src
for row in plpy.execute("SELECT key, value FROM %s" % table_name):
for emission in map_fn(row["key"], json.loads(row["value"])):
yield json.dumps(emission)
$$ LANGUAGE plpythonu;
A test table, which is a key value table:
create table kv (key serial, value json);
insert into kv (value) values ('{"foo": 100}');
insert into kv (value) values ('{"foo": 200}');
insert into kv (value) values ('{"bar": 300}');
Now we can use our pymap function by passing in a table name and python source
(which defines a map_fn):
select pymap('kv;', '
def map_fn(k, v):
if "foo" in v:
yield {k: v["foo"]}'
'');
pymap
------------
{"1": 100}
{"2": 200}
(2 rows)
But this is python, so lets do something more interesting, like use scipy to
calculate the gamma function...
select pymap('kv;', '
def map_fn(k, v):
import scipy.special
if "foo" in v:
yield {k: scipy.special.gamma(float(v["foo"])/1000)}'
'');
pymap
---------------------------
{"1": 9.5135076986687324}
{"2": 4.5908437119988026}
(2 rows)
I'll leave the implementation of reduce as an exercise to the reader. I hope
this has shown how useful having a full python interpreter in the database can
be. There are currently versions for both python 2 and 3, but it would be
really nice if we had a link to libpypy-c which would then give JITed functions.
One potential use of plpython is that you can use it to create indexes on
computed values. If you've ever worked on a system that stores serialized
objects in a database, you'll understand what I mean. To add an index to these,
you have to alter your database and code to store extra the extra data and then index it.
With plpython you could generate the index in the database without altering your data storage code:
CREATE FUNCTION two_of_foo (v json)
RETURNS integer
AS $$
import json
obj = json.loads(v)
if 'foo' in obj:
return 2 * obj['foo']
else:
return None
$$ LANGUAGE plpythonu IMMUTABLE;
CREATE INDEX i_two_of_foo ON kv (two_of_foo(value));
Update: Anja Skrba kindly translated this page into Serbo-Croatian
Github has a nice OAuth2 API that we can use to manipulate git repos, gists,
etc. I went through the process of implementing a Tornado based client for the
API. This could allow a github backed application. Note here, we are still
performing server side operations, so the client is relatively simple. An
alternative approach is to do the github interaction in the browser. If you are
interested in that, check this library.
Tornado takes a callback based approach, and we need to code the entire OAuth2
dance in this way. There is an base class, tornado.auth.OAuth2Mixin, which can
perform a very small subset of the protocol. The steps we need to log in a
user:
-
Get an authorization code using authorize_redirect method from the base class.
This presents the user with a 'do you want to authorize this app dialog in
github'. Github then posts back to our app with the code
-
Get a user access token by exchanging the code for an access token. This is
done via a simple GET. This means the user is now logged in.
-
Get the user details by asking the API for the user information
-
Parse the user information and store the relevant details in a session/whatever
Here is a base class that takes care of many of these details.
get_authenticated_user needs an authorization code, which it uses to get an
access token. If it succeeds, it calls _on_access_code, which makes an API
request to /user. This is then returned to the callback set by the original
caller of get_authenticated_user. It also provides github_request, which makes
an API call and hands the data to _parse_response which in turn hands it to the
callback specified by the caller.
import urllib
import tornado.ioloop
import tornado.web
import tornado.auth
import tornado.httpclient
import tornado.escape
import tornado.httputil
import logging
class GithubMixin(tornado.auth.OAuth2Mixin):
""" Github OAuth Mixin, based on FacebookGraphMixin
"""
_OAUTH_AUTHORIZE_URL = 'https://github.com/login/oauth/authorize'
_OAUTH_ACCESS_TOKEN_URL = 'https://github.com/login/oauth/access_token'
_API_URL = 'https://api.github.com'
def get_authenticated_user(self, redirect_uri, client_id, client_secret,
code, callback, extra_fields=None):
""" Handles the login for Github, queries /user and returns a user object
"""
logging.debug('gau ' + redirect_uri)
http = tornado.httpclient.AsyncHTTPClient()
args = {
"redirect_uri": redirect_uri,
"code": code,
"client_id": client_id,
"client_secret": client_secret,
}
http.fetch(self._oauth_request_token_url(**args),
self.async_callback(self._on_access_token, redirect_uri, client_id,
client_secret, callback, fields))
def _on_access_token(self, redirect_uri, client_id, client_secret,
callback, fields, response):
""" callback for authentication url, if successful get the user details """
if response.error:
logging.warning('Github auth error: %s' % str(response))
callback(None)
return
args = tornado.escape.parse_qs_bytes(
tornado.escape.native_str(response.body))
if 'error' in args:
logging.error('oauth error ' + args['error'][-1])
raise Exception(args['error'][-1])
session = {
"access_token": args["access_token"][-1],
}
self.github_request(
method="/user",
callback=self.async_callback(
self._on_get_user_info, callback, session),
access_token=session["access_token"],
)
def _on_get_user_info(self, callback, session, user):
""" callback for github request /user to create a user """
logging.debug('user data from github ' + str(user))
if user is None:
callback(None)
return
callback({
"login": user["login"],
"name": user["name"],
"email": user["email"],
"access_token": session["access_token"],
})
def github_request(self, path, callback, access_token=None,
method='GET', body=None, **args):
""" Makes a github API request, hands callback the parsed data """
args["access_token"] = access_token
url = tornado.httputil.url_concat(self._API_URL + path, args)
logging.debug('request to ' + url)
http = tornado.httpclient.AsyncHTTPClient()
if body is not None:
body = tornado.escape.json_encode(body)
logging.debug('body is' + body)
http.fetch(url, callback=self.async_callback(
self._parse_response, callback), method=method, body=body)
def _parse_response(self, callback, response):
""" Parse the JSON from the API """
if response.error:
logging.warning("HTTP error from Github: %s", response.error)
callback(None)
return
try:
json = tornado.escape.json_decode(response.body)
except Exception:
logging.warning("Invalid JSON from Github: %r", response.body)
callback(None)
return
if isinstance(json, dict) and json.get("error_code"):
logging.warning("Facebook error: %d: %r", json["error_code"],
json.get("error_msg"))
callback(None)
return
callback(json)
We can use this, as below, to create handlers to login a user - and in this case
store the credentials in a secure cookie - or to make an API call for a logged
in user:
import tornado.ioloop
import tornado.web
import tornado.escape
import tornado.options
import tornado.httputil
import jinja2
import pyjade.compiler
import coffeescript
import markdown
import github
class GithubLoginHandler(tornado.web.RequestHandler, github.GithubMixin):
_OAUTH_REDIRECT_URL = 'http://localhost:8888/auth/github'
@tornado.web.asynchronous
def get(self):
# we can append next to the redirect uri, so the user gets the
# correct URL on login
redirect_uri = tornado.httputil.url_concat(
self._OAUTH_REDIRECT_URL, {'next': self.get_argument('next', '/')})
# if we have a code, we have been authorized so we can log in
if self.get_argument("code", False):
self.get_authenticated_user(
redirect_uri=redirect_uri,
client_id=self.settings["github_client_id"],
client_secret=self.settings["github_secret"],
code=self.get_argument("code"),
callback=self.async_callback(self._on_login)
)
return
# otherwise we need to request an authorization code
self.authorize_redirect(
redirect_uri=redirect_uri,
client_id=self.settings["github_client_id"],
extra_params={"scope": self.settings['github_scope'], "foo":1})
def _on_login(self, user):
""" This handles the user object from the login request """
if user:
logging.info('logged in user from github: ' + str(user))
self.set_secure_cookie("user", tornado.escape.json_encode(user))
else:
self.clear_cookie("user")
self.redirect(self.get_argument("next","/"))
class GistLister(BaseHandler, github.GithubMixin):
@tornado.web.authenticated
@tornado.web.asynchronous
def get(self):
self.github_request(
'/gists', self._on_get_gists,
access_token=self.current_user['access_token'])
def _on_get_gists(self, gists):
self.render('gists.jade', gists=gists)
I just uploaded the PyVCF 0.6 release to pypi. This release concentrated on performance and will use a cythonized inner loop if you install it with cython. There is a small backwards incompatible change, which is that the call data (the cells in a VCF file) are now stored as a namedtuple.
A simple performance benchmark parsing a 1kg file:
- pyvcf: 1.8s / 100%
- cyvcf: 2.3s / 127%
- vcftools: 2.9s / 161%