Disclaimer: This is the summary from the learning spark book, it covers
core concepts of spark and playing around with RDDs with various functions.
This post helps in taking a quick revisit to refresh the concepts with out any distractions.
Programming with RDDs
RDD is simply a distributed collection of elements. Under
the hood, spark automatically distributes the data contained in RDD across the
cluster and parallelizes the operations you perform on them. A RDD is an immutable distributed collection of objects. Each
RDD is split into multiple partitions, which may be computed on different nodes
of the cluster. Users can create RDDs in two ways: by loading an external
dataset, or by distributing a collection of objects (eg., list or set) in their
driver program.
RDDs offer two types of operations: transformations and
actions. Transformations construct new RDD from previous one. Actions compute a
result based on an RDD and either return it to the driver program or save it to
storage system. Spark computes RDDs the first time they are used in an
action (lazy evaluation). Spark's RDDs are by default recomputed each time you run an
action on them. To reuse RDD in multiple actions, we have to persist it using
RDD.persist(). After computing it the first time, Spark will store the RDD
contents in memory (partitioned across the machines in your cluster), and reuse
them in future actions.
To summarize, every Spark program and shell session will
work as follows:
1. Create some input RDDs from external data.
2. Transform them to define new RDDs using transformations
like filter().
3. Ask Spark to persist() any intermediate RDDs that will
need to be reused.
4. Launch actions such as count() and first() to kick off a
parallel computation,
which is then optimized and executed by Spark.
cache() is the same as calling persist() with the default
storage level.
The ability to always recompute an RDD is actually why RDDs
are called “resilient.” When a machine holding RDD data fails, Spark uses this
ability to recompute the missing partitions, transparent to the user.
The simplest way to create RDDs is to take an existing
collection in your program and pass it to SparkContext’s parallelize() method
lines =
sc.parallelize(["pandas", "i like pandas"])
If you are ever confused whether a given function is a
transformation or an action, you can look at its return type: transformations
return RDDs, whereas actions return some other data type. Spark keeps track of the set of dependencies between
different RDDs, called the lineage graph. It uses this information to compute
each RDD on demand and to recover lost data if part of a persistent RDD is
lost.
Actions are count, take etc. RDDs also have a collect()
function to retrieve the entire RDD. Keep in mind that your entire dataset must
fit in memory on a single machine to use collect() on it, so collect()
shouldn’t be used on large datasets.
In most cases RDDs can’t just be collect()ed to the driver
because they are too large. In these cases, it’s common to write data out to a
distributed storage system such as HDFS or Amazon S3. You can save the contents
of an RDD using the saveAsTextFile() action, saveAsSequenceFile(), or any of a
number of actions for various built-in formats.
Lazy evaluation means that when we call a transformation on an RDD (for instance, calling map()), the operation is not immediately performed. Instead, Spark internally records metadata to indicate that this operation has been requested. Rather than thinking of an RDD as containing specific data, it is best to think of each RDD as consisting of instructions on how to compute the data that we build up through transformations. Loading data into an RDD is lazily evaluated in the same way transformations are. So, when we call sc.textFile(), the data is not loaded until it is necessary.
Spark uses lazy evaluation to reduce the number of passes it
has to take over our data by grouping operations together.
Functions
We have three options for passing functions into Spark. For
shorter functions, we can pass in lambda expressions. Alternatively, we can
pass in top-level functions, or locally defined functions.
word =
rdd.filter(lambda s: "error" in s)
def containsError(s):
return
"error" in s
word =
rdd.filter(containsError)
One issue to watch out for when passing functions is
inadvertently serializing the object containing the function.
Element-wise
transformations
map(), filter(), flatmap() generates multiple outputs for
single input.
Set operations
distinct(), union(), intersection(), subtract(), cartesian()
map() takes input a function and applies it to each element in the input RDD with the result of the function being the new value of each element in the resulting RDD.
flatMap() same as map but instead of returning single element for each input element, it returns multiple outputs for each element. (it returns an iterator with our return values and instead of returning RDD of iterators, we get back an RDD that consists of elements from all iterators).
filter() takes in a function and returns an RDD that only has elements that pass the filter() function.
distinct() is expensive, it requires shuffling all data over
the network.
union() operation does not remove duplicates.
intersection() operation will remove duplicates. Its
performance is worse since it requires a shuffle to identify common elements.
subtract() returns an RDD that has only values present in the
first RDD, it performs a shuffle.
cartesian() is expensive on large data sets.
RDD transformations
map(), filter(), flatMap(), distinct(), sample() - sample an
RDD with or without replacement, union(), intersection(),subtract(),
cartesian()
Actions
reduce(), fold(),Aggregate(), collect(), take(), top(),
takeSample(), foreach(), count(), countByValue(), takeOrdered()
reduce() - is the most common aggregation function.
fold(zero)(func) - similar to reduce but takes additional
value called identity element that is used for the initial call on each
partition. applying it multiple times with your function should not change the
value (e.g., 0 for +, 1 for *, or an empty list for concatenation).
Both reduce() and fold () require same return type as of
input RDD. In some cases we want to return different type (like in computing
running avg we have to track the count as well, so output will be pairs). one
way is use map() to transform every element and reduce() can work on pairs.
aggregate(zeroValue)(seqOp, combOp) will help in these
cases. With aggregate(), like fold(), we supply an initial zero value of the
type we want to return. We then supply a function to combine the elements from
our RDD with the accumulator. Finally, we need to supply a second function to
merge two accumulators, given that each node accumulates its own results
locally.
collect() returns entire RDD's contents to driver program
where entire RDD contents are expected to fit in memory.
take(n) returns n elements from RDD, also attempts to
minimize number of partitions it accesses, so it may represetn a biased
collection.
These operations do not return elements in order you expect
and are useful for testing and debugging. There may be bottlenecks when dealing
with large data.
top() will use default ordering of the data, and we can
supply our own comparison function to extract the top elements.
takeSample(withReplacement, num, [seed]) - if we need a
sample of our data in our driver program.
foreach() - let us perform computation on each element in
the RDD without brining it (to driver program) back locally.
count() -returns number of elements
countByValue() returns a map of each unique value to its
count.
takeOrdered(num)(ordering) Return num elements based on
provided ordering.
Persistence (Caching)
When we ask Spark to persist an RDD, the nodes that compute
the RDD store their partitions. If a node that has data persisted on it fails,
Spark will recompute the lost partitions of the data when needed. We can also
replicate our data on multiple nodes if we want to be able to handle node
failure without slowdown.
Spark has many levels of persistence to choose from based on
what our goals are, they are MEMORY_ONLY, MEMORY_ONLY_SER, MEMORY_AND_DISK,
MEMORY_AND_DISK_SER
val result =
input.map(x => x * x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
If you attempt to cache too much data to fit in memory,
Spark will automatically evict old partitions using a Least Recently Used (LRU)
cache policy. RDDs come with a method called unpersist() that lets you manually
remove them from the cache
Working with Key/Value pairs
Pair RDDs allow you to act on each key in parallel or
regroup data across the network. pair RDDs have a reduceByKey() method that can
aggregate data separately for each key, and a join() method that can merge two
RDDs together by grouping elements with the same key.
When creating a pair RDD from an in-memory collection in
Scala and Python, we only need to call SparkContext.parallelize() on a
collection of pairs.
Transformations on
pair RDDs
reduceByKey(), foldByKey(), groupByKey(),combineByKey(),
mapValues(),flatMapValues(),keys(), values(), sortByKey()
Also transformations on two pair RDDs are subtractByKey(),
join(), rightOuterJoin(), leftOuterjoin(), cogroup().
reduceByKey() Combine values with the same key.
groupByKey() Groups values with the same key.
combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)
combine values with same key using a different result type.
mapValues() Apply a function to each value of a pair RDD
without changing the key. eg rdd.mapValues(x => x+1)
flatMapValues() Apply a function that returns an iterator to
each value of a pair RDD, and for each element returned, produce a key/value
entry with the old key. Often used for tokenization.
keys() return a RDD of just the keys.
values() return an RDD of just values
sortByKey() return an RDD sorted by the key.
cogroup() group data from both RDDs sharing the same key.
reduceByKey() similar
to reduce(), runs several parallel reduce operations, one for each key in the
dataset. It is not implemented as an action that returns a value. Instead it
returns a new pair RDD.
foldByKey() is similar to fold().
calling reduceByKey() and foldByKey() will automatically
perform combining locally on each machine before computing global totals for
each key. The user does not need to specify a combiner. The more general
combineByKey() interface allows you to customize combining behavior.
word count in python
rdd =
sc.textFile("s3://...")
words =
rdd.flatMap(lambda x: x.split(" "))
result =
words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
We can actually implement word count even faster by using
the countByValue() function on the first RDD: input.flatMap(x =>
x.split(" ")).countByValue().
combineByKey() is the most general of the per-key aggregation functions. Most of the other per-key combiners are implemented using it. Like aggregate(), combineBy Key() allows the user to return values that are not the same type as our input data. To understand combineByKey(), it’s useful to think of how it handles each element it processes. As combineByKey() goes through the elements in a partition, each element either has a key it hasn’t seen before or has the same key as a previous element. If it’s a new element, combineByKey() uses a function we provide, called create Combiner(), to create the initial value for the accumulator on that key. It’s important to note that this happens the first time a key is found in each partition, rather than only the first time the key is found in the RDD. If it is a value we have seen before while processing that partition, it will instead use the provided function, mergeValue(), with the current value for the accumulator for that key and the new value. Since each partition is processed independently, we can have multiple accumulators for the same key. When we are merging the results from each partition, if two or more partitions have an accumulator for the same key we merge the accumulators using the user-supplied mergeCombiners() function.
We can disable map-side aggregation in combineByKey() if we
know that our data won’t benefit from it. For example, groupBy Key() disables
map-side aggregation as the aggregation function (appending to a list) does not
save any space. If we want to disable map-side combines, we need to specify the
partitioner; for now you can just use the partitioner on the source RDD by
passing rdd.partitioner.
Per-key average using combineByKey() in Python
sumCount =
nums.combineByKey((lambda x: (x,1)),
(lambda
x, y: (x[0] + y, x[1] + 1)),
(lambda
x, y: (x[0] + y[0], x[1] + y[1])))
sumCount.map(lambda
key, xy: (key, xy[0]/xy[1])).collectAsMap()
** In any case, using one of the specialized aggregation
functions in Spark can be much faster than the naive approach of grouping our
data and then reducing it.
Tuning the level of
parallelism
Every RDD has a fixed number of partitions that determine
the degree of parallelism to use when executing operations on the RDD. When
performing aggregations or grouping operations, we can ask Spark to use a
specific number of partitions. Spark will always try to infer a sensible
default value based on the size of your cluster, but in some cases you will
want to tune the level of parallelism for better performance. Most of the
operators accept a second parameter giving the number of partitions to use when
creating the grouped or aggregated RDD.
reduceByKey() with custom parallelism in Python
data =
[("a", 3), ("b", 4), ("a", 1)]
sc.parallelize(data).reduceByKey(lambda
x, y: x + y) # Default parallelism
sc.parallelize(data).reduceByKey(lambda
x, y: x + y, 10) # Custom parallelism
To change the partitioning of an RDD outside the context of
grouping and aggregation operations, Spark provides the repartition() function.
It shuffles the data across the network to create a new set of partitions.
Repartitioning the data is a fairly expensive operation. An optimized version
of repartition() is coalesce() that allows avoiding data movement but only if
you are decreasing the number of RDD partitions.
To know whether you can safely call coalesce(), you can
check the size of the RDD using rdd.partitions.size() in Java/Scala and
rdd.getNumPartitions() in Python and make sure that you are coalescing it to
fewer partitions than it currently has.
Grouping Data
If our data is already keyed in the way we want,
groupByKey() will group our data using the key in our RDD. On an RDD consisting
of keys of type K and values of type V, we get back an RDD of type [K,
Iterable[V]].
*** If you find yourself writing code where you groupByKey()
and then use a reduce() or fold() on the values, you can probably achieve the
same result more efficiently by using one of the per-key aggregation functions.
Rather than reducing the RDD to an inmemory value, we reduce the data per key
and get back an RDD with the reduced values corresponding to each key. For
example, rdd.reduceByKey(func) produces the same RDD as
rdd.groupBy Key().mapValues(value => value.reduce(func))
but is more efficient as it avoids the step of creating a list of values for each key.
rdd.groupBy Key().mapValues(value => value.reduce(func))
but is more efficient as it avoids the step of creating a list of values for each key.
we can also group multiple RDDs sharing same key using
function called cogroup(). cogroup() over two RDDs sharing the same key type,
K, with the respective value types V and W gives us back RDD[(K, (Iterable[V],
Iterable[W]))]. If one of the RDDs doesn’t have elements for a given key that
is present in the other RDD, the corresponding Iterable is simply empty.
cogroup() gives us the power to group data from multiple RDDs.
cogroup() is used as a building block for the joins.
cogroup() can be used for much more than just implementing joins. We can also
use it to implement intersect by key. Additionally, cogroup() can work on three
or more RDDs at once.
Sorting
The sortByKey() function takes a parameter called ascending
indicating whether we want it in ascending order (it defaults to true). And for
custom sort order, we can provide our own comparison functions.
Custom sort order in Python, sorting integers as if strings
rdd.sortByKey(ascending=True,
numPartitions=None, keyfunc = lambda x: str(x))
Actions on Pair RDDs
countByKey(), collectAsMap(), countByValue(),lookup()
countByKey() count
the number of elements for each key.
collectAsMap() collect the result as a map to provide easy
lookup.
lookup() return all values associated with the provided key.
**Data Partitioning
(Advanced)
Lets see how datasets partitioning across nodes.
Partitioning will not be helpful in all applications—for
example, if a given RDD is scanned only once, there is no point in partitioning
it in advance. It is useful only when a dataset is reused multiple times in
key-oriented operations such as joins.
Spark’s partitioning is available on all RDDs of key/value
pairs, and causes the system to group elements based on a function of each key.
Although Spark does not give explicit control of which worker node each key
goes to (partly because the system is designed to work even if specific nodes
fail), it lets the program ensure that a set of keys will appear together on
some node. For example, you might choose to hashpartition an RDD into 100
partitions so that keys that have the same hash value modulo 100 appear on the
same node. Or you might range-partition the RDD into sorted ranges of keys so
that elements with keys in the same range appear on the same node.
As a simple example, consider an application that keeps a
large table of user information in memory—say, an RDD of (UserID, UserInfo)
pairs, where UserInfo contains a list of topics the user is subscribed to. The
application periodically combines this table with a smaller file representing
events that happened in the past five minutes—say, a table of (UserID,
LinkInfo) pairs for users who have clicked a link on a website in those five
minutes. For example, we may wish to count how many users visited a link that
was not to one of their subscribed topics. We can perform this combination with
Spark’s join() operation, which can be used to group the User Info and LinkInfo
pairs for each UserID by key. Our application would look like below Example.
Example. Scala simple
application
// Initialization
code; we load the user info from a Hadoop SequenceFile on HDFS.
// This distributes
elements of userData by the HDFS block where they are found,
// and doesn't provide
Spark with any way of knowing in which partition a
// particular UserID
is located.
val sc = new
SparkContext(...)
val userData =
sc.sequenceFile[UserID, UserInfo]("hdfs://...").persist()
// Function called
periodically to process a logfile of events in the past 5 minutes;
// we assume that this
is a SequenceFile containing (UserID, LinkInfo) pairs.
def processNewLogs(logFileName:
String) {
val events =
sc.sequenceFile[UserID, LinkInfo](logFileName)
val joined =
userData.join(events)// RDD of (UserID, (UserInfo, LinkInfo)) pairs
val offTopicVisits =
joined.filter {
case (userId,
(userInfo, linkInfo)) => // Expand the tuple into its components
!userInfo.topics.contains(linkInfo.topic)
}.count()
println("Number
of visits to non-subscribed topics: " + offTopicVisits)
}
This code will run fine as is, but it will be inefficient.
This is because the join() operation, called each time processNewLogs() is
invoked, does not know anything about how the keys are partitioned in the
datasets. By default, this operation will hash all the keys of both datasets,
sending elements with the same key hash across the network to the same machine,
and then join together the elements with the same key on that machine. Because
we expect the userData table to be much larger than the small log of events
seen every five minutes, this wastes a lot of work: the userData table is
hashed and shuffled across the network on every call, even though it doesn’t
change.
Fixing this is simple: just use the partitionBy()
transformation on userData to hash-partition it at the start of the program. We
do this by passing a spark.HashPartitioner object to partitionBy, as shown in
Example
Example. Scala custom
partitioner
val sc = new
SparkContext(...)
val userData =
sc.sequenceFile[UserID, UserInfo]("hdfs://...")
.partitionBy(new
HashPartitioner(100)) // Create 100 partitions
.persist()
The processNewLogs() method can remain unchanged: the events
RDD is local to processNewLogs(), and is used only once within this method, so
there is no advantage in specifying a partitioner for events. Because we called
partitionBy() when building userData, Spark will now know that it is
hash-partitioned, and calls to join() on it will take advantage of this
information. In particular, when we call user Data.join(events), Spark will
shuffle only the events RDD, sending events with each particular UserID to the
machine that contains the corresponding hash partition of userData (see Figure
4-5). The result is that a lot less data is communicated over the network, and
the program runs significantly faster.
Also, the 100 passed to partitionBy() represents the number
of partitions, which will control how many parallel tasks perform further
operations on the RDD (e.g., joins); in general, make this at least as large as
the number of cores in your cluster.
Failure to persist an RDD after it has been transformed with
partitionBy() will cause subsequent uses of the RDD to repeat the partitioning
of the data, resulting in repeated partitioning and shuffling of data across
the network, similar to what occurs without any specified partitioner.
However, in Python, you cannot pass a Hash Partitioner
object to partitionBy; instead, you just pass the number of partitions desired
(e.g., rdd.partitionBy(100)).
To determine what is a RDD's partitioner, use the function
partitioner(). (not available in python).
scala> val pairs =
sc.parallelize(List((1, 1), (2, 2), (3, 3)))
pairs: spark.RDD[(Int,
Int)] = ParallelCollectionRDD[0] at parallelize at :12
scala>
pairs.partitioner
res0:
Option[spark.Partitioner] = None
scala> val
partitioned = pairs.partitionBy(new spark.HashPartitioner(2))
partitioned:
spark.RDD[(Int, Int)] = ShuffledRDD[1] at partitionBy at :14
scala>
partitioned.partitioner
res1:
Option[spark.Partitioner] = Some(spark.HashPartitioner@5147788d)
without persist() in third line of input, subsequent RDD
actions will evaluate the entire lineage of partitioned, which will cause pairs
to be hash-partitioned over and over.
The operations that benefit from partitioning are cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), and lookup().
***if you call map() on a hash-partitioned RDD of key/value pairs, the function passed to map() can in theory change the key of each element, so the result will not have a partitioner. Spark does not analyze your functions to check whether they retain the key. Instead, it provides two other operations, mapValues() and flatMapValues(), which guarantee that each tuple’s key remains the same.
All that said, here are all the operations that result in a
partitioner being set on the output RDD:
cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent has a partitioner), and filter() (if parent has a partitioner).
All other operations will produce a result with no partitioner.
cogroup(), groupWith(), join(), leftOuterJoin(), rightOuter Join(), groupByKey(), reduceByKey(), combineByKey(), partitionBy(), sort(), mapValues() (if the parent RDD has a partitioner), flatMapValues() (if parent has a partitioner), and filter() (if parent has a partitioner).
All other operations will produce a result with no partitioner.
Finally, for binary operations, which partitioner is set on the output depends on the parent RDDs’ partitioners. By default, it is a hash partitioner, with the number of partitions set to the level of parallelism of the operation. However, if one of the parents has a partitioner set, it will be that partitioner; and if both parents have a parti tioner set, it will be the partitioner of the first parent.
To maximize the potential for partitioning-related
optimizations, you should use mapValues() or flatMapValues() whenever you are
not changing an element’s key.
custom partitioner can be build by extending spark.Partitioner class and implement the required methods. Many of the shuffle-based methods in Spark, such as join() and groupByKey(), can also take an optional Partitioner object to control the partitioning of the output.
In Python, you do not extend a Partitioner class, but
instead pass a hash function as an additional argument to RDD.partitionBy().
Example. Python
custom partitioner
import urlparse
def hash_domain(url):
return hash(urlparse.urlparse(url).netloc)
rdd.partitionBy(20,
hash_domain) # Create 20 partitions
No comments:
Post a Comment