0:07
In this lecture we're going to be discussing stream processing.
Now, stream processing is a very old branch of distributed computing.
But in the last few years,
it has come into the prominence because of the new workloads that we are seeing.
And one of the systems that has become very popular is the Storm system, and
we'll be discussing some of the, the details of Storm in this lecture.
So we'll discuss why stream processing is important in today's workloads and
then also we'll discuss some design details of Storm.
0:37
So today's workloads involve large amounts of data whether these are feeds in
a social network or whether they're information click for
information from particular websites or from advertisement networks or
whether there are logs of accesses in any distributing computing infrastructure.
So when you have such large amounts of information, you need
some real-time view of these of these data, so that they can be compressed.
And they can be distilled down into a form that is digestible to human users.
So, for instance, in social networks you have the capability of search, and
this is often real-time search as you might find in the Twitter social network,
where you can do a real-time search of the tweets that are being put out right now on
a particular topic or with a particular key word.
When you have websites which are collecting data about
who is visiting websites you may want to do a website's statistics for
instance, analytics like Google Analytics.
1:42
for instance most data centers are running some form of intrusion detection system or
the other, and all these require real-time analysis of the data that's coming in.
Essentially real-time analysis means that you have large amounts of data and
you need to process very large amounts of data within a few seconds with very low
latencies and with very high throughput, as much data per second as possible.
And you want to produce some amount of knowledge or
information out of that data that you're getting.
So you want to convert data into knowledge but fairly quickly even if the dat,
even if the data cannot be processed completely
you want to still be able to [SOUND] glean some information out of it, at least.
So that's a stream processing challenge and of course you might think well,
the MapReduce system is available and we've already used it for
batch computing before, why not just use it?
And in fact there are variants of MapReduce that are aimed at
stream processing.
But essentially MapReduce is a batch processing framework.
And you need to wait for an entire computation on a large dataset to complete
before you can get the results.
There is no notion of partial results in MapReduce at all.
Even the variants of MapReduce that are tuned to stream-processing or
to incremental processing, there are incremental versions of MapReduce as well,
still have a fairly high latency.
And essentially MapReduce is not intended for stream-processing applications.
And essentially it's not ra,
intended to be a long running application which is what stream processing really is.
So the Storm system has become very popular in the last few years.
It's an Apache project in the Apache incubator.
It's one of the more highly active JVM projects today.
Multiple languages are supported in the API including Python and Ruby.
And Storm as of today is used by many companies including Twitter for
personalization and for search.
So every time you do a Twitter search, they're likely using indirectly
the Storm system that Twitter is running in the data centers.
Flipboard uses Storm for generating custom feeds.
And a variety of other companies and websites use Storm as well,
including the Weather Channel WebMD.com, and others.
3:45
So Storm consists of five, of, consists of many components, and the five major
keywords in Storm are tuples, streams, spouts, bolts, and topologies.
We'll study these what these are over the next few slides.
So tuple is essentially an ordered list of elements.
For instance,
a tuple might be, in the Twitter world, a tweeter followed by the tweet itself.
For instance a tuple might be the the tweeter name which is Miley Cyrus
followed be the tweet which says, Hey!
Here's my new song!
A different tuple might be a, a different tweeter,
Justin Bieber followed by the tweet itself, Hey!
Here's MY new song!
4:21
In a different world where you're collecting click through logs the tuple
might be the URL followed by the IP of the clicker client then the date and the time.
For instance the URL might be coursera.org,
the IP is 101.201.301.401, then you have the date and the time.
A different tuple might be from a different client, 901.801.701.601.
And then a slightly later time.
Okay, so each of these is a tuple.
And you want to process these tuples.
You don't have just one tuple as you've see here.
You have multiple tuples and so a stream is said to be a sequence of tuples,
potentially unbounded, which means that the stream might poten,
contain potentially infinite number of tuples.
For instance, in our social network example,
we could have all the tweets that are being put out there, vi, via stream.
So for instance, the Miley Cyrus tweet, followed by the Justin Bieber's tweet,
followed by a tweet by Rolling Stones that says,
Here's my old song that's still a super hit.
And so on and so forth, you can have other tuples coming in as well.
Similarly in the website example, the two tuples we discussed might be
part of a stream that contain other tuples, as well.
Essentially, you have a sequence of tuples coming in, or a stream.
And you need to parse, process these tuples potentially one tuple at a time.
But who generates these tuples?
The process or entity that generates these tuples is known as the spout in Storm.
The spout is a source of streams.
Often the spout reads from a crawler or from a database.
So, for instance it might be reading from a database in the case of Twitter,
the database that contains the actual tweets that are being posted.
Or in the case of a web analytics application it might be reading from
a crawler that is crawling the proxy logs of a particular webpage.
6:11
How do you process these streams?
Well the entity that process processes a stream is known as a bolt in Storm.
A bolt processes input streams typically,
just one input stream but it can process multiple input streams as well and
in turn it generates an output stream which is then fed to other bolts.
In this example, I have a spout on the left generating a green stream for
the first bolt, which processes one tuple at a time,
maybe multiple tuples at a time and generates some output tuples.
In this case, the purple tuples which are then fed as input to another
bolt over here, the second bolt, which then generates yellow tuples
which are then fed potentially to another bolt and so on and so forth.
So a bolt is essentially a program that is given as input one or more streams, and
then decides when it gets a tuple from these multiple streams or
from just one stream, whichever are its input streams.
It decides how to generate output bolts.
Okay, so that's typically the code you would be writing when you write
a Storm application.
You'd be writing code for each of the bolts in your application.
And your application essentially consists of a series of bolts connected
in a topology.
So a directed graph of spouts and
bolts along with output bolts is called a topology.
And that's essentially equivalent or corresponds to a Storm application.
So in this case I have three bolts in my application, one spout, and
of these bolts is the output bolt.
The rightmost bolt is the output bolt.
And as shown here, essentially you would write code for
each of these three bolts in the system as per your application requirements.
And then final bolt, the rightmost bolt will generate some output bolts for you.
7:44
Now topologies are essentially directed graphs and
topologies may contain loops as well,
they can have cycles as is shown over here if your application in fact requires it.
But of course when you're recording cycles you have to be careful that you're not
creating infinite loops so that you're, you know,
you need to be sure that tuples are not going around in the system forever.
You need to make sure that tuples are in fact being processed and
then within some amount of time they're they leave the system.
They don't stay around forever.
So bolts themselves
the entity that processes streams can come in many different flavors.
Three of the more popular flavors are filter, join, and apply.
A filter essentially forwards a tuple only if it satisfies a condition.
So that's essentially very similar to the SQL filtering notion.
A join receives has a tuple, has a bolt receiving multiple streams,
say it receives two streams A and B.
It computes a cross product of all the tuples and streams A and B.
And for every pair of tuples, one from A,
one from B, it then checks whether it satisfies a condition, and if it does,
then it outputs something, okay, perhaps something from those two tuples.
That's again very similar to the notion of a database join.
Finally, you have an apply or transform which takes a tuple and modifies it
according to a function perhaps even according to a filter function.
And there of course many other functions that bolts can implement.
In fact, you can write up your own bolt your own bolt functionality that either
uses these functionalities or doesn't implements a new functionality.
9:12
But no matter what bolt you implement, the bolt needs to process a lot of data.
Remember that we are having very large amounts of data coming in and
there are potentially gigabytes per second,
maybe even terabytes per second coming into each bolt.
I need to process this large amount of data and process it fast.
So in order to make bolts, each bolt fast you can paralize bolts.
You can paralize a bolt into multiple processes or tasks.
So a, a bolt might consist of multiple tasks and
incoming stream into that bolt is split among these tasks.
9:50
among the tasks in a given bolt is decided by what is known as a grouping strategy.
So, there are three types of grouping that are popular,
which decide how the tuples of a stream are distributed among the tasks
of the bolt to which that stream is going to.
So, the first is shuffle grouping, which distributes the tuples of a stream
evenly among the tasks of the bolt, and this is done in a round-robin fashion.
So, essentially as you receive tuples, you go through
the tasks in a sequential order, giving each task one other batch of tuples and
then you loop back around to the beginning of that list of tasks and
then you do that again for the incoming tuples.
10:23
However this may not be what you want because you may need to group
the tuples based on some fields, or a subset of its fields.
This is where field grouping comes into play.
So here you group a stream by a subset of its fields.
For instance, you may want to have all the tweets,
all the tuples where the Twitter username starts with A through M or
0 through 4 to go to task one, and the remaining tuples to go to task 2.
This allows you to, for instance,
aggregate those individual classes of user names.
And this is done via fields grouping.
Finally you have all grouping as well,
where all the tasks of the bolt receive all the input tuples.
This is especially useful for joins.
For instance if you're trying to join the incoming stream with some other stream or
with an existing database then this is fairly useful for an all grouping.
11:09
How is a Storm cluster server run?
Let's discuss a little bit about the infrastructure components.
So the Storm cluster is run by a master node, again elected using leader election.
The master node runs a daemon called a Nimbus.
And Nimbus is responsible for three functionalities among others.
It is responsible for distributing code along the clusters.
So when a bolt is started up on another server Nimbus is responsible for
sending the code for that bolt to that particular server.
It's responsible for assigning tasks to machines.
So when a bolt is split up into multiple tasks, which task runs and which servers,
that's decided by Nimbus.
Also Nimbus is responsible for failure detection.
If any of these servers or any bolts crashes, Nimbus needs to detect this and
then potentially restart the tasks or the bolts on other servers.
11:57
This is run by, via a daemon known as a supervisor.
This listens for work assigned to its machine.
So it's, it talks to the Nimbus server, and
it also keeps track of the tasks that are running at itself, so that if any of these
tasks crashes it can then be either restarted or the Nimbus daemon can be
asked by the supervisor for a new task that needs to be run in its place.
12:19
A Storm also uses the Zookeeper system which you have seen elsewhere in
this course which helps coordinate between the Nimbus and the supervisors.
And keep all the state of the supervisors the supervisors and
the Nimbus in a consistent place so that if any of these crashes
you can then recover using what's kept in the Zookeeper system itself.
12:41
What do you do with failures?
So of course you have failures, a server might crash while it is in the midst of
processing a tuple and if it does so then those tuples might be lost,
any of the tuples that are in transit at the server might be lost, 'kay?
So in order to tack, tackle this Storm uses what is known as anchoring,
where you can anchor an output to one or more of its input tuples.
'Kay, and the failure of this output tuple,
tuple you received causes one or more of those tuples to be replayed in the system.
The way you do this is via an API which is specified in the OutputCollector class.
This has three major functions, emit, ack, and fail.
Emit essentially emits an output tuple perhaps anchored to an input tuple.
And if you want to anchor to an input tuple you specify that as the first
argument in emit.
You can acknowledge that a tuple has been done by calling the ack function.
And this means that the spout will now know that hey this tuple has been,
this input tuple that I had generated has been processed by the last
bolt in the topology.
13:44
Or you can fail a tuple this immediately fails a spout to
pull at the root at the tuple topology and if so then you would re, start
replaying the tuples that have been missed as specified in the fail function itself.
'Kay, this might be done for instance if there is a failure or
there is an exception from the database and so on and so forth.
Typically, you need to be, in Storm,
at least in the current version, you need remember to explicitly acknowledge or
fail each tuple there is no implicit failure detection of the tuple themselves.
14:14
And if you don't do so then you can certainly miss tuples that are not being,
that will not be processed because of failures, but
also because tuples consume memory.
You might have memory leaks because the tuples might stay on forever inside
a particular server and never be garbage collected.
So, in order to prevent this, you need to make sure that you use ack or
fail in a consistent manner in an exclusive manner,
across all the tuples that you are processing.
14:40
So to wrap up our discussion of stream processing in Storm processing data,
large amounts of data and real time systems i,
in trace systems in real time within a few seconds is a big requirement.
And although stream processing systems have been around for
a long period of time, for instance IBM has had a very long running project called
Streams which is a very popular and widely used proprietary system.
One of the newer open source systems that has emerged for
processing streams today is Storm or Apache Storm.
Then many sister systems that have emerged as an offshoot of Storm.
For instance, Spark Streaming from Berkeley but in essen,
in essence Storm allows you to process large amounts of data, large amount,
large streams in very short periods of time with very high tuple.
To do this it uses parallelism and it, it allows you to write up
arbitrary applications using application topologies and
it also supports some notion of fault tolerance.