In this session,we're going to focus on wide versus narrow dependencies, which dictate relationships between RDDs in graphs of computation, which we'll see has a lot to do with shuffling. A recurring theme lately has been that, not all transformations are exactly equal. Some are a lot more expensive in terms of latency than others. For example, some might require lots of data to be transferred over the network perhaps unnecessarily so, operations that cause a shuffle are an example of this. In order to better understand when shuffling might occur, we have to look at how RDDs are represented, and then we have to think carefully about the operations that we're calling on our RDDs, to understand when a shuffle might occur. We'll actually see that this unique way of representing RDDs, actually makes fault tolerance possible in Spark. So we mentioned in earlier sessions that Spark is fault tolerant, we'll get a glimpse of how that's possible and actually kind of easy. So let's start with some terminology. Let's start with lineages. So if you think about a group of computations that are done on a RDD, we can call that group of computations, a lineage graph. That is when you do operations on RDD, the operations can be organized into a Directed Acyclic Graph, representing the computations that were done on that RDD. So here's an example, we start with some input RDD, then we call the number of operations on that RDD. We can then reuse one of these RDD's in two subsequent computations. So we can reuse this filtered RDD in account, and we can also reuse it in a reduce. If you try to visualize it, it makes a graph like this. An rdd is the rdd that's actually made from this input file. We can then do map and filter, these transformations here are not rdd. And we'll get back in new RDD called filter, which we could persist in memory. And then we can use that filtered RDD in two different ways, with a count and with a reduce operation. This forms a graph of computations of tree without cycles. That's why it's called a Directed Acyclic Graph. In Spark, these graphs or DAGs are referred to as the lineage graph. That means it's possible to step back up this graph, to figure out how the result of this count operation is derived from input file. These lineage graphs are a big part of how RDDs are represented in Spark. And in fact, Spark can actually analyze this representation and do optimizations. We'll see what that means in a moment. But first, let's look at how RDDs are represented. So let's start with a visual example. Let's assume that this block here is an RDD. Usually, we think of RDDs as some kind of abstraction that we invoke methods on. But an RDD itself is actually made up of Partitions, partitions are atomic pieces of the dataset, and they may exist on one or many compute node. So you might have one RDD spread out over many compute nodes, and all of the pieces that are spread out are the Partitions. The next most important part of an RDD are Dependencies. Dependencies model relationship between an RDDs partitions, and the partitions that it was derived from in other RDDs. So for example in this picture, if this is a parent RDD and this is a child RDD, and we're calling the map function on this RDD to get that RDD. Then the Dependencies are how these partitions mapped to those partitions with the map function. So, while you might normally think of a map as one big arrow between this RDD and that RDD, actually Dependencies model the relationship between some partitions and the partitions that they were derived from. So you could say in this picture that this partition was derived from that partition. This Partition was derived from that partition and so on. And finally the last two parts of an RDD include a function, so you can think of this as the function that you might you might pass to a map operation on the previous slide, you can't compute a child partition without some kind of function to get that child partition from the parent partition. So functions are actually a big part of RDDs, because they say how to compute the data sets based on an RDDs parents. So what to actually do. And finally, there is some metadata about the partitioning scheme and where data is placed that's a part of an RDD. So these are the four parts of an RDD. But you might ask well how does that relate to shuffles? If you recall in a previous session, we developed the following rule of thumb. We said a shuffle can occur, when the resulting RDD depends on other elements from the same RDD or another RDD. So here's a key word, depends. Now we have an idea of what a dependency is, in fact an RDD's dependencies actually encode when data must move over the network. So that brings us to transformations. So we know that transformations cause shuffles, and we know that dependency information can tell us when a shuffle might occur. To differentiate between these, we can actually define two sets of dependencies called Narrow and Wide Dependencies, that can tell us when shuffles will happen. We're going to go more into depth about what a Narrow Dependency and a Wide Dependency means in this session. So let's define Marrow and Wide Dependencies. A transformation has Narrow Dependencies, when each partition of the parent RDD is used by at most one partition of the child RDD. So that means some child partition has only one parent partition and not many. A transformation that has Wide Dependencies on the other hand, is one where each partition of the parent RDD may be depended on by multiple children partitions. So that means you may have many child partitions, which were all derived from a single parent partition. Transformations with these kind of dependencies have Wide Dependencies. So what does that mean to us? Well, as you might've guessed, transformations with Narrow Dependencies are typically quite fast. They require no shuffles, and optimizations like pipelining can occur. Which is when you can group together many transformations into one pass. And as you may have guessed, transformations with Wide Dependencies are slow, because they require all or some data to be shuffled over the network. So these are the transformations that cause shuffles. So let's try to visualize the differences between Narrow and Wide Dependencies. Let's start first with Narrow Dependencies. So as I said on the previous slide, each partition of the parent RDD is used by at most one partition of the child RDD. So some example transformations include map and filter, because one partition depends on at most one other partition. And the same is true for union. We can essentially just put the data in this partition, into the new resulting RDD here. There's no relationship between many RDDs, when it comes to looking at who this partition was derived from. And what might surprise you, is that this is also true for joins that are already partitioned. So if we recall in previous sessions, how we actually were able to create joins where shuffles weren't required, this is the sort of join which has a Narrow Dependency. But just to go back to the definition of a Narrow Dependency, join with co-partitioned inputs, is a transformation with narrow dependencies. Because each partition of the parent RDD, is used but at most one partition of the child RDD. That is this child dependency here, is derived from only one partition in one of the parent RDDs. So in another way, we don't have an error going from this partition to that child partition, and then also this partition here going to that same first child partition. We have just one relationship, this partition relates to that partition and that's it. On the other hand, transformation with Wide Dependencies as we said on the previous slide, is when each partition of a parent RDD may be depended on by multiple child partitions. So one operation that we already know caused the shuffles is groupByKey. Now when we visualize groupByKey with partitions and dependencies, we can see how a shuffle will occur. As the definition here says, we have parent partitions which are depended on by multiple child partitions. So for example, this partition is depended on by both of the children partitions. So these are Wide Dependencies. Another operation with Wide Dependencies of course is join, when it's inputs are not already partitioned, this means that we are going to have to move data all over the network to make sure that the resulting RDD has all the values corresponding to some key on the same partitions. Before the join is called, they could be spread all over the network. So this is a pretty clear example of an operation that has Wide Dependencies, a join without already partitioned inputs. So let's take this visualization a step further, and let's visualize a sample program and its dependencies. So let's assume we have the following DAG, the following graph of computations. Where each one of these here are RDDs. So we have this box is an RDD, this one's called A, this one's called B, this one's called C, D, E, F, G. They each have different numbers of partitions. So A has three partitions, C has two partitions. And these big arrows here, represent the operations that we're calling on those RDDs. So A is the parent RDD. And B is the child RDD, and the operation that B is derived from is groupBy. So here's a quick exercise for you. If you had to draw the dependencies between each of these RDDs, what would they look like? Which dependencies are wide and which dependencies are narrow? Try it for yourself. Try to draw it out on a piece of paper, and think very carefully about what functions like groupBy and map, actually semantically do. Ask yourself how many dependencies each one of these partitions might have on their parents. Well, here's the answer. If you think very carefully about what groupBy has to do, it has to group values by the same key on in the resulting RDDs. So it has to move key value pairs around the network, in order to make sure that they're all in the same partition. Which means that many of the parent RDDs partitions, are used in each partition of the child RDD. Operations like map have only one dependency. So this partition depends on only that partition and it doesn't matter what's in this partition. So the next question I asked you, are which dependencies are wide and which are narrow? Transformations like groupBy and join are wide, because each child partition depends on multiple parent partitions. And transformations like map and union are narrow, because each child depends on only one parent RDD. There's a one-to-one relationship between parent partitions and child partitions. Well, hang on a second. You might be asking yourself, well, why is this side of the join narrow? And the simple answer is that if you looked at the code, you probably cached B in memory because you knew you were doing a group by key, which does partitioning already. So you want to keep that partitioning in place when you reuse B. So I said simply, B then is already co-partitioned. That's already partitioning in cached memory following the call to this groupBy here. That's why this part of the join can have Narrow Dependencies. So I hope those visualizations were an aha moment for you, when thinking about computations that you have to do in Spark. You actually have to think carefully about whether or not an operation that you want to use, might have Narrow or Wide Dependencies. And of course we saw that sometimes some operations like join, sometimes are narrow and sometimes are wide. So, I can make a list here of operations that are usually always narrow and usually always wide. Though there of course some exemptions such us join here, because we know that join is not always wide. Sometimes it can be narrow depending on the partitioning scheme. But here we can roughly breakdown transformations into narrow and wide. Again, doesn't always hold but it's a rough approximation. So you can always look back at this list and use it as little bit of our reference, but it's not always going to give you the answer. You're still going to have to think carefully about the operation that you're actually doing on the cluster. But sometimes, maybe you want to be a little bit more certain than thinking critically about the operations that you're using. For that, Sparks gives you a method called dependencies. And dependencies will give you a sequence of dependency objects, which are actually the dependencies that Spark uses in its scheduler, to know how this RDD depends on other RDDs. When you call this method, you'll get back a sequence of dependency objects with different names. Dependencies called OneToOneDependencies are Narrow Dependencies. You might also see something called a Prune and a Range Dependency. But most often you'll see OneToOneDependency for narrow transformations. And finally wide dependencies are called ShuffleDepedency in Spark, which is a pretty clear indicator that a shuffle might occur. Here's a quick example of what the output to dependencies looks like. So if you have some pair RDD words, and you do groupByKey and then you call dependencies. What you get back from the Spark is a list with shuffle dependencies in it. So you know that this called to groupByKey is going to cause the shuffle. Spark also comes with another method called toDebugString. This method actually prints out a visualization of the RDD lineage along with other information relevant to scheduling. So if I was to use the same example on the previous slide, so we do map and then groupByKey on some pair RDD of words, if we call toDebugString, what we see is this. The resulting RDD from group by key is a ShuffledRDD, which came from a MapPartitionsRDD. Which itself came from a ParallelCollectionRDD. So this ParallelCollectionsRDD corresponds to wordsRdd. This MapPartitionsRDD corresponds to the results of map on this wordsRdd. And finally, ShuffledRDD corresponds to the result of groupByKey. The indentations here actually show how Spark groups together these operations. So operations in the same indentation, like for example MapPartitions here, these groupings are typically separated by shuffles. So you can print out a lineage using toDebugString, and you'll be able to see how your job will be broken up into different groupings of operations, separated by shuffles. So for example you can have a group of many narrow transformations, followed by one wide transformation. And you can see that from the indentation from the lineage printed out, from toDebugString. And if you want to understand a little bit more about how Spark Jobs are actually run, these groupings are called stages. So a spark job is broken into stages by its scheduler. So we saw these lineage graphs and how they actually represent all of the relationships between RDDS, and how we can determine when a shuffle will occur, but at the beginning of this session, I made this claim that lineage is related to fault tolerance. So how might that be the case? Well, actually lineages are the key to fault tolerance in Spark. Ideas for functional programming enable this fault tolerance in Spark. This is because these are immutable, so we can't update or change any of the data inside of an RDD. And then by using higher-order functions like map, flatMap and filter to do functional transformations on our immutable data, what we have is effectively a Directed Acyclic Graph. And assuming that we keep around this function that we passed to operations like map, flatMap and filter, what we can do is we can re-compute at any point in time any of the RDDs in our entire lineage graph. So with these three aspects of Spark's design, RDDs being immutable, everything being essentially higher order functions, and then passing functions to these higher order functions, and building a Directive Accyclic Graph, building a tree of transformation out of that. This means that we can actually, recover from failures by recomputing lost partitions from the lineage graphs. So if you keep this information around. If you save this somewhere to stable storage, then you'll always be able to re-derive individual RDDs in your lineage graph. That's the key idea here. And that's how Spark achieves fault tolerance without having to write to disk. This is how Spark can still do everything in memory, and also be fault tolerant. Of course, this is always more easily understood visually. So let's assume one of our partitions from the previous example fails. So let's assume this partition here for some reason, is no longer any good. Without having to have checkpoints and all that data to disk, Spark can actually just re-drive it using this graph of dependencies that it all ready has. For example, we know that this piece of data is derived from this piece of data, which is derived from this piece of data. Assuming that non of it is cached in memory, and I can't just reuse this, and all we all have to do is recompute these pieces of data. So all we have to do is go back to this dependency information along with those functions that are stored with those dependencies, and just recompute these pieces of data in which we compute that piece of data, like so. And viola. Now note, now we can put in this partitions is fast for narrow dependencies, but it's slow for y dependencies. But looking then at a visualization to see why that's the case. If we lost this partition instead, we have to trace our dependencies backward. And because this child dependency depends on all of the parent dependencies in this union, we've gotta recompute all of these intermediate RDDs, in order to recompute this one piece of data here. Of course, it helps that this RDD is cached memory so we can just reuse this partition in, without having to compute this groupBy. But this is still going to be very expensive. So losing partitions that were derived from a transformation with wide dependencies, can be much slower.