-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dataset creation ERROR DatasetCoordinatorActor: #152
Comments
when we are using bulk_write mode as true we are getting null point exception while turning it off resolves it. for reference the rows_key column used is not unique but that's the requirement and we need to ingest all the records without overwriting them which happens when we set bulk_write as false. |
Ah, ok. May I ask what is your overall application and use case? You could try using for example a timestamp as a unique row key. Spark also has an auto incrementing function, but the last time I tried it it didn't work.
…On May 22 2018, at 2:58 am, shiwanshujalan ***@***.***> wrote:
when we are using bulk_write mode as true we are getting null point exception while turning it off resolves it. for reference the rows_key column used is not unique but that's the requirement and we need to ingest all the records without overwriting them which happens when we set bulk_write as false.
—
You are receiving this because you are subscribed to this thread.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA327GlREY-_oSj7v0efzqArxaOVwH9ks5t0-FagaJpZM4TOM-d).
|
Here is my use case. We are trying to insert 18 million records in filoDB using parquet files. we are converting the non supported datatypes using below command. after conversion the schema looks like below. Here is the distribution for all the column values. +-----+-------+--------+----------+---------+----------+--------+--------+----------+------+------+-------+-------+-------+-------+------+------------+-----------+-------+-------+-------------+------------+---------+--------+---------+---------+----------+---------+-----------+-----------+------------+------+-------+-----+-------+----------+----------+--------------------------------+----------------+--------+ From above distribution you can see that we have only one column("nextHopIp") which has most number of unique keys (near to 18 million). val dfResult1 = dfResult.sort($"protocolId") wirte command fails with NULLPOINTER exceptions. Earlier we used below keys partition key -> protocolId(total distinct count = 4) |
@shiwanshujalan the fix is simple, I'll see if I can get something for you to try. |
Hi, Apart from the above mentioned issue i have somehow ingested 18 million records in FiloDB (using small chunks) but the space occupied is very high(around 1.5G) as compared to parquet file used(900 Mb). Do you have any suggestion on how to reduce the space in FiloDB. Once this data is inserted we will be donig the adhoc aggregation on this data hence we would also appreciate if you could help us with the Data modelling. Thanks. |
Hi,
Timeline: most likely this weekend.
Size: chunk size is heavily dependent on your data model. We can go into it a little bit. It can be optimized, but even at optimal sizes it is likely to be not quite as small as Parquet. How data is compressed in Cassandra also makes a big difference. Using the highest setting would help as well.
Next version: in the fall, but it will be very different focus and premise. There is a branch you can look at for a preview.
I'm curious, how you found FiloDB and what you are comparing this against.
…On May 24 2018, at 6:38 am, shiwanshujalan ***@***.***> wrote:
Hi,
I will be much appreciated if we can get some timeline for the fix as this is a complete blocked for our POC.
Currently we have around 3 POCs which are centred around FiloDB.
Also can you tell us when the next version will be released.
Apart from the above mentioned issue i have somehow ingested 18 million records in FiloDB (using small chunks) but the space occupied is very high(around 1.5G) as compared to parquet file used(900 Mb). Do you have any suggestion on how to reduce the space in FiloDB.
Once this data is inserted we will be donig the adhoc aggregation on this data hence we would also appreciate if you could help us with the Data modelling.
Thanks.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA329IZ5se8_7SgLfLsCfnjoTESPg37ks5t1rfTgaJpZM4TOM-d).
|
Hi, ALso does the fix which you are working will resolve bulk write issue? Let me explain you the entire use case. Currently we are ingesting around 18 million records in the form of parquet files and apart from that we are running around 32 aggregations on that data with the time interval of 1 mins , 15 mins, 30 mins ,1 hour and 6 hrs and 24 hrs.
So in typical use case we will be having continuous stream of data comming into the system (roughly around 18 million records/ min) and we will be ingesting that data into FiloDB. i am attaching the aggregations file for your reference. in the attached Aggragation file pipe seperated columns are the grouping keys on the data and we will be carrying out 4 aggregations i.e. sum, min, max, count on the entire raw data, all the 4 aggregations will be acted on 4 different columns hence in all for each record in the attached aggregation file we are applying 16 computations (4 computations on each column). Our prob is as the data grows in parquet files performance starts degrading. Hence we are looking for a faster retrieval time from DB where FIloDB comes into picture. Few queries which i want to ask are
I am also attaching snapshot of our data for your reference. Thanks. |
@PallaviPersistent I just pushed a new version of filo-scala 0.3.7. So try modifying the dependency in build.sbt for filo-scala to 0.3.7 and see if that solves the problem - it should fix the NPE at least. |
Hi, Thanks for providing fix for NPE in quick time. I have tested the fix against 18 million records, unfortunately it is still failing with NPE intermitently. 18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job Driver stacktrace: Also when i take a look at memory consumption it is taking lot of memory for computation, is it normal behavior with FiloDB? I tried to use the bulk write option to check whether it appends the duplicate records in FiloDB but it doesn't seems to be working for us. Also currently its taking around 7 mins to ingest 18 million records into FiloDB, can you provide some suggestions around this as our expectation are around a min for ingestion. Appreciate your prompt help on this. Thanks. |
Hi, you are sure the error trace is from the new version of filo-scala right? I'm not quite sure how an NPE is possible from that line number.
Bulk write option - this might result in duplicates yes. What do you mean "it is not working", are you referring to the NPE?
7 min and memory - it's probably not good idea to measure performance when there are NPEs. When there is an error the whole ingestion stack gets restarted and Spark tries to restart the task and reingest multiple times.
Memory usage - which stat are you measuring? There is a static amount of memory always used because of the write buffers, and dynamic garbage which a lot probably gets generated since in order to do columnar writes we need to buffer enough data to turn into chunks.
Data model - I'll have more of a look later but you essentially have at least two different dimensions. If time is the primary dimension you care about you could put time into the partition key which would reduce the partitons you need to read, and if you need to select time within a partition you could put time as part of the row key as well which would result in being able to range select by time within a partition. Note that the selection happens whenyou issue the SQL statement or equivalently the df.where(....) condition, if that condition is on the partition and/or row keys, then those get pushed down to the FiloDB datasource, so that not all rows are read. This is key to read performance.
…On May 29 2018, at 5:58 am, shiwanshujalan ***@***.***> wrote:
Hi,
Thanks for providing fix for NPE in quick time. I have tested the fix against 18 million records, unfortunately it is still failing with NPE intermitently.
PFB the error logs.
scala> dfResult1.write.format("filodb.spark").option("dataset", "Netflow_protocol_3").option("row_keys", "exportMs,nextHopIp").option("partition_keys", "inIfId").option("filodb.reprojector.bulk-write-mode", "true").mode(SaveMode.Append).save()
[INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension
18/05/29 18:10:42 WARN Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 4:===========> (5 + 12) / 24]18/05/29 18:12:56 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.Netflow_protocol_3/0)
java.lang.NullPointerException
at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46)
at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314)
at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304)
at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59)
at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 ERROR OneForOneStrategy:
java.lang.NullPointerException
at org.velvia.filo.vectors.DictUTF8Vector$.shouldMakeDict(DictUTF8Vector.scala:46)
at org.velvia.filo.vectors.UTF8PtrAppendable.optimizedVector(UTF8Vector.scala:314)
at org.velvia.filo.vectors.UTF8PtrAppendable.suboptimize(UTF8Vector.scala:304)
at org.velvia.filo.vectors.ObjectVector.optimize(ObjectVector.scala:59)
at org.velvia.filo.BinaryVectorBuilder.toFiloBuffer(BinaryVector.scala:341)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at org.velvia.filo.RowToVectorBuilder$$anonfun$2.apply(RowToVectorBuilder.scala:86)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.velvia.filo.RowToVectorBuilder.convertToBytes(RowToVectorBuilder.scala:86)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:60)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:194)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-Netflow_protocol_3-0#432767698] has terminated! Ingestion for (filodb.Netflow_protocol_3,0) will stop.
18/05/29 18:12:56 ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 1034)
java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job
18/05/29 18:12:56 WARN TaskSetManager: Lost task 17.0 in stage 4.0 (TID 1048, localhost): TaskKilled (killed intentionally)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 4.0 failed 1 times, most recent failure: Lost task 3.0 in stage 4.0 (TID 1034, localhost): java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1897)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1911)
at org.apache.spark.rdd.RDD.count(RDD.scala:1115)
at filodb.spark.FiloContext$.insertIntoFilo$extension(FiloContext.scala:170)
at filodb.spark.FiloContext$.saveAsFilo$extension(FiloContext.scala:119)
at filodb.spark.DefaultSource.createRelation(DefaultSource.scala:63)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:429)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:211)
... 48 elided
Caused by: java.lang.RuntimeException: Ingestion actors shut down from ref Actor[akka://filo-spark/user/coordinator#48637466], check error logs
at filodb.spark.package$.ingestRddRows(package.scala:105)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:167)
at filodb.spark.FiloContext$$anonfun$insertIntoFilo$extension$1.apply(FiloContext.scala:162)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:801)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Also when i take a look at memory consumption it is taking lot of memory for computation, is it normal behavior with FiloDB?
I tried to use the bulk write option to check whether it appends the duplicate records in FiloDB but it doesn't seems to be working for us.
Can you also provide a solution for this as it will be very useful for us.
Also currently its taking around 7 mins to ingest 18 million records into FiloDB, can you provide some suggestions around this as our expectation are around a min for ingestion.
Appreciate your prompt help on this.
Thanks.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32_hFcZ2xSxfG1naNj3S-BW5uInGNks5t3UXZgaJpZM4TOM-d).
|
Hi, For Bulk write option when we are ingesting duplicate data it is still overwriting the existing primary key records hence this option in not working in our case. The time taken to ingest 18 million is comming around 7 mins, this time is for the round when there were no NPE and entire 18 million records got ingested in one shot. On Memory front what we are observing is when we are ingesting data thorugh spark shell the first time 18 million records gets ingested but on the same shell if we again try to ingest another 18 million records(all records are duplicate) , it gives us JAVA RUNTIME MEMORY errors . When we exit the spark shell and enter it again and then try to load the data it works fine. As part of out POC we will be ingesting around 2.4 million reccords/min in FIloDB for continuos 2 days but since we are getting NPE we are stuck with Ingestion. Can you suggest on the above points and also look at the NPE. |
Hi, PFB the snapshot of the jar imported during initialization. nfo] Including: logback-core-1.0.7.jar As you can see the lates filo-scala jar is created. But i am getting NPE during ingestion of 18 million records. I am attaching the entire run logs for your reference. From it i can confirm that the fix is not working as expected. Could you please check if somthing is missing from my end from the logs. Also can we connect during your working hours so that i can showcase the entire run and if something needs to tried and tested we can do it right away as it will really expedite our work. Thanks. |
Hmmm. I'm sorry to hear the fix did not work.
Connecting during working hours is probably not easy. Which time zone are you located in?
I think the easiest is if there is any way to get some anonymized, smallest subset of data that can reproduce the error. However, let me be honest - the core problem is that we are no longer working on the version that is in master, we are focused on a newer version with very different internals. Thus what I would do is try to apply the data to our newer version and see what works and breaks.
…On Jun 4 2018, at 1:32 am, shiwanshujalan ***@***.***> wrote:
Hi,
I have rebuild the Filo jar using 0.3.7 version in build.sbt.
PFB the snapshot of the jar imported during initialization.
nfo] Including: logback-core-1.0.7.jar
[info] Including: jnr-ffi-2.1.6.jar
[info] Including: jffi-1.2.16-native.jar
[info] Including: filo-scala_2.11-0.3.7.jar
[info] Including: boon-0.33.jar
[info] Including: akka-actor_2.11-2.3.15.jar
As you can see the lates filo-scala jar is created.
But i am getting NPE during ingestion of 18 million records.
I am attaching the entire run logs for your reference.
filodb_run.txt (https://github.com/filodb/FiloDB/files/2067408/filodb_run.txt)
From it i can confirm that the fix is not working as expected. Could you please check if somthing is missing from my end from the logs.
Also can we connect during your working hours so that i can showcase the entire run and if something needs to tried and tested we can do it right away as it will really expedite our work.
Thanks.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32w7ErhbgKtRxKGAD_V5ytNPf3Kyxks5t5PCQgaJpZM4TOM-d).
|
We can try doing that by applying filter on the records and writing the same but the issue is intermittent. Is the new version available on git so that we can use it for our testing. |
Hi, We are in india Time Zone. It will be really helpful if we could connect for one session, We will be able to present the entire scenario since this issue is intermittent and we cannot provide the entire data set. Thanks. |
We could potentially connect at around 9:45am or 10am PST tomorrow (Wed June 6th)... would that work?
I'm honestly not sure how helpful I can be.
…On Jun 4 2018, at 11:35 pm, shiwanshujalan ***@***.***> wrote:
Hi,
We are in india Time Zone. It will be really helpful if we could connect for one session, We will be able to present the entire scenario since this issue is intermittent and we cannot provide the entire data set.
Thanks.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA325KeH0F-_6BttafQq3pCp3UjBln0ks5t5iaggaJpZM4TOM-d).
|
9:45 am pst should be fine with us..Thanks for providing time..shall i send you webex link for tomorrow meet? |
Sure thing, go ahead and send a WebEx.
…On Jun 5 2018, at 11:44 am, shiwanshujalan ***@***.***> wrote:
9:45 am pst should be fine with us..Thanks for providing time..shall i send you webex link for tomorrow meet?
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32y7nFKy9_rz52pmFDhJ5WbihK4wpks5t5tF6gaJpZM4TOM-d).
|
Hi, Let me know in case you have any issues while joining meeting. |
Hi I'm not quite sure if 9:30pm IST is same as 9:45am PST, hope it is :)
…On Jun 5 2018, at 10:48 pm, shiwanshujalan ***@***.***> wrote:
Hi,
PFB the meeting invite.
https://persistent.webex.com/persistent/j.php?MTID=m62e37f692b46aa3c4d713281507ec0fa
Password: Y8iHfTZP
I have scheduled the meeting for 9:30 PM IST.
Let me know in case you have any issues while joining meeting.
Thanks.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA323ekbGZqUaPNl5A6u_SdVP7yUTTBks5t520TgaJpZM4TOM-d).
|
Hi, Our 9:30 PM IST is 10 AM PST. |
Hi, We are on the WEBex. Let me know in case you are facing any trouble in joining WEBex. |
I am trying to join. Wifi here is very slow.
…On Jun 6 2018, at 9:20 am, shiwanshujalan ***@***.***> wrote:
Hi,
We are on the WEBex. Let me know in case you are facing any trouble in joining WEBex.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA329bqsJT-7nj4RiERmTtZMfN553FZks5t6AFrgaJpZM4TOM-d).
|
Hi Evan, Thanks for your time on the last webex call. As suggested, we tried to perform the ingestion by doing the below So we suspect the issue to be with data type String. We have few columns which are originally String and some which are originally Binary, Byte and have been converted to String. We tried ingesting columns originally String and received NPE. Snapshot of the column values Also we tried to ingest columns converted to String and it again resulted in NPE. Snapshot of the column values Thanks, |
Hmm, weird. I'll try to run a unit test with these exact values and see what happens.
…On Jun 10 2018, at 11:54 pm, PallaviPersistent ***@***.***> wrote:
Hi Evan,
Thanks for your time on the last webex call. As suggested, we tried to perform the ingestion by doing the below
-> Removing Columns having NULL values
We still are getting NPE.
-> We tried removing string columns and then writing the data and we could write 100 million + rows successfully.
So we suspect the issue to be with data type String.
We have few columns which are originally String and some which are originally Binary, Byte and have been converted to String. We tried ingesting columns originally String and received NPE. Snapshot of the column values
ipv6:67
ipv6:118
ipv6:4703
ipv6:80
ipv6:443
ipv6:21
ipv6:43
ipv6:80
ipv6:1039
ipv6:80
ipv6:3125
ipv6:53
ipv6:5804
ipv6:22
ipv6:22
ipv6:21
ipv6:5884
ipv6:5922
ipv6:1308
ipv6:443
ipv6:68
ipv6:194
ipv6:118
ipv6:118
ipv6:53
ipv6:20
ipv6:67
ipv6:194
ipv6:445
ipv6:443
ipv6:43
ipv6:119
ipv6:118
ipv6:20
ipv6:21
ipv6:4560
ipv6:53
ipv6:80
Also we tried to ingest columns converted to String and it again resulted in NPE. Snapshot of the column values
53.218.50.27
96.35.2.107
186.127.105.182
141.195.32.146
183.85.199.206
84.90.252.182
219.123.171.140
194.65.227.130
115.233.179.117
128.50.50.250
Thanks,
Pallavi
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA321akTPl0X9zoNEhcSZMb5kdEdtU5ks5t7hQSgaJpZM4TOM-d).
|
Hi Evan, Any luck with the unit test on the values we provided. Regards, |
Hi,
I added a test just now and it passed on the ipv6 data. :(
I have someone else who uses an older version in production and has seem similar issues. They say they see no issues with smaller or larger amounts of data when only one partition/thread is ingesting, but issues with multiple ones, and also only issues with larger amounts of data. Are you seeing the same thing?
I am in the middle of rewriting the storage format. It might help when the results are done.
…On Jul 11 2018, at 1:27 am, PallaviPersistent ***@***.***> wrote:
Hi Evan,
Any luck with the unit test on the values we provided.
Regards,
Pallavi
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32xn0AyPmiP0jNOhrwlj-0P4pBAloks5uFbbigaJpZM4TOM-d).
|
Would you mind sending the NPE again one more time, the entire stack trace? Thanks.
…On Jul 12 2018, at 8:21 am, Evan Chan ***@***.***> wrote:
Hi,
I added a test just now and it passed on the ipv6 data. :(
I have someone else who uses an older version in production and has seem similar issues. They say they see no issues with smaller or larger amounts of data when only one partition/thread is ingesting, but issues with multiple ones, and also only issues with larger amounts of data. Are you seeing the same thing?
I am in the middle of rewriting the storage format. It might help when the results are done.
On Jul 11 2018, at 1:27 am, PallaviPersistent ***@***.***> wrote:
>
> Hi Evan,
> Any luck with the unit test on the values we provided.
> Regards,
> Pallavi
>
>
> —
> You are receiving this because you commented.
> Reply to this email directly, view it on GitHub (#152 (comment)), or mute the thread (https://github.com/notifications/unsubscribe-auth/ABA32xn0AyPmiP0jNOhrwlj-0P4pBAloks5uFbbigaJpZM4TOM-d).
>
>
|
Hi Evan, Yes the issue mostly appears when we try to insert large amounts of data. Also we see this error if we try inserting small amount of data in the same dataset. PFB the stack trace for NPE [INFO] [05/29/2018 18:10:33.288] [main] [StatsDExtension(akka://kamon)] Starting the Kamon(StatsD) extension 18/05/29 18:12:56 ERROR TaskSetManager: Task 3 in stage 4.0 failed 1 times; aborting job Driver stacktrace: |
Branch, version, commit
OS and Environment
Red Hat Enterprise Linux Server release 7.2 (Maipo)
JVM version
java -version
java version "1.8.0_161"
Java(TM) SE Runtime Environment (build 1.8.0_161-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.161-b12, mixed mode)
Scala version
Scala version 2.11.8
Kafka and Cassandra versions and setup
apache-cassandra-3.11.2
Spark version if used
2.0.0
Deployed mode
(client/cluster on Spark Standalone/YARN/Mesos/EMR or default)
Spark Standalone
Actual (wrong) behavior
parDFnew.write.format("filodb.spark").
option("dataset", "parDFNF").
option("row_keys", "appId").
option("partition_keys", "exportMs").
mode(SaveMode.Overwrite).save()
[Stage 2:> (0 + 2) / 2]18/04/10 02:40:47 ERROR DatasetCoordinatorActor: Error in reprojection task (filodb.parDFNF/0)
java.lang.NullPointerException
at org.velvia.filo.vectors.UTF8PtrAppendable.addData(UTF8Vector.scala:270)
at org.velvia.filo.BinaryAppendableVector$class.addVector(BinaryVector.scala:154)
at org.velvia.filo.vectors.ObjectVector.addVector(ObjectVector.scala:16)
at org.velvia.filo.GrowableVector.addData(BinaryVector.scala:251)
at org.velvia.filo.BinaryVectorBuilder.addData(BinaryVector.scala:308)
at org.velvia.filo.VectorBuilderBase$class.add(VectorBuilder.scala:36)
at org.velvia.filo.BinaryVectorBuilder.add(BinaryVector.scala:303)
at org.velvia.filo.RowToVectorBuilder.addRow(RowToVectorBuilder.scala:70)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:53)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:55)
at filodb.core.store.ChunkSet$.withSkips(ChunkSetInfo.scala:70)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:193)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/04/10 02:40:47 ERROR OneForOneStrategy:
java.lang.NullPointerException
at org.velvia.filo.vectors.UTF8PtrAppendable.addData(UTF8Vector.scala:270)
at org.velvia.filo.BinaryAppendableVector$class.addVector(BinaryVector.scala:154)
at org.velvia.filo.vectors.ObjectVector.addVector(ObjectVector.scala:16)
at org.velvia.filo.GrowableVector.addData(BinaryVector.scala:251)
at org.velvia.filo.BinaryVectorBuilder.addData(BinaryVector.scala:308)
at org.velvia.filo.VectorBuilderBase$class.add(VectorBuilder.scala:36)
at org.velvia.filo.BinaryVectorBuilder.add(BinaryVector.scala:303)
at org.velvia.filo.RowToVectorBuilder.addRow(RowToVectorBuilder.scala:70)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:53)
at filodb.core.store.ChunkSet$$anonfun$2.apply(ChunkSetInfo.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at filodb.core.store.ChunkSet$.apply(ChunkSetInfo.scala:55)
at filodb.core.store.ChunkSet$.withSkips(ChunkSetInfo.scala:70)
at filodb.core.store.ChunkSetSegment.addChunkSet(Segment.scala:193)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply$mcV$sp(Reprojector.scala:94)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2$$anonfun$apply$1.apply(Reprojector.scala:93)
at kamon.trace.TraceContext$class.withNewSegment(TraceContext.scala:53)
at kamon.trace.MetricsOnlyContext.withNewSegment(MetricsOnlyContext.scala:28)
at filodb.core.Perftools$.subtrace(Perftools.scala:26)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:92)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1$$anonfun$apply$2.apply(Reprojector.scala:79)
at kamon.trace.Tracer$$anonfun$withNewContext$1.apply(TracerModule.scala:62)
at kamon.trace.Tracer$.withContext(TracerModule.scala:53)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:61)
at kamon.trace.Tracer$.withNewContext(TracerModule.scala:77)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:79)
at filodb.core.reprojector.DefaultReprojector$$anonfun$toSegments$1.apply(Reprojector.scala:78)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at filodb.core.reprojector.DefaultReprojector.toSegments(Reprojector.scala:78)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:118)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1$$anonfun$apply$3.apply(Reprojector.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at filodb.core.reprojector.DefaultReprojector$$anonfun$reproject$1.apply(Reprojector.scala:117)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/04/10 02:40:47 WARN NodeCoordinatorActor: Actor Actor[akka://filo-spark/user/coordinator/ds-coord-parDFNF-0#-1168010970] has terminated! Ingestion for (filodb.parDFNF,0) will stop.
18/04/10 02:41:07 WARN RddRowSourceActor: ==> (filodb.parDFNF_0_0_1) No Acks received for last 20 seconds
18/04/10 02:41:07 WARN RddRowSourceActor: ==> (filodb.parDFNF_0_1_0) No Acks received for last 20 seconds
Steps to reproduce
scala> val files=Seq("/root/FiloDB/FiloDB/parquetfile/fragment1522917434336000000.dat","/root/FiloDB/FiloDB/parquetfile/fragment1522917494312000000.dat")
files: Seq[String] = List(/root/FiloDB/FiloDB/parquetfile/fragment1522917434336000000.dat, /root/FiloDB/FiloDB/parquetfile/fragment1522917494312000000.dat)
scala> val parDF=sqlContext.read.parquet(files:_*)
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-core-3.2.10.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-core-3.2.10.jar."
18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus.store.rdbms" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-rdbms-3.2.9.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-rdbms-3.2.9.jar."
18/04/10 02:39:20 WARN General: Plugin (Bundle) "org.datanucleus.api.jdo" is already registered. Ensure you dont have multiple JAR versions of the same plugin in the classpath. The URL "file:/root/FiloDB/spark-2.0.0-bin-hadoop2.7/jars/datanucleus-api-jdo-3.2.6.jar" is already registered, and you are trying to register an identical plugin located at URL "file:/root/FiloDB/spark/jars/datanucleus-api-jdo-3.2.6.jar."
parDF: org.apache.spark.sql.DataFrame = [epoch: bigint, rowNum: bigint ... 38 more fields]
scala> val parDFnew=parDF.withColumn("exporterIp", 'exporterIp.cast("String")).withColumn("srcIp", 'srcIp.cast("String")).withColumn("dstIp", 'dstIp.cast("String")).withColumn("nextHopIp", 'nextHopIp.cast("String")).withColumn("bgpNextHopIp", 'bgpNextHopIp.cast("String")).withColumn("appId", 'appId.cast("String")).withColumn("policyQosClassificationHierarchy", 'policyQosClassificationHierarchy.cast("String")).withColumn("protocolId", 'protocolId.cast("Int")).withColumn("srcTos", 'srcTos.cast("Int")).withColumn("dstTos", 'dstTos.cast("Int")).withColumn("srcMask", 'srcMask.cast("Int")).withColumn("dstMask", 'dstMask.cast("Int")).withColumn("direction", 'direction.cast("String")).select('epoch, 'srcIp, 'dstIp, 'exporterIp, 'rowNum, 'exportMs, 'pktSeqNum, 'flowSeqNum, 'protocolId, 'srcTos, 'dstTos, 'srcMask, 'dstMask, 'tcpBits, 'srcPort, 'inIfId, 'inIfEntityId, 'inIfEnabled, 'dstPort, 'outIfId, 'outIfEntityId, 'outIfEnabled, 'direction, 'inOctets, 'outOctets, 'inPackets, 'outPackets, 'nextHopIp, 'bgpSrcAsNum, 'bgpDstAsNum, 'bgpNextHopIp, 'endMs, 'startMs, 'appId, 'appName, 'srcIpGroup, 'dstIpGroup, 'policyQosClassificationHierarchy, 'policyQosQueueId, 'workerId)
parDFnew: org.apache.spark.sql.DataFrame = [epoch: bigint, srcIp: string ... 38 more fields]
parDFnew.write.format("filodb.spark").
option("dataset", "parDFNF").
option("row_keys", "appId").
option("partition_keys", "exportMs").
mode(SaveMode.Overwrite).save()
Logs
or as attached file (see below)
Unused parts of this template should be removed (including this line).
The text was updated successfully, but these errors were encountered: