A) Unix Command Line Interface (CLI)
B) Distributed File Systems (DFS), HDFS (Hadoop DFS) Architecture and Scalability Problems
C) Tuning Distributed Storage Platform with File Types
D) Docker installation guide:
To make the learning process more convenient, our team has built Docker containers for each block of assignments. These containers are everything that you should install to pass all our courses. Having these containers deployed you can work with the same Jupyter notebooks as in Coursera’s interface and be independent from the internet connection problems or any other problems.
To use our containers with Jupyter notebooks and appropriate Hadoop services installed, you should do the steps below. Firstly you should install the latest version of Docker. Then you should deploy the necessary containers from our repository in DockerHub. And finally if you want you can use a container in a terminal. It is an optional part of the guide because this is a good decision if you are interested to learn how Hadoop cluster works and not only to practice API of Hadoop services.
I. Installing Docker
For installing Docker and further using it with our containers for this course, you should have a machine answering the following requirements.
The minimal requirements for running notebooks on your computer are:
2-core processor
4 GB of RAM
20 GB of free disk space
The recommended requirements are:
4-core processor
8GB of RAM
50 GB of free disk space
Installing Docker on Unix
>curl -sSL https://get.docker.com/ | sh
To install Docker on any of the most popular Linux operating systems (Ubuntu, Debian, CentOs, fedora etc) you should run the following command with sudo privileges:
Then optionally you can add your user account to the docker group:
>usermod -aG docker $USER
This allows you to deploy containers without sudo but we don’t recommend doing this on CentOS for security reasons.
Checking the installation
Now run the test container to check the installation successful:
>docker run hello-world
You can see some Docker deployment logs on the screen and then “Hello from Docker!” message from successfully deployed container.
II. Deploying containers
You can find all of them in our Dockerhub repository by the following link. Appropriate is specified at the bottom of self-reading to each assignment. E.g. you can see “If you want to deploy the environment on your own machine, please use bigdatateam/spark-course1 Docker container.”
To run a Juputer notebook on you own machine you should do the following steps.
1. Pull the actual version of the container from DockerHub
>docker pull
2. Deploy the container
>docker run –rm -it -p :8888
3. Open Jupyter environment
Now Jupyter can be opened in a browser by typing localhost:. If you’re working on Windows, you should open : instead of localhost. You can find in the deployment logs of the container.
So now you have full environment with installed and adjusted Hadoop services, prepared datasets and demo codes on your own machine.
III. Working with docker through a terminal
To start the container and work with it via Unix terminal you should do the following steps.
1. Start a Tmux session:
>tmux new -s my_docker
2. Run your container in terminal opened:
docker run –rm -it -p 8888:8888 -p 50070:50070 -p 8088:8088 bigdatateam/hdfs-notebook
(please, take into account that you can forward some additional ports with Hadoop UIs, not only Jupyter’s port).
3. Detach the Tmux session by typing “Ctrl+b” and then “d”.
4. Check your container’s id:
>docker ps
In the output of the command you should find something like this “da1d48ac25fc”. This is your container’s id.
5. Finally, you can open the terminal within the container by executing:
>docker exec -it /bin/bash
6. Now you’ve logged in the container as root and you can execute Hadoop shell commands.
>root@da1d48ac25fc:~# hdfs dfs -ls /data
Found 1 items
drwxrwxrwx – jovyan supergroup 0 2017-10-15 16:30 /data/wiki
root@da1d48ac25fc:~#
Simultaneously, you can work with Jupyter via browser.
To stop the docker container you should do the following steps.
1. Attach the Tmux session.
>tmux a -t my_docker
(if you have forgotten the name of Tmux session, you can type ‘tmux ls’)
2. Stop the container via Ctrl+C.
3. Exit from the Tmux session.
We recommend to learn Tmux, it has many interesting features (e.g., you can start from this guide). However these 3 commands will be enough to work with our docker containers:
>tmux new -s
tmux a -t
tmus ls
Name node architecture:
The discussion about WAL and NFS is going beyond the scope of this course. If you are not familiar with them, I suggest you start from Wikipedia articles:
https://en.wikipedia.org/wiki/Write-ahead_logging
https://en.wikipedia.org/wiki/Network_File_System
WAL helps you to persist changes into the storage before applying them.
NFS helps you to overcome node crashes so you will be able to restore changes from a remote storage.
E) First assignment :
==============================================
!hdfs dfs -du -h /user/jovyan/assignment1/test.txt is not correct you should use instead hdfs dfs -ls [filename] because it gives more details about the file.
!hdfs dfs -chown is not correct chown change the owner of the file not the rights on that file, use chmod instead.
for the lase question the used space is false
==============================================
Estimate minimum Namenode RAM size for HDFS with 1 PB capacity, block size 64 MB, average metadata size for each block is 300 B, replication factor is 3. Provide the formula for calculations and the result.
Ans:
1) Formula for minimum Namenode RAM size for HDFS = (Size of original data ) / (Size of single data block * replication factor ) * Avg metadatasize of each block on name node)
Minimum Namenode RAM size with requirements stated = 1 PB / (64 MB * 3) * 300 = (10 ^15 B) / (64 *1000*1000* 3 * 300B) = 1562500000 bytes ~ 1.5GB
Remember the level of granularity on the Namenode is block, not replica . No of blocks on data node is nothing but (Size of original data ) / (Size of single data block * replication factor ) say “x” blocks . So to store one block avg name node metadatasize is 300 bytes. Hence for “x” blocks it is 300x.
2) HDDs in your cluster have the following characteristics: average reading speed is 60 MB/s, seek time is 5 ms. You want to spend 0.5 % time for seeking the block, i.e. seek time should be 200 times less than the time to read the block. Estimate the minimum block size.
Ans:
Check the calculations and the result, they should both be correct.
block_size / 60 MB/s * 0.5 / 100 >= 5 ms
block_size >= 60 MB/s * 0.005 s / 0.005 = 60 MB
So, the minimum block size is 60 MB or 64 MB.
3) To complete this task use the ‘HDFS CLI Playground’ item.
Create text file ‘test.txt’ in a local fs. Use HDFS CLI to make the following operations:
сreate directory ‘assignment1’ in your home directory in HDFS (you can use a relative path or prescribe it explicitly “/user/jovyan/…”)
put test.txt in it
output the size and the owner of the file
revoke ‘read’ permission for ‘other users’
read the first 10 lines of the file
rename it to ‘test2.txt’.
Provide all the commands to HDFS CLI.
Ans:
https://linode.com/docs/tools-reference/tools/modify-file-permissions-with-chmod/
!touch test.txt
!for i in 1 2 3 4 5 6 7 8 9 0 10 ; do echo line_No_$i >> test.txt ; done
!hdfs dfs -mkdir -p /user/jovyan/assignment1
!hdfs dfs -put test.txt /user/jovyan/assignment1/
!hdfs dfs -du -h /user/jovyan/assignment1/test.txt
141 /user/jovyan/assignment1/test.txt
!hdfs dfs -chown -R r-ou /user/jovyan/assignment1/test.txt
!hdfs dfs -cat /user/jovyan/assignment1/test.txt | head -10
!hdfs dfs -mv /user/jovyan/assignment1/test.txt /user/jovyan/assignment1/test2.txt
4)
To complete this task use the ‘HDFS CLI Playground’ item.
Use HDFS CLI to investigate the file ‘/data/wiki/en_articles_part/articles-part’ in HDFS:
get blocks and their locations in HDFS for this file, show the command without an output
get the information about any block of the file, show the command and the block locations from the output
!hdfs fsck -blockId blk_1073741825
Connecting to namenode via http://localhost:50070/fsck?ugi=jovyan&blockId=blk_1073741825+&path=%2F
FSCK started by jovyan (auth:SIMPLE) from /127.0.0.1 at Sun Apr 15 10:40:54 UTC 2018
Block Id: blk_1073741825
Block belongs to: /data/wiki/en_articles_part/articles-part
No. of Expected Replica: 1
No. of live Replica: 1
No. of excess Replica: 0
No. of stale Replica: 0
No. of decommissioned Replica: 0
No. of decommissioning Replica: 0
No. of corrupted Replica: 0
Block replica on datanode/rack: eca2de932db3/default-rack is HEALTHY —-> shows the block location ie datanode/rack: eca2de932db3/default-rack
RIGHT ANSWERS BELOW:
Check the formula and the result, they should both be correct.
1 PB / 64 MB / 3 * 300 B = 1024 * 1024 * 1024 / 64 / 3 * 300 = 1600 MB
The result may be not exactly the same, rounding and other units are possible. So 1600 MB, 1.6 GB, 1.56 GB are all allowed.
=====
Check the calculations and the result, they should both be correct.
block_size / 60 MB/s * 0.5 / 100 >= 5 ms
block_size >= 60 MB/s * 0.005 s / 0.005 = 60 MB
So, the minimum block size is 60 MB or 64 MB.
-============
Check the commands, they should be like these:
$ hdfs dfs -mkdir assignment1
$ hdfs dfs -put test.txt assignment1/
$ hdfs dfs -ls assignment1/test.txt or hdfs dfs -stat “%b %u” assignment1/test.txt
$ hdfs dfs -chmod o-r assignment1/test.txt
$ hdfs dfs -cat assignment1/test.txt | head -10
$ hdfs dfs -mv assignment1/test.txt assignment1/test2.txt
There can be the following differences:
‘hdfs dfs’ and ‘hadoop fs’ are the same
absolute paths are also allowed: ‘/user//assignment1/test.txt’ instead of ‘assignment1/test.txt’
the permissions argument can be in an octal form, like 0640
the ‘text’ command can be used instead of ‘cat’
======
Blocks and locations of ‘/data/wiki/en_articles_part/articles-part’:
$ hdfs fsck /data/wiki/en_articles_part/articles-part -files -blocks -locations
Block information (block id may be different):
$ hdfs fsck -blockId blk_1073971670
It outputs the block locations, example (nodes list will be different):
Block replica on datanode/rack: some_node_hostname/default-rack is HEALTHY
=====
Total capacity: 2.14 TB
Used space: 242.12 GB (=DFS Used) or 242.12+35.51 = 277.63 GB (=DFS Used + Non DFS Used) – the latter answer is more precise, but the former is also possible
Data nodes in the cluster: 4
F)
1. The name of the notebook may contain only Roman letters, numbers and characters “-” or “_”
2. You have to send the worked out notebook (the required outputs in all cells should be filled).
3. While checking the system runs the code from the notebook and reads the output of only the last filled cell. It is clear that for getting this output all the preceding cells should also contain working code.If you decide to write some text in a cell, you should change the style of the cell to Markdown (Cell -> Cell type -> Markdown).
4. Only the answer to your task should be printed in the output stream (stdout) There should be no more output in stdout. In order to get rid of garbage [junk lines] redirect the output to /dev/null.
Let’s see on this code snippet:
%%bash
OUT_DIR=”wordcount_result_”$(date +”%s%6N”)
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR}
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name=”Streaming wordCount” \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper.py,reducer.py \
-mapper “python mapper.py” \
-combiner “python reducer.py” \
-reducer “python reducer.py” \
-input /data/wiki/en_articles_part1 \
-output ${OUT_DIR}
hdfs dfs -cat ${OUT_DIR}/part-00000 | head
F.1)
The commands in lines (6) and (8) may output some junk lines to stdout. So you need to eliminate output of these commands by redirecting it to the /dev/null. Let’s look at the fixed version of the snippet:
%%bash
OUT_DIR=”wordcount_result_”$(date +”%s%6N”)
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name=”Streaming wordCount” \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper.py,reducer.py \
-mapper “python mapper.py” \
-combiner “python reducer.py” \
-reducer “python reducer.py” \
-input /data/wiki/en_articles_part1 \
-output ${OUT_DIR} > /dev/null
hdfs dfs -cat ${OUT_DIR}/part-00000 | head
Please take into account that you mustn’t redirect stderr to anywhere. Hadoop, Hive, and Spark print their logs to stderr and grading system also reads and analyses it.
5. One cell must contain only one programming language.
E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.
6. When you submit the solution the grading system converts it from Ipython-notebook into usual Python-script for further execution on the Hadoop-cluster. When standard converter (nbconvert) finds Jupyter’s magic commands or anything else that can’t be found in usual Python it generates a stub function “get_ipython()”. This function doesn’t work by default. Our Ipython converter is an improved version of Nbconvert and it can process most of magics correctly (e.g. it can convert “%%bash” to the “subprocess.check_output()”). However, despite this, we recommend to avoid magics where possible. Because in the cases when our converter can’t process magics correctly, it escalates the conversion to the nbconvert and nbconvert generates “get_ipython()” functions.
Week2:
Select the type of failure when a server in a cluster gets out of service because of the power supply has burnt out
Fail-Stop
Fail-Recovery
This should not be selected
It’s not a Fail-Recovery because it requires somebody to fix the problem first and then to turn on the server
Byzantine failure
Question 2
Correct
1 / 1 points
2. Question 2
Select the type of failure when a server in a cluster reboots because of a momentary power failure
Fail-Stop
Byzantine failure
Fail-Recovery
Correct
That’s right, the server recovers after a failure by itself
Question 3
Correct
1 / 1 points
3. Question 3
Select the type of failure when a server in a cluster outputs unexpected results of the calculations
Fail-Recovery
Fail-Stop
Byzantine failure
Correct
Yes, it is a Byzantine failure, because it looks like the server doesn’t behave according to the protocol
Question 4
Correct
1 / 1 points
4. Question 4
Select the type of failure when some packets are lost regardless the contents of the message
Fair-Loss Link
Correct
Yes, when packets are lost regardless their contents, this is exactly a Fair-Loss Link
Byzantine Link
Question 5
Correct
1 / 1 points
5. Question 5
Select the facts about Byzantine Link:
Some packets are modified
Correct
Yes, in case of Byzantine Link packets can be modified
Some packets are created out of nowhere
Correct
Yes, in case of Byzantine Link packets can be created
Some packets are lost regardless the contents of the message
Un-selected is correct
Question 6
Correct
1 / 1 points
6. Question 6
Select a mechanism for capturing events in chronological order in the distributed system
Synchronize the clocks on all the servers in the system
Use a logical clocks, for example, Lamport timestamps
Both ways are possible, logical clocks hide inaccuracies of clocks synchronization
Correct
That’s right, use a logical clocks, but don’t forget about clocks synchronization
Question 7
Correct
1 / 1 points
7. Question 7
Select the failure types specific for distributed computing, for example for Hadoop stack products
Fail-Stop, Perfect Link and Synchronous model
Byzantine failure, Byzantine Link and Asynchronous model
Fail-Recovery, Fair-Loss Link and Asynchronous model
Correct
That’s right, these types of failures are inherent in Hadoop
Question 8
Correct
1 / 1 points
8. Question 8
What phase in MapReduce paradigm is better for aggregating records by key?
Map
Shuffle & sort
Reduce
Correct
Yes, the reducer gets records sorted by key, so they are suitable for aggregation
Question 9
Incorrect
0 / 1 points
9. Question 9
Select a MapReduce phase which input records are sorted by key:
Map
This should not be selected
No, because mapper input records can be (and usually are) unsorted
Shuffle & sort
Reduce
Question 10
0.60 / 1 points
10. Question 10
What map and reduce functions should be used (in terms of Unix utilities) to select the only unique input records?
map=’uniq’, reduce=None
Un-selected is correct
map=’uniq’, reduce=’uniq’
This should be selected
map=’cat’, reduce=’uniq’
Correct
Yes, because ‘uniq’ on the sorted input records gives the required result
map=’sort -u’, reduce=’sort -u’
Un-selected is correct
map=’cat’, reduce=’sort -u’
This should not be selected
No, because the records have already been sorted before the Reduce phase, so using ‘sort’ here is an overkill and can lead to the ‘out of memory’ error (though on small data the result will be correct)
Question 11
Correct
1 / 1 points
11. Question 11
What map and reduce functions should be used (in terms of Unix utilities) to select only the repeated input records?
map=’uniq’, reduce=None
Un-selected is correct
map=’uniq’, reduce=’cat’
Un-selected is correct
map=’uniq -d’, reduce=’uniq -d’
Un-selected is correct
map=’cat’, reduce=’uniq -d’
Correct
Yes, mappers pass all the records to reducers and then ‘uniq -d’ on the sorted records solves the task
Question 12
Incorrect
0 / 1 points
12. Question 12
What service in Hadoop MapReduce v1 is responsible for running map and reduce tasks:
TaskTracker
JobTracker
This should not be selected
No, JobTracker is responsible for jobs, whereas tasks are run by TaskTracker
Question 13
Correct
1 / 1 points
13. Question 13
In YARN Application Master is…
A service which processes requests for cluster resources
A process on a cluster node to run a YARN application (for example, a MapReduce job)
Correct
Yes, Application Master executes a YARN application on a node
A service to run and monitor containers for application-specific processes on cluster nodes
=================== WEEK3 ====================================
In this assignment Hadoop Streaming is used to process Wikipedia articles dump.
Dataset location: /data/wiki/en_articles
Stop words list is in ‘/datasets/stop_words_en.txt’ file in local filesystem.
Format: article_id <tab> article_text
To parse articles don’t forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. To cope with Unicode we recommend to use the following tokenizer:
In this assignment Hadoop Streaming is used to process Wikipedia articles dump.
Dataset location: /data/wiki/en_articles
Stop words list is in ‘/datasets/stop_words_en.txt’ file in local filesystem.
Format: article_id <tab> article_text
To parse articles don’t forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. To cope with Unicode we recommend to use the following tokenizer:
#!/usr/bin/env python
import sys
import re
reload(sys)
sys.setdefaultencoding(‘utf-8’)
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split(‘\t’, 1)
except ValueError as e:
continue
text = re.sub(“^\W+|\W+$”, “”, text, flags=re.UNICODE)
words = re.split(“\W*\s+\W*”, text, flags=re.UNICODE)
# your code goes here
%%writefile mapper.py
import sys
import re
reload(sys)
sys.setdefaultencoding(‘utf-8’) # required to convert to unicode
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split(‘\t’, 1)
except ValueError as e:
continue
words = re.split(“\W*\s+\W*”, text, flags=re.UNICODE)
for word in words:
print >> sys.stderr, “reporter:counter:Wiki stats,Total words,%d” % 1
print “%s\t%d” % (word.lower(), 1)
%%writefile reducer.py
import sys
current_key = None
word_sum = 0
%%writefile -a reducer.py
for line in sys.stdin:
try:
key, count = line.strip().split(‘\t’, 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print “%s\t%d” % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count
if current_key:
print “%s\t%d” % (current_key, word_sum)
! hdfs dfs -ls /data/wiki
Found 1 items
drwxrwxrwx – jovyan supergroup 0 2017-10-17 13:15 /data/wiki/en_articles_part
%%bash
OUT_DIR=”wordcount_result_”$(date +”%s%6N”)
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name=”Streaming wordCount” \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper.py,reducer.py \
-mapper “python mapper.py” \
-combiner “python reducer.py” \
-reducer “python reducer.py” \
-input /data/wiki/en_articles_part \
-output ${OUT_DIR} > /dev/null
hdfs dfs -cat ${OUT_DIR}/part-00000 | head
0%however 1
0&\mathrm{if 1
0(8)320-1234 1
0)).(1 2
0,03 1
0,1,...,n 1
0,1,0 1
0,1,\dots,n 1
0,5 1
0,50 1
rm: `wordcount_result_1504600278440756': No such file or directory
17/09/05 12:31:23 INFO client.RMProxy: Connecting to ResourceManager at mipt-master.atp-fivt.org/93.175.29.106:8032
17/09/05 12:31:23 INFO client.RMProxy: Connecting to ResourceManager at mipt-master.atp-fivt.org/93.175.29.106:8032
17/09/05 12:31:24 INFO mapred.FileInputFormat: Total input paths to process : 1
17/09/05 12:31:24 INFO mapreduce.JobSubmitter: number of splits:3
17/09/05 12:31:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1503302131685_0379
17/09/05 12:31:25 INFO impl.YarnClientImpl: Submitted application application_1503302131685_0379
17/09/05 12:31:25 INFO mapreduce.Job: The url to track the job: http://mipt-master.atp-fivt.org:8088/proxy/application_1503302131685_0379/
17/09/05 12:31:25 INFO mapreduce.Job: Running job: job_1503302131685_0379
17/09/05 12:31:31 INFO mapreduce.Job: Job job_1503302131685_0379 running in uber mode : false
17/09/05 12:31:31 INFO mapreduce.Job: map 0% reduce 0%
17/09/05 12:31:47 INFO mapreduce.Job: map 33% reduce 0%
17/09/05 12:31:49 INFO mapreduce.Job: map 55% reduce 0%
17/09/05 12:31:55 INFO mapreduce.Job: map 67% reduce 0%
17/09/05 12:32:01 INFO mapreduce.Job: map 78% reduce 0%
17/09/05 12:32:11 INFO mapreduce.Job: map 100% reduce 0%
17/09/05 12:32:19 INFO mapreduce.Job: map 100% reduce 100%
17/09/05 12:32:19 INFO mapreduce.Job: Job job_1503302131685_0379 completed successfully
17/09/05 12:32:19 INFO mapreduce.Job: Counters: 54
File System Counters
FILE: Number of bytes read=4851476
FILE: Number of bytes written=11925190
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=76993447
HDFS: Number of bytes written=5370513
HDFS: Number of read operations=33
HDFS: Number of large read operations=0
HDFS: Number of write operations=16
Job Counters
Launched map tasks=3
Launched reduce tasks=8
Rack-local map tasks=3
Total time spent by all maps in occupied slots (ms)=351040
Total time spent by all reduces in occupied slots (ms)=231588
Total time spent by all map tasks (ms)=87760
Total time spent by all reduce tasks (ms)=38598
Total vcore-milliseconds taken by all map tasks=87760
Total vcore-milliseconds taken by all reduce tasks=38598
Total megabyte-milliseconds taken by all map tasks=179732480
Total megabyte-milliseconds taken by all reduce tasks=118573056
Map-Reduce Framework
Map input records=4100
Map output records=11937375
Map output bytes=97842436
Map output materialized bytes=5627293
Input split bytes=390
Combine input records=11937375
Combine output records=575818
Reduce input groups=427175
Reduce shuffle bytes=5627293
Reduce input records=575818
Reduce output records=427175
Spilled Records=1151636
Shuffled Maps =24
Failed Shuffles=0
Merged Map outputs=24
GC time elapsed (ms)=1453
CPU time spent (ms)=126530
Physical memory (bytes) snapshot=4855193600
Virtual memory (bytes) snapshot=32990945280
Total committed heap usage (bytes)=7536115712
Peak Map Physical memory (bytes)=906506240
Peak Map Virtual memory (bytes)=2205544448
Peak Reduce Physical memory (bytes)=281800704
Peak Reduce Virtual memory (bytes)=3315249152
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
Wiki stats
Total words=11937375
File Input Format Counters
Bytes Read=76993057
File Output Format Counters
Bytes Written=5370513
17/09/05 12:32:19 INFO streaming.StreamJob: Output directory: wordcount_result_1504600278440756
cat: Unable to write to output stream.
===== Testing (num. 700): STARTING =====
Executing notebook with kernel: python2
===== Testing (num. 700): SUMMARY =====
Tests passed: crs700_1 mrb17 sp7_1 res1 res2_1 res3 res6_1 res7_1
Tests failed: – – | – –
==================================================
Duration: 49.0 sec
VCoreSeconds: 178 sec
============================================================================
Hadoop Streaming assignment 1: Words Rating
============================================================================
Helped:
=======================================================================
working with Hadoop streaming and python
=======================================================================
Hadoop streaming may give non-informative exceptions when mappers’ or reducers’ scripts work incorrectly:
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads():
subprocess failed with code 1
It would be better to find the main part of bugs in code before running it on Hadoop (it, by the way, saves your time). You can debug mappers’ and reducers’ code locally (as simple Python scripts) before running it on Hadoop. For doing this you should emulate Hadoop’s workflow using bash.
You already know that every Hadoop job has 3 stages:
- Map,
- Shuffle & sort,
- Reduce.
Since you already have mapper’s and reducer’s code you can emulate Hadoop’s behavior using a simple bash:
cat my_data | python ./mapper.py | sort | python ./reducer.py
It helps you to understand if your code is working at all.
================ Another note on using hadoop streams and python=============
1. Logs of the task should be displayed in the standard error stream (stderr).
Do not redirect stderr to /dev/null because in this case the system won’t receive any information about running Jobs.
2. Output directories (including intermediates) should have randomly generated paths, for example:
4. Please, use relative HDFS-paths, i.e. dir1/file1 instead of /user/jovyan/dir1/file1. When you submit the code it will be executed on a real Hadoop cluster. And ‘jovyan’ user’s account may not exist in it.
5. In the Hadoop logs the counter of stop words should be before the counter of total words. For doing this please take into account that the counters print in lexicographical order by default.
==========END ====== Another note on using hadoop streams and python=========
SPARK ASSIGNMENT
Week5
In this assignment you will use Spark to compute various statistics for word pairs. At the same time, you will learn some simple techniques of natural language processing.
Dataset location: /data/wiki/en_articles
Format: article_id <tab> article_text
While parsing the articles, do not forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. Here is a starting snippet:
#! /usr/bin/env python
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName(“MyApp”).setMaster(“local[2]”))
import re
def parse_article(line):
try:
article_id, text = unicode(line.rstrip()).split(‘\t’, 1)
except ValueError as e:
return []
text = re.sub(“^\W+|\W+$”, “”, text, flags=re.UNICODE)
words = re.split(“\W*\s+\W*”, text, flags=re.UNICODE)
return words
wiki = sc.textFile(“/data/wiki/en_articles_part1/articles-part”, 16).map(parse_article)
result = wiki.take(1)[0]
You can use this code as a starter template. Now proceed to LTI assignments.
If you want to deploy the environment on your own machine, please use bigdatateam/spark-course1 Docker container.
for word in result[:50]:
print word
Spark QUIZ 1:
What functions must a dataset implement in order to be an RDD?
getSplits, getRecordReader
partitions, iterator and dependencies
Correct
Correct. The RDD must have these three functions in order to be named ‘resilient’ and ‘distributed’ dataset.
Question 2
2. Question 2
Mark all the things that can be used as RDDs.
Correct
Correct. You can partition the table by its primary key and use it as the data source.
Correct
Correct. This is the example from the video ‘RDDs’.
A set of CSV files in my home folder
Correct
Correct. You can treat every file as a partition, why not?
Correct
Correct. This is the example from the video ‘RDDs’.
Question 3
3. Question 3
Is it possible to access a MySQL database from within the predicate in the ‘filter’ transformation?
No, it is not possible to use database handles in predicates at all.
Yes, it is possible to create a database handle and capture it within the closure.
Yes, but one need to create a database handle within the closure and close it upon returning from the predicate.
Correct
Correct. However, that is not an efficient solution. A better way would be to use the ‘mapPartition’ transformation which would allow you to reuse the handle between the predicate calls.
Question 4
4. Question 4
True or false? Mark only the correct statements about the ‘filter’ transform.
There is just one partition in the transformed RDD.
There may be many dependencies for some output partitions.
There are indefinitely many partitions in the transformed RDD.
There are no dependencies for the output partitions.
There is a single dependency on an input partition for every output partition.
Correct
Correct. Filtering establishes narrow dependencies between RDDs.
There is the same number of partitions in the transformed RDD as in the source RDD.
Correct
Correct. Filtering establishes one-to-one correspondence between the partitions.
Question 5
5. Question 5
True or false? Mark only the incorrect statements.
There is no native join transformation in Spark.
Correct
Incorrect statement. Rewatch the video ‘Transformations 2’.
Spark natively supports only inner joins.
Correct
Incorrect statement. There are outer joins as well.
You cannot do a map-side join or a reduce-side join in Spark.
Correct
Incorrect statement. Every MapReduce computation could be expressed in Spark terms. Therefore, map-side joins and reduce-side joins could be expressed in Spark as well. But nobody does this in practice.
There is a native join transformation in Spark, and its type signature is: RDD , RDD => RDD .
Correct
Incorrect statement. Join keys must be explicit in the RDD items.
Question 6
6. Question 6
Mark all the transformations with wide dependencies. Try to do this without sneaking into the documentation.
This should not be selected
Incorrect. The ‘map’ transformation can be implemented with the ‘flatMap’, which has narrow dependencies.
Correct
Correct. Cartesian product is a kind of all-to-all join, it has wide dependencies.
Correct
Correct. Reduction requires data shuffling to regroup data items — thus it has wide dependencies.
Correct
Correct. This transformation requires a data shuffle — this it has wide dependencies.
Correct
Correct. Repartitioning may join or split partitions.
Question 7
7. Question 7
Imagine you would like to print your dataset on the display. Which code is correct (in Python)?
Correct
Correct. You need to collect data to the driver program first.
Question 8
8. Question 8
Imagine you would like to count items in your dataset. Which code is correct (in Python)?
Correct
Correct. The ‘fold’ transformation updates an accumulator (which is zero initially) by calling the given function (which increments the value).
Question 9
9. Question 9
Consider the following implementation of the ‘sample’ transformation:
Are there any issues with the implementation?
No, it is completely valid implementation.
Yes, it is written in Python and thus very slow.
Yes, it exhibits nondeterminism thus making the result non-reproducible.
Correct
Correct. The major issue here is the random number generation. Two different runs over a dataset would lead to two different outcomes.
Question 10
10. Question 10
Consider the following action that updates a counter in a MySQL database:
Are there any issues with the implementation?
Yes, the action may produce incorrect results due to non-idempotent updates.
Correct
Yes. If the action fails while processing a partition, it would be re-executed, thus counting some items twice.
Yes, the action may produce incorrect results due to non-atomic increments.
Yes, the action is inefficient; it would be better to use ‘foreachPartition’.
Spark QUIZ 2:
=================
Congratulations! You passed!
Question 1
A unit of work performed by the executor.
An activity spawned in the response to a Spark action.
That is how Spark calls my application.
A pipelineable part of the computation.
A dependency graph for the RDDs.
An activity you get paid for.
Question 2
A unit of work performed by the executor.
An activity you get paid for.
A pipelineable part of the computation.
That is how Spark calls my application.
An activity spawned in the response to a Spark action.
A dependency graph for the RDDs.
Question 3
A subset of the dependency graph.
A single step of the job.
A place where a job is performed.
A pipelineable part of the computation.
An activity spawned in the response to a Spark action.
A particular shuffle operation within the job.
Question 4
4. Question 4
How does your application find out the executors to work with?
The SparkContext object allocates the executors by communicating with the cluster manager.
You statically define them in the configuration file.
The SparkContext object queries a discovery service to find them out.
Question 5
5. Question 5
Mark all the statements that are true.
Spark keeps all the intermediate data in the memory until the end of the computation, that is why it is a ‘lighting-fast computing’!
You can ask Spark to make several copies of your persistent dataset.
Correct
Yes, you can tune the replication factor.
While executing a job, Spark loads data from HDFS only once.
Spark can be hinted to keep particular datasets in the memory.
Data can be cached both on the disk and in the memory.
Correct
Yes, you can tune persistence level to use both the disk & the memory.
Every partition is stored in Spark in 3 replicas to achieve fault-tolerance.
It is advisable to cache every RDD in your computation for optimal performance.
Question 6
6. Question 6
Imagine that you need to deliver three floating-point parameters for a machine learning algorithm used in your tasks. What is the best way to do it?
Capture them into the closure to be sent during the task scheduling.
Correct
Yes, that is correct. Three floating-point numbers add a negligible overhead.
Hardcode them into the algorithm and redeploy the application.
Make a broadcast variable and put these parameters there.
Question 7
7. Question 7
Imagine that you need to somehow print corrupted records from the log file to the screen. How can you do that?
Use an accumulator variable to collect all the records and pass them back to the driver.
Use a broadcast variable to broadcast the corrupted records and listen for these events in the driver.
Use an action to collect filtered records in the driver.
Correct
There is no way to trick you!
Question 8
8. Question 8
How broadcast variables are distributed among the executors?
The executors are organized in a tree-like hierarchy, and the distribution follows the tree structure.
The driver sends the content in parallel to every executor.
The executors distribute the content with a peer-to-peer, torrent-like protocol, and the driver seeds the content.
The driver sends the content one-by-one to every executor.
Question 9
9. Question 9
What will happen if you use a non-associative, non-commutative operator in the accumulator variables?
Spark will not allow me to do that.
Operation semantics are ill-defined in this case.
Correct
Yes. As the order of the updates is unknown in advance, we must be able to apply them in any order. Thus, commutativity and associativity.
I have tried that — everything works just fine.
Question 10
10. Question 10
Mark all the operators that are both associative and commutative.
max(x, y) = if x > y then x else y end
min(x, y) = if x > y then y else x end
concat(x, y) = str(x) + str(y)
Question 11
11. Question 11
Does Spark guarantee that accumulator updates originating from actions are applied only once?
Question 12
12. Question 12
Does Spark guarantee that accumulator updates originating from transformations are applied at least once?