Archive for the ‘Distributed Systems’ Category
Network Operating System?
I’ve just begun dealing with Software Defined Networks (SDN) for my Master’s thesis, and I’m experimenting on top of Floodlight, an open source OpenFlow controller from Big Switch Networks. In OpenFlow, a logically centralised entity known as the controller can control the forwarding tables of a bunch of switches which speak OpenFlow. OpenFlow applications then talk to the controller using some controller-specific API to ‘program’ the network (manipulate forwarding tables on the switches). The high level architecture looks something like this:
Just like an operating system abstracts away the complexities of the underlying hardware for a user-space application, the controller abstracts away the complexities of the network for OpenFlow applications. For this reason, the controller is often referred to as a “network operating system”. Applications have some API to talk to the network-OS, and it translates those APIs into OpenFlow commands that control the switches.
For my thesis, the plan for my architecture was to have two applications that provide different services to the network, that are expected to run simultaneously. Both of them collect information from the OpenFlow switches and some other framework specific agents situated at the edges of the network to make some optimisation type decisions. But as soon as I implemented one of the applications, it was clear that I had no straightforward way of ensuring that both my applications wouldn’t make decisions that counteract each other. Although I really don’t like the idea of doing this, the easiest way to solve this is to wrap both applications into one. And from the looks of it, this is a problem that hasn’t been solved yet.
Controllers like NOX and Onix make the assumption that only one OpenFlow application is running on a given network at any point of time. This is a reasonable assumption from a systems perspective. But what’s gotten me confused is how OpenFlow applications fit into the “SDN for enterprises” picture. I was under the impression that a network operator using a particular controller could choose between different 3rd party OpenFlow applications to handle different complexities with the network: a load balancing application from vendor A for the edge, a routing daemon application from vendor B, and so forth. While these are relatively orthogonal applications, it looks like it’s possible for two OpenFlow applications to make decisions and choices that adversely affect each other (leading to oscillations in switch state). Floodlight allows you to run multiple applications at the same time, but leaves it to the developer (or user?) to ensure that applications can safely co-exist with each other.
So again, if my observation isn’t mistaken, how do OpenFlow applications fit cleanly into the SDN ecosystem? How can I manage my network using building blocks of applications from different vendors? Will I need to rely on OneBigApplianceFromBigBadVendor per network? Does this necessitate something analogous to per-process resource allocation as in traditional operating systems? I can see that FlowVisor style slicing is one way to go about it, but will that suffice?
So what *should* the network operating system do here? Let the applications run wild and fight it out? Or provide some mechanism to enforce policies between applications?
If I am indeed mistaken in my assumption, please do let me know what I’m missing here!
Towards a Scalable and Highly Available HDFS Namenode
After 3 months of intense hacking, I’m pleased to be writing about a little something I worked on for a project course here at KTH.
The premise
So we’re all familiar with Hadoop, right? It’s the little yellow elephant that provides an excellent platform for distributed computing, which is seeing rapid adoption by the industry, and involvement from major players like Yahoo!, Facebook and recently, Microsoft. Well, Hadoop and friends use the Hadoop Distributed File System (HDFS) as their underlying storage layer. Given the kind of jobs that are expected to run on top of it, HDFS is designed to store large files, and is optimised for throughput as opposed to latency.
HDFS is a single-master-server based distributed file system. Architecturally speaking, HDFS comprises of three important entities:
- Clients, who read/write files from/to the filesystem.
- Datanodes, which actually store the data (blocks) associated with the files.
- The Namenode, which is a central server that stores all the metadata associated with the files, and blocks.
This division between metadata storage and data storage is important, because typical use cases of HDFS are data intensive, and not metadata intensive. That’s fine, but the problem is, if the Namenode crashes, the entire file system becomes inoperable because clients and Datanodes still need the metadata to do anything useful. Furthermore, since the Namenode maintains all the metadata only in memory, the number of files you can store on the filesystem is directly proportional to the amount of RAM the Namenode has. As if that’s not enough, the Namenode will be completely saturated under write intensive workloads, and will be unable to respond to even simple client side queries like “ls”. Have a look at Shvachko’s paper which describes these problems at great length and depth, on which we’ve based our work.
Long story short, the needs of the hour are:
- High availability for the Namenode, i.e, no single point of failure.
- Horizontal scalability for the Namenode, i.e, to handle heavier loads, one would need to only add more Namenodes to the system than having to upgrade a single Namenode’s hardware.
Our solution
In order to recover from crashes, the Namenode maintains a journal of all changes that it makes to the metadata. This pretty much involves logging every operation made to disk, and there is quite a huge piece of code related to this as well. However, the database community has been doing journaling, checkpointing and replicated storage since quite a while. So if you haven’t guessed our solution yet, here it is:
“Move all of the Namenode’s metadata storage into an in-memory, replicated, share-nothing distributed database.”
In short, Namenodes themselves are reduced to a stateless frontend to the database, and fetch state into memory only when required. This comes with the added advantage of being able to have multiple stateless Namenodes for the same filesystem namespace. We chose MySQL Cluster as our database because of its wide spread use and stability. So for the filesystem to scale to a larger number of files, one needs to add more MySQL Cluster Datanodes, thus moving the bottleneck from the Namenode’s RAM to the DB’s storage capacity. For the filesystem to handle heavier workloads, one needs to add only more Namenode machines and divide the load amongst them. Another interesting aspect is that if a single Namenode machine has to reboot, it needn’t fetch any state into memory and will be ready for action within a few seconds (although it still has to sync with Datanodes). Another advantage of our design is that the modifications will not affect the clients or Datanodes in anyway, except that we might need to find a way to divide the load among the Namenodes.
How we did it
We first dissected all the internal protocols being used in HDFS, i.e, the client-Namenode, Namenode-Datanode, and client-Datanode protocols. Next, we stripped out all the Namenode code that we didn’t need. This was pretty much the code related to journaling, checkpointing, the secondary Namenode and so forth.
Next, we identified the key data structures we needed to move to the DB. We picked the two most memory intensive data-structures to migrate first: the Inodes, and the Blocks.
Since we were heavily time constrained (three months to deliver the project and the report), we decided to focus on functional correctness first, and then optimise later. So the easiest course of action seemed to be to modify the lowest levels of the call chain, replacing reads/writes from/to memory with query, insert, update and delete operations on the DB. We developed two helper classes, one each for Inodes and Blocks, and interfaced with the DB through these methods. We used the ClusterJ connector to talk to MySQL. This obviously meant that we needed a flat row representation for Inodes and Blocks in the DB, and we had some other problems to think of as well on the way. How do we index Inodes? How do we index Blocks? What about Triplets?
All in all, we tackled the problem of scaling the Namenode with a set of design decisions which we later found to be consistent with Shvacko’s update paper on the Namenode’s scalability, except that he suggests using HBase as the DB.
Current status
- Multiple stateless Namenodes run merrily, which store Inode and Block related metadata in MySQL Cluster. As a validation test, Clients can do an “ls” query to any Namenode and see a consistent view of the filesystem regardless of which Namenode updated the DB with the content.
- We’re trying to ensure functional correctness using the HDFS unit tests. We got the most important ones to pass, and decided to keep some more bug fixing until later because we needed to evaluate the system as part of the course.
- We’ve been evaluating the system using the Synthetic Load Generator. Horizontal scalability has been clearly achieved; adding more Namenodes improves the average number of operations per second for different work loads. With write intensive work loads, the scalability is linear in terms of total operations/sec that are executed.
Current limitations
Obviously, our work isn’t rainbows and sunshine; there’s a long way to go. Here’s what we don’t have yet and are currently addressing:
- Performance improvements. With a single load-generator thread throwing requests at the Namenode, we’re within a 10th of the original Namenode’s performance because read/writes from/to memory now go over a network to a database cluster (which is OK, I guess). But with more LoadGen threads, we’re experiencing a hefty bottleneck, which I’ll describe in the next point.
- The Namenode isn’t fully stateless yet. The most important data structures we’re yet to move are the DatanodeDescriptor entities and the file leases. There’ll surely be more, but these are the most crucial ones. Once full statelessness is achieved, we can eliminate the read-write locks in the code which are absolutely not needed any more in our implementation (the Namenode currently uses a multiple-reader-single-writer concurrency model). Profiling experiments indicated that the Namenode spends around 70% of its time waiting to acquire write locks. If we keep the Namenode fully stateless, we can wrap FSNamesystem operations into Database transactions which can be batched, and let MySQL cluster handle serialisability for us (which can handle write-heavy transactions really well). We can even break away from the single-writer model that the Namenode currently uses. Will this lead to a higher throughput for write operations than the original Namenode? Maybe.
- Clients and Datanodes have to be statically partitioned as of now (it sufficed for our evaluations). We need a way for them to pick a random Namenode to perform their operations with.
Talk is cheap, show me the code!
The code is publicly available here for thy scrutiny. You’ll also need to have a MySQL cluster setup in order to test this (we have a hard coded set of defaults in DBConnector.java which you can politely ignore).
Here’s our presentation on it as well. We’ve dubbed our prototype implementation KTHFS (because we’re students at KTH, but yes, no points for creativity on that one).
Future work
As an academic obligation, here’s future work (and no I’m not going to write stuff we’ll never do).
One member (not me) from the team will be continuing this work as part of his Masters thesis, and plans to address the above mentioned limitations as part of his work. I’ll try to contribute as well during my free time (what are weekends for anyway?). Let’s see how this goes.
Magic Numbers in Distributed Systems
I’m officially done with the first half of my masters as of now, and it’s been a fun swim so far in the violent sea of distributed systems, which by the way, is a really big zoo where anything can go wrong. To quote Leslie Lamport:
A distributed system is one in which the failure of a computer you didn’t even know existed can render your own computer unusable”
Thus, researchers over the last few decades have gone through great lengths to design distributed algorithms that ensure correct behaviour in the light of node failures and random activity by other processes. But if you’re a student and you’re going through these algorithms, it’s not entirely obvious what some of the magic numbers mean at times, and what the intuition is behind them that ensures that an algorithm works correctly. This post is meant to help in that direction. As per distributed systems convention, N refers to the number of processes in the system and F refers to the number of failures that can be tolerated.
- > N/2
This is a simple majority “quorum”. There are many algorithms that exploit the fact that two quorums overlap in at least one process. Let’s take an example to illustrate this. Consider a system with N processes, serving as a distributed replicated database. There is a single process writing to the database and there can be more than one reader. One approach to doing this would be to have the writing process write to at leastreplicas. If the reading process reads from at least
nodes, there will be at least one replica which has seen the latest write. This can used in scenarios where only less than half the nodes can fail. Some examples include Majority ACK Uniform Reliable Broadcast, Majority Voting Regular Register, and Read Impose Write Majority (1,N) Atomic Registers.
- > (N+F)/2
Referred to as “Byzantine Quorum”, this one’s a bread and butter number for many Byzantine Fault Tolerant algorithms, which are algorithms that tolerate arbitrary failures. This means that the distributed algorithm produces a correct result in spite of having processes that either do not respond, send out garbage values and so forth. The behaviour can be either due to faulty hardware, software, or due to control by a malicious user. The essence of the number itself is that any two sets of sizeoverlap in at least one correct process. This is valid only when N > 3F (you can learn the “why” behind this from the Byzantine Generals Problem). Example algorithms: Authenticated Data Byzantine Quorum based Regular Register and Byzantine Randomised Consensus.
- > (N+2F)/2
This is known as a Byzantine Masking Quorum. The intuition here is that any two quorums of sizewill overlap in 2F + 1 processes, thus ensuring that you have a worst case split up of F+1 correct processes and F faulty ones. Note that N > 4F is necessary for this condition to be true.This is particularly useful in the context of Byzantine Registers, and can be seen in algorithms that implement Byzantine Safe Registers. Let’s take the example of our distributed replicated database that we’d discussed above, but with byzantine faults. The writing node attempts to write to all nodes and returns from the write upon receiving ACKs from
replicas. A node that needs to read from the replicated servers reads from any
nodes, and only picks a value if it has more than F occurrences in the set that was read (the Byzantine process can return garbage values upon the read request). The
guarantees that there would be at least F+1 correct processes in such a set so that reads that are not concurrent with a write will for sure give you F+1 correct and up-to-date values.
- > F
This one is pretty straightforward, and you begin seeing this as soon as you’re dealing with algorithms wherein processes crash. If you receive a message from at least F+1 processes, then you’re sure that you’ve received the message from at least one correct process (a process that will not fail). - > 2F
Follows from the previous one, this ensures that if you receive 2F + 1 messages, you’re sure that more than one correct processes is part of this set. This is particularly seen in Byzantine Algorithms where making a decision based on > F is risky. For instance, consider situations where uniform agreement is required. One such example is that of Leader Election in a distributed system. For correct functioning of the system, it is entirely necessary that all nodes have the same view of who the leader is. The algorithm fails if two nodes have picked two different nodes as their respective leaders, and in the presence of byzantine faults, this is quite possible. Consider the Rotating Byzantine Leader Detection algorithm wherein nodes can choose to “complain” about the current leader under the suspicion of being byzantine. A node decides that it should shift to the next leader when it hears at least > 2F complaints. Here’s why we can’t use > F alone over here. Let’s assume our system should tolerate one byzantine fault (F = 1). Assume, that a process P incorrectly suspects the current leader (who is legitimate) of being byzantine and broadcasts a “complaint” message. Now a byzantine node in the same system can go against the protocol, and send process P a “complaint” message, at which point process P now has > F (2 > 1) complaints about the current leader, causing it to change leaders. Since the byzantine process sent the complaint message only to process P and not the other processes in the system (who didn’t find anything strange about the leader in order to complain), we now have process P with a leaderand the remaining processes at leader
such that
. Byzantine process wins. On the other hand, if you rely on the 2F + 1 magic number, you’re sure that there are at least F+1 correct entries in your set.
- > N-F
In algorithms where faulty nodes have the liberty of “not responding” (crash-stop processes for instance), N-F is the lowest number of responses you can hope to get if you ping all processes in the system. In byzantine scenarios, a set of N-F processes contains at least N-2F correct processes (F responses out of N-F can be corrupt). This number can be seen in many algorithms which require uniform agreement, one example being Byzantine Randomized Consensus.
That’s it for now. Do let me know if there are any errors above or any important information I’ve missed out on.
Paper review: Adaptive Offloading for Pervasive Computing
My friend Marcus recently suggested a good idea to keep track of papers that we read by writing a publicly available review about the work. So here’s the first in a series of such posts.
Title: Adaptive Offloading for Pervasive Computing
Authors: Xiaohui Gu and Klara Nahrstedt (UIUC), Alan Messer, Ira Greenberg, and Dejan Milojicic (HP Labs)
Motivation: Certain applications have high memory requirements, and thus cannot be easily run on resource constrained devices like mobile devices which are an essential component of pervasive computing environments. In lieu of such constraints, the authors propose a scheme wherein the deployment of such applications on mobile devices is made possible by “offloading” objects in the code (the paper assumes an object oriented language like Java or C#) onto a network-nearby device, hereafter referred to as the surrogate. This needs to be achieved by keeping the application completely oblivious to what’s happening underneath.
The problem: when to trigger an offload, and which objects to offload.
Assumptions in the paper: Object oriented languages need to be used. High speed wireless link required.
Summary:
The core of the work involves describing each Java program as a graph of classes called the Application Execution Graph (AEG). Classes are chosen as the basic unit for representing an application because: 1) Classes map directly to interactions in the system, 2) classes allow more precise/fine-grained decision making for the offloading process (while this line isn’t explained clearly, I believe it has to do with the next point) 3) The said interactions are easier to represent with classes than with several thousand Java objects.
The graph of classes is to be partitioned into two chunks, one of which will remain on the device, and will be locally referenced, whereas the remaining chunk will be offloaded onto another device called the surrogate, usually a desktop or some non-resource-constrained device. Objects on the surrogate will be referenced using remote object invocations. This partitioning will be transparent to the application itself. This means that the Java application would be completely oblivious to the physical locations of the objects that it’s dealing with, but the underlying VM will perform this partitioning, and will use local or remote invocations as would be the case. The VM used for the work was HP’s Chai JVM.
The AEG has weights for the nodes, and the edges. Weights are assigned to the nodes based on the access frequency of the class, the memory size of an instance of the class, the current location of the object, and whether the object _has_ to be on the device (device specific classes like a touchscreen reader for instance, which make no sense on the surrogate). The last property is called the IsNative property.
The partitioning of the AEG is performed using a min cut algorithm, using the weights described above as a parameter for deciding the cut itself. Since determining the min-cut of a graph is an NP-Complete problem, the algorithm produces several possible min cut partitions, and maintains a set of such partitions. In run-time, one of these partitions is picked, depending on what is felt to be most optimal, given the particulars of the constraints in that scenario. All classes which have the IsNative property set to true, are bound to the device and will _not_ be offloaded to the surrogate.
The architecture of the proposed solution is decomposed into a form wherein resource constraints are expressed using Fuzzy Logic. The fuzzy values like “low” and “high” depend on the application itself. Each application can have its own rules for the partitioning of the AEG. Depending on the rules and the current status of resource availability (like bandwidth and memory), we can pick the appropriate partition of the AEG that was generated in the step above. The partitioning is performed in a timely manner such that this computation needn’t be performed when an offload has to be initiated, and we already have a partition available when we hit the resource limit.
The solution was evaluated in a real setting (for a change!) and tools like Dia, Biomer and Java Notes was run. I won’t be elaborating much on the evaluation itself, as it is clearly explained in the paper. I don’t consider it very strong, as it does not explore cases where bandwidth was constrained, but it’s still better than some other evaluations out there.
Weaknesses in the work: Surrogates cannot be migrated on the fly, and this restricts how _far_ the mobile client can stray away from the surrogate. Furthermore, hacking the VM to achieve this may impact portability of the code, but this is much better than having to re-write applications.
Euro Trip!
A few days ago, I lived a part of my life which I like to call Happiness. Yes, I was finally accepted into the European Masters in Distributed Computing programme and that too with a Category A Erasmus Mundus scholarship. This programme is being offered by the consortium of universities formed by Royal Institute of Technology (KTH), Sweden; Instituto Superior Técnico (IST), Portugal and Universitat Politècnica de Catalunya (UPC), Barcelona. My study track involves being in IST, Portugal for the first year; KTH, Sweden during my 3rd semester and finally having my Master’s Thesis evaluated from IST, Portugal. Furthermore, my good friend Navaneeth has been offered the same course _and_ the same study track as well, to the relief of our parents.
So here’s how the python list of my life looks like:
life.journey = ['Tehran, Iran', 'Calicut, India', 'Doha, Qatar', 'Palakkad, Kerala', 'Jaipur, India']
life.journey.append ('Lisbon, Portugal')
life.journey.append ('Stockholm, Sweden')
Guess it’s time for a Euro trip then eh?
