zeppelin 0.7.1 and Spark cluster standalone - Reading and writing csv

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

zeppelin 0.7.1 and Spark cluster standalone - Reading and writing csv

Sofiane Cherchalli
Hi,

I have a spark cluster in standalone mode with one worker. Each of Zeppelin, spark master, and spark slave run in its own docker container.

I am trying to read and write a csv from a notebook, but I'm having issues.

First, my zeppelin-env.sh:
# spark home
export SPARK_HOME=/opt/spark-2.1.0

# set hadoop conf dir
export HADOOP_CONF_DIR=/opt/hadoop-2.7.3/etc/hadoop

# set options to pass spark-submit command
export SPARK_SUBMIT_OPTIONS="--packages com.databricks:spark-csv_2.11:1.5.0"

# worker memory
export ZEPPELIN_JAVA_OPTS="-Dspark.driver.memory=7g"

# master
export MASTER="spark://master:7077"


The notebook:
%spark.pyspark
data_in = '/data/01.csv'
data_out = '/data/02.csv'

def read_input(fin):
    '''
    Read input file from filesystem and return dataframe
    '''
    df = sqlContext.read.load(fin, format='com.databricks.spark.csv', mode='PERMISSIVE', header='false', inferSchema='true')
    return df
    
def write_output(df, fout):
    '''
    Write dataframe to filesystem
    '''
    df.write.mode('overwrite').format('com.databricks.spark.csv').options(delimiter=',', header='true').save(fout)

df = read_input(data_in)
write_output(df, data_out)

I copied the /data/01.csv file in the spark worker.

When I run the notebook it fails complaining that the /data/01.csv was not found int the zeppelin container:
Traceback (most recent call last):
File "/opt/spark-2.1.0/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.load.
: org.apache.spark.sql.AnalysisException: Path does not exist: file:/data/01.csv;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5030675375956428180.py", line 337, in <module>
exec(code)
File "<stdin>", line 1, in <module>
File "<stdin>", line 5, in read_input
File "/opt/spark-2.1.0/python/pyspark/sql/readwriter.py", line 149, in load
return self._df(self._jreader.load(path))
File "/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/spark-2.1.0/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Path does not exist: file:/data/01.csv;'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5030675375956428180.py", line 349, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/opt/spark-2.1.0/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o46.load.
: org.apache.spark.sql.AnalysisException: Path does not exist: file:/data/01.csv;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:382)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:135)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5030675375956428180.py", line 337, in <module>
exec(code)
File "<stdin>", line 1, in <module>
File "<stdin>", line 5, in read_input
File "/opt/spark-2.1.0/python/pyspark/sql/readwriter.py", line 149, in load
return self._df(self._jreader.load(path))
File "/opt/spark-2.1.0/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/spark-2.1.0/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Path does not exist: file:/data/01.csv;'


If I copy the 01.csv in the the /data directory of the zeppelin container and run the notebook (which is not a good practice because data should not be moved), I get a /data/02.csv directory created in both the zeppelin and the worker container:

worker - ls -laRt /data/02.csv/                                                                                                                                                     
02.csv/:
total 0
drwxr-xr-x. 3 root root 24 Apr 28 09:55 .
drwxr-xr-x. 3 root root 15 Apr 28 09:55 _temporary
drwxr-xr-x. 3 root root 64 Apr 28 09:55 ..

02.csv/_temporary:
total 0
drwxr-xr-x. 5 root root 106 Apr 28 09:56 0
drwxr-xr-x. 3 root root  15 Apr 28 09:55 .
drwxr-xr-x. 3 root root  24 Apr 28 09:55 ..

02.csv/_temporary/0:
total 0
drwxr-xr-x. 5 root root 106 Apr 28 09:56 .
drwxr-xr-x. 2 root root   6 Apr 28 09:56 _temporary
drwxr-xr-x. 2 root root 129 Apr 28 09:56 task_20170428095632_0005_m_000000
drwxr-xr-x. 2 root root 129 Apr 28 09:55 task_20170428095516_0002_m_000000
drwxr-xr-x. 3 root root  15 Apr 28 09:55 ..

02.csv/_temporary/0/_temporary:
total 0
drwxr-xr-x. 2 root root   6 Apr 28 09:56 .
drwxr-xr-x. 5 root root 106 Apr 28 09:56 ..

02.csv/_temporary/0/task_20170428095632_0005_m_000000:
total 52
drwxr-xr-x. 5 root root   106 Apr 28 09:56 ..
-rw-r--r--. 1 root root   376 Apr 28 09:56 .part-00000-e39ebc76-5343-407e-b42e-c33e69b8fd1a.csv.crc
-rw-r--r--. 1 root root 46605 Apr 28 09:56 part-00000-e39ebc76-5343-407e-b42e-c33e69b8fd1a.csv
drwxr-xr-x. 2 root root   129 Apr 28 09:56 .

02.csv/_temporary/0/task_20170428095516_0002_m_000000:
total 52
drwxr-xr-x. 5 root root   106 Apr 28 09:56 ..
-rw-r--r--. 1 root root   376 Apr 28 09:55 .part-00000-c2ac5299-26f6-4b23-a74b-b3dc96464271.csv.crc
-rw-r--r--. 1 root root 46605 Apr 28 09:55 part-00000-c2ac5299-26f6-4b23-a74b-b3dc96464271.csv


zeppelin - ls -laRt 02.csv/                                                                                                                                                                                        
02.csv/:
total 12
drwxr-sr-x    2 root     10000700      4096 Apr 28 09:56 .
-rw-r--r--    1 root     10000700         8 Apr 28 09:56 ._SUCCESS.crc
-rw-r--r--    1 root     10000700         0 Apr 28 09:56 _SUCCESS
drwxrwsr-x    5 root     10000700      4096 Apr 28 09:56 ..


What's wrong?

Thanks
Sofiane