In continuation to my previous post on sentiment analysis, here lets explore further on performing the same using RHadoop.
What will be covered?
- Environment & Pre-requisites
- Rhadoop in action
- Setting Rhadoop environment variables
- Setting working folder paths
- Loading data
- Scoring function
- Writing Mapper
- Writing Reducer
- Run your Map-Reduce program
- Read data output from hadoop to R data frame
Environment:
I have performed this analysis on below given set up:
- Single Node Hadoop Cluster set up over Ubuntu 14.04 (learn how to set up here!)
- RHadoop, is a collection of four R packages that allow users to manage and analyse data with Hadoop (learn how to set up here!)
Pre-requisites:
Ensure that all hadoop process are running. You can do this by running the hadoop command on your terminal
start-dfs.sh and start-yarn.sh
Then, run the command jps on your terminal and the result should look similar to below screen shot:
RHadoop In Action:
Set up the environment variables, and note that the path may change based on your version of Ubuntu and Hadoop (I’m using Hadoop 2.4.0) installation
Sys.setenv("HADOOP_CMD"="/usr/local/hadoop/bin/hadoop" Sys.setenv("HADOOP_STREAMING"="/usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.4.0.jar")
Setting folder paths:
To ease the testing process during development stage flexibility of switching between local and hadoop environment would be useful. In the below code setting Local = T would use files from local folder otherwise Local = F to use from hadoop
# Root folder path setwd('/home/manohar/example/Sentiment_Analysis') # Set "LOCAL" variable to T to execute using rmr's local backend. # Otherwise, use Hadoop (which needs to be running, correctly configured, etc.) LOCAL=F if (LOCAL) { rmr.options(backend = 'local') # we have smaller extracts of the data in this project's 'local' subdirectory hdfs.data.root = '/home/manohar/example/Sentiment_Analysis/' hdfs.data = file.path(hdfs.data.root, 'data', 'data.csv') hdfs.out.root = hdfs.data.root } else { rmr.options(backend = 'hadoop') # assumes 'Sentiment_Analysis/data' input path exists on HDFS under /home/manohar/example hdfs.data.root = '/home/manohar/example/Sentiment_Analysis/' hdfs.data = file.path(hdfs.data.root, 'data') # writes output to 'Sentiment_Analysis' directory in user's HDFS home (e.g., /home/manohar/example/Sentiment_Analysis/) hdfs.out.root = 'Sentiment_Analysis' } hdfs.out = file.path(hdfs.out.root, 'out')
Loading Data:
Below code will copy the file from local to hadoop, if file already exists then will return TRUE
# equivalent to hadoop dfs -copyFromLocal hdfs.put(hdfs.data, hdfs.data)
Our data is in csv file, so setting the input format for better code readability especially in for the mapper stage
# asa.csv.input.format() - read CSV data files and label field names # for better code readability (especially in the mapper) # asa.csv.input.format = make.input.format(format='csv', mode='text', streaming.format = NULL, sep=',', col.names = c('ID', 'Name', 'Gender', 'Age','OverAllRating', 'ReviewType', 'ReviewTitle', 'Benefits', 'Money', 'Experience', 'Purchase', 'claimsProcess', 'SpeedResolution', 'Fairness', 'ReviewDate', 'Review', 'Recommend', 'ColCount'), stringsAsFactors=F)
Load opinion lexicons, the files and paper on opinion lexicons can be found here
pos_words <- scan('/home/manohar/example/Sentiment_Analysis/data/positive-words.txt', what='character', comment.char=';') neg_words <- scan('/home/manohar/example/Sentiment_Analysis/data/negative-words.txt', what='character', comment.char=';')
Scoring Function:
Below is the main function that calculates the sentiment score, written by Jeffrey Breen (source here!)
score.sentiment = function(sentence, pos.words, neg.words) { require(plyr) require(stringr) score = laply(sentence, function(sentence, pos.words, neg.words) { # clean up sentences with R's regex-driven global substitute, gsub(): sentence = gsub('[[:punct:]]', '', sentence) sentence = gsub('[[:cntrl:]]', '', sentence) sentence = gsub('\\d+', '', sentence) # and convert to lower case: sentence = tolower(sentence) # split into words. str_split is in the stringr package word.list = str_split(sentence, '\\s+') # sometimes a list() is one level of hierarchy too much words = unlist(word.list) # compare our words to the dictionaries of positive & negative terms pos.matches = match(words, pos.words) neg.matches = match(words, neg.words) # match() returns the position of the matched term or NA # we just want a TRUE/FALSE: pos.matches = !is.na(pos.matches) neg.matches = !is.na(neg.matches) # and conveniently enough, TRUE/FALSE will be treated as 1/0 by sum(): score = sum(pos.matches) - sum(neg.matches) return(score) }, pos.words, neg.words) score.df = data.frame(score) return(score.df) }
Mapper:
This is the first stage in the map-reduce process which splits out each word into a separate string (also called as tokenizing the string) and for each word seen, it will output the word and a 1 which is the count value to indicate that it has seen the word one time. Mapper phase works parallel because Hadoop uses divide and conquer approach to slove the problem. This is just to execute your code as fast as possible. In this phase all the computation, processing and distribution of data takes place. However in our case we are using single node the code and logic is fairly simple.
The mapper gets keys and values from the input formatter. In our case, the key is NULL and the value is a data.frame from read.table()
mapper = function(key, val.df) { # Remove header lines val.df = subset(val.df, Review != 'Review') output.key = data.frame(Review = as.character(val.df$Review),stringsAsFactors=F) output.val = data.frame(val.df$Review) return( keyval(output.key, output.val) ) }
Reducer:
The reduce phase will then sum up the number of times each word was seen and write that sum count together with the word as output.
There are two sub parts that internally works before our code gives its final result, that are shuffle and short. Shuffle just to collect similar type of works into single unit and Short for shorting data into some order.
reducer = function(key, val.df) { output.key = key output.val = data.frame(score.sentiment(val.df, pos_words, neg_words)) return( keyval(output.key, output.val) ) }
Running your Map-Reduce:
Executing the map-reduce logic program.
mr.sa = function (input, output) { mapreduce(input = input, output = output, input.format = asa.csv.input.format, map = mapper, reduce = reducer, verbose=T) } out = mr.sa(hdfs.data, hdfs.out) ------- output on screen ------ > out = mr.sa(hdfs.data, hdfs.out) 16/09/09 10:11:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable packageJobJar: [/usr/local/hadoop/data/hadoop-unjar2099064477903127749/] [] /tmp/streamjob6583314935744487158.jar tmpDir=null 16/09/09 10:11:33 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8050 16/09/09 10:11:33 INFO client.RMProxy: Connecting to ResourceManager at localhost/127.0.0.1:8050 16/09/09 10:11:36 INFO mapred.FileInputFormat: Total input paths to process : 3 16/09/09 10:11:36 INFO mapreduce.JobSubmitter: number of splits:4 16/09/09 10:11:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1473394634383_0002 16/09/09 10:11:39 INFO impl.YarnClientImpl: Submitted application application_1473394634383_0002 16/09/09 10:11:39 INFO mapreduce.Job: The url to track the job: http://manohar-dt:8088/proxy/application_1473394634383_0002/ 16/09/09 10:11:39 INFO mapreduce.Job: Running job: job_1473394634383_0002 16/09/09 10:11:58 INFO mapreduce.Job: Job job_1473394634383_0002 running in uber mode : false 16/09/09 10:11:58 INFO mapreduce.Job: map 0% reduce 0% 16/09/09 10:12:27 INFO mapreduce.Job: map 48% reduce 0% 16/09/09 10:12:37 INFO mapreduce.Job: map 100% reduce 0% 16/09/09 10:13:22 INFO mapreduce.Job: map 100% reduce 100% 16/09/09 10:13:35 INFO mapreduce.Job: Job job_1473394634383_0002 completed successfully 16/09/09 10:13:36 INFO mapreduce.Job: Counters: 50 File System Counters FILE: Number of bytes read=1045750 FILE: Number of bytes written=2580683 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=679293 HDFS: Number of bytes written=578577 HDFS: Number of read operations=15 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=4 Launched reduce tasks=1 Data-local map tasks=4 Total time spent by all maps in occupied slots (ms)=148275 Total time spent by all reduces in occupied slots (ms)=53759 Total time spent by all map tasks (ms)=148275 Total time spent by all reduce tasks (ms)=53759 Total vcore-seconds taken by all map tasks=148275 Total vcore-seconds taken by all reduce tasks=53759 Total megabyte-seconds taken by all map tasks=151833600 Total megabyte-seconds taken by all reduce tasks=55049216 Map-Reduce Framework Map input records=9198 Map output records=1818 Map output bytes=1037580 Map output materialized bytes=1045768 Input split bytes=528 Combine input records=0 Combine output records=0 Reduce input groups=1616 Reduce shuffle bytes=1045768 Reduce input records=1818 Reduce output records=1720 Spilled Records=3636 Shuffled Maps =4 Failed Shuffles=0 Merged Map outputs=4 GC time elapsed (ms)=1606 CPU time spent (ms)=22310 Physical memory (bytes) snapshot=1142579200 Virtual memory (bytes) snapshot=5270970368 Total committed heap usage (bytes)=947912704 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=678765 File Output Format Counters Bytes Written=578577 rmr reduce calls=1616 16/09/09 10:13:36 INFO streaming.StreamJob: Output directory: /Sentiment_Analysis/out
Load output from hadoop to R data frame:
Read the output from hadoop folder to a R variable and convert it to data frame for further processing.
results = from.dfs(out) # put the result in a dataframe df = sapply(results,c) df = data.frame(df) # convert to dataframe colnames(df) <- c('Review', 'score') # assign column names print(head(df)) ------- Result ----- Review score 1 Very good experience 1 2 It was a classic scenario 1 3 I have chosen for all my insurances 0 4 As long as customers understand the t&c 0 5 time will tell if live up to our expectations 0 6 Good price good customer service happy to help.. 3
Now we have the sentiment score for each text. This opens up opportunity for further analysis such as classifying emotion, polarity and a whole lot of visualization for insight. Please see my previous post here to learn more about this.
You can find the full working code in my github account here!