However, when it comes to bigger problems (like analyzing sequences of >=8 amino acids), the scalability of map/reduce really starts to kick in and makes it possible to analyze huge amount of data (it's part of the "big data - hype" after all :) )
The data
Since the human genome was fully sequenced, we know all types of proteins (and their amino acid sequence) that exist in the human body. In fact, you can download this information as zipped TXT files from NCBI: ftp://ftp.ncbi.nih.gov/genomes/Homo_sapiens/protein/ - download protein.fa.gzThis zip file is only 11,3MB of size ... our whole life in such a small file ..
Proteins are chains of Amino Acids (AA). Only 22 different AA's are used in human proteins, so each of them is assigned a letter. You can look up that assignments in wikipedia.
In the File, each pair of 2 lines represents the name and the actual sequence of AA's of every protein found in the human body.
In this example, the occurance of each possible triple of AA's are counted throughall all proteins in the input file. The following example shows what is done:
Let's consider the following AA-seqence: AABABAB
From this sequence, we can extract the following triples and there count of occurrences:
- AAB: 1x (AABABAB)
- ABA: 2x (AABABAB)
- BAB: 2x (AABABAB)
The Results
It is remarkable that there are indeed a few triples that occure very often in the analyzed AA-chains. Below is a graph showing the number of occurrences of the top 400 triples (each triple is assigned a number, starting with triple 1 - the tiple with the most occurences - almost 80K).
![]() |
The logarithmic pattern of the distribution continues throughout the whole dataset.
The following is a table of the 10 most occurring triples:
The following is a table of the 10 most occurring triples:
| TRIPLE | OCCURANCES |
|---|---|
| SSS | 78314 |
| EEE | 66653 |
| LLL | 63178 |
| PPP | 59174 |
| AAA | 48985 |
| SSL | 46638 |
| SLL | 43912 |
| LLS | 43323 |
| LSS | 43262 |
You can download here a sorted CSV file with all triples and their occurrences.
How it's done
Map/Reduce is a proven programming model to analyze data. What makes it so special is its ability to be executed (massively) parallel. Paired with modern implementations like Hadoop and its flexible and fast filesystem HDFS, Map/Reduce can be applied as a massively parallel procedure without running into problems like locks, dirty reads or all the other stuff we dislike about parallel algorithms :)Map/Reduce works as follows in Hadoop:
- The input file (for example the amino acid sequence file) is split into several parts (chosen by the system or configured by the user)
- A number of MAPPER tasks are spawned. Each mapper is filled with a part of the input file splittet in step 1. (The mapper gets the data via stdin)
- The MAPPER writes out a number of KEY-VALUE-PAIRS (k, v) to stdout. In our example, this would be the sequence of 3 amino acids as key, and the (constant) count of 1 as value.
- After all mappers have finished, a number of REDUCER tasks are spawned. Each reducer is filled with a tuple that consists of a key and a LIST of values (k, ListOf(v)). In our example, a reducer could get an input that looks like this, if there were 4 key-value pairs emitted for the amino acid combination "FLI":
- (FLI, (1, 1, 1, 1))
- The reducer now can aggregate the values of the lists (like calculating the sum or the average of the values). The reducer outputs another key-value pair (k, v'). In our example, it should output:
- (FLI, 4)
After the reduce step we see, that the subsequnce "FLI" was found 4 time amongst all the data (of ALL mappers and reducers).
This was just a very brief overview. There can be done much more using Map/Reduce. Wikipedia is (as always) a good entry point: https://en.wikipedia.org/wiki/MapReduce
This is an illustration of how the map/reduce was used for this particular problem:
The Implementation
Most steps of the process are executed automatically by hadoop. The only custom code to write are: the mapper code and the reducer code.
In this example, python was used as implementation language for the mapper and the reducer process. Here is the code:
They can be introduced as additional processes that are executed directly after the mapping process but before the shuffeling process. In this example, the reducer.py is used also as a combiner. This means, that all results from each mapper are reduced directly after the mapping process.
Combine works only for the results of each mapper separately! So if there is more than 1 mapper process, combine can drastically speed up the whole process (by pre-aggregating the data), but reduce is still needed to process the data of all mappers.
In this example, the AWS CLI was used.
Furthermore, a s3 bucket and the following subfolders need to be created:
After starting the cluster, you can monitor the cluster in the AWS web console:
In this example, 1 master and 4 worker (virtual) machines were started as cluster to run the map/reduce processes.
The output files are written into the corresponding folder specified in steps.json. In the log folder you will find detailed logs on the whole map/reduce tasks, including the execution time, the number of spawned mappers/combiners/reducers, etc.
How many of those processes and tasks are spawned is determined automatically by hadoop. However, you can configure a lot of parameters to influence the system. Please consult the corresponding documentation. This is a good starting point: http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-what-is-emr.html
In this example, python was used as implementation language for the mapper and the reducer process. Here is the code:
mapper.py
#!/usr/bin/env python
import sys
def main(separator='\t'):
for line in sys.stdin:
for n in range(0, len(line) - 1 - 3): # iterate line from 0 to len-4
print '%s%s%d' % (line[n:n+3], separator, 1)
if __name__ == "__main__":
main()
reducer.py
The reducer code was taken from a great blog post from Michael Noll: http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/
I recommend this blog post for additional info on how to write map/reduce code.
#!/usr/bin/env python
"""A more advanced Reducer, using Python iterators and generators."""
from itertools import groupby
from operator import itemgetter
import sys
def read_mapper_output(file, separator='\t'):
for line in file:
yield line.rstrip().split(separator, 1)
def main(separator='\t'):
# input comes from STDIN (standard input)
data = read_mapper_output(sys.stdin, separator=separator)
# groupby groups multiple word-count pairs by word,
# and creates an iterator that returns consecutive keys and their group:
# current_word - string containing a word (the key)
# group - iterator yielding all ["<current_word>", "<count>"] items
for current_word, group in groupby(data, itemgetter(0)):
try:
total_count = sum(int(count) for current_word, count in group)
print "%s%s%d" % (current_word, separator, total_count)
except ValueError:
# count was not a number, so silently discard this item
pass
if __name__ == "__main__":
main()
Combiner
Combiners are useful to reduce the amount of data that is to be transfered and processed during the shuffeling step, especially if mappers run on different machines.They can be introduced as additional processes that are executed directly after the mapping process but before the shuffeling process. In this example, the reducer.py is used also as a combiner. This means, that all results from each mapper are reduced directly after the mapping process.
Combine works only for the results of each mapper separately! So if there is more than 1 mapper process, combine can drastically speed up the whole process (by pre-aggregating the data), but reduce is still needed to process the data of all mappers.
Cluster creation and process management
To run a map/reduce process on amazon, you first have to create a cluster and add an execution step. This can be done via the GUI or via the AWS command line interface.In this example, the AWS CLI was used.
Furthermore, a s3 bucket and the following subfolders need to be created:
- Folder for log files
- Folder for source files
- Here the python code files are placed
- Folder for input file(s)
- Folder for output files
You can name these folders as you wish.
Locally you need:
- steps.json
Cluster creation command
aws emr create-cluster --release-label emr-4.0.0 --use-default-roles \ --log-uri s3://yourS3bucket/logfolder --enable-debugging \ --applications Name=Hadoop --ec2-attributes AvailabilityZone=YOUR-ZONE \ --steps file://./steps.json \ --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=4,InstanceType=m3.xlarge \ --auto-terminate
steps.json
This file needs to be placed locally (=on your machine) and serves as definition of step you want to execute. This file contains information on where to find the neccessary files for map/reduce:[
{
"Name": "Protenom analyzer",
"Type": "STREAMING",
"ActionOnFailure": "CONTINUE",
"Args": [
"-files",
"s3://yourS3bucket/src/mapper.py,s3://yourS3bucket/src/reducer.py",
"-mapper",
"mapper.py",
"-reducer",
"reducer.py",
"-combiner",
"reducer.py",
"-input",
"s3://yourS3bucket/inputFiles",
"-output",
"s3://yourS3bucket/outputFiles"]
}
]
After starting the cluster, you can monitor the cluster in the AWS web console:
In this example, 1 master and 4 worker (virtual) machines were started as cluster to run the map/reduce processes.
The output files are written into the corresponding folder specified in steps.json. In the log folder you will find detailed logs on the whole map/reduce tasks, including the execution time, the number of spawned mappers/combiners/reducers, etc.
How many of those processes and tasks are spawned is determined automatically by hadoop. However, you can configure a lot of parameters to influence the system. Please consult the corresponding documentation. This is a good starting point: http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-what-is-emr.html


