Writing Cassandra User Defined Types with Spark and Java

Published in Big Data on 10-Feb-2016

Apache Cassandra and Apache Spark allow you to very quickly accomplish some truly cool things with very large sets of data. As a developer primarily focused on Java based technologies, one complication that arises with using the Cassandra/Spark stack is that Spark is written in Scala. While Scala is a very cool language, not everyone is fully comfortable with it, and not every organization is willing to embrace the polyglotism that would allow you to use Scala as an interface with Spark, while using Java or other languages for the rest of your application.

To help bridge this gap, DataStax has generously provided a Java API wrapper around their Scala Spark connector to allow you to access Spark in Java. Most of this API is fairly straight-forward, however as I’ve been working with it lately, I have run into a few hiccups that have caused me some headaches. The primary challenge I have run into so far was dealing with Cassandra User Defined Types. I could find no documentation, blog entries, or tea leaves that illustrated how to read or write Cassandra RDDs containing UDTs.

Reading UDTs is easy enough. If you have a CassandraTableScanJavaRDD of type CassandraRow, you can call the .getUDTValue(String columnName) method on each row to fetch a UDTValue object, which effectively is just a map that you can then query, much like a row, to fetch Integer, String or other values. However, writing new UDTs back to Cassandra rows is what left me scratching my head for a while.

It should be noted that it is entirely possible that there is a better way to accomplish this, and if there is, I would very much like to know about it.  That being said, this solution is what I was able to work out. 

If you have transformed and processed your RDD of CassandraRow objects into an RDD of some POJO type, the Spark Connector is really good about automagically using bean naming conventions to map the POJO fields to columns in Cassandra. For example, if you have used standard bean naming conventions and your column names in your Cassandra table match the field names, you can do something like the following to load the CassandraRow objects from a table, process them resulting in an RDD of POJOs and then save those POJOs as rows in the table:

CassandraTableScanJavaRDD<CassandraRow> rdd =
    CassandraJavaUtil
        .javaFunctions(sparkContext)
        .cassandraTable("keyspace", "tablename");

JavaRDD<MyPojo> pojoRdd = rdd.map(
    new Function<CassandraRow, MyPojo>() {
        @Override
        public MyPojo call(CassandraRow row) {
            return someMethodThatConvertsRowsToPojos(row);
        }
    }
);

CassandraJavaUtil
    .javaFunctions(pojoRdd)
    .writerBuilder(
        "keyspace",
        "tablename",
        CassandraJavaUtil.mapToRow(MyPojo.class))
    .saveToCassandra();

I would have assumed that if my POJO had a field that was itself a Java Bean, it would automatically be treated as a UDT and serialized accordingly. This did not seem to be the case however. Any attempts I made at this resulted in a variety of errors about not being able to map the object to the relevant column.

Fortunately, the API gives you the ability to specify your own RowWriter and RowReader objects to allow you to do custom mapping of POJOs to rows and vice-versa. This is the route that I had to take. The .writerBuilder() method illustrated above has an overload that allows you to specify a custom RowWriterFactory object. By creating a simple RowWriterFactory that returns a RowWriter suitable for our conversion of POJOs to UDTs we can get one step closer to saving a row with a User Defined Type back to Cassandra. However, before we are able to save a UDTValue object, we must be able to create one.

com.datastax.spark.connector.UDTValue is a class which comes from the Scala portion of the Spark Cassandra Connector API. We will need to create an instance of this class, which presents us a bit of a challenge. The UDTValue constructor takes two IndexedSeq arguments: the first representing the names of the fields in the UDT and the second representing the values to store in those fields. Java does not have a class that directly corresponds to IndexedSeq however, so we need to figure out how to create them. This was one of my major roadblocks in this process.

Scala contains a class called JavaConversions that is built to allow the conversion between Scala data types and Java data types to help with interoperability between languages. This class can be used to convert a Java Iterable to a Scala IndexedSeq like so:

private <T> IndexedSeq<T> toIndexedSeq(Iterable<T> iterable) {
    return JavaConversions
        .asScalaIterable(iterable)
        .toIndexedSeq();
}

Next, we can implement a custom RowWriter and RowWriterFactory, making use of this method, to create UDTValue objects and staging them to be written back to Cassandra:

public class MyRowWriter
        implements RowWriter<MyPojo>, Serializable {

    /*
        Return the list of column names in the
        Cassandra table
    */
    @Override
    public Seq<String> columnNames() {
        return toIndexedSeq(
            Arrays.asList(
                "field1",
                "field2",
                "udtField"));
    }

    /*
        Populates the outputBuffer argument with
        the appropriate values extracted from
        the MyPojo object.  The order of the
        values in the outputBuffer array lines
        up with the order of the column names
        returned from the above method.
    */
    @Override
    public void readColumnValues(
            MyPojo myPojo,
            Object[] outputBuffer) {

        output[0] = myPojo.getSomeBeanValue();
        output[1] = myPojo.getSomeOtherBeanValue();

        MyUdtPojo myUdtPojo = myPojo.getUdtPojo();

        output[2] = new com.datastax.spark.connector.UDTValue(
                toIndexedSeq(
                    Arrays.asList(
                        "udtField1",
                        "udtField2"
                    )
                ),
                toIndexedSeq(
                    Arrays.asList(
                        (Object)myUdtPojo.getField1(),
                        (Object)myUdtPojo.getField2()
                    )
                )
            );
    }
}

public class MyRowWriterFactory
        implements RowWriterFactory<MyPojo>, Serializable {

    private static final MyRowWriter rowWriter = new MyRowWriter();

    @Override
    public RowWriter<MyPojo> rowWriter(
            TableDef tableDef,
            IndexedSeq<ColumnRef> columnRefs) {
        return rowWriter;
    }
}

Finally, once we have defined our custom RowWriter and RowWriterFactory, we can pass an instance of our RowWriterFactory to the .writerBuilder() method:

CassandraJavaUtil
    .javaFunctions(pojoRdd)
    .writerBuilder(
        "keyspace",
        "tablename",
        new MyRowWriterFactory())
    .saveToCassandra();

In sum, the key problems I ran into with this exercise were:

  1. being able to create the Scala UDTValue object, which was resolved by using the JavaConversions utility in Scala
  2. instructing Spark to convert my UDT POJO as a UDTValue when writing objects back to disk, which I was able to accomplish by creating a simple custom RowWriterFactory and RowWriter.

About the Author

dan.jpg

Daniel Morton is a Software Developer with Shopify Plus in Waterloo, Ontario and the co-owner of Switch Case Technologies, a software development and consulting company. Daniel specializes in Enterprise Java Development and has worked and consulted in a variety of fields including WAN Optimization, Healthcare, Telematics, Media Publishing, and the Payment Card Industry.