CNFE when running SQL query against Cassandra temp table

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

CNFE when running SQL query against Cassandra temp table

Robbie Strickland

When running a SQL statement against a Cassandra temp table where no records have previously been realized using theSQLContext, a ClassNotFoundException is thrown.

For example, we run the following code to register the table:

import com.datastax.spark.connector._
case class Stats(queue: String, time: Long, host: String, successes: Long)
val stats2 = sc.cassandraTable[Stats]("prod_analytics_events", "stats").select("queue", "time", "host", "successes").where("time >= 1442707200000 and time < 1442793600000")
stats2.toDF.registerTempTable("stats2")

If we immediately try to run a %sql query, such as:

%sql
select * from stats2 limit 10

we will get the following stack trace:

java.lang.ClassNotFoundException: $line551.$read
	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.javaClass(JavaMirrors.scala:500)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1167)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1174)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToJava$1.apply(JavaMirrors.scala:1159)
	at scala.reflect.runtime.TwoWayCache.toJava(TwoWayCache.scala:49)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToJava(JavaMirrors.scala:1159)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToJavaClass(JavaMirrors.scala:1255)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:202)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.runtimeClass(JavaMirrors.scala:65)
	at com.datastax.spark.connector.rdd.reader.AnyObjectFactory.<init>(AnyObjectFactory.scala:30)
	at com.datastax.spark.connector.rdd.reader.GettableDataToMappedTypeConverter.<init>(GettableDataToMappedTypeConverter.scala:45)
	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReader.<init>(ClassBasedRowReader.scala:22)
	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:47)
	at com.datastax.spark.connector.rdd.reader.ClassBasedRowReaderFactory.rowReader(ClassBasedRowReader.scala:42)
	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.rowReader(CassandraTableRowReaderProvider.scala:48)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader$lzycompute(CassandraTableScanRDD.scala:59)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.rowReader(CassandraTableScanRDD.scala:59)
	at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:151)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59)
	at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:143)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:121)
	at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:125)
	at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1269)
	at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1203)
	at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1262)
	at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.zeppelin.spark.ZeppelinContext.showDF(ZeppelinContext.java:300)
	at org.apache.zeppelin.spark.SparkSqlInterpreter.interpret(SparkSqlInterpreter.java:142)
	at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57)
	at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93)
	at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276)
	at org.apache.zeppelin.scheduler.Job.run(Job.java:170)
	at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

However, it is possible to run a query directly using the SQLContext without issue:

sqlContext.sql("select * from stats2 limit 10").collect

returns the expected results:

stats2: com.datastax.spark.connector.rdd.CassandraTableScanRDD[Stats] = CassandraTableScanRDD[637] at RDD at CassandraRDD.scala:15
res155: Array[org.apache.spark.sql.Row] = Array([events_ANDROID_LocationUpdate,1442707206499,sink4x056,1821024], [events_ANDROID_LocationUpdate,1442707207062,sink4x019,1480357], [events_ANDROID_LocationUpdate,1442707266854,sink4x056,1821394], [events_ANDROID_LocationUpdate,1442707268281,sink4x019,1480675], [events_ANDROID_LocationUpdate,1442707329595,sink4x056,1821771], [events_ANDROID_LocationUpdate,1442707332608,sink4x019,1480979], [events_ANDROID_LocationUpdate,1442707389853,sink4x056,1822088], [events_ANDROID_LocationUpdate,1442707393107,sink4x019,1481257], [events_ANDROID_LocationUpdate,1442707451639,sink4x056,1822413], [events_ANDROID_LocationUpdate,1442707457504,sink4x019,1481591])

Additionally, if we first materialize some rows using the SQLContext (such as in the above example), further queries using%sql work fine.

Relevant config from zeppelin-env.sh:

export ZEPPELIN_JAVA_OPTS="-Dspark.jars=/opt/spark/lib/spark-cassandra-connector-assembly.jar:/opt/hadoop/share/hadoop/tools/lib/*:/opt/jars/*:/opt/spark/lib/pyspark-cassandra.jar -Dspark.cassandra.connection.host=x.x.x.x -Dspark.cassandra.read.timeout_ms=300000 -Dspark.cassandra.auth.username=zeppelin -Dspark.cassandra.auth.password=[password]"

It's also worth noting that this was NOT a problem under Spark 1.3.1.

I have logged a JIRA against this here:

https://issues.apache.org/jira/browse/ZEPPELIN-332


--
 Robbie Strickland  |Director, Software Engineering
 w: 770-226-2093      e: [hidden email]