Thursday 21 September 2017

Apache Spark core concepts [ summary style]

Disclaimer: This is the summary from the learning spark book, it covers core concepts of spark and playing around with RDDs with various functions. This post helps in taking a quick revisit to refresh the concepts with out any distractions.

Programming with RDDs

RDD is simply a distributed collection of elements. Under the hood, spark automatically distributes the data contained in RDD across the cluster and parallelizes the operations you perform on them. A RDD is an immutable distributed collection of objects. Each RDD is split into multiple partitions, which may be computed on different nodes of the cluster. Users can create RDDs in two ways: by loading an external dataset, or by distributing a collection of objects (eg., list or set) in their driver program.

RDDs offer two types of operations: transformations and actions. Transformations construct new RDD from previous one. Actions compute a result based on an RDD and either return it to the driver program or save it to storage system. Spark computes RDDs the first time they are used in an action (lazy evaluation). Spark's RDDs are by default recomputed each time you run an action on them. To reuse RDD in multiple actions, we have to persist it using RDD.persist(). After computing it the first time, Spark will store the RDD contents in memory (partitioned across the machines in your cluster), and reuse them in future actions.

To summarize, every Spark program and shell session will work as follows:
1. Create some input RDDs from external data.
2. Transform them to define new RDDs using transformations like filter().
3. Ask Spark to persist() any intermediate RDDs that will need to be reused.
4. Launch actions such as count() and first() to kick off a parallel computation,
which is then optimized and executed by Spark.
cache() is the same as calling persist() with the default storage level.

The ability to always recompute an RDD is actually why RDDs are called “resilient.” When a machine holding RDD data fails, Spark uses this ability to recompute the missing partitions, transparent to the user.
The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method

lines = sc.parallelize(["pandas", "i like pandas"])

If you are ever confused whether a given function is a transformation or an action, you can look at its return type: transformations return RDDs, whereas actions return some other data type. Spark keeps track of the set of dependencies between different RDDs, called the lineage graph. It uses this information to compute each RDD on demand and to recover lost data if part of a persistent RDD is lost.

Actions are count, take etc. RDDs also have a collect() function to retrieve the entire RDD. Keep in mind that your entire dataset must fit in memory on a single machine to use collect() on it, so collect() shouldn’t be used on large datasets.

In most cases RDDs can’t just be collect()ed to the driver because they are too large. In these cases, it’s common to write data out to a distributed storage system such as HDFS or Amazon S3. You can save the contents of an RDD using the saveAsTextFile() action, saveAsSequenceFile(), or any of a number of actions for various built-in formats.

Lazy evaluation means that when we call a transformation on an RDD (for instance, calling map()), the operation is not immediately performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call sc.textFile(), the data is not loaded until it is necessary.
Spark uses lazy evaluation to reduce the number of passes it has to take over our data by grouping operations together.

Functions
We have three options for passing functions into Spark. For shorter functions, we can pass in lambda expressions. Alternatively, we can pass in top-level functions, or locally defined functions.
word = rdd.filter(lambda s: "error" in s)

def containsError(s):
return "error" in s
word = rdd.filter(containsError)

One issue to watch out for when passing functions is inadvertently serializing the object containing the function.

Element-wise transformations
map(), filter(), flatmap() generates multiple outputs for single input.
Set operations
distinct(), union(), intersection(), subtract(), cartesian()

map() takes input a function and applies it to each element in the input RDD with the result of the function being the new value of each element in the resulting RDD.
flatMap() same as map but instead of returning single element for each input element, it returns multiple outputs for each element. (it returns an iterator with our return values and instead of returning RDD of iterators, we get back an RDD that consists of elements from all iterators).
filter() takes in a function and returns an RDD that only has elements that pass the filter() function.
distinct() is expensive, it requires shuffling all data over the network.
union() operation does not remove duplicates.
intersection() operation will remove duplicates. Its performance is worse since it requires a shuffle to identify common elements.
subtract() returns an RDD that has only values present in the first RDD, it performs a shuffle.
cartesian() is expensive on large data sets.

RDD transformations
map(), filter(), flatMap(), distinct(), sample() - sample an RDD with or without replacement, union(), intersection(),subtract(), cartesian()

Actions
reduce(), fold(),Aggregate(), collect(), take(), top(), takeSample(), foreach(), count(), countByValue(), takeOrdered()

reduce() - is the most common aggregation function.
fold(zero)(func) - similar to reduce but takes additional value called identity element that is used for the initial call on each partition. applying it multiple times with your function should not change the value (e.g., 0 for +, 1 for *, or an empty list for concatenation).
Both reduce() and fold () require same return type as of input RDD. In some cases we want to return different type (like in computing running avg we have to track the count as well, so output will be pairs). one way is use map() to transform every element and reduce() can work on pairs.
aggregate(zeroValue)(seqOp, combOp) will help in these cases. With aggregate(), like fold(), we supply an initial zero value of the type we want to return. We then supply a function to combine the elements from our RDD with the accumulator. Finally, we need to supply a second function to merge two accumulators, given that each node accumulates its own results locally.
collect() returns entire RDD's contents to driver program where entire RDD contents are expected to fit in memory.
take(n) returns n elements from RDD, also attempts to minimize number of partitions it accesses, so it may represetn a biased collection.
These operations do not return elements in order you expect and are useful for testing and debugging. There may be bottlenecks when dealing with large data.
top() will use default ordering of the data, and we can supply our own comparison function to extract the top elements.
takeSample(withReplacement, num, [seed]) - if we need a sample of our data in our driver program.
foreach() - let us perform computation on each element in the RDD without brining it (to driver program) back locally.
count() -returns number of elements
countByValue() returns a map of each unique value to its count.
takeOrdered(num)(ordering) Return num elements based on provided ordering.

Persistence (Caching)
When we ask Spark to persist an RDD, the nodes that compute the RDD store their partitions. If a node that has data persisted on it fails, Spark will recompute the lost partitions of the data when needed. We can also replicate our data on multiple nodes if we want to be able to handle node failure without slowdown.
Spark has many levels of persistence to choose from based on what our goals are, they are MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK, MEMORY_AND_DISK_SER

val result = input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))

If you attempt to cache too much data to fit in memory, Spark will automatically evict old partitions using a Least Recently Used (LRU) cache policy. RDDs come with a method called unpersist() that lets you manually remove them from the cache

Working with Key/Value pairs

Pair RDDs allow you to act on each key in parallel or regroup data across the network. pair RDDs have a reduceByKey() method that can aggregate data separately for each key, and a join() method that can merge two RDDs together by grouping elements with the same key.
When creating a pair RDD from an in-memory collection in Scala and Python, we only need to call SparkContext.parallelize() on a collection of pairs.

Transformations on pair RDDs
reduceByKey(), foldByKey(), groupByKey(),combineByKey(), mapValues(),flatMapValues(),keys(), values(), sortByKey()

Also transformations on two pair RDDs are subtractByKey(), join(), rightOuterJoin(), leftOuterjoin(), cogroup().

reduceByKey() Combine values with the same key.
groupByKey() Groups values with the same key.
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) combine values with same key using a different result type.
mapValues() Apply a function to each value of a pair RDD without changing the key. eg rdd.mapValues(x => x+1)
flatMapValues() Apply a function that returns an iterator to each value of a pair RDD, and for each element returned, produce a key/value entry with the old key. Often used for tokenization.
keys() return a RDD of just the keys.
values() return an RDD of just values
sortByKey() return an RDD sorted by the key.
cogroup() group data from both RDDs sharing the same key.
reduceByKey() similar to reduce(), runs several parallel reduce operations, one for each key in the dataset. It is not implemented as an action that returns a value. Instead it returns a new pair RDD.
foldByKey() is similar to fold().

calling reduceByKey() and foldByKey() will automatically perform combining locally on each machine before computing global totals for each key. The user does not need to specify a combiner. The more general combineByKey() interface allows you to customize combining behavior.

word count in python
rdd = sc.textFile("s3://...")
words = rdd.flatMap(lambda x: x.split(" "))
result = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

We can actually implement word count even faster by using the countByValue() function on the first RDD: input.flatMap(x => x.split(" ")).countByValue().

combineByKey() is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. Like aggregate(), combineBy Key() allows the user to return values that are not the same type as our input data. To understand combineByKey(), it’s useful to think of how it handles each element it processes. As combineByKey() goes through the elements in a partition, each element either has a key it hasn’t seen before or has the same key as a previous element. If it’s a new element, combineByKey() uses a function we provide, called create Combiner(), to create the initial value for the accumulator on that key. It’s important to note that this happens the first time a key is found in each partition, rather than only the first time the key is found in the RDD. If it is a value we have seen before while processing that partition, it will instead use the provided function, mergeValue(), with the current value for the accumulator for that key and the new value. Since each partition is processed independently, we can have multiple accumulators for the same key. When we are merging the results from each partition, if two or more partitions have an accumulator for the same key we merge the accumulators using the user-supplied mergeCombiners() function.
We can disable map-side aggregation in combineByKey() if we know that our data won’t benefit from it. For example, groupBy Key() disables map-side aggregation as the aggregation function (appending to a list) does not save any space. If we want to disable map-side combines, we need to specify the partitioner; for now you can just use the partitioner on the source RDD by passing rdd.partitioner.

Per-key average using combineByKey() in Python
sumCount = nums.combineByKey((lambda x: (x,1)),
                                                       (lambda x, y: (x[0] + y, x[1] + 1)),
                                                       (lambda x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()

** In any case, using one of the specialized aggregation functions in Spark can be much faster than the naive approach of grouping our data and then reducing it.

Tuning the level of parallelism
Every RDD has a fixed number of partitions that determine the degree of parallelism to use when executing operations on the RDD. When performing aggregations or grouping operations, we can ask Spark to use a specific number of partitions. Spark will always try to infer a sensible default value based on the size of your cluster, but in some cases you will want to tune the level of parallelism for better performance. Most of the operators accept a second parameter giving the number of partitions to use when creating the grouped or aggregated RDD.

reduceByKey() with custom parallelism in Python
data = [("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda x, y: x + y) # Default parallelism
sc.parallelize(data).reduceByKey(lambda x, y: x + y, 10) # Custom parallelism

To change the partitioning of an RDD outside the context of grouping and aggregation operations, Spark provides the repartition() function. It shuffles the data across the network to create a new set of partitions. Repartitioning the data is a fairly expensive operation. An optimized version of repartition() is coalesce() that allows avoiding data movement but only if you are decreasing the number of RDD partitions.
To know whether you can safely call coalesce(), you can check the size of the RDD using rdd.partitions.size() in Java/Scala and rdd.getNumPartitions() in Python and make sure that you are coalescing it to fewer partitions than it currently has.

Grouping Data
If our data is already keyed in the way we want, groupByKey() will group our data using the key in our RDD. On an RDD consisting of keys of type K and values of type V, we get back an RDD of type [K, Iterable[V]].

*** If you find yourself writing code where you groupByKey() and then use a reduce() or fold() on the values, you can probably achieve the same result more efficiently by using one of the per-key aggregation functions. Rather than reducing the RDD to an inmemory value, we reduce the data per key and get back an RDD with the reduced values corresponding to each key. For example, rdd.reduceByKey(func) produces the same RDD as

rdd.groupBy Key().mapValues(value => value.reduce(func))

but is more efficient as it avoids the step of creating a list of values for each key.

we can also group multiple RDDs sharing same key using function called cogroup(). cogroup() over two RDDs sharing the same key type, K, with the respective value types V and W gives us back RDD[(K, (Iterable[V], Iterable[W]))]. If one of the RDDs doesn’t have elements for a given key that is present in the other RDD, the corresponding Iterable is simply empty. cogroup() gives us the power to group data from multiple RDDs.
cogroup() is used as a building block for the joins. cogroup() can be used for much more than just implementing joins. We can also use it to implement intersect by key. Additionally, cogroup() can work on three or more RDDs at once.

Sorting
The sortByKey() function takes a parameter called ascending indicating whether we want it in ascending order (it defaults to true). And for custom sort order, we can provide our own comparison functions.

Custom sort order in Python, sorting integers as if strings
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc = lambda x: str(x))

Actions on Pair RDDs
countByKey(), collectAsMap(), countByValue(),lookup()
countByKey()  count the number of elements for each key.
collectAsMap() collect the result as a map to provide easy lookup.
lookup() return all values associated with the provided key.

**Data Partitioning (Advanced)
Lets see how datasets partitioning across nodes.
Partitioning will not be helpful in all applications—for example, if a given RDD is scanned only once, there is no point in partitioning it in advance. It is useful only when a dataset is reused multiple times in key-oriented operations such as joins.
Spark’s partitioning is available on all RDDs of key/value pairs, and causes the system to group elements based on a function of each key. Although Spark does not give explicit control of which worker node each key goes to (partly because the system is designed to work even if specific nodes fail), it lets the program ensure that a set of keys will appear together on some node. For example, you might choose to hashpartition an RDD into 100 partitions so that keys that have the same hash value modulo 100 appear on the same node. Or you might range-partition the RDD into sorted ranges of keys so that elements with keys in the same range appear on the same node.
As a simple example, consider an application that keeps a large table of user information in memory—say, an RDD of (UserID, UserInfo) pairs, where UserInfo contains a list of topics the user is subscribed to. The application periodically combines this table with a smaller file representing events that happened in the past five minutes—say, a table of (UserID, LinkInfo) pairs for users who have clicked a link on a website in those five minutes. For example, we may wish to count how many users visited a link that was not to one of their subscribed topics. We can perform this combination with Spark’s join() operation, which can be used to group the User Info and LinkInfo pairs for each UserID by key. Our application would look like below Example.

Example. Scala simple application
// Initialization code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes elements of userData by the HDFS block where they are found,
// and doesn't provide Spark with any way of knowing in which partition a
// particular UserID is located.
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// Function called periodically to process a logfile of events in the past 5 minutes;

// we assume that this is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs(logFileName: String) {
val events = sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined = userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits = joined.filter {
case (userId, (userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number of visits to non-subscribed topics: " + offTopicVisits)
}

This code will run fine as is, but it will be inefficient. This is because the join() operation, called each time processNewLogs() is invoked, does not know anything about how the keys are partitioned in the datasets. By default, this operation will hash all the keys of both datasets, sending elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine. Because we expect the userData table to be much larger than the small log of events seen every five minutes, this wastes a lot of work: the userData table is hashed and shuffled across the network on every call, even though it doesn’t change.
Fixing this is simple: just use the partitionBy() transformation on userData to hash-partition it at the start of the program. We do this by passing a spark.HashPartitioner object to partitionBy, as shown in Example

Example. Scala custom partitioner
val sc = new SparkContext(...)
val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://...")
                                                            .partitionBy(new HashPartitioner(100)) // Create 100 partitions
                                                            .persist()
                                                           
The processNewLogs() method can remain unchanged: the events RDD is local to processNewLogs(), and is used only once within this method, so there is no advantage in specifying a partitioner for events. Because we called partitionBy() when building userData, Spark will now know that it is hash-partitioned, and calls to join() on it will take advantage of this information. In particular, when we call user Data.join(events), Spark will shuffle only the events RDD, sending events with each particular UserID to the machine that contains the corresponding hash partition of userData (see Figure 4-5). The result is that a lot less data is communicated over the network, and the program runs significantly faster.                                       
Also, the 100 passed to partitionBy() represents the number of partitions, which will control how many parallel tasks perform further operations on the RDD (e.g., joins); in general, make this at least as large as the number of cores in your cluster.
Failure to persist an RDD after it has been transformed with partitionBy() will cause subsequent uses of the RDD to repeat the partitioning of the data, resulting in repeated partitioning and shuffling of data across the network, similar to what occurs without any specified partitioner.
However, in Python, you cannot pass a Hash Partitioner object to partitionBy; instead, you just pass the number of partitions desired (e.g., rdd.partitionBy(100)).

To determine what is a RDD's partitioner, use the function partitioner(). (not available in python).
scala> val pairs = sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int, Int)] = ParallelCollectionRDD[0] at parallelize at :12
scala> pairs.partitioner
res0: Option[spark.Partitioner] = None
scala> val partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned: spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at :14
scala> partitioned.partitioner
res1: Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)

without persist() in third line of input, subsequent RDD actions will evaluate the entire lineage of partitioned, which will cause pairs to be hash-partitioned over and over.

The operations that benefit from partitioning are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), and lookup().

***if you call map() on a hash-partitioned RDD of key/value pairs, the function passed to map() can in theory change the key of each element, so the result will not have a partitioner. Spark does not analyze your functions to check whether they retain the key. Instead, it provides two other operations, mapValues() and flatMapValues(), which guarantee that each tuple’s key remains the same.
All that said, here are all the operations that result in a partitioner being set on the output RDD:

cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent has a partitioner), and filter() (if parent has a partitioner).

All other operations will produce a result with no partitioner.

Finally, for binary operations, which partitioner is set on the output depends on the parent RDDs’ partitioners. By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation. However, if one of the parents has a partitioner set, it will be that partitioner; and if both parents have a parti tioner set, it will be the partitioner of the first parent.
To maximize the potential for partitioning-related optimizations, you should use mapValues() or flatMapValues() whenever you are not changing an element’s key.

custom partitioner can be build by extending spark.Partitioner class and implement the required methods. Many of the shuffle-based methods in Spark, such as join() and groupByKey(), can also take an optional Partitioner object to control the partitioning of the output.
In Python, you do not extend a Partitioner class, but instead pass a hash function as an additional argument to RDD.partitionBy().

Example. Python custom partitioner

import urlparse

def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)

rdd.partitionBy(20, hash_domain) # Create 20 partitions

No comments:

Post a Comment