Monday, November 9, 2009

HDFS High Availability

I have encountered plenty of questions about the single point of failure for the HDFS NameNode. The most common concern being that if the NameNode dies, then the whole cluster is unavailable. This means that HDFS is unsuitable for applications that need a high degree of uptime. This is not a problem when you run map-reduce jobs on HDFS, especially because a map-reduce system is a batch system and the uptime requirements of a batch system is typically not that stringent.

In recent times, I am starting to see lots of distributed applications that are using HDFS as a general purpose storage system. These applications range from multimedia servers that store mails, photos, videos, etc to systems that store updates from thousands of distributed sensors. These applications prefer HDFS because they can store a large volume of data. These applications do not use map-reduce to mine any of the information stored in HDFS. However, these applications do need consistency and availability of data. The Brewer's CAP Theorem states that most distributed systems need some tradeoffs among Consistency, Availability and Partition tolerance. HDFS's does an excellent job of providing Consistency of data at all times. Traditionally, it did not address Availability and Partition tolerance earlier. That could change to a certain extent with HDFS 0.21 release. HDFS 0.21 has a new entity called the BackupNode that receives real-time updates about transactions from the NameNode. This is the first step in making making HDFS highly available.

Here is a slide-deck that I wanted to present at ApacheCon 2009 about the current state of affairs regarding High Availabilty with HDFS.

Monday, October 19, 2009

Hadoop discussions at Microsoft Research

I was invited to present a talk about Hadoop File System Architecture at Microsoft Research at Seattle. This is a research group and is focussed on long-term research, so it is no surprise that they are interested in knowing how a growing company like Facebook is using Hadoop to its advantage.

I met a few folks who chatted with me about how Microsoft SQL Server is being modified to handle large scala databases. These folks heartily agreed with a comment I made in my presentation that Dr. Dewitt and Dr. Stonebraker is missing the point when they are comparing performance numbers between Hadoop with traditional Database systems.... rather than comparing the scalability and fault-tolerance of these systems. I had learned some of the fundamentals of Database systems from Professor Dewitt during my graduate studies at Uiversity of Wisconsin Madison, but Dr Dewitt is a Microsoft employee now!

The fact that Facebook uses the SQL interface of Hive layered over Hadoop makes it even more interesting to Microsoft. They wanted to know the performance difference between Hive and PIG and would like to compare them to their distributed-SQL-Server software.

Here are the slides I used for my presentation.

Friday, October 2, 2009

I presented a set of slides that describes the Hadoop development at Facebook at the HadoopWorld conference in New York today. It was well received by more than 100 people. I have presented at many-a-conferences in the west coast but this is the first time I have presented at a conference in New York... there are more hadoop users here versus mostly hadoop developers in the west coast. There were plenty of questions, especially about Hadoop-Archive and Realtime-Hadoop. There were people asking me questions about HDFS Symbolic links and HDFS-scribe copier.

Earlier, I visited the university of Notre Dame to conduct a department seminar and present a guest lecture for the graduate students at the Department of Computer Science. There is plenty of interesting research being led by Prof Douglas Thain. One interesting research idea that came up was to place HDFS block replicas by analyzing HDFS access patterns. It is possible that we can provide HDFS datanode/namenode logs to researchers who can analyze these logs to come up with better algorithms for HDFS block replica placement.

Monday, September 14, 2009

HDFS block replica placement in your hands now!

Most Hadoop administrators set the default replication factor for their files to be three. The main assumption here is that if you keep three copies of the data, your data is safe. I have observed this to be true in the big clusters that we manage and operate. In actuality, administrators are managing two failure aspects: data corruption and data availability.

If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.

HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.

The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.

Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.

Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.

Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API.

Friday, August 28, 2009

HDFS and Erasure Codes (HDFS-RAID)

The Hadoop Distributed File System has been great in providing a cloud-type file system. It is robust (when administered correctly :-)) and highly scalable. However, one of the main drawbacks of HDFS is that each piece of data is replicated in three places. This is acceptable because disk storage is cheap and is becoming cheaper by the day; this isn't a problem if you have a relatively small to medium size cluster. The price difference (in absolute terms) is not much whether you use 15 disks or whether you use 10 disks. If we consider the cost of $1 per GByte, the price difference between fifteen 1 TB disk and ten 1 TB disk is only $5K. But when the total size of your cluster is 10 PBytes, then the costs savings in storing the data in two places versus three is a huge ten million dollars!

The reason HDFS stores disk blocks in triplicate is because it uses commodity hardware and there is non-negligible probability of a disk failure. It has been observed that a replication factor of 3 and the fact the HDFS aggressively detects failures and immediately replicates failed -block-replicas is sufficient to never lose any data in practice. The challenge now is to achieve an effective replication factor of 3 while keeping the real physical replication factor at close to 2! How best to do it than by using Erasure Codes.

I heard about this idea called DiskReduce from the folks at CMU. The CMU PDL Labs has been a powerhouse of research in file systems and it is no surprise that they proposed a elegant way of implementing erasure codes in HDFS. I borrowed heavily from their idea in my implementation of Erasure Codes in HDFS described in HDFS-503. One of the main motivation of my design is to keep the HDFS Erasure Coding as a software layer above HDFS rather than inter-twining it inside of HDFS code. The HDFS code is complex by itself and it is really nice to not have to make it more complex and heavyweight.

Distributed Raid File System consists of two main software components. The first component is the RaidNode, a daemon that creates parity files from specified HDFS files. The second component "raidfs" is a software that is layered over a HDFS client and it intercepts all calls that an application makes to the HDFS client. If the HDFS client encounters corrupted data while reading a file, the raidfs client detects it; it uses the relevant parity blocks to recover the corrupted data (if possible) and returns the data to the application. The application is completely transparent to the fact that parity data was used to satisfy it's read request. The Distributed Raid File System can be configured in such a way that a set of data blocks of a file are combined together to form one or more parity blocks. This allows one to reduce the replication factor of a HDFS file from 3 to 2 while keeping the failure probabilty relatively same as before.

I have seen that using a stripe size of 10 blocks decreases the physical replication factor of a file to 2.2 while keeping the effective replication factor of a file at 3. This typically results in saving 25% to 30% of storage space in a HDFS cluster.

One of the shortcoming of this implementation is that we need a parity file for every file in HDFS. This potentially increases the number of files in the NameNode. To alleviate this problem, I will enhance this implementation (in future) to use the Hadoop Archive feature to archive all the parity files together in larger containers so that the NameNode does not have to support additional files when the HDFS Erasure Coding is switched on. This works reasonably well because it is a very very rare case that the parity files are ever used to satisfy a read request.

I am hoping that this feature becomes part of Hadoop 0.21 release scheduled for September 2009!

Tuesday, July 28, 2009

Hadoop and Condor

My graduate work in the mid-nineties at the University of Wisconsin focussed on Condor. Condor has an amazing way to do process checkpointing and migrating processes from one machine to another if needed. It also has a very powerful scheduler that matches job requirements with machine characteristics.

One of the major inefficiencies with Hadoop schedulers (Fairshare and Capacity scheduler) is that they are not resource-aware. There has been some work-in-progress in this area, HADOOP-5881. Condor's ClassAds mechanism can be used to match hadoop jobs with machines very elegantly.

Here is one of my recent presentation at the The Israeli Association of Grid Technologies that talks about the synergies between Condor and Hadoop.

Monday, June 22, 2009

Hadoop at Netflix

Netflix is interested in using Hadooo/Hive to process click logs from the users of their website. Here is what I presented to them in a meeting that was well attended by about 50 engineers. Following the meeting, a bunch of engineers asked me question related to the integration of scribe and hdfs and how Facebook imports click logs into Hadoop.

Here is a copy of my presentation .. slides.

Saturday, June 6, 2009

HDFS Scribe Integration

It is finally here: you can configure the open source log-aggregator, scribe, to log data directly into the Hadoop distributed file system.

Many Web 2.0 companies have to deploy a bunch of costly filers to capture weblogs being generated by their application. Currently, there is no option other than a costly filer because the write-rate for this stream is huge. The Hadoop-Scribe integration allows this write-load to be distributed among a bunch of commodity machines, thus reducing the total cost of this infrastructure.

The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.

Making the HDFS write code path more real-time was painful. There are various timeouts/settings in HDFS that were hardcoded and needed to be changed to allow the application to fail fast. At the bottom of this blog-post, I am attaching the settings that we have currently configured to make the HDFS-write very real-timeish. The last of the JIRAS, HADOOP-2757 is in the pipeline to be committed to Hadoop trunk very soon.

What about Namenode being the single point of failure? This is acceptable in a warehouse type of application but cannot be tolerated by a realtime application. Scribe typically aggregates click-logs from a bunch of webservers, and losing *all* click log data of a website for a 10 minutes or so (minimum time for a namenode restart) cannot be tolerated. The solution is to configure two overlapping clusters on the same hardware. Run two separate namenodes N1 and N2 on two different machines. Run one set of datanode software on all slave machines that report to N1 and the other set of datanode software on the same set of slave machines that report to N2. The two datanode instances on a single slave machine share the same data directories. This configuration allows HDFS to be highly available for writes!

The highly-available-for-writes-HDFS configuration is also required for software upgrades on the cluster. We can shutdown one of the overlapping HDFS clusters, upgrade it to new hadoop software, and then put it back online before starting the same process for the second HDFS cluster.

What are the main changes to scribe that were needed? Scribe already had the feature that it buffers data when it is unable to write to the configured storage. The default scribe behaviour is to replay this buffer back to the storage when the storage is back online. Scribe is configured to support no-buffer-replay when the primary storage is back online. Scribe-hdfs is configured to write data to a cluster N1 and if N1 fails then it writes data to cluster N2. Scribe treats N1 and N2 as two equivalent primary stores. The scribe configuration should have fs_type=hdfs. For scribe compilation, you can use ./configure --enable-hdfs LDFLAGS="-ljvm -lhdfs". A good example for configuring scribe-hdfs is in a file called hdfs_example2.conf in the scribe code base.

Here are the settings for the Hadoop 0.17 configuration that is needed by an application doing writes in realtime:

ipc.client.idlethreshold
10000
Defines the threshold number of connections after which
connections will be inspected for idleness.
ipc.client.connection.maxidletime
10000
The maximum time in msec after which a client will bring down the
connection to the server.
ipc.client.connect.max.retries
2
Indicates the number of retries a client will make to establish
a server connection.
ipc.server.listen.queue.size
128
Indicates the length of the listen queue for servers accepting
client connections.
ipc.server.tcpnodelay
true
Turn on/off Nagle's algorithm for the TCP socket connection on
the server. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
ipc.client.tcpnodelay
true
Turn on/off Nagle's algorithm for the TCP socket connection on
the client. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
ipc.ping.interval
5000
The Client sends a ping message to server every period. This is helpful
to detect socket connections that were idle and have been terminated by a failed server.
ipc.client.connect.maxwaittime
5000
The Client waits for this much time for a socket connect call to be establised
with the server.
dfs.datanode.socket.write.timeout
20000
The dfs Client waits for this much time for a socket write call to the datanode.
ipc.client.ping
false
HADOOP-2757



Thursday, May 28, 2009

Report from my visit to the Berkeley RAD Labs

I went to attend the UC Berkeley RAD Lab Spring Retreat held at Santa Cruz. This Lab has about 30 Phd students and the quality of their work really impressed me a lot. Most of their work is based on research on distributed systems. There were many students who were working with Hadoop and it is amazing to see Hadoop being the core of so much research activity... when the Hadoop project started three years back, I definitely did not imagine that it will get to this state!

I had read David Patterson's papers during my graduate studies at Univ of Wisconsin Madison, and it was really nice to be able to meet him in person. And the group of students that he leads at the RAD labs is of very high calibre. Most people must have already seen the Above the Cloud whitepaper that the RAD Lab has produced. It tries to clear up the muddle on what Cloud Computing really is, its benefits and its possible usage scenarios. A good read!

A paper titled Detecting Large Scale System Problems by Mining Console Logs talks about using application logs from a distributed application to detect bugs, problems and anomalies in the system. They provide an example whereby they process 24 million log lines produced by HDFS to detect a bug (page 11 of the paper). I am not really convinced about the bug, but this is an effort in the right direction.
My employer Facebook is an enabler for research in distributed systems. To this effect, Facebook has allowed researchers from premier Universities to analyze Hadoop system logs. These logs typically record machine usage and Hadoop job performance metrics. The hadoop job records are inserted into a MySQL database for easy analysis (HADOOP-3708). Some students at the RAD Labs have used this database to prove that Machine Learning techniques can be used to predict Hadoop job performance. This is a paper that is not yet published. There is another paper that analyzes the performance characteristics of the Hadoop Fair share scheduler. Most of the abstract of these publications are available here.

Last, but not the least, SCADS is a scalable storage system that is specifically designed for social networking type software. It has a declarative query language and supports various consistency models. It supports a rich data model that includes joins on pre-computed queries.

Sunday, May 24, 2009

Better Late than Never

For quite a while, I have been thinking on blogging about Hadoop in general and Hadoop distributed file system (HDFS) in particular. Why, you may ask?

Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.

Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well.