Donnerstag, 31. März 2016

Evaluating the distribution of amino acid triples for proteins of humans using Amazon's Elasic Map Reduce

This is a tutorial on how to use Amazon's EMR to analyze data parallel in a cluster. This is an educational article with the purpose to show how elastic Map/Reduce works. For the sake of simplicity, we are about to calculate sequences of 3 amino acids in this tutorial. This (rather small) problem could be solved by a classical (single-threaded) imperative program as well much faster and with much less overhead.
However, when it comes to bigger problems (like analyzing sequences of >=8 amino acids), the scalability of map/reduce really starts to kick in and makes it possible to analyze huge amount of data (it's part of the "big data - hype" after all :) )

The data

Since the human genome was fully sequenced, we know all types of proteins (and their amino acid sequence) that exist in the human body. In fact, you can download this information as zipped TXT files from NCBI: ftp://ftp.ncbi.nih.gov/genomes/Homo_sapiens/protein/ - download protein.fa.gz

This zip file is only 11,3MB of size ... our whole life in such a small file ..

Proteins are chains of Amino Acids (AA). Only 22 different AA's are used in human proteins, so each of them is assigned a letter. You can look up that assignments in wikipedia.

In the File, each pair of 2 lines represents the name and the actual sequence of AA's of every protein found in the human body.

In this example, the occurance of each possible triple of AA's are counted throughall all proteins in the input file. The following example shows what is done:

Let's consider the following AA-seqence: AABABAB

From this sequence, we can extract the following triples and there count of occurrences:

  • AAB: 1x (AABABAB)
  • ABA: 2x (AABABAB)
  • BAB: 2x (AABABAB)

The Results

It is remarkable that there are indeed a few triples that occure very often in the analyzed AA-chains. Below is a graph showing the number of occurrences of the top 400 triples (each triple is assigned a number, starting with triple 1 - the tiple with the most occurences - almost 80K).


The logarithmic pattern of the distribution continues throughout the whole dataset.

The following is a table of the 10 most occurring triples:

TRIPLEOCCURANCES
SSS78314
EEE66653
LLL63178
PPP59174
AAA48985
SSL46638
SLL43912
LLS43323
LSS43262

You can download here a sorted CSV file with all triples and their occurrences

How it's done

Map/Reduce is a proven programming model to analyze data. What makes it so special is its ability to be executed (massively) parallel. Paired with modern implementations like Hadoop and its flexible and fast filesystem HDFS, Map/Reduce can be applied as a massively parallel procedure without running into problems like locks, dirty reads or all the other stuff we dislike about parallel algorithms :)

Map/Reduce works as follows in Hadoop:
  1. The input file (for example the amino acid sequence file) is split into several parts (chosen by the system or configured by the user)
  2. A number of MAPPER tasks are spawned. Each mapper is filled with a part of the input file splittet in step 1. (The mapper gets the data via stdin)
  3. The MAPPER writes out a number of KEY-VALUE-PAIRS (k, v) to stdout. In our example, this would be the sequence of 3 amino acids as key, and the (constant) count of 1 as value.
  4. After all mappers have finished, a number of REDUCER tasks are spawned. Each reducer is filled with a tuple that consists of a key and a LIST of values (k, ListOf(v)). In our example, a reducer could get an input that looks like this, if there were 4 key-value pairs emitted for the amino acid combination "FLI":
    1. (FLI, (1, 1, 1, 1))
  5. The reducer now can aggregate the values of the lists (like calculating the sum or the average of the values). The reducer outputs another key-value pair (k, v'). In our example, it should output:
    1. (FLI, 4)

After the reduce step we see, that the subsequnce "FLI" was found 4 time amongst all the data (of ALL mappers and reducers).

This was just a very brief overview. There can be done much more using Map/Reduce. Wikipedia is (as always) a good entry point: https://en.wikipedia.org/wiki/MapReduce

This is an illustration of how the map/reduce was used for this particular problem:


The Implementation

Most steps of the process are executed automatically by hadoop. The only custom code to write are: the mapper code and the reducer code.

In this example, python was used as implementation language for the mapper and the reducer process. Here is the code:

mapper.py

#!/usr/bin/env python
import sys

def main(separator='\t'):
    for line in sys.stdin:
        for n in range(0, len(line) - 1 - 3):    # iterate line from 0 to len-4
            print '%s%s%d' % (line[n:n+3], separator, 1)

if __name__ == "__main__":
    main()


reducer.py

The reducer code was taken from a great blog post from Michael Noll: http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

I recommend this blog post for additional info on how to write map/reduce code.

#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""

from itertools import groupby
from operator import itemgetter
import sys

def read_mapper_output(file, separator='\t'):
    for line in file:
        yield line.rstrip().split(separator, 1)

def main(separator='\t'):
    # input comes from STDIN (standard input)
    data = read_mapper_output(sys.stdin, separator=separator)
    # groupby groups multiple word-count pairs by word,
    # and creates an iterator that returns consecutive keys and their group:
    #   current_word - string containing a word (the key)
    #   group - iterator yielding all ["<current_word>", "<count>"] items
    for current_word, group in groupby(data, itemgetter(0)):
        try:
            total_count = sum(int(count) for current_word, count in group)
            print "%s%s%d" % (current_word, separator, total_count)
        except ValueError:
            # count was not a number, so silently discard this item
            pass

if __name__ == "__main__":
    main()

Combiner

Combiners are useful to reduce the amount of data that is to be transfered and processed during the shuffeling step, especially if mappers run on different machines.
They can be introduced as additional processes that are executed directly after the mapping process but before the shuffeling process. In this example, the reducer.py is used also as a combiner. This means, that all results from each mapper are reduced directly after the mapping process.

Combine works only for the results of each mapper separately! So if there is more than 1 mapper process, combine can drastically speed up the whole process (by pre-aggregating the data), but reduce is still needed to process the data of all mappers.

Cluster creation and process management

To run a map/reduce process on amazon, you first have to create a cluster and add an execution step. This can be done via the GUI or via the AWS command line interface.

In this example, the AWS CLI was used.

Furthermore, a s3 bucket and the following subfolders need to be created:

  • Folder for log files
  • Folder for source files
    • Here the python code files are placed
  • Folder for input file(s)
  • Folder for output files
You can name these folders as you wish.

Locally you need:
  • steps.json

Cluster creation command

aws emr create-cluster --release-label emr-4.0.0 --use-default-roles \
--log-uri s3://yourS3bucket/logfolder --enable-debugging \
--applications Name=Hadoop --ec2-attributes AvailabilityZone=YOUR-ZONE \
--steps file://./steps.json \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=4,InstanceType=m3.xlarge \
--auto-terminate


steps.json

This file needs to be placed locally (=on your machine) and serves as definition of step you want to execute. This file contains information on where to find the neccessary files for map/reduce:

[
  {
     "Name": "Protenom analyzer",
     "Type": "STREAMING",
     "ActionOnFailure": "CONTINUE",
     "Args": [
         "-files",
         "s3://yourS3bucket/src/mapper.py,s3://yourS3bucket/src/reducer.py",
         "-mapper",
         "mapper.py",
         "-reducer",
         "reducer.py",
         "-combiner",
         "reducer.py",
         "-input",
         "s3://yourS3bucket/inputFiles",
         "-output",
         "s3://yourS3bucket/outputFiles"]
  }
]

After starting the cluster, you can monitor the cluster in the AWS web console:


In this example, 1 master and 4 worker (virtual) machines were started as cluster to run the map/reduce processes.

The output files are written into the corresponding folder specified in steps.json. In the log folder you will find detailed logs on the whole map/reduce tasks, including the execution time, the number of spawned mappers/combiners/reducers, etc.

How many of those processes and tasks are spawned is determined automatically by hadoop. However, you can configure a lot of parameters to influence the system. Please consult the corresponding documentation. This is a good starting point: http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-what-is-emr.html

Keine Kommentare:

Kommentar veröffentlichen