6

Hy,

I have a dataframe in a sparkcontext with 400k rows and 3 columns. Driver has 143.5 of Storage Memory

16/03/21 19:52:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55613 with 143.5 GB RAM, BlockManagerId(driver, localhost, 55613)
16/03/21 19:52:35 INFO BlockManagerMaster: Registered BlockManager

I want returns the contents of this DataFrame as Pandas

I did

df_users =  UserDistinct.toPandas()

but I have this error

16/03/21 20:01:08 ERROR Executor: Exception in task 7.0 in stage 6.0 (TID 439)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/21 20:01:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

How is this possible if I have 143.5 GB RAM? What can I do?

EDIT

My spark-defaults

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
#spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              200g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

my spark context

conf = SparkConf()

conf.set("spark.app.name","teste")
conf.set("spark.driver.maxResultSize","0")

sc = SparkContext(conf=conf)

enter image description here

EDIT

All steps

#import data for a pandas dataframe

df_ora = pd.read_sql(query, con=connection)

#change for Spark dataframe and some transformation

sqlContext = SQLContext(sc)
df_oraAS = sqlContext.createDataFrame(df_ora)
df_oraAS.registerTempTable("df_oraAS")

#new column
df_with_C = df_oraAS.withColumn("BUY", lit(1))

indexer = StringIndexer(inputCol="ENT_EMAIL", outputCol="user")

#index because I want use recommendation system
user_PK = indexer.fit(df_with_C).transform(df_with_C)

#distinct
UserDistinct = user_PK.dropDuplicates(['ENT_EMAIL' ,'user'])

#data in Pandas dataframe
df_users =  UserDistinct.toPandas()

New Edit

change for Driver 60g and Executor 60g

Error:

16/03/22 09:53:40 INFO MemoryStore: Block taskresult_446 stored as bytes in memory (estimated size 1978.5 MB, free 22.5 GB)
16/03/22 09:53:40 INFO BlockManagerInfo: Added taskresult_446 in memory on localhost:56281 (size: 1978.5 MB, free: 20.4 GB)
16/03/22 09:53:40 INFO Executor: Finished task 14.0 in stage 6.0 (TID 446). 2074557399 bytes result sent via BlockManager)
16/03/22 09:53:40 INFO TaskSetManager: Starting task 25.0 in stage 6.0 (TID 457, localhost, partition 25,NODE_LOCAL, 1999 bytes)
16/03/22 09:53:40 INFO Executor: Running task 25.0 in stage 6.0 (TID 457)
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:04 ERROR Executor: Exception in task 18.0 in stage 6.0 (TID 450)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:04 INFO TaskSetManager: Starting task 26.0 in stage 6.0 (TID 458, localhost, partition 26,NODE_LOCAL, 1999 bytes)
16/03/22 09:54:04 INFO Executor: Running task 26.0 in stage 6.0 (TID 458)
16/03/22 09:54:04 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:05 INFO SparkContext: Invoking stop() from shutdown hook
16/03/22 09:54:06 WARN QueuedThreadPool: 6 threads could not be stopped
16/03/22 09:54:06 INFO SparkUI: Stopped Spark web UI at http://10.10.5.105:4040
16/03/22 09:54:08 INFO DAGScheduler: ResultStage 6 (toPandas at <stdin>:1) failed in 385.120 s
16/03/22 09:54:08 INFO DAGScheduler: Job 3 failed: toPandas at <stdin>:1, took 398.921433 s
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-1
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "task-result-getter-1" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    ... 3 more
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 1378, in toPandas
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 280, in collect
    port = self._jdf.collectToPython()
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)
3
  • 1
    Do you mean the machine running the driver has 143.5GB RAM? What about your settings, what do you have in spark-defaults.conf, and spark-env.sh? Are you passing any options when launching pyspark?
    – mattinbits
    Commented Mar 21, 2016 at 20:05
  • @mattinbits, I edit my question with spark defaults and spark context.
    – Kardu
    Commented Mar 22, 2016 at 9:12
  • You could try to cache table before feeding it to indexer if it fits into the memory. This will speed up everything and avoid recreating a lot of small objects. Commented Mar 24, 2016 at 22:35

3 Answers 3

12

For some reason Spark wants to serialize some data. Apparently it does so by writing to a ByteArrayOutputStream. From the docs:

This class implements an output stream in which the data is written into a byte array. The buffer automatically grows as data is written to it. The data can be retrieved using toByteArray() and toString().

The key word here is a (one!) byte array. Java byte arrays have a maximum length of 2^31-1=2147483647 bytes = 2GB. So as soon as Spark attempts to serialize anything that's greater than 2GB, you'll get an OutOfMemoryError.

And that's exactly what happened here.

To solve this issue, file a bug report with Spark. The culprit is org.apache.spark.serializer.JavaSerializerInstance.serialize(), which assumes that nothing you ever want to serialize can be larger than 2GB in its serialized form.

1
  • I using pyspark and load data using binaryfiles('*.tar') and then flatmap(process). I did observe this error occur when dealing with tar files >3GB but totally fine with files lower than 2GB. I am assuming my process function return a giant list that probably exceed the 2GB limit Hendrik mentioned here.
    – B.Mr.W.
    Commented Apr 25, 2021 at 0:18
1

I am assuming the storage you are referring to is disk space.

What is happening is that your application is running out of RAM; not disk space.

OutOfMemoryError is covered extensively in this Stackoverflow Question

By default, only so much memory is allocated to your driver in executor. Usually around 500MB - 5GB. If you are running spark locally, you will need to adjust the driver memory.

The Spark Documentation - Memory Management details all of the parameters/options you can configure. These can be found in:

$SPARK_HOME/conf/spark-defaults.conf

Try adjusting your driver-memory in that file.

However, if you are running your application using spark-submit, you can pass the driver-memory as an option like so:

spark-1.6.1/bin/spark-submit
  --class "MyClass"
  --driver-memory 12g
  --master local[*] 
  target/scala-2.10/simple-project_2.10-1.0.jar 
5
  • Brian Vanover, it's is RAM... I edit my question with spark context and a picture with executor. Thks for your help
    – Kardu
    Commented Mar 22, 2016 at 9:13
  • @Kardu How are you submitting your application? Try using spark-submit like my example above, but instead of jar, submit your .py file. This link contains a python example: spark.apache.org/docs/latest/submitting-applications.html
    – Brian
    Commented Mar 22, 2016 at 14:09
  • I tried Brian...In my spark-defaults.conf I try 20, 100, 150g. Many combination between drived and executor error and.... Same error... I try to make UserDistinct.collect() and I have the same error too.. I don't understand why I need so many RAM in so smaller dataset...
    – Kardu
    Commented Mar 22, 2016 at 17:21
  • collect is a risky function to call, because it essentially brings your entire dataset into memory. I had the same problem as you before, and I fixed it by modifying my options when executing spark-submit
    – Brian
    Commented Mar 22, 2016 at 18:19
  • When I call .toPandas() is the same idea? I call all dataset to my drive memory right? I really don't understand why need so many memory for a dataframe with 400 rows and 3 column but ok... Thanks for your help @Brian Vanover
    – Kardu
    Commented Mar 22, 2016 at 18:26
-2

I have the similar situation. The best solution I came up with is to save spark dataframe to parquet file and then read this file using fastparquet

Also a good idea would be to switch from pandas dataframes to dask dataframes because it can keep data not just in memory like pandas but can keep on disk.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.