What are BigData and distributed file systems (e.g. HDFS)?

A) Unix Command Line Interface (CLI)
B) Distributed File Systems (DFS), HDFS (Hadoop DFS) Architecture and Scalability Problems
C) Tuning Distributed Storage Platform with File Types

D) Docker installation guide:
To make the learning process more convenient, our team has built Docker containers for each block of assignments. These containers are everything that you should install to pass all our courses. Having these containers deployed you can work with the same Jupyter notebooks as in Coursera’s interface and be independent from the internet connection problems or any other problems.

To use our containers with Jupyter notebooks and appropriate Hadoop services installed, you should do the steps below. Firstly you should install the latest version of Docker. Then you should deploy the necessary containers from our repository in DockerHub. And finally if you want you can use a container in a terminal. It is an optional part of the guide because this is a good decision if you are interested to learn how Hadoop cluster works and not only to practice API of Hadoop services.

I. Installing Docker
For installing Docker and further using it with our containers for this course, you should have a machine answering the following requirements.

The minimal requirements for running notebooks on your computer are:

2-core processor
4 GB of RAM
20 GB of free disk space
The recommended requirements are:

4-core processor
8GB of RAM
50 GB of free disk space

Installing Docker on Unix
>curl -sSL https://get.docker.com/ | sh

To install Docker on any of the most popular Linux operating systems (Ubuntu, Debian, CentOs, fedora etc) you should run the following command with sudo privileges:

Then optionally you can add your user account to the docker group:
>usermod -aG docker $USER

This allows you to deploy containers without sudo but we don’t recommend doing this on CentOS for security reasons.

Checking the installation
Now run the test container to check the installation successful:
>docker run hello-world

You can see some Docker deployment logs on the screen and then “Hello from Docker!” message from successfully deployed container.

II. Deploying containers
You can find all of them in our Dockerhub repository by the following link. Appropriate is specified at the bottom of self-reading to each assignment. E.g. you can see “If you want to deploy the environment on your own machine, please use bigdatateam/spark-course1 Docker container.”

To run a Juputer notebook on you own machine you should do the following steps.

1. Pull the actual version of the container from DockerHub
>docker pull

2. Deploy the container
>docker run –rm -it -p :8888

3. Open Jupyter environment

Now Jupyter can be opened in a browser by typing localhost:. If you’re working on Windows, you should open : instead of localhost. You can find in the deployment logs of the container.

So now you have full environment with installed and adjusted Hadoop services, prepared datasets and demo codes on your own machine.

III. Working with docker through a terminal
To start the container and work with it via Unix terminal you should do the following steps.
1. Start a Tmux session:
>tmux new -s my_docker

2. Run your container in terminal opened:
docker run –rm -it -p 8888:8888 -p 50070:50070 -p 8088:8088 bigdatateam/hdfs-notebook

(please, take into account that you can forward some additional ports with Hadoop UIs, not only Jupyter’s port).

3. Detach the Tmux session by typing “Ctrl+b” and then “d”.

4. Check your container’s id:
>docker ps

In the output of the command you should find something like this “da1d48ac25fc”. This is your container’s id.

5. Finally, you can open the terminal within the container by executing:
>docker exec -it /bin/bash

6. Now you’ve logged in the container as root and you can execute Hadoop shell commands.
>root@da1d48ac25fc:~# hdfs dfs -ls /data
Found 1 items
drwxrwxrwx – jovyan supergroup 0 2017-10-15 16:30 /data/wiki
root@da1d48ac25fc:~#

Simultaneously, you can work with Jupyter via browser.

To stop the docker container you should do the following steps.
1. Attach the Tmux session.
>tmux a -t my_docker
(if you have forgotten the name of Tmux session, you can type ‘tmux ls’)

2. Stop the container via Ctrl+C.

3. Exit from the Tmux session.

We recommend to learn Tmux, it has many interesting features (e.g., you can start from this guide). However these 3 commands will be enough to work with our docker containers:
>tmux new -s
tmux a -t
tmus ls

Name node architecture:
The discussion about WAL and NFS is going beyond the scope of this course. If you are not familiar with them, I suggest you start from Wikipedia articles:
https://en.wikipedia.org/wiki/Write-ahead_logging
https://en.wikipedia.org/wiki/Network_File_System
WAL helps you to persist changes into the storage before applying them.
NFS helps you to overcome node crashes so you will be able to restore changes from a remote storage.

E) First assignment :
==============================================
!hdfs dfs -du -h /user/jovyan/assignment1/test.txt is not correct you should use instead hdfs dfs -ls [filename] because it gives more details about the file.
!hdfs dfs -chown is not correct chown change the owner of the file not the rights on that file, use chmod instead.
for the lase question the used space is false

==============================================

Estimate minimum Namenode RAM size for HDFS with 1 PB capacity, block size 64 MB, average metadata size for each block is 300 B, replication factor is 3. Provide the formula for calculations and the result.

Ans:
1) Formula for minimum Namenode RAM size for HDFS = (Size of original data ) / (Size of single data block * replication factor ) * Avg metadatasize of each block on name node)

Minimum Namenode RAM size with requirements stated = 1 PB / (64 MB * 3) * 300 = (10 ^15 B) / (64 *1000*1000* 3 * 300B) = 1562500000 bytes ~ 1.5GB

Remember the level of granularity on the Namenode is block, not replica . No of blocks on data node is nothing but (Size of original data ) / (Size of single data block * replication factor ) say “x” blocks . So to store one block avg name node metadatasize is 300 bytes. Hence for “x” blocks it is 300x.

2) HDDs in your cluster have the following characteristics: average reading speed is 60 MB/s, seek time is 5 ms. You want to spend 0.5 % time for seeking the block, i.e. seek time should be 200 times less than the time to read the block. Estimate the minimum block size.

Ans:
Check the calculations and the result, they should both be correct.

block_size / 60 MB/s * 0.5 / 100 >= 5 ms

block_size >= 60 MB/s * 0.005 s / 0.005 = 60 MB

So, the minimum block size is 60 MB or 64 MB.

3) To complete this task use the ‘HDFS CLI Playground’ item.

Create text file ‘test.txt’ in a local fs. Use HDFS CLI to make the following operations:

сreate directory ‘assignment1’ in your home directory in HDFS (you can use a relative path or prescribe it explicitly “/user/jovyan/…”)
put test.txt in it
output the size and the owner of the file
revoke ‘read’ permission for ‘other users’
read the first 10 lines of the file
rename it to ‘test2.txt’.
Provide all the commands to HDFS CLI.

Ans:
https://linode.com/docs/tools-reference/tools/modify-file-permissions-with-chmod/

!touch test.txt
!for i in 1 2 3 4 5 6 7 8 9 0 10 ; do echo line_No_$i >> test.txt ; done
!hdfs dfs -mkdir -p /user/jovyan/assignment1
!hdfs dfs -put test.txt /user/jovyan/assignment1/
!hdfs dfs -du -h /user/jovyan/assignment1/test.txt
141 /user/jovyan/assignment1/test.txt
!hdfs dfs -chown -R r-ou /user/jovyan/assignment1/test.txt
!hdfs dfs -cat /user/jovyan/assignment1/test.txt | head -10
!hdfs dfs -mv /user/jovyan/assignment1/test.txt /user/jovyan/assignment1/test2.txt

4)

To complete this task use the ‘HDFS CLI Playground’ item.

Use HDFS CLI to investigate the file ‘/data/wiki/en_articles_part/articles-part’ in HDFS:

get blocks and their locations in HDFS for this file, show the command without an output
get the information about any block of the file, show the command and the block locations from the output

!hdfs fsck -blockId blk_1073741825

Connecting to namenode via http://localhost:50070/fsck?ugi=jovyan&blockId=blk_1073741825+&path=%2F
FSCK started by jovyan (auth:SIMPLE) from /127.0.0.1 at Sun Apr 15 10:40:54 UTC 2018

Block Id: blk_1073741825
Block belongs to: /data/wiki/en_articles_part/articles-part
No. of Expected Replica: 1
No. of live Replica: 1
No. of excess Replica: 0
No. of stale Replica: 0
No. of decommissioned Replica: 0
No. of decommissioning Replica: 0
No. of corrupted Replica: 0
Block replica on datanode/rack: eca2de932db3/default-rack is HEALTHY —-> shows the block location ie datanode/rack: eca2de932db3/default-rack

RIGHT ANSWERS BELOW:

Check the formula and the result, they should both be correct.

1 PB / 64 MB / 3 * 300 B = 1024 * 1024 * 1024 / 64 / 3 * 300 = 1600 MB

The result may be not exactly the same, rounding and other units are possible. So 1600 MB, 1.6 GB, 1.56 GB are all allowed.

=====

Check the calculations and the result, they should both be correct.

block_size / 60 MB/s * 0.5 / 100 >= 5 ms

block_size >= 60 MB/s * 0.005 s / 0.005 = 60 MB

So, the minimum block size is 60 MB or 64 MB.

-============

Check the commands, they should be like these:

$ hdfs dfs -mkdir assignment1

$ hdfs dfs -put test.txt assignment1/

$ hdfs dfs -ls assignment1/test.txt or hdfs dfs -stat “%b %u” assignment1/test.txt

$ hdfs dfs -chmod o-r assignment1/test.txt

$ hdfs dfs -cat assignment1/test.txt | head -10

$ hdfs dfs -mv assignment1/test.txt assignment1/test2.txt

There can be the following differences:

‘hdfs dfs’ and ‘hadoop fs’ are the same
absolute paths are also allowed: ‘/user//assignment1/test.txt’ instead of ‘assignment1/test.txt’
the permissions argument can be in an octal form, like 0640
the ‘text’ command can be used instead of ‘cat’
======

Blocks and locations of ‘/data/wiki/en_articles_part/articles-part’:
$ hdfs fsck /data/wiki/en_articles_part/articles-part -files -blocks -locations

Block information (block id may be different):
$ hdfs fsck -blockId blk_1073971670

It outputs the block locations, example (nodes list will be different):

Block replica on datanode/rack: some_node_hostname/default-rack is HEALTHY

=====

Total capacity: 2.14 TB

Used space: 242.12 GB (=DFS Used) or 242.12+35.51 = 277.63 GB (=DFS Used + Non DFS Used) – the latter answer is more precise, but the former is also possible

Data nodes in the cluster: 4

F)

1. The name of the notebook may contain only Roman letters, numbers and characters “-” or “_”

2. You have to send the worked out notebook (the required outputs in all cells should be filled).

3. While checking the system runs the code from the notebook and reads the output of only the last filled cell. It is clear that for getting this output all the preceding cells should also contain working code.If you decide to write some text in a cell, you should change the style of the cell to Markdown (Cell -> Cell type -> Markdown).

4. Only the answer to your task should be printed in the output stream (stdout) There should be no more output in stdout. In order to get rid of garbage [junk lines] redirect the output to /dev/null.

Let’s see on this code snippet:

%%bash

OUT_DIR=”wordcount_result_”$(date +”%s%6N”)
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR}

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name=”Streaming wordCount” \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper.py,reducer.py \
-mapper “python mapper.py” \
-combiner “python reducer.py” \
-reducer “python reducer.py” \
-input /data/wiki/en_articles_part1 \
-output ${OUT_DIR}

hdfs dfs -cat ${OUT_DIR}/part-00000 | head

F.1)
The commands in lines (6) and (8) may output some junk lines to stdout. So you need to eliminate output of these commands by redirecting it to the /dev/null. Let’s look at the fixed version of the snippet:

%%bash

OUT_DIR=”wordcount_result_”$(date +”%s%6N”)
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name=”Streaming wordCount” \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper.py,reducer.py \
-mapper “python mapper.py” \
-combiner “python reducer.py” \
-reducer “python reducer.py” \
-input /data/wiki/en_articles_part1 \
-output ${OUT_DIR} > /dev/null

hdfs dfs -cat ${OUT_DIR}/part-00000 | head

Please take into account that you mustn’t redirect stderr to anywhere. Hadoop, Hive, and Spark print their logs to stderr and grading system also reads and analyses it.

5. One cell must contain only one programming language.

E.g. if a cell contains Python code and you also want to call a bash-command (using “!”) in it, you should move the bash to another cell.

6. When you submit the solution the grading system converts it from Ipython-notebook into usual Python-script for further execution on the Hadoop-cluster. When standard converter (nbconvert) finds Jupyter’s magic commands or anything else that can’t be found in usual Python it generates a stub function “get_ipython()”. This function doesn’t work by default. Our Ipython converter is an improved version of Nbconvert and it can process most of magics correctly (e.g. it can convert “%%bash” to the “subprocess.check_output()”). However, despite this, we recommend to avoid magics where possible. Because in the cases when our converter can’t process magics correctly, it escalates the conversion to the nbconvert and nbconvert generates “get_ipython()” functions.

Week2:

Select the type of failure when a server in a cluster gets out of service because of the power supply has burnt out

Fail-Stop

Fail-Recovery

This should not be selected
It’s not a Fail-Recovery because it requires somebody to fix the problem first and then to turn on the server

Byzantine failure

Question 2
Correct
1 / 1 points
2. Question 2
Select the type of failure when a server in a cluster reboots because of a momentary power failure

Fail-Stop

Byzantine failure

Fail-Recovery

Correct
That’s right, the server recovers after a failure by itself

Question 3
Correct
1 / 1 points
3. Question 3
Select the type of failure when a server in a cluster outputs unexpected results of the calculations

Fail-Recovery

Fail-Stop

Byzantine failure

Correct
Yes, it is a Byzantine failure, because it looks like the server doesn’t behave according to the protocol

Question 4
Correct
1 / 1 points
4. Question 4
Select the type of failure when some packets are lost regardless the contents of the message

Fair-Loss Link

Correct
Yes, when packets are lost regardless their contents, this is exactly a Fair-Loss Link

Byzantine Link

Question 5
Correct
1 / 1 points
5. Question 5
Select the facts about Byzantine Link:

Some packets are modified

Correct
Yes, in case of Byzantine Link packets can be modified

Some packets are created out of nowhere

Correct
Yes, in case of Byzantine Link packets can be created

Some packets are lost regardless the contents of the message

Un-selected is correct
Question 6
Correct
1 / 1 points
6. Question 6
Select a mechanism for capturing events in chronological order in the distributed system

Synchronize the clocks on all the servers in the system

Use a logical clocks, for example, Lamport timestamps

Both ways are possible, logical clocks hide inaccuracies of clocks synchronization

Correct
That’s right, use a logical clocks, but don’t forget about clocks synchronization

Question 7
Correct
1 / 1 points
7. Question 7
Select the failure types specific for distributed computing, for example for Hadoop stack products

Fail-Stop, Perfect Link and Synchronous model

Byzantine failure, Byzantine Link and Asynchronous model

Fail-Recovery, Fair-Loss Link and Asynchronous model

Correct
That’s right, these types of failures are inherent in Hadoop

Question 8
Correct
1 / 1 points
8. Question 8
What phase in MapReduce paradigm is better for aggregating records by key?

Map

Shuffle & sort

Reduce

Correct
Yes, the reducer gets records sorted by key, so they are suitable for aggregation

Question 9
Incorrect
0 / 1 points
9. Question 9
Select a MapReduce phase which input records are sorted by key:

Map

This should not be selected
No, because mapper input records can be (and usually are) unsorted

Shuffle & sort

Reduce

Question 10
0.60 / 1 points
10. Question 10
What map and reduce functions should be used (in terms of Unix utilities) to select the only unique input records?

map=’uniq’, reduce=None

Un-selected is correct

map=’uniq’, reduce=’uniq’

This should be selected

map=’cat’, reduce=’uniq’

Correct
Yes, because ‘uniq’ on the sorted input records gives the required result

map=’sort -u’, reduce=’sort -u’

Un-selected is correct

map=’cat’, reduce=’sort -u’

This should not be selected
No, because the records have already been sorted before the Reduce phase, so using ‘sort’ here is an overkill and can lead to the ‘out of memory’ error (though on small data the result will be correct)

Question 11
Correct
1 / 1 points
11. Question 11
What map and reduce functions should be used (in terms of Unix utilities) to select only the repeated input records?

map=’uniq’, reduce=None

Un-selected is correct

map=’uniq’, reduce=’cat’

Un-selected is correct

map=’uniq -d’, reduce=’uniq -d’

Un-selected is correct

map=’cat’, reduce=’uniq -d’

Correct
Yes, mappers pass all the records to reducers and then ‘uniq -d’ on the sorted records solves the task

Question 12
Incorrect
0 / 1 points
12. Question 12
What service in Hadoop MapReduce v1 is responsible for running map and reduce tasks:

TaskTracker

JobTracker

This should not be selected
No, JobTracker is responsible for jobs, whereas tasks are run by TaskTracker

Question 13
Correct
1 / 1 points
13. Question 13
In YARN Application Master is…

A service which processes requests for cluster resources

A process on a cluster node to run a YARN application (for example, a MapReduce job)

Correct
Yes, Application Master executes a YARN application on a node

A service to run and monitor containers for application-specific processes on cluster nodes

 

=================== WEEK3 ====================================

In this assignment Hadoop Streaming is used to process Wikipedia articles dump.

Dataset location: /data/wiki/en_articles

Stop words list is in /datasets/stop_words_en.txt’ file in local filesystem.

Format: article_id <tab> article_text

To parse articles don’t forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. To cope with Unicode we recommend to use the following tokenizer:

In this assignment Hadoop Streaming is used to process Wikipedia articles dump.

Dataset location: /data/wiki/en_articles

Stop words list is in /datasets/stop_words_en.txt’ file in local filesystem.

Format: article_id <tab> article_text

To parse articles don’t forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. To cope with Unicode we recommend to use the following tokenizer:

#!/usr/bin/env python

import sys
import re

reload(sys)
sys.setdefaultencoding(‘utf-8’)

for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split(‘\t’, 1)
except ValueError as e:
continue
text = re.sub(“^\W+|\W+$”, “”, text, flags=re.UNICODE)
words = re.split(“\W*\s+\W*”, text, flags=re.UNICODE)

# your code goes here

%%writefile mapper.py

import sys
import re

reload(sys)
sys.setdefaultencoding(‘utf-8’) # required to convert to unicode

for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split(‘\t’, 1)
except ValueError as e:
continue
words = re.split(“\W*\s+\W*”, text, flags=re.UNICODE)
for word in words:
print >> sys.stderr, “reporter:counter:Wiki stats,Total words,%d” % 1
print “%s\t%d” % (word.lower(), 1)

%%writefile reducer.py

import sys

current_key = None
word_sum = 0

%%writefile -a reducer.py

for line in sys.stdin:
try:
key, count = line.strip().split(‘\t’, 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print “%s\t%d” % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count

if current_key:
print “%s\t%d” % (current_key, word_sum)

! hdfs dfs -ls /data/wiki

Found 1 items
drwxrwxrwx – jovyan supergroup 0 2017-10-17 13:15 /data/wiki/en_articles_part

%%bash

OUT_DIR=”wordcount_result_”$(date +”%s%6N”)
NUM_REDUCERS=8

hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null

yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name=”Streaming wordCount” \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper.py,reducer.py \
-mapper “python mapper.py” \
-combiner “python reducer.py” \
-reducer “python reducer.py” \
-input /data/wiki/en_articles_part \
-output ${OUT_DIR} > /dev/null

hdfs dfs -cat ${OUT_DIR}/part-00000 | head

0%however	1
0&\mathrm{if	1
0(8)320-1234	1
0)).(1	2
0,03	1
0,1,...,n	1
0,1,0	1
0,1,\dots,n	1
0,5	1
0,50	1
rm: `wordcount_result_1504600278440756': No such file or directory
17/09/05 12:31:23 INFO client.RMProxy: Connecting to ResourceManager at mipt-master.atp-fivt.org/93.175.29.106:8032
17/09/05 12:31:23 INFO client.RMProxy: Connecting to ResourceManager at mipt-master.atp-fivt.org/93.175.29.106:8032
17/09/05 12:31:24 INFO mapred.FileInputFormat: Total input paths to process : 1
17/09/05 12:31:24 INFO mapreduce.JobSubmitter: number of splits:3
17/09/05 12:31:24 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1503302131685_0379
17/09/05 12:31:25 INFO impl.YarnClientImpl: Submitted application application_1503302131685_0379
17/09/05 12:31:25 INFO mapreduce.Job: The url to track the job: http://mipt-master.atp-fivt.org:8088/proxy/application_1503302131685_0379/
17/09/05 12:31:25 INFO mapreduce.Job: Running job: job_1503302131685_0379
17/09/05 12:31:31 INFO mapreduce.Job: Job job_1503302131685_0379 running in uber mode : false
17/09/05 12:31:31 INFO mapreduce.Job:  map 0% reduce 0%
17/09/05 12:31:47 INFO mapreduce.Job:  map 33% reduce 0%
17/09/05 12:31:49 INFO mapreduce.Job:  map 55% reduce 0%
17/09/05 12:31:55 INFO mapreduce.Job:  map 67% reduce 0%
17/09/05 12:32:01 INFO mapreduce.Job:  map 78% reduce 0%
17/09/05 12:32:11 INFO mapreduce.Job:  map 100% reduce 0%
17/09/05 12:32:19 INFO mapreduce.Job:  map 100% reduce 100%
17/09/05 12:32:19 INFO mapreduce.Job: Job job_1503302131685_0379 completed successfully
17/09/05 12:32:19 INFO mapreduce.Job: Counters: 54
	File System Counters
		FILE: Number of bytes read=4851476
		FILE: Number of bytes written=11925190
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=76993447
		HDFS: Number of bytes written=5370513
		HDFS: Number of read operations=33
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=16
	Job Counters 
		Launched map tasks=3
		Launched reduce tasks=8
		Rack-local map tasks=3
		Total time spent by all maps in occupied slots (ms)=351040
		Total time spent by all reduces in occupied slots (ms)=231588
		Total time spent by all map tasks (ms)=87760
		Total time spent by all reduce tasks (ms)=38598
		Total vcore-milliseconds taken by all map tasks=87760
		Total vcore-milliseconds taken by all reduce tasks=38598
		Total megabyte-milliseconds taken by all map tasks=179732480
		Total megabyte-milliseconds taken by all reduce tasks=118573056
	Map-Reduce Framework
		Map input records=4100
		Map output records=11937375
		Map output bytes=97842436
		Map output materialized bytes=5627293
		Input split bytes=390
		Combine input records=11937375
		Combine output records=575818
		Reduce input groups=427175
		Reduce shuffle bytes=5627293
		Reduce input records=575818
		Reduce output records=427175
		Spilled Records=1151636
		Shuffled Maps =24
		Failed Shuffles=0
		Merged Map outputs=24
		GC time elapsed (ms)=1453
		CPU time spent (ms)=126530
		Physical memory (bytes) snapshot=4855193600
		Virtual memory (bytes) snapshot=32990945280
		Total committed heap usage (bytes)=7536115712
		Peak Map Physical memory (bytes)=906506240
		Peak Map Virtual memory (bytes)=2205544448
		Peak Reduce Physical memory (bytes)=281800704
		Peak Reduce Virtual memory (bytes)=3315249152
	Shuffle Errors
		BAD_ID=0
		CONNECTION=0
		IO_ERROR=0
		WRONG_LENGTH=0
		WRONG_MAP=0
		WRONG_REDUCE=0
	Wiki stats
		Total words=11937375
	File Input Format Counters 
		Bytes Read=76993057
	File Output Format Counters 
		Bytes Written=5370513
17/09/05 12:32:19 INFO streaming.StreamJob: Output directory: wordcount_result_1504600278440756
cat: Unable to write to output stream.

===== Testing (num. 700): STARTING =====
Executing notebook with kernel: python2

===== Testing (num. 700): SUMMARY =====
Tests passed: crs700_1 mrb17 sp7_1 res1 res2_1 res3 res6_1 res7_1
Tests failed: – – | – –
==================================================
Duration: 49.0 sec
VCoreSeconds: 178 sec

 

============================================================================

Hadoop Streaming assignment 1: Words Rating

============================================================================

Create your own WordCount program and process Wikipedia dump. Use the second job to sort words by quantity in the reverse order (most popular first). Output format:

word <tab> count

The result is the 7th word by popularity and its quantity.

Hint: it is possible to use exactly one reducer in the second job to obtain a totally ordered result.


This course uses a third-party tool, Hadoop Streaming assignment 1: Words Rating, to enhance your learning experience. The tool will reference basic information like your Coursera ID.
 Helped:
=======================================================================
working with Hadoop streaming and python
=======================================================================

Hadoop streaming may give non-informative exceptions when mappers’ or reducers’ scripts work incorrectly:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads():

subprocess failed with code 1

It would be better to find the main part of bugs in code before running it on Hadoop (it, by the way, saves your time). You can debug mappers’ and reducers’ code locally (as simple Python scripts) before running it on Hadoop. For doing this you should emulate Hadoop’s workflow using bash.

You already know that every Hadoop job has 3 stages:

  • Map,
  • Shuffle & sort,
  • Reduce.

Since you already have mapper’s and reducer’s code you can emulate Hadoop’s behavior using a simple bash:

cat my_data | python ./mapper.py | sort | python ./reducer.py

It helps you to understand if your code is working at all.

 

================ Another note on using hadoop streams and python=============

1. Logs of the task should be displayed in the standard error stream (stderr).

Do not redirect stderr to /dev/null because in this case the system won’t receive any information about running Jobs.

2. Output directories (including intermediates) should have randomly generated paths, for example:

OUT_DIR=”wordcount_result_”$(date +”%s%6N”)

3. All the letters in the names of Hadoop counters except the first letter should be small (the 1st could be any).

4. Please, use relative HDFS-paths, i.e. dir1/file1 instead of /user/jovyan/dir1/file1. When you submit the code it will be executed on a real Hadoop cluster. And ‘jovyan’ user’s account may not exist in it.

5. In the Hadoop logs the counter of stop words should be before the counter of total words. For doing this please take into account that the counters print in lexicographical order by default.

 

==========END ====== Another note on using hadoop streams and python=========

SPARK ASSIGNMENT 

Week5

In this assignment you will use Spark to compute various statistics for word pairs. At the same time, you will learn some simple techniques of natural language processing.

Dataset location: /data/wiki/en_articles

Format: article_id <tab> article_text

While parsing the articles, do not forget about Unicode (even though this is an English Wikipedia dump, there are many characters from other languages), remove punctuation marks and transform words to lowercase to get the correct quantities. Here is a starting snippet:

#! /usr/bin/env python

from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().setAppName(“MyApp”).setMaster(“local[2]”))

import re

def parse_article(line):
try:
article_id, text = unicode(line.rstrip()).split(‘\t’, 1)
except ValueError as e:
return []
text = re.sub(“^\W+|\W+$”, “”, text, flags=re.UNICODE)
words = re.split(“\W*\s+\W*”, text, flags=re.UNICODE)
return words

wiki = sc.textFile(“/data/wiki/en_articles_part1/articles-part”, 16).map(parse_article)
result = wiki.take(1)[0]

You can use this code as a starter template. Now proceed to LTI assignments.

If you want to deploy the environment on your own machine, please use bigdatateam/spark-course1 Docker container.

for word in result[:50]:
print word

 

Spark QUIZ 1:

What functions must a dataset implement in order to be an RDD?

foo, bar, baz

getSplits, getRecordReader

Question 2

2. Question 2

Mark all the things that can be used as RDDs.

Question 3

3. Question 3

Is it possible to access a MySQL database from within the predicate in the ‘filter’ transformation?

No, it is not possible to use database handles in predicates at all.

Yes, it is possible to create a database handle and capture it within the closure.

Question 4

4. Question 4

True or false? Mark only the correct statements about the ‘filter’ transform.

Question 5

5. Question 5

True or false? Mark only the incorrect statements.

Question 6

6. Question 6

Mark all the transformations with wide dependencies. Try to do this without sneaking into the documentation.

Question 7

7. Question 7

Imagine you would like to print your dataset on the display. Which code is correct (in Python)?

1
2
myRDD.foreach(print)
Question 8

8. Question 8

Imagine you would like to count items in your dataset. Which code is correct (in Python)?

1
2
3
count = 0
myRDD.foreach(lambda x: count += 1)
1
2
myRDD.reduce(lambda x, y: x + y)
Question 9

9. Question 9

Consider the following implementation of the ‘sample’ transformation:

1
2
3
4
class MyRDD(RDD):
def my_super_sample(self, ratio):
return this.filter(lambda x: random.random() < ratio)

Are there any issues with the implementation?

No, it is completely valid implementation.

Yes, it is written in Python and thus very slow.

Question 10

10. Question 10

Consider the following action that updates a counter in a MySQL database:

1
2
3
4
5
6
def update_mysql_counter():
handle = connect_to_mysql()
handle.execute(“UPDATE counter SET value = value + 1”)
handle.close()
myRDD.foreach(update_mysql_counter)

Are there any issues with the implementation?

Yes, the action may produce incorrect results due to non-atomic increments.

Yes, the action is inefficient; it would be better to use ‘foreachPartition’.

Spark QUIZ 2:

=================

Congratulations! You passed!
Question 1

1. Question 1

What is a job?

A unit of work performed by the executor.

That is how Spark calls my application.

A pipelineable part of the computation.

A dependency graph for the RDDs.

An activity you get paid for.

Question 2

2. Question 2

What is a task?

An activity you get paid for.

A pipelineable part of the computation.

That is how Spark calls my application.

An activity spawned in the response to a Spark action.

A dependency graph for the RDDs.

Question 3

3. Question 3

What is a job stage?

A subset of the dependency graph.

A single step of the job.

A place where a job is performed.

An activity spawned in the response to a Spark action.

A particular shuffle operation within the job.

Question 4

4. Question 4

How does your application find out the executors to work with?

You statically define them in the configuration file.

The SparkContext object queries a discovery service to find them out.

Question 5

5. Question 5

Mark all the statements that are true.

Question 6

6. Question 6

Imagine that you need to deliver three floating-point parameters for a machine learning algorithm used in your tasks. What is the best way to do it?

Hardcode them into the algorithm and redeploy the application.

Make a broadcast variable and put these parameters there.

Question 7

7. Question 7

Imagine that you need to somehow print corrupted records from the log file to the screen. How can you do that?

Use an accumulator variable to collect all the records and pass them back to the driver.

Use a broadcast variable to broadcast the corrupted records and listen for these events in the driver.

Question 8

8. Question 8

How broadcast variables are distributed among the executors?

The executors are organized in a tree-like hierarchy, and the distribution follows the tree structure.

The driver sends the content in parallel to every executor.

The driver sends the content one-by-one to every executor.

Question 9

9. Question 9

What will happen if you use a non-associative, non-commutative operator in the accumulator variables?

Spark will not allow me to do that.

I have tried that — everything works just fine.

The cluster will crash.

Question 10

10. Question 10

Mark all the operators that are both associative and commutative.

Question 11

11. Question 11

Does Spark guarantee that accumulator updates originating from actions are applied only once?

No.

Question 12

12. Question 12

Does Spark guarantee that accumulator updates originating from transformations are applied at least once?

Yes.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s