Skip to content

Commit

Permalink
replace all get[T] with getOrElse[T] in CIMRDD subclasses
Browse files Browse the repository at this point in the history
  • Loading branch information
derrickoswald committed Aug 27, 2018
1 parent e6eed02 commit c6f6f42
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 89 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/ch/ninecode/cim/CIMAbout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ with
def do_about (): RDD[Element] =
{
// get the elements RDD
val elements = get[Element]("Elements")
val elements = getOrElse[Element]("Elements")

// get the elements flagged as "rdf:about"
val about_elements = elements.filter (_.about).groupBy (_.id)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ch/ninecode/cim/CIMDeDup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY
log.info ("eliminating duplicates")

// get the elements RDD
val elements = get[Element]("Elements")
val elements = getOrElse[Element]("Elements")

// deduplicate
val new_elements = elements.keyBy (_.id).groupByKey ().values.map (deduplicate)
Expand Down
22 changes: 12 additions & 10 deletions src/main/scala/ch/ninecode/cim/CIMEdges.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ class Extremum (val id_loc: String, var min_index: Int, var x1 : String, var y1
case class PostEdge (id_seq_1: String, id_seq_2: String, id_equ: String, clazz: String, name: String, aliasName: String, description: String, container: String, length: Double, voltage: String, normalOpen: Boolean, ratedCurrent: Double, power: Double, installationDate: String, receivedDate: String, status: String, x1: String, y1: String, x2: String, y2: String) extends Serializable
case class TopoEdge (id_seq_1: String, id_island_1: String, id_seq_2: String, id_island_2: String, id_equ: String, clazz: String, name: String, aliasName: String, description: String, container: String, length: Double, voltage: String, normalOpen: Boolean, ratedCurrent: Double, power: Double, installationDate: String, receivedDate: String, status: String, x1: String, y1: String, x2: String, y2: String) extends Serializable

class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with Serializable
class CIMEdges (spark: SparkSession, storage: StorageLevel)
extends CIMRDD
with Serializable
{
implicit val session: SparkSession = spark
implicit val level: StorageLevel = storage
implicit val log: Logger = LoggerFactory.getLogger (getClass)

def get_extremum (): RDD[Extremum] =
{
val points = get[PositionPoint]
val points = getOrElse[PositionPoint]
val point_seq_op = (x: Extremum, p: PositionPoint)
{
if (null == x)
Expand Down Expand Up @@ -367,10 +369,10 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with
log.info ("making Edges RDD")

// get the elements RDD
val elements = get[Element]("Elements")
val elements = getOrElse[Element]("Elements")

// get the terminals
val terminals = get[Terminal]
val terminals = getOrElse[Terminal]

// first get the terminals keyed by equipment
val terms = terminals.groupBy (_.ConductingEquipment)
Expand All @@ -388,8 +390,8 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with
val located_edges = preedges2.keyBy (_.location).leftOuterJoin (extremum.keyBy (_.id_loc)).values

// join assets & lifecycles with edges
val asset = get[Asset]
val lifecycledate = get[LifecycleDate]
val asset = getOrElse[Asset]
val lifecycledate = getOrElse[LifecycleDate]
val assets = asset.keyBy (_.lifecycle).leftOuterJoin (lifecycledate.keyBy (_.id)).values
def psr (arg: (Asset, Option[LifecycleDate])) =
{
Expand All @@ -399,14 +401,14 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with
else
List[(String, (Asset, Option[LifecycleDate]))]()
}
val asseted_edges = located_edges.keyBy (_._1.id_equ).leftOuterJoin (assets.flatMap (psr)).map ((x) => (x._2._1._1, x._2._1._2, x._2._2))
val asseted_edges = located_edges.keyBy (_._1.id_equ).leftOuterJoin (assets.flatMap (psr)).map (x => (x._2._1._1, x._2._1._2, x._2._2))

// join with topological nodes if requested
if (topological_nodes)
{
val topologicals = get[TopologicalNode].keyBy (_.id)
val topo1 = asseted_edges.keyBy (_._1.cn_1).leftOuterJoin (topologicals).values.map ((x) => (x._1._1, x._1._2, x._1._3, x._2))
val topo2 = topo1.keyBy (_._1.cn_2).leftOuterJoin (topologicals).values.map ((x) => (x._1._1, x._1._2, x._1._3, x._1._4, x._2))
val topologicals = getOrElse[TopologicalNode].keyBy (_.id)
val topo1 = asseted_edges.keyBy (_._1.cn_1).leftOuterJoin (topologicals).values.map (x => (x._1._1, x._1._2, x._1._3, x._2))
val topo2 = topo1.keyBy (_._1.cn_2).leftOuterJoin (topologicals).values.map (x => (x._1._1, x._1._2, x._1._3, x._1._4, x._2))
val edges = topo2.map (topo_edge_op)

// persist and expose it
Expand Down
Loading

0 comments on commit c6f6f42

Please sign in to comment.