PySpark with livy

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

PySpark with livy

Mauro Schneider

Hi

I'm trying execute PySpark code with Zeppelin and Livy but without success. With Scala and Livy work well but when I execute the code below I getting a Exception from Zeppelin.

<code>
%livy.pyspark
txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.collect()
</code>

<excpetion>
Version:0.9 StartHTML:0000000168 EndHTML:0000024858 StartFragment:0000000204 EndFragment:0000024822 SourceURL:http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
An error occurred while calling o47.textFile.
: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at scala.Option.map(Option.scala:145)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
</exception>

I had too tested the code below with Curl and Livy and work correctly

<code /user/mulisses/test.py>
import sys
from pyspark import SparkContext

if __name__ == "__main__":
        sc = SparkContext(appName="Hello Spark")
        txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
        counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        counts.saveAsTextFile("test_wc_py")
</code>

<cUrl>
curl  -i --negotiate -u : -X POST --data '{"file": "/user/mulisses/test.py"}' -H "Content-Type: application/json" dtbhad02p.bvs.corp:8998/batches
</cUrl>

Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and Zeppelin 0.7.3
Am I forgetting some configuration?

Best regards,

Mauro Schneider


Reply | Threaded
Open this post in threaded view
|

Re: PySpark with livy

Jeff Zhang
It is more likely your spark configuration issue, could you run this code in pyspark shell ?



Mauro Schneider <[hidden email]>于2017年9月29日周五 下午11:24写道:

Hi

I'm trying execute PySpark code with Zeppelin and Livy but without success. With Scala and Livy work well but when I execute the code below I getting a Exception from Zeppelin.

<code>
%livy.pyspark
txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.collect()
</code>

<excpetion>
Version:0.9 StartHTML:0000000168 EndHTML:0000024858 StartFragment:0000000204 EndFragment:0000024822 SourceURL:http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
An error occurred while calling o47.textFile.
: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at scala.Option.map(Option.scala:145)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
</exception>

I had too tested the code below with Curl and Livy and work correctly

<code /user/mulisses/test.py>
import sys
from pyspark import SparkContext

if __name__ == "__main__":
        sc = SparkContext(appName="Hello Spark")
        txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
        counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        counts.saveAsTextFile("test_wc_py")
</code>

<cUrl>
curl  -i --negotiate -u : -X POST --data '{"file": "/user/mulisses/test.py"}' -H "Content-Type: application/json" dtbhad02p.bvs.corp:8998/batches
</cUrl>

Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and Zeppelin 0.7.3
Am I forgetting some configuration?

Best regards,

Mauro Schneider


Reply | Threaded
Open this post in threaded view
|

Re: PySpark with livy

Mauro Schneider
Hi Jeff

Yes, the code work with PySpark Shell and the Spark Submit in the same server where is running the Zeppelin and Livy. And I did an another test, I executed the same code with cUrl using to Livy and work ok.





Mauro Schneider


On Fri, Sep 29, 2017 at 8:26 PM, Jeff Zhang <[hidden email]> wrote:
It is more likely your spark configuration issue, could you run this code in pyspark shell ?



Mauro Schneider <[hidden email]>于2017年9月29日周五 下午11:24写道:

Hi

I'm trying execute PySpark code with Zeppelin and Livy but without success. With Scala and Livy work well but when I execute the code below I getting a Exception from Zeppelin.

<code>
%livy.pyspark
txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.collect()
</code>

<excpetion>
Version:0.9 StartHTML:0000000168 EndHTML:0000024858 StartFragment:0000000204 EndFragment:0000024822 SourceURL:http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
An error occurred while calling o47.textFile.
: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at scala.Option.map(Option.scala:145)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
</exception>

I had too tested the code below with Curl and Livy and work correctly

<code /user/mulisses/test.py>
import sys
from pyspark import SparkContext

if __name__ == "__main__":
        sc = SparkContext(appName="Hello Spark")
        txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
        counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        counts.saveAsTextFile("test_wc_py")
</code>

<cUrl>
curl  -i --negotiate -u : -X POST --data '{"file": "/user/mulisses/test.py"}' -H "Content-Type: application/json" dtbhad02p.bvs.corp:8998/batches
</cUrl>

Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and Zeppelin 0.7.3
Am I forgetting some configuration?

Best regards,

Mauro Schneider



Reply | Threaded
Open this post in threaded view
|

Re: PySpark with livy

Jeff Zhang

I see your test with livy via curl command, but seems you are submitting it as batch. Could you do it via interactive livy session, this is what livy interpreter of zeppelin does.


Mauro Schneider <[hidden email]>于2017年10月1日周日 上午4:55写道:
Hi Jeff

Yes, the code work with PySpark Shell and the Spark Submit in the same server where is running the Zeppelin and Livy. And I did an another test, I executed the same code with cUrl using to Livy and work ok.





Mauro Schneider


On Fri, Sep 29, 2017 at 8:26 PM, Jeff Zhang <[hidden email]> wrote:
It is more likely your spark configuration issue, could you run this code in pyspark shell ?



Mauro Schneider <[hidden email]>于2017年9月29日周五 下午11:24写道:

Hi

I'm trying execute PySpark code with Zeppelin and Livy but without success. With Scala and Livy work well but when I execute the code below I getting a Exception from Zeppelin.

<code>
%livy.pyspark
txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.collect()
</code>

<excpetion>
Version:0.9 StartHTML:0000000168 EndHTML:0000024858 StartFragment:0000000204 EndFragment:0000024822 SourceURL:http://dtbhad02p.bvs.corp:8080/#/notebook/2CSK9TFZM
An error occurred while calling o47.textFile.
: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I
at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:316)
at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:79)
at org.apache.spark.io.SnappyCompressionCodec.compressedOutputStream(CompressionCodec.scala:156)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$4.apply(TorrentBroadcast.scala:200)
at scala.Option.map(Option.scala:145)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:200)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:102)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:85)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:63)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1334)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1022)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1.apply(SparkContext.scala:1019)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1019)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:840)
at org.apache.spark.SparkContext$$anonfun$textFile$1.apply(SparkContext.scala:838)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:722)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:838)
at org.apache.spark.api.java.JavaSparkContext.textFile(JavaSparkContext.scala:188)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
</exception>

I had too tested the code below with Curl and Livy and work correctly

<code /user/mulisses/test.py>
import sys
from pyspark import SparkContext

if __name__ == "__main__":
        sc = SparkContext(appName="Hello Spark")
        txtFile = sc.textFile ("/data/staging/zeppelin_test/data.txt")
        counts = txtFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
        counts.saveAsTextFile("test_wc_py")
</code>

<cUrl>
curl  -i --negotiate -u : -X POST --data '{"file": "/user/mulisses/test.py"}' -H "Content-Type: application/json" dtbhad02p.bvs.corp:8998/batches
</cUrl>

Is anyone know how to solve it ? I had tested with Zeppelin 0.7.2 and Zeppelin 0.7.3
Am I forgetting some configuration?

Best regards,

Mauro Schneider