Saturday, April 27, 2013

HBase - Scan Best practice

What shall we know for efficent scan commands creation?

1. Example code:


Scan scan1 = new Scan();
ResultScanner scanner1 = table.getScanner(scan1);
for (Result res : scanner1) {
System.out.println(res);
}
scanner1.close();

2. When iterating ResultScanner we execute next() method.

3. Each call to next() will be a separate RPC for each row—even when you use the
next(int nbRows) method, because it is nothing else but a client-side loop over
next() calls

4. Would make sense to fetch more than one row per RPC if possible. This is called scanner caching and is disabled by default. 

5. Caching improves performance but impacts memory, since sing row can be constucted of hundreds columns and they will be fetched.And this should fit into client process.  Batching feauture limits number of fetched columns per bulk.


6. Improved example

Scan scan = new Scan();
scan.setCaching(caching);
scan.setBatch(batch);
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner)

Example of fetches and RPCs ( from book :HBase,Defenitive Guide)


7. Bring only relevant information instead of full table.(column families/columns etc)

8.When performing a table scan where only the row keys are needed (no families,
qualifiers, values, or timestamps), add a FilterList with a MUST_PASS_ALL operator
to the scanner using setFilter().
The filter list should include both a FirstKeyOnlyFilter and a KeyOnlyFilter instance Using this filter combination will cause the region server to onlyload the row key of the first KeyValue (i.e., from the first column) found and return it to the client, resulting in minimized network traffic.


9. Make surethe input Scan instance  has setCaching() set to something
greater than the default of 1. Using the default value means that the map task will
make callbacks to the region server for every record processed. Setting this value
to 500, for example, will transfer 500 rows at a time to the client to be processed


10. You can disable WALon Put commands  by call writeToWAL(false). Just know what your are doing.The consequence is that if there is a region server failure there will be data loss

HBase client API summary

Single Put
HTable table = new HTable(conf, "testtable");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val2"));
table.put(put);


MultiPut(cache)
table.setAutoFlush(false)
table.put...
table.put...
table.put....
table.flushCommits();
Put can fail on exception. That why on catch better to do flushCommits();


MultiPut(batch)
List<Put> puts = new ArrayList<Put>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val1"));
puts.add(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"),
Bytes.toBytes("val2"));
puts.add(put2);
Put put3 = new Put(Bytes.toBytes("row2"));
put3.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"),
Bytes.toBytes("val3"));
puts.add(put3);
table.put(puts);


SingleGet
Get get = new Get(Bytes.toBytes("row1"));
get.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"));
Result result = table.get(get);


MultiGet
List<Get> gets = new ArrayList<Get>();
Get get1 = new Get(row1);
get1.addColumn(cf1, qf1);
gets.add(get1);
Get get2 = new Get(row2);
get2.addColumn(cf1, qf1);
gets.add(get2);
Get get3 = new Get(row2);
get3.addColumn(cf1, qf2);
gets.add(get3);
Result[] results = table.get(gets);


Delete(Single)
Delete delete = new Delete(Bytes.toBytes("row1"));
delete.deleteFamily(Bytes.toBytes("colfam3"));
table.delete(delete);


Delete(Multi)
List<Delete> deletes = new ArrayList<Delete>();
Delete delete1 = new Delete(Bytes.toBytes("row1"));
delete1.setTimestamp(4);
deletes.add(delete1);
Delete delete2 = new Delete(Bytes.toBytes("row2"));
delete2.deleteColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"));
delete2.deleteColumns(Bytes.toBytes("colfam2"), Bytes.toBytes("qual3"), 5);
deletes.add(delete2);
Delete delete3 = new Delete(Bytes.toBytes("row3"));
delete3.deleteFamily(Bytes.toBytes("colfam1"));
delete3.deleteFamily(Bytes.toBytes("colfam2"), 3);
deletes.add(delete3);
table.delete(deletes);


Batch
List<Row> batch = new ArrayList<Row>();

Put put = new Put(ROW2);
put.add(COLFAM2, QUAL1, Bytes.toBytes("val5"));
batch.add(put);

Get get1 = new Get(ROW1);
get1.addColumn(COLFAM1, QUAL1);
batch.add(get1);

Delete delete = new Delete(ROW1);
delete.deleteColumns(COLFAM1, QUAL2);
batch.add(delete);

Object[] results = new Object[batch.size()];
try {
table.batch(batch, results);
} catch (Exception e) {
System.err.println("Error: " + e);



Scan
Scan scan1 = new Scan();
ResultScanner scanner1 = table.getScanner(scan1);
for (Result res : scanner1) {
System.out.println(res);
}
scanner1.close();

So far, each call to next() will be a separate RPC for each row—even when you use the
next(int nbRows) method, because it is nothing else but a client-side loop over
next() calls
Thus it would make
sense to fetch more than one row per RPC if possible
Cache - number of rows
scan.setCaching(caching);
batch - number of columns
scan.setBatch(batch);

Thursday, April 25, 2013

HBase - how it stores data?


Tables in HBase can scale to billions of rows and millions of columns. The size of each table can run into terabytes and sometimes even petabytes.Tables are split into smaller chunks that are distributed across multiple servers.These smaller chunks are called regions Servers that host regions are called RegionServers.

The master process does the distribution of regions among RegionServers, and each RegionServer typically hosts multiple regions


Region assignment happens when regions split (as they grow in size), when RegionServers die, or when new RegionServers are added to the deployment

Two special tables in HBase, -ROOT- and .META., help find where regions for various tables are hosted

When a client application wants to access a particular row, it goes to the -ROOTtableand asks it where it can find the region responsible for that particular row. -ROOT- points it to the region of the .META. table that contains the answer to that question. The .META. table consists of entries that the client application uses to determine which RegionServer is hosting the region in question
The interaction happens as following :


Exploring the metadata:

In hbse shell:
zk_dump

Active master address: localhost,37150,1332994662074 - Master
Region server holding ROOT: localhost,41893,1332994662888 - the ROOT holder
Region servers:
localhost,41893,1332994662888 - Region servers



hbase(main):003:0> scan '-ROOT-'
ROW                                                         COLUMN+CELL                                                                                                                                                                    
 .META.,,1                                                  column=info:regioninfo, timestamp=1366823396043, value={NAME => '.META.,,1', STARTKEY => '', ENDKEY => '', ENCODED => 1028785192,}                                             
 .META.,,1                                                  column=info:server, timestamp=1366896097668, value=illin793:60202                                                                                                              
 .META.,,1                                                  column=info:serverstartcode, timestamp=1366896097668, value=1366896069920                                                                                                      
 .META.,,1                                                  column=info:v, timestamp=1366823396043, value=\x00\x00                                                                                                                         
1 row(s) in 0.0250 seconds

hbase(main):004:0> 

hbase(main):004:0> scan '.META.'
ROW                                                         COLUMN+CELL                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
 test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7 column=info:regioninfo, timestamp=1366892381710, value={NAME => 'test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7.', STARTKEY => '', ENDKEY => 'aR1', ENCODED => e0d8
 .                                                          f83583650b2662832ea48873d6f7,}                                                                                                                                                 
 test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7 column=info:server, timestamp=1366896098296, value=illin793:12200                                                                                                              
 .                                                                                                                                                                                                                                         
 test_table,,1366892381501.e0d8f83583650b2662832ea48873d6f7 column=info:serverstartcode, timestamp=1366896098296, value=1366896064271                                                                                                      
 .                                                                                                                                                                                                                                         
 test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202 column=info:regioninfo, timestamp=1366892381698, value={NAME => 'test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202bb2.', STARTKEY => 'aR1', ENDKEY => '', ENCODED => 6
 bb2.                                                       d8a82416e782463f3ebe813f9202bb2,}                                                                                                                                              
 test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202 column=info:server, timestamp=1366896098041, value=illin793:60203                                                                                                              
 bb2.                                                                                                                                                                                                                                      
 test_table,aR1,1366892381501.6d8a82416e782463f3ebe813f9202 column=info:serverstartcode, timestamp=1366896098041, value=1366896071092                                                                                                      
 bb2.                                                                                                                                                                                                                                      

You can see the split of the table to 2 regions.

Investigating from JavaClient API:
By giving specific table the code shows the region servers , its regions ranges taht every and each holds and entries for those regions:
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "10.232.230.11");
conf.set("hbase.zookeeper.property.clientPort","2183");
        String tableName="XXXXXXXXXX";
        System.out.println("Table details:");
HTable investTable = new HTable( conf, tableName);
NavigableMap<HRegionInfo,ServerName> map = investTable.getRegionLocations();
Set<Entry<HRegionInfo,ServerName>> resultSet = map.entrySet();
for (Entry<HRegionInfo,ServerName> entry :resultSet)
{
System.out.println(entry.getKey().getRegionNameAsString()+ " [ ----]  " +  entry.getValue().getServerName());
System.out.println("Start key : " + new String(entry.getKey().getStartKey())+ " [-----]  " + "End key : " + new        String(entry.getKey().getEndKey()));
Scan scanner = new Scan(entry.getKey().getStartKey(),entry.getKey().getEndKey());
ResultScanner resulScanner = investTable.getScanner(scanner);
int counter = 0;
for (Result result : resulScanner)
{
System.out.println(counter + " : " +result);
counter++;
}
}
Note:From book: "Hadoop in Action".

HDFS Java API - Tutorial

In this tutorial i will list basic HDFS needed comamnds like:

Connectin to the filse system,creating directory, copy /delete/create files etc.

1. Connecting to HDFS file system:
Configuration config = new Configuration();
config.set("fs.default.name","hdfs://127.0.0.1:9000/");
FileSystem dfs = FileSystem.get(config);


2. Creating directory

Path src = new Path(dfs.getWorkingDirectory()+"/"+"rami");
dfs.mkdirs(src);


3. Delete directory or file:

Path src = new Path(dfs.getWorkingDirectory()+"/"+"rami");
Dfs.delete(src);


4. Copy files from local FS o HDFS and back:

Path src = new Path("E://HDFS/file1.txt");
Path dst = new Path(dfs.getWorkingDirectory()+"/directory/");
dfs.copyFromLocalFile(src, dst);

Or Back :
dfs.copyToLocalFile(src, dst);

Note : destination should be a path object that contains the directory to copy the source fiel to.
Source should be a path object that contains path to the file including thefile itself.


5.Create file:

Path src = new Path(dfs.getWorkingDirectory()+"/rami.txt");
dfs.createNewFile(src);


6. Reading file:

Path src = new Path(dfs.getWorkingDirectory()+"/rami.txt");
FSDataInputStream fs = dfs.open(src);
String str = null;
while ((str = fs.readline())!= null)
{
System.out.println(str);
}

7.Writing file:

Path src = new Path(dfs.getWorkingDirectory()+"/rami.txt");
FSDataOutputStream fs = dfs.create(src);
byte[] btr = new byte[]{1,2,3,4,5,6,7,8,9};
fs.write(btr);
fs.close();

Tuesday, April 23, 2013

HBase put methods for multithreading environment

According to some interesting aricle i found on Hbase 0.20 they got following conclusion

1. Write performances seems to be good when lesser threads are used for writing.
2.  All 3 variants seems to perform more or less similarly.  In our application, we use HTablePool based approach and its nice to see that it performs equally nice.  Another thing which I observed during my tests is that pool size may not have much role to play.  With pool size of 10 too, the results are more or less similar.
3. As number of threads increase, the write performance deteriorates.  A potential reason for this behavior is that HBase opens only one connection per region server from a single JVM.  So, will increasing the number of JVMs help in horizontally scaling the writes?

It would be interesting reruning those tests on 0.94 and see what happens now with the performence.
Thats probably what i will do in upcomming days.

Link: http://www.srikanthps.com/2011/06/hbase-benchmarking-for-multi-threaded.html

Monday, April 22, 2013

HBase internals

Tables in HBase are organized into rows and columns.HBase treats columns a little differently than a relational database. Columns inHBase are organized into groups called column families.

Architecture

Table—HBase organizes data into tables. Table names are Strings and composed of characters that are safe for use in a file system path.
Row—Within a table, data is stored according to its row. Rows are identifieduniquely by their rowkey. Rowkeys don’t have a data type and are always treated as a byte[].
Column family—Data within a row is grouped by column family. Column families also impact the physical arrangement of data stored in HBase. For this reason, they must be defined up front and aren’t easily modified. Every row in a tablehas the same column families, although a row need not store data in all its families.Column family names are Strings and composed of characters that are safe for use in a file system path.
Column qualifier—Data within a column family is addressed via its column qualifier,or column. Column qualifiers need not be specified in advance. Column qualifiers need not be consistent between rows. Like rowkeys, column qualifiers don’t have a data type and are always treated as a byte[].
Cell—A combination of rowkey, column family, and column qualifier uniquelyidentifies a cell. The data stored in a cell is referred to as that cell’s value. Values also don’t have a data type and are always treated as a byte[].
Version—Values within a cell are versioned. Versions are identified by their timestamp, a long. When a version isn’t specified, the current timestamp is used asthe basis for the operation. The number of cell value versions retained by HBase is configured via the column family. The default number of cell versions is three.
 

Commands :

How Write Works

Hbase recieves the command and writes the change to 2 destinations :
1] Write-Ahead log (WAL) - HLog.
2] Memstore - which is written to disk in form of Hfile - every flush new file is created.
HFile belong to column family 1 mem store per column family
If failure occurs - data can be recovered from WAL which is single per server shared by all You can disable WAL but failover will be disabled for the command. If system crushes - you loose you data.

How Read Works:

HBase has an LRU cache for reads – BlockCache. BlockCache is designed to keep frequently accessed data from the HFiles in memory so as to avoid disk reads. Each column family has its own BlockCache .The “Block” in BlockCache is the unit of data that HBase reads from disk in a single pass
Checks the memstore for pending modifictations that were not flushed. Acess CacheBlock if the block contains the row what we want to access Then goes to Hfile to load data
Note: Data of some row contains over all Many HFiles.

 

How delete works

The Delete command marks the record for deletion .Indicates that nor scan or get should return those values Hfiles are immutables, means never updated unitll Major compaction occurs
 

Compaction


Minor an Major compaction

A minor compaction folds HFiles together, creating a larger HFile from multiple smaller Hfiles HBase reads the content of the existing HFiles, writing records into a new one. Then, it swaps in the new HFile as the current active one and deletes the old ones that formed the new One.Minor compactions are designed to be minimally detrimental to HBase performance,S o there is an upper limit on the number of HFiles involved. All of these settings are configurable.
When a compaction operates over all HFiles in a column family in a given region, it’scalled a major compaction. Upon completion of a major compaction, all HFiles in the  column family are merged into a single file. Major compactions can also be triggered for the entire table (or a particular region) manually from the shell. This is a relatively expensive operation and isn’t done often. Minor compactions, on the other hand, are relatively lightweight and happen more frequently. Major compactions are the only chance HBase has to clean up deleted records. Resolving a delete requires removing both the deleted record and the deletion marker. There’s no guarantee that both the record and marker are in the same HFile. A major compaction is the only time whenHBase is guaranteed to have access to both of these entries at the same time.

image
image
All examples and details taken from book : Hbase in Action.



















Sunday, April 21, 2013

Moving data into Hadoop


If you  want to push all of your production server system log files into HDFS use  Flume



Apache Flume is a distributed system for collecting streaming data. It’s an Apache
project in incubator status, originally developed by Cloudera. It offers various levels of
reliability and transport delivery guarantees that can be tuned to your needs. It’s
highly customizable and supports a plugin architecture where you can add custom
data sources and data sinks.




Link:

If you need to automate the process by which files on remote servers are copied into HDFS use HDFS File slurper.
Feautures

  • After a successful file copy you can either remove the source file, or have it moved into another directory.
  • Destination files can be compressed as part of the write codec with any compression codec which extendsorg.apache.hadoop.io.compress.CompressionCodec.
  • Capability to write "done" file after completion of copy
  • Verify destination file post-copy with CRC32 checksum comparison with source
  • Ignores hidden files (filenames that start with ".")
  • Customizable destination via a script which can be called for every source file. Or alternatively let the utility know a single destination directory
  • Customizable pre-processing of file prior to transfer via script and all files are copied into that location.
  • A daemon mode which is compatible with inittab respawn
  • Multi-threaded data transfer

Link:

If you want to automate peredioc tasksfor downloading  content from an HTTP server into HDFS use:
Oozie is a workflow scheduler system to manage Apache Hadoop jobs.
Link:
http://oozie.apache.org/
If you want to import relational data using MapReduce use DBInputFormat class


You can do the same using scoop
http://sqoop.apache.org/

If you want moving data from HBase to HDFS you can use Export class of HBase


$ bin/run.sh org.apache.hadoop.hbase.mapreduce.Export \
stocks_example \ - table name
output - directory

Or specific column family:


$ bin/run.sh org.apache.hadoop.hbase.mapreduce.Export \
-D hbase.mapreduce.scan.column.family=details \
-D mapred.output.compress=true \
-D mapred.output.compression.codec=\
org.apache.hadoop.io.compress.SnappyCodec \
stocks_example output



The Export class writes the HBase output in the SequenceFile format, where the HBase
rowkey is stored in the SequenceFile record key using org.apache.hadoop.hbase
.io.ImmutableBytesWritable, and the HBase value is stored in the SequenceFile record
value using org.apache.hadoop.hbase.client.Result.

Now its time to move to HDFS ( example of Stock records move):


import static com.manning.hip.ch2.HBaseWriteAvroStock.*;
public class HBaseExportedStockReader {
public static void main(String... args) throws IOException {
read(new Path(args[0]));
}


public static void read(Path inputPath) throws IOException {
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(conf);
       SequenceFile.Reader reader =new SequenceFile.Reader(fs, inputPath, conf);
       HBaseScanAvroStock.AvroStockReader stockReader =
     new HBaseScanAvroStock.AvroStockReader();
      try {
     ImmutableBytesWritable key = new ImmutableBytesWritable();
     Result value = new Result();
     while (reader.next(key, value)) {
          Stock stock = stockReader.decode(value.getValue(
           STOCK_DETAILS_COLUMN_FAMILY_AS_BYTES,
           STOCK_COLUMN_QUALIFIER_AS_BYTES));
         System.out.println(new String(key.get()) + ": " +ToStringBuilder.reflectionToString(stock,ToStringStyle.SIMPLE_STYLE));
}
} finally {
reader.close();
}
}
}