Tuesday, February 12, 2013

Quick Start Cassandra with Java Client Hector


I found Cassandra is very useful data store for different kinds of applications. 

Apache Cassandra is an open source, distributed, decentralized, elastically scalable, highly available, fault-tolerant, tunably consistent, column-oriented database that bases its distribution design on Amazon’s Dynamo and its data model on Google’s Bigtable. Created at Facebook, it is now used at some of the most popular sites on the Web. It is distributed and decentralized, elastic scalable, and with high availability, fault tolerance, tuneable consistency. 

Data in Cassandra is designed to be distributed over several machines/nodes operating together that appear as a single instance to the end user. Cassandra assigns data to nodes in the cluster by arranging them in a ring. A cluster is a container for keyspaces. And keyspace consists of column families and columns.  I like to view Cassandra column families as a four-dimensional hash:

[Keyspace][ColumnFamily][Key][Column]

I am using Cassandra  version 0.8 for this posting and you can find more information at  http://www.datastax.com/docs/0.8/

I following the wiki page to install Cassandra 0.8 at localhost port 9160. 

There are number of choices of client, they are listed at http://wiki.apache.org/cassandra/ClientOptions.   Personally, I like Hector

  


Here is mvn project file:
    
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.lei.cassandra.sample</groupId>
  <artifactId>cassandra-quickstart-with-hector</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>cassandra-quickstart-with-hector</name>
  <url>http://maven.apache.org</url>

  <properties>
 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
 <dependency>
 <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.11</version>
 </dependency>

 <dependency>
  <groupId>me.prettyprint</groupId>
  <artifactId>hector-core</artifactId>
  <version>1.0-2</version>
 </dependency>

 <dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-log4j12</artifactId>
  <version>1.6.1</version>
 </dependency>

  </dependencies>
</project>



connect/disconnect from Cassandra cluster:
    
 /**
  * Connect to Cassandra's demo keyspace at localhost's cluster at 9160  
  */
 public void connect() {
  cluster = HFactory.getOrCreateCluster(DEF_CLUSTER, DEF_LOCALHOST_ID);
  System.out.println("Cluster instantiated");
  ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
  ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.ONE);
  keyspace = HFactory.createKeyspace("demo", cluster, ccl);
 }

 /**
  * Disconnect from localhost's Cassandra cluster at 9160  
  */
 public void disconnect() {
  cluster.getConnectionManager().shutdown();
 }





To insert a row and its columns:
    
 /**
  * insert a row and its columns for the rowkey 
  * 
  * @return row's column values associated the rowkey
  */
    public List<Object> insertRow () {
     String rowkey = "mike_johnson";
     
     ColumnFamilyTemplate<String, String> userCFTemplate = new ThriftColumnFamilyTemplate<String, String>(
       this.getKeyspace(),
       DEF_USER_CF_NAME, StringSerializer.get(), StringSerializer.get());

     ColumnFamilyUpdater<String, String> updater = userCFTemplate.createUpdater(rowkey);
     updater.setString("full_name", "mike johnson");
     updater.setString("address", "90 Elm Street");
     updater.setString("state", "NY");
     updater.setString("gender", "male");
     updater.setString("email", "mikej@gmail.com");
     updater.setLong("birth_year", new Long(1980));
     try {
      userCFTemplate.update(updater);
      System.out.println("value inserted");
     } catch (HectorException e) {
      System.out.println("Error during insertion : " + e.getMessage() );

     }  
     
     return this.getColumnValues("mike_johnson");
    }


 /**
  * insert a row and its columns with the rowkey using Mutator  
  * 
  * @return row's column values associated the rowkey
  */
    public List<Object> insertRowV2 () {
     Mutator<String> mutator = HFactory.createMutator(this.getKeyspace(), StringSerializer.get() );
     mutator.addInsertion("michael_johnson", "users", HFactory.createStringColumn("full_name", "michael johnson"));
     mutator.addInsertion("michael_johnson", "users", HFactory.createStringColumn("address", "9890 Elm Street"));
     mutator.addInsertion("michael_johnson", "users", HFactory.createStringColumn("state", "CT"));
     mutator.addInsertion("michael_johnson", "users", HFactory.createStringColumn("gender", "male"));
     mutator.addInsertion("michael_johnson", "users", HFactory.createStringColumn("email", "michael_johnson@gmail.com"));
     mutator.addInsertion("michael_johnson", "users", HFactory.createColumn("birth_year", new Long(1978), StringSerializer.get(), LongSerializer.get()));
     
     MutationResult mr = mutator.execute();
     
     return this.getColumnValues("michael_johnson");
    }




To query a column value for a key:
    

 /**
  * build the query for fetching a row's column
  * 
  * @return QueryResult
  */
    public QueryResult<HColumn<String,String>> query() {        
        ColumnQuery<String, String, String> columnQuery = HFactory.createStringColumnQuery(this.getKeyspace());
        columnQuery.setColumnFamily(DEF_USER_CF_NAME);
        columnQuery.setKey("john_smith");
        columnQuery.setName("full_name");
        QueryResult<HColumn<String, String>> result = columnQuery.execute();
        return result;
    }




To query columnis for a key:
    

 /**
  * build the query for fetching a row's columns 
  * 
  * @return QueryResult
  */
    public QueryResult<ColumnSlice<String,String>> query() {
        SliceQuery<String, String, String> sliceQuery = 
            HFactory.createSliceQuery(this.getKeyspace(), StringSerializer.get(), StringSerializer.get(), StringSerializer.get());
        sliceQuery.setColumnFamily(DEF_USER_CF_NAME);
        sliceQuery.setKey("mary_leaman");
        sliceQuery.setColumnNames("email","full_name","gender","address", "state");
        QueryResult<ColumnSlice<String, String>> result = sliceQuery.execute();
        return result;
    }



To fetch column values:
    

 /**
  * getColumnVlues as a list objects from the rowkey
  *  
  * @param rowkey
  * 
  * @return row's column values associated the rowkey
  */
    public List<Object> getColumnValues (String rowkey) {
     List<Object> listColumnObj = new LinkedList<Object> ();
     
  try {
      ColumnFamilyTemplate<String, String> userCFTemplate = new ThriftColumnFamilyTemplate<String, String>(
        this.getKeyspace(),
        DEF_USER_CF_NAME, StringSerializer.get(), StringSerializer.get());

   ColumnFamilyResult<String, String> result = userCFTemplate.queryColumns(rowkey);
   String full_name = result.getString("full_name");
   String address = result.getString("address");
   String state = result.getString("state");
   String gender = result.getString("gender");
   String email = result.getString("email");
   Long birth_year = result.getLong("birth_year");
   
   System.out.println("full_name [" + full_name + "]");
   System.out.println("address [" + address + "]");
   System.out.println("state [" + state + "]");
   System.out.println("gender [" + gender + "]");
   System.out.println("birth_year [" + birth_year + "]");
   System.out.println("email [" + email + "]");
   
   listColumnObj.add(full_name);
   listColumnObj.add(address);
   listColumnObj.add(state);
   listColumnObj.add(gender);
   listColumnObj.add(email);
   listColumnObj.add(birth_year);
   
     } catch (HectorException e) {
      System.out.println("Error reading the column with key " + rowkey);
      System.out.println(e.getMessage());
     }

  return listColumnObj;
    }



To delete a row:
    

 /**
  * delete the row with rowkey
  * 
  * @return row's column values associated the rowkey
  */
    public List<Object> deleteRow() {
     
     List<Object> list = this.insertRow();
     String rowkey = "mike_johnson";
     ColumnFamilyTemplate<String, String> userCFTemplate = new ThriftColumnFamilyTemplate<String, String>(
       this.getKeyspace(),
       DEF_USER_CF_NAME, StringSerializer.get(), StringSerializer.get());
     try {
      userCFTemplate.deleteRow(rowkey) ;
      System.out.println("Row deleted .... ");
     } catch (HectorException e) {
      System.out.println("Error during deletion : " + e.getMessage() );
     }  
     
     return this.getColumnValues("mike_johnson");
    }


 /**
  * delete the row with rowkey using Mutator
  * 
  * @return row's column values associated the rowkey
  */
    public List<Object> deleteRowV2() {
     
     List<Object> list = this.insertRowV2();
     String rowkey = "michael_johnson";
     
        Mutator<String> mutator = HFactory.createMutator(this.getKeyspace(), StringSerializer.get() );
        mutator.addDeletion(rowkey, "users", null, StringSerializer.get() );
        MutationResult mr = mutator.execute();
     return this.getColumnValues(rowkey);
    }


To delete a row column :
    

 /**
  * delete a column value associated with rowkey 
  * 
  * @return row's column values associated the rowkey 
  */
    public List<Object> deleteRowColumn () {
     
     List<Object> list = this.insertRow();
     String rowkey = "mike_johnson";
     
     ColumnFamilyTemplate<String, String> userCFTemplate = new ThriftColumnFamilyTemplate<String, String>(
       this.getKeyspace(),
       DEF_USER_CF_NAME, StringSerializer.get(), StringSerializer.get());

     try {
      userCFTemplate.deleteColumn(rowkey, "email") ;
      System.out.println("Column deleted .... ");
     } catch (HectorException e) {
      System.out.println("Error during deletion : " + e.getMessage() );

     }  
     return this.getColumnValues("mike_johnson");
    }




To list all rows:
    

 /**
  * Demo to list all rows 
  * 
  * @param maxRowCount
  * @return
  */
 private List<Object> listAllEntries(int maxRowCount) {
     
     List<Object> list = new ArrayList<Object> ();

     RangeSlicesQuery<String, String, ByteBuffer> rangeSlicesQuery = 
          HFactory.createRangeSlicesQuery(this.getKeyspace(), StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get())
            .setColumnFamily(DEF_USER_CF_NAME)
            .setRange(null, null, false, 10)
            .setRowCount(maxRowCount);

        String last_key = null;

        while (true) {
         rangeSlicesQuery.setKeys(last_key, null);

         QueryResult<OrderedRows<String, String, ByteBuffer>> result = rangeSlicesQuery.execute();
         OrderedRows<String, String, ByteBuffer> rows = result.get();
         Iterator<Row<String, String, ByteBuffer>> rowsIterator = rows.iterator();


            if (last_key != null && rowsIterator != null) {
             rowsIterator.next();   
            }

            while (rowsIterator.hasNext()) {
             Row<String, String, ByteBuffer> row = rowsIterator.next();
             last_key = row.getKey();

             if (row.getColumnSlice().getColumns().isEmpty()) {
              continue;
             }

             System.out.println("rowKey [" + row.getKey() + "]");
             
             for(HColumn<String, ByteBuffer> cols : row.getColumnSlice().getColumns())
             {
              String colName = cols.getName();
           System.out.println("colName [" + colName + "] colValue [" + getColumnValue(colName, cols.getValue() ) + "]");
             }
             list.add(row);
             
            }

            if (rows.getCount() < maxRowCount) {
                break;
            }
        }
        
        return list;
    }




Demo MultigetSliceQuery with multiple keys:
    

 /**
  * build the query with multiple keys
  * 
  * @return QueryResult
  */
    public QueryResult<Rows<String,String,String>> query() {        
        MultigetSliceQuery<String, String, String> multigetSlicesQuery =
          HFactory.createMultigetSliceQuery( this.getKeyspace(), StringSerializer.get(), StringSerializer.get(), StringSerializer.get());
        multigetSlicesQuery.setColumnFamily(DEF_USER_CF_NAME);
        multigetSlicesQuery.setColumnNames("email","full_name","gender","address", "state");        
        multigetSlicesQuery.setKeys("mary_leaman","john_smith");
        QueryResult<Rows<String, String, String>> results = multigetSlicesQuery.execute();
        return results;
    }



You can access the code at Github:
https://github.com/leizou123/cassandra-quickstart-with-hector.git

Have fun and enjoy the journey.

2 comments:

  1. Hi, Do you have any idea of changing the default connecting port?

    ReplyDelete
  2. You well described on how to quick start Cassandra with Java Client Hector. I was looking for it and got here. You saved lots of my time. Thanks!
    cfd trade

    ReplyDelete