Monday, April 29, 2013

HBase row keys design rules


Monolitical increasing rowkeys are bad.

Since if you have a heavy load of events they all will be located in single region, Then the region will get split. but new arrived entries will be still in only 1 of the split regions.And so on… so – think about good destribution that will not cause to region disbalance.
(Next 2 have improvments in latest HBase version  - look here Data Block Encodings

Use shortest column family names as possible.

This is because of KeyValue structure.Names are saved inside the strucutre.

Use shortest row keys as possible.

This is because of KeyValue structure.Names are saved inside the strucutre.

Construct row key in proper way and check the byte array what is getting constructed.

Examlpe:
image
When result is :
image
As you can see – result arrays are different.
Rowkeys are scoped to ColumnFamilies. Thus, the same rowkey could exist in each ColumnFamily that exists in a table without collision

Always remember the ASCII table to see correct order of the character,.

asciifull
Accroding to the ASCII table you can also see that between numerics [0-9] are followed by [A-Z] and only then you have set of [a-z]. Remember is when you split tables.
Better when its possible use only capital letters in key.

Sunday, April 28, 2013

10 Must know things about HBase performance

Facts your should know about Hbase

1. Write in Hbase takes  - 1-3 ms


2. Read takes 0-3ms if data is cached, 10-30ms if not cached and Hbase reads it from disk.


3. Its preffered to work with 3mb cell sizes


4. Define region max size as bigger as possible to avoid split. Controlled by property hbase.hregion.max.filesize 


6. You can cancel major compaction by hbase.hregion.majorcompaction=and run in manually.


7. Know to define cache size of the client by hbase.client.scanner.caching. and setBatch  for  columns limitation.


8. Know to limit Map reduce task:

mapred.tasktracker.map.tasks.maximum and  mapred.tasktracker.reduce.tasks.maximum.

9. Use property hfile.block.cache.size  to define percent of your heap for block caching. Default - 0.25. Im using 0.5.

10. Enable Data copression - Use snappy or LZO.



Saturday, April 27, 2013

HBase - Scan Best practice

What shall we know for efficent scan commands creation?

1. Example code:


Scan scan1 = new Scan();
ResultScanner scanner1 = table.getScanner(scan1);
for (Result res : scanner1) {
System.out.println(res);
}
scanner1.close();

2. When iterating ResultScanner we execute next() method.

3. Each call to next() will be a separate RPC for each row—even when you use the
next(int nbRows) method, because it is nothing else but a client-side loop over
next() calls

4. Would make sense to fetch more than one row per RPC if possible. This is called scanner caching and is disabled by default. 

5. Caching improves performance but impacts memory, since sing row can be constucted of hundreds columns and they will be fetched.And this should fit into client process.  Batching feauture limits number of fetched columns per bulk.


6. Improved example

Scan scan = new Scan();
scan.setCaching(caching);
scan.setBatch(batch);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner)

Example of fetches and RPCs ( from book :HBase,Defenitive Guide)


7. Bring only relevant information instead of full table.(column families/columns etc)

8.When performing a table scan where only the row keys are needed (no families,
qualifiers, values, or timestamps), add a FilterList with a MUST_PASS_ALL operator
to the scanner using setFilter().
The filter list should include both a FirstKeyOnlyFilter and a KeyOnlyFilter instance Using this filter combination will cause the region server to onlyload the row key of the first KeyValue (i.e., from the first column) found and return it to the client, resulting in minimized network traffic.


9. Make surethe input Scan instance  has setCaching() set to something
greater than the default of 1. Using the default value means that the map task will
make callbacks to the region server for every record processed. Setting this value
to 500, for example, will transfer 500 rows at a time to the client to be processed


10. You can disable WALon Put commands  by call writeToWAL(false). Just know what your are doing.The consequence is that if there is a region server failure there will be data loss

HBase client API summary

Single Put
HTable table = new HTable(conf, "testtable");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val2"));
table.put(put);


MultiPut(cache)
table.setAutoFlush(false)
table.put...
table.put...
table.put....
table.flushCommits();
Put can fail on exception. That why on catch better to do flushCommits();


MultiPut(batch)
List<Put> puts = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
puts.add(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
puts.add(put2);
Put put3 = new Put(Bytes.toBytes("row2"));
put3.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val3"));
puts.add(put3);
table.put(puts);


SingleGet
Get get = new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"));
Result result = table.get(get);


MultiGet
List<Get> gets = new ArrayList<Get>();
Get get1 = new Get(row1);
get1.addColumn(cf1, qf1);
gets.add(get1);
Get get2 = new Get(row2);
get2.addColumn(cf1, qf1);
gets.add(get2);
Get get3 = new Get(row2);
get3.addColumn(cf1, qf2);
gets.add(get3);
Result[] results = table.get(gets);


Delete(Single)
Delete delete = new Delete(Bytes.toBytes("row1"));
delete.deleteFamily(Bytes.toBytes("colfam3"));
table.delete(delete);


Delete(Multi)
List<Delete> deletes = new ArrayList<Delete>();
Delete delete1 = new Delete(Bytes.toBytes("row1"));
delete1.setTimestamp(4);
deletes.add(delete1);
Delete delete2 = new Delete(Bytes.toBytes("row2"));
delete2.deleteColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"));
delete2.deleteColumns(Bytes.toBytes("colfam2"), Bytes.toBytes("qual3"), 5);
deletes.add(delete2);
Delete delete3 = new Delete(Bytes.toBytes("row3"));
delete3.deleteFamily(Bytes.toBytes("colfam1"));
delete3.deleteFamily(Bytes.toBytes("colfam2"), 3);
deletes.add(delete3);
table.delete(deletes);


Batch
List<Row> batch = new ArrayList<Row>();

Put put = new Put(ROW2);
put.add(COLFAM2, QUAL1, Bytes.toBytes("val5"));
batch.add(put);

Get get1 = new Get(ROW1);
get1.addColumn(COLFAM1, QUAL1);
batch.add(get1);

Delete delete = new Delete(ROW1);
delete.deleteColumns(COLFAM1, QUAL2);
batch.add(delete);

Object[] results = new Object[batch.size()];
try {
table.batch(batch, results);
} catch (Exception e) {
System.err.println("Error: " + e);



Scan
Scan scan1 = new Scan();
ResultScanner scanner1 = table.getScanner(scan1);
for (Result res : scanner1) {
System.out.println(res);
}
scanner1.close();

So far, each call to next() will be a separate RPC for each row—even when you use the
next(int nbRows) method, because it is nothing else but a client-side loop over
next() calls
Thus it would make
sense to fetch more than one row per RPC if possible
Cache - number of rows
scan.setCaching(caching);
batch - number of columns
scan.setBatch(batch);

Thursday, April 25, 2013

HBase - how it stores data?


Tables in HBase can scale to billions of rows and millions of columns. The size of each table can run into terabytes and sometimes even petabytes.Tables are split into smaller chunks that are distributed across multiple servers.These smaller chunks are called regions Servers that host regions are called RegionServers.

The master process does the distribution of regions among RegionServers, and each RegionServer typically hosts multiple regions


Region assignment happens when regions split (as they grow in size), when RegionServers die, or when new RegionServers are added to the deployment

Two special tables in HBase, -ROOT- and .META., help find where regions for various tables are hosted

When a client application wants to access a particular row, it goes to the -ROOTtableand asks it where it can find the region responsible for that particular row. -ROOT- points it to the region of the .META. table that contains the answer to that question. The .META. table consists of entries that the client application uses to determine which RegionServer is hosting the region in question
The interaction happens as following :


Exploring the metadata:

In hbse shell:
zk_dump

Active master address: localhost,37150,1332994662074 - Master
Region server holding ROOT: localhost,41893,1332994662888 - the ROOT holder
Region servers:
localhost,41893,1332994662888 - Region servers



hbase(main):003:0> scan '-ROOT-'
ROW                                                         COLUMN+CELL                                                                                                                                                                    
 .META.,,1                                                  column=info:regioninfo, timestamp=1366823396043, value={NAME => '.META.,,1', STARTKEY => '', ENDKEY => '', ENCODED => 1028785192,}                                             
 .META.,,1                                                  column=info:server, timestamp=1366896097668, value=illin793:60202                                                                                                              
 .META.,,1                                                  column=info:serverstartcode, timestamp=1366896097668, value=1366896069920                                                                                                      
 .META.,,1                                                  column=info:v, timestamp=1366823396043, value=\x00\x00                                                                                                                         
1 row(s) in 0.0250 seconds

hbase(main):004:0> 

hbase(main):004:0> scan '.META.'
ROW                                                         COLUMN+CELL                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
 test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7 column=info:regioninfo, timestamp=1366892381710, value={NAME => 'test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7.', STARTKEY => '', ENDKEY => 'aR1', ENCODED => e0d8
 .                                                          f83583650b2662832ea48873d6f7,}                                                                                                                                                 
 test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7 column=info:server, timestamp=1366896098296, value=illin793:12200                                                                                                              
 .                                                                                                                                                                                                                                         
 test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7 column=info:serverstartcode, timestamp=1366896098296, value=1366896064271                                                                                                      
 .                                                                                                                                                                                                                                         
 test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202 column=info:regioninfo, timestamp=1366892381698, value={NAME => 'test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202bb2.', STARTKEY => 'aR1', ENDKEY => '', ENCODED => 6
 bb2.                                                       d8a82416e782463f3ebe813f9202bb2,}                                                                                                                                              
 test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202 column=info:server, timestamp=1366896098041, value=illin793:60203                                                                                                              
 bb2.                                                                                                                                                                                                                                      
 test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202 column=info:serverstartcode, timestamp=1366896098041, value=1366896071092                                                                                                      
 bb2.                                                                                                                                                                                                                                      

You can see the split of the table to 2 regions.

Investigating from JavaClient API:
By giving specific table the code shows the region servers , its regions ranges taht every and each holds and entries for those regions:
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "10.232.230.11");
conf.set("hbase.zookeeper.property.clientPort","2183");
        String tableName="XXXXXXXXXX";
        System.out.println("Table details:");
HTable investTable = new HTable( conf, tableName);
NavigableMap<HRegionInfo,ServerName> map = investTable.getRegionLocations();
Set<Entry<HRegionInfo,ServerName>> resultSet = map.entrySet();
for (Entry<HRegionInfo,ServerName> entry :resultSet)
{
System.out.println(entry.getKey().getRegionNameAsString()+ " [ ----]  " +  entry.getValue().getServerName());
System.out.println("Start key : " + new String(entry.getKey().getStartKey())+ " [-----]  " + "End key : " + new        String(entry.getKey().getEndKey()));
Scan scanner = new Scan(entry.getKey().getStartKey(),entry.getKey().getEndKey());
ResultScanner resulScanner = investTable.getScanner(scanner);
int counter = 0;
for (Result result : resulScanner)
{
System.out.println(counter + " : " +result);
counter++;
}
}
Note:From book: "Hadoop in Action".

HDFS Java API - Tutorial

In this tutorial i will list basic HDFS needed comamnds like:

Connectin to the filse system,creating directory, copy /delete/create files etc.

1. Connecting to HDFS file system:
Configuration config = new Configuration();
config.set("fs.default.name","hdfs://127.0.0.1:9000/");
FileSystem dfs = FileSystem.get(config);


2. Creating directory

Path src = new Path(dfs.getWorkingDirectory()+"/"+"rami");
dfs.mkdirs(src);


3. Delete directory or file:

Path src = new Path(dfs.getWorkingDirectory()+"/"+"rami");
Dfs.delete(src);


4. Copy files from local FS o HDFS and back:

Path src = new Path("E://HDFS/file1.txt");
Path dst = new Path(dfs.getWorkingDirectory()+"/directory/");
dfs.copyFromLocalFile(src, dst);

Or Back :
dfs.copyToLocalFile(src, dst);

Note : destination should be a path object that contains the directory to copy the source fiel to.
Source should be a path object that contains path to the file including thefile itself.


5.Create file:

Path src = new Path(dfs.getWorkingDirectory()+"/rami.txt");
dfs.createNewFile(src);


6. Reading file:

Path src = new Path(dfs.getWorkingDirectory()+"/rami.txt");
FSDataInputStream fs = dfs.open(src);
String str = null;
while ((str = fs.readline())!= null)
{
System.out.println(str);
}

7.Writing file:

Path src = new Path(dfs.getWorkingDirectory()+"/rami.txt");
FSDataOutputStream fs = dfs.create(src);
byte[] btr = new byte[]{1,2,3,4,5,6,7,8,9};
fs.write(btr);
fs.close();

Tuesday, April 23, 2013

HBase put methods for multithreading environment

According to some interesting aricle i found on Hbase 0.20 they got following conclusion

1. Write performances seems to be good when lesser threads are used for writing.
2.  All 3 variants seems to perform more or less similarly.  In our application, we use HTablePool based approach and its nice to see that it performs equally nice.  Another thing which I observed during my tests is that pool size may not have much role to play.  With pool size of 10 too, the results are more or less similar.
3. As number of threads increase, the write performance deteriorates.  A potential reason for this behavior is that HBase opens only one connection per region server from a single JVM.  So, will increasing the number of JVMs help in horizontally scaling the writes?

It would be interesting reruning those tests on 0.94 and see what happens now with the performence.
Thats probably what i will do in upcomming days.

Link: http://www.srikanthps.com/2011/06/hbase-benchmarking-for-multi-threaded.html

Monday, April 22, 2013

HBase internals

Tables in HBase are organized into rows and columns.HBase treats columns a little differently than a relational database. Columns inHBase are organized into groups called column families.

Architecture

Table—HBase organizes data into tables. Table names are Strings and composed of characters that are safe for use in a file system path.
Row—Within a table, data is stored according to its row. Rows are identifieduniquely by their rowkey. Rowkeys don’t have a data type and are always treated as a byte[].
Column family—Data within a row is grouped by column family. Column families also impact the physical arrangement of data stored in HBase. For this reason, they must be defined up front and aren’t easily modified. Every row in a tablehas the same column families, although a row need not store data in all its families.Column family names are Strings and composed of characters that are safe for use in a file system path.
Column qualifier—Data within a column family is addressed via its column qualifier,or column. Column qualifiers need not be specified in advance. Column qualifiers need not be consistent between rows. Like rowkeys, column qualifiers don’t have a data type and are always treated as a byte[].
Cell—A combination of rowkey, column family, and column qualifier uniquelyidentifies a cell. The data stored in a cell is referred to as that cell’s value. Values also don’t have a data type and are always treated as a byte[].
Version—Values within a cell are versioned. Versions are identified by their timestamp, a long. When a version isn’t specified, the current timestamp is used asthe basis for the operation. The number of cell value versions retained by HBase is configured via the column family. The default number of cell versions is three.
 

Commands :

How Write Works

Hbase recieves the command and writes the change to 2 destinations :
1] Write-Ahead log (WAL) - HLog.
2] Memstore - which is written to disk in form of Hfile - every flush new file is created.
HFile belong to column family 1 mem store per column family
If failure occurs - data can be recovered from WAL which is single per server shared by all You can disable WAL but failover will be disabled for the command. If system crushes - you loose you data.

How Read Works:

HBase has an LRU cache for reads – BlockCache. BlockCache is designed to keep frequently accessed data from the HFiles in memory so as to avoid disk reads. Each column family has its own BlockCache .The “Block” in BlockCache is the unit of data that HBase reads from disk in a single pass
Checks the memstore for pending modifictations that were not flushed. Acess CacheBlock if the block contains the row what we want to access Then goes to Hfile to load data
Note: Data of some row contains over all Many HFiles.

 

How delete works

The Delete command marks the record for deletion .Indicates that nor scan or get should return those values Hfiles are immutables, means never updated unitll Major compaction occurs
 

Compaction


Minor an Major compaction

A minor compaction folds HFiles together, creating a larger HFile from multiple smaller Hfiles HBase reads the content of the existing HFiles, writing records into a new one. Then, it swaps in the new HFile as the current active one and deletes the old ones that formed the new One.Minor compactions are designed to be minimally detrimental to HBase performance,S o there is an upper limit on the number of HFiles involved. All of these settings are configurable.
When a compaction operates over all HFiles in a column family in a given region, it’scalled a major compaction. Upon completion of a major compaction, all HFiles in the  column family are merged into a single file. Major compactions can also be triggered for the entire table (or a particular region) manually from the shell. This is a relatively expensive operation and isn’t done often. Minor compactions, on the other hand, are relatively lightweight and happen more frequently. Major compactions are the only chance HBase has to clean up deleted records. Resolving a delete requires removing both the deleted record and the deletion marker. There’s no guarantee that both the record and marker are in the same HFile. A major compaction is the only time whenHBase is guaranteed to have access to both of these entries at the same time.

image
image
All examples and details taken from book : Hbase in Action.



















Sunday, April 21, 2013

Moving data into Hadoop


If you  want to push all of your production server system log files into HDFS use  Flume



Apache Flume is a distributed system for collecting streaming data. It’s an Apache
project in incubator status, originally developed by Cloudera. It offers various levels of
reliability and transport delivery guarantees that can be tuned to your needs. It’s
highly customizable and supports a plugin architecture where you can add custom
data sources and data sinks.




Link:

If you need to automate the process by which files on remote servers are copied into HDFS use HDFS File slurper.
Feautures

  • After a successful file copy you can either remove the source file, or have it moved into another directory.
  • Destination files can be compressed as part of the write codec with any compression codec which extendsorg.apache.hadoop.io.compress.CompressionCodec.
  • Capability to write "done" file after completion of copy
  • Verify destination file post-copy with CRC32 checksum comparison with source
  • Ignores hidden files (filenames that start with ".")
  • Customizable destination via a script which can be called for every source file. Or alternatively let the utility know a single destination directory
  • Customizable pre-processing of file prior to transfer via script and all files are copied into that location.
  • A daemon mode which is compatible with inittab respawn
  • Multi-threaded data transfer

Link:

If you want to automate peredioc tasksfor downloading  content from an HTTP server into HDFS use:
Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
Link:
http://oozie.apache.org/
If you want to import relational data using MapReduce use DBInputFormat class


You can do the same using scoop
http://sqoop.apache.org/

If you want moving data from HBase to HDFS you can use Export class of HBase


$ bin/run.sh org.apache.hadoop.hbase.mapreduce.Export \
stocks_example \ - table name
output - directory

Or specific column family:


$ bin/run.sh org.apache.hadoop.hbase.mapreduce.Export \
-D hbase.mapreduce.scan.column.family=details \
-D mapred.output.compress=true \
-D mapred.output.compression.codec=\
org.apache.hadoop.io.compress.SnappyCodec \
stocks_example output



The Export class writes the HBase output in the SequenceFile format, where the HBase
rowkey is stored in the SequenceFile record key using org.apache.hadoop.hbase
.io.ImmutableBytesWritable, and the HBase value is stored in the SequenceFile record
value using org.apache.hadoop.hbase.client.Result.

Now its time to move to HDFS ( example of Stock records move):


import static com.manning.hip.ch2.HBaseWriteAvroStock.*;
public class HBaseExportedStockReader {
public static void main(String... args) throws IOException {
read(new Path(args[0]));
}


public static void read(Path inputPath) throws IOException {
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(conf);
       SequenceFile.Reader reader =new SequenceFile.Reader(fs, inputPath, conf);
       HBaseScanAvroStock.AvroStockReader stockReader =
     new HBaseScanAvroStock.AvroStockReader();
      try {
     ImmutableBytesWritable key = new ImmutableBytesWritable();
     Result value = new Result();
     while (reader.next(key, value)) {
          Stock stock = stockReader.decode(value.getValue(
           STOCK_DETAILS_COLUMN_FAMILY_AS_BYTES,
           STOCK_COLUMN_QUALIFIER_AS_BYTES));
         System.out.println(new String(key.get()) + ": " +ToStringBuilder.reflectionToString(stock,ToStringStyle.SIMPLE_STYLE));
}
} finally {
reader.close();
}
}
}


Saturday, April 20, 2013

Serialization of data for NoSQL

In order to serialzie requests or data that will be stored we shall use some serialization system.
After deep investigation and comparison of stability,serialization/deserialiation engine strucutre
Following was found :

Protocol buffer https://code.google.com/p/protobuf/
Avro - http://avro.apache.org/
Thrift -
http://thrift.apache.org/
http://diwakergupta.github.io/thrift-missing-guide/

Main comparison conclusion
1. Protobuff and Thrift need code generation, when Avro doesnt need it. Metadata simply defined on both sides.In addition to that, because of metadata file there is no need to sign datatypes inside of the buffer..

2. Thrift vs protocol buffer - The difference on benchmark results are in nano seconds for ser/deser times and storage difference is around 0.05%

Comparison links :
https://code.google.com/p/thrift-protobuf-compare/wiki/Benchmarking


NoSQL and BigData Books

In order to get into Big Data technologies i would recommend following books


For Non Computer science newbies
Head First : Java
DataBase systems : Practical approach

For Computer Science newbies
Seven databases in seven weeks
Proffessional NoSQL

Hadoop
Hadoop - The Definitive Guide
Hadoop in practice
Hadoop mapreduce cookbook
Hadoop operations
Hadoop real-world solutions cookbook
Mapreduce design_patterns

HBase
HBase - The.Definitive Guide
Hbase in action

Hive
Programming hive

Pig
Programming pig


Links:
Contact me if books are needed

Thursday, April 18, 2013

Zookeeper Summary

ZooKeeper is a distributed, open-source coordination service for distributed applications

naming.Its used for synchronization, configuration maintenance, and groups and


ZooKeeper allows distributed processes to coordinate with each other
through a shared hierarchal namespace which is organized similarly to a standard file system.


 Like the distributed processes it coordinates, ZooKeeper itself is
intended to be replicated over a sets of hosts called an ensemble


The name space provided by ZooKeeper is much like that of a standard file system. A name
is a sequence of path elements separated by a slash (/). Every node in ZooKeeper's name
space is identified by a path

So basically what happens here is that you create a tree which can be updated from any client of the system and the model is shared and all the actions are ordered.
For example if you want to implement destributed Queue - you create a znode, and put to it data with related paths, fro other side you retrieve those values and remove them


API:
Zookeeper Javadoc


General architecture

Every ZooKeeper server services clients. Clients connect to exactly one server to submit
irequests. Read requests are serviced from the local replica of each server database. Requests
that change the state of the service, write requests, are processed by an agreement protocol.
As part of the agreement protocol all write requests from clients are forwarded to a single
server, called the leader. The rest of the ZooKeeper servers, called followers, receive
message proposals from the leader and agree upon message delivery. The messaging layer
takes care of replacing leaders on failures and syncing followers with leaders.
ZooKeeper uses a custom atomic messaging protocol. Since the messaging layer is atomic,
ZooKeeper can guarantee that the local replicas never diverge. When the leader receives a
write request, it calculates what the state of the system is when the write is to be applied and
transforms this into a transaction that captures this new state.



Additional feautures

1. Watches concept  - Clients can set a watch on a znodes. A watch will be triggered and removed when the znode changes

2. Sequential Consistency - Updates from a client will be applied in the order that they were
sent.

3. Atomicity - Updates either succeed or fail. No partial results.

4. Single System Image - A client will see the same view of the service regardless of the
server that it connects to.

5. Reliability - Once an update has been applied, it will persist from that time forward until
a client overwrites the update.

6. Timeliness - The clients view of the system is guaranteed to be up-to-date within a
certain time bound.
7.  Observer - in order not o hurt write performence with many many clients use Observer nide

Observersforward these requests to the Leader like Followers do, but they then simply wait to hear the
result of the vote. Because of this, we can increase the number of Observers as much as we
like without harming the performance of votes




Architecture
Can eb run in standalone mode or replicated.A replicatedgroup of servers in the same application is called a quorum, and in replicated mode, all servers in the quorum have copies of the same configuration file


Performance
In version 3.2 r/w performance improved by ~2x compared to the previous 3.1 release

What can be implemented
Shared Barriers
Shared Queues
Shared Locks
Two face commit
etc
Documentation, tutorials and examplesZookeeper documentation


Wednesday, April 17, 2013

Double check locking

 Ss a software design pattern used to reduce the overhead of acquiring a lock by first testing the locking criterion (the "lock hint") without actually acquiring the lock. Only if the locking criterion check indicates that locking is required does the actual locking logic proceed.

It is typically used to reduce locking overhead when implementing "lazy initialization" in a multi-threaded environment, especially as part of the Singleton pattern. Lazy initialization avoids initializing a value until the first time it is accessed.



class Foo {
    private Helper helper = null;
    public Helper getHelper() {
        if (helper == null) {
            helper = new Helper();
        }
        return helper;
    }
 
    // other functions and members...
}

The problem is that this does not work when using multiple threads. A lock must be obtained in case two threads call getHelper() simultaneously. Otherwise, either they may both try to create the object at the same time, or one may wind up getting a reference to an incompletely initialized object.
The lock is obtained by expensive synchronizing, as is shown in the following example.
// Correct but possibly expensive multithreaded version
class Foo {
    private Helper helper = null;
    public synchronized Helper getHelper() {
        if (helper == null) {
            helper = new Helper();
        }
        return helper;
    }
 
    // other functions and members...
}
However, the first call to getHelper() will create the object and only the few threads trying to access it during that time need to be synchronized; after that all calls just get a reference to the member variable. Since synchronizing a method can decrease performance by a factor of 100 or higher,[3] the overhead of acquiring and releasing a lock every time this method is called seems unnecessary: once the initialization has been completed, acquiring and releasing the locks would appear unnecessary. Many programmers have attempted to optimize this situation in the following manner:
  1. Check that the variable is initialized (without obtaining the lock). If it is initialized, return it immediately.
  2. Obtain the lock.
  3. Double-check whether the variable has already been initialized: if another thread acquired the lock first, it may have already done the initialization. If so, return the initialized variable.
  4. Otherwise, initialize and return the variable.
// Broken multithreaded version
// "Double-Checked Locking" idiom
class Foo {
    private Helper helper = null;
    public Helper getHelper() {
        if (helper == null) {
            synchronized(this) {
                if (helper == null) {
                    helper = new Helper();
                }
            }
        }
        return helper;
    }
 
    // other functions and members...
}
tuitively, this algorithm seems like an efficient solution to the problem. However, this technique has many subtle problems and should usually be avoided. For example, consider the following sequence of events:
  1. Thread A notices that the value is not initialized, so it obtains the lock and begins to initialize the value.
  2. Due to the semantics of some programming languages, the code generated by the compiler is allowed to update the shared variable to point to a partially constructed object before A has finished performing the initialization. For example, in Java if a call to a constructor has been inlined then the shared variable may immediately be updated once the storage has been allocated but before the inlined constructor initializes the object.[4]
  3. Thread B notices that the shared variable has been initialized (or so it appears), and returns its value. Because thread B believes the value is already initialized, it does not acquire the lock. If B uses the object before all of the initialization done by A is seen by B (either because A has not finished initializing it or because some of the initialized values in the object have not yet percolated to the memory B uses (cache coherence)), the program will likely crash.
One of the dangers of using double-checked locking in J2SE 1.4 (and earlier versions) is that it will often appear to work: it is not easy to distinguish between a correct implementation of the technique and one that has subtle problems. Depending on the compiler, the interleaving of threads by the scheduler and the nature of other concurrent system activity, failures resulting from an incorrect implementation of double-checked locking may only occur intermittently. Reproducing the failures can be difficult.
As of J2SE 5.0, this problem has been fixed. The volatile keyword now ensures that multiple threads handle the singleton instance correctly. This new idiom is described in [4]:
// Works with acquire/release semantics for volatile
// Broken under Java 1.4 and earlier semantics for volatile
class Foo {
    private volatile Helper helper = null;
    public Helper getHelper() {
        Helper result = helper;
        if (result == null) {
            synchronized(this) {
                result = helper;
                if (result == null) {
                    helper = result = new Helper();
                }
            }
        }
        return result;
    }
 
    // other functions and members...
}

Note the usage of the local variable result which seems unnecessary. For some versions of the Java VM, it will make the code 25% faster and for others, it won't hurt.[5]
If the helper object is static (one per class loader), an alternative is the initialization on demand holder idiom [6] See Listing 16.6 on [7]
// Correct lazy initialization in Java 
@ThreadSafe
class Foo {
    private static class HelperHolder {
       public static Helper helper = new Helper();
    }
 
    public static Helper getHelper() {
        return HelperHolder.helper;
    }
}
This relies on the fact that inner classes are not loaded until they are referenced.
Semantics of final field in Java 5 can be employed to safely publish the helper object without using volatile:[8]
public class FinalWrapper<T> {
    public final T value;
    public FinalWrapper(T value) { 
        this.value = value; 
    }
}
 
public class Foo {
   private FinalWrapper<Helper> helperWrapper = null;
 
   public Helper getHelper() {
      FinalWrapper<Helper> wrapper = helperWrapper;
 
      if (wrapper == null) {
          synchronized(this) {
              if (helperWrapper == null) {
                  helperWrapper = new FinalWrapper<Helper>(new Helper());
              }
              wrapper = helperWrapper;
          }
      }
      return wrapper.value;
   }
}
The local variable wrapper is required for correctness. Performance of this implementation is not necessarily better than the volatile implementation.