Problems submitting Kafka library with pyspark

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Problems submitting Kafka library with pyspark

Borja Garrido
Hi all,

I've a working version of Zeppelin with Spark 1.5.1, I'm using my own Spark build and not the one packaged with Zeppelin.

If I try streaming Kafka with Scala everything works just fine, however if I try it with Pyspark it just won't find the Utils library.

I'm using the SPARK_SUBMIT variable in zep-env.sh in order to load the kafka jar with --packages (as it is said in the Spark documentation), but always get the same error.

Py4JJavaError: An error occurred while calling o48.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Anyone else with a similar problem?

Thanks in advance
Borja
Reply | Threaded
Open this post in threaded view
|

Re: Problems submitting Kafka library with pyspark

moon
Administrator
Hi Borja,

Thanks for sharing the issue. I could reproduce the problem.
So far, you can use %dep in the notebook instead of --packages in zeppelin-env.sh as a workaround.

%dep
z.reset()
z.load("org.apache.spark:spark-streaming-kafka_2.10:1.5.1")

%pyspark
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, ...)
kafkaStream = KafkaUtils.createStream(ssc, ...)


will work. I'll take a look why --packages does not work in pyspark.

Thanks,
moon

On Mon, Oct 12, 2015 at 9:08 AM Borja Garrido <[hidden email]> wrote:
Hi all,

I've a working version of Zeppelin with Spark 1.5.1, I'm using my own Spark build and not the one packaged with Zeppelin.

If I try streaming Kafka with Scala everything works just fine, however if I try it with Pyspark it just won't find the Utils library.

I'm using the SPARK_SUBMIT variable in zep-env.sh in order to load the kafka jar with --packages (as it is said in the Spark documentation), but always get the same error.

Py4JJavaError: An error occurred while calling o48.loadClass. : java.lang.ClassNotFoundException: org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)
Anyone else with a similar problem?

Thanks in advance

Borja