diff --git a/src/main/scala/ch/ninecode/cim/CIMAbout.scala b/src/main/scala/ch/ninecode/cim/CIMAbout.scala index 565cf897c..b34fba32e 100644 --- a/src/main/scala/ch/ninecode/cim/CIMAbout.scala +++ b/src/main/scala/ch/ninecode/cim/CIMAbout.scala @@ -87,7 +87,7 @@ with def do_about (): RDD[Element] = { // get the elements RDD - val elements = get[Element]("Elements") + val elements = getOrElse[Element]("Elements") // get the elements flagged as "rdf:about" val about_elements = elements.filter (_.about).groupBy (_.id) diff --git a/src/main/scala/ch/ninecode/cim/CIMDeDup.scala b/src/main/scala/ch/ninecode/cim/CIMDeDup.scala index c554a120e..5a9585dad 100644 --- a/src/main/scala/ch/ninecode/cim/CIMDeDup.scala +++ b/src/main/scala/ch/ninecode/cim/CIMDeDup.scala @@ -95,7 +95,7 @@ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY log.info ("eliminating duplicates") // get the elements RDD - val elements = get[Element]("Elements") + val elements = getOrElse[Element]("Elements") // deduplicate val new_elements = elements.keyBy (_.id).groupByKey ().values.map (deduplicate) diff --git a/src/main/scala/ch/ninecode/cim/CIMEdges.scala b/src/main/scala/ch/ninecode/cim/CIMEdges.scala index 2d67651c6..4ddca7bfa 100644 --- a/src/main/scala/ch/ninecode/cim/CIMEdges.scala +++ b/src/main/scala/ch/ninecode/cim/CIMEdges.scala @@ -14,7 +14,9 @@ class Extremum (val id_loc: String, var min_index: Int, var x1 : String, var y1 case class PostEdge (id_seq_1: String, id_seq_2: String, id_equ: String, clazz: String, name: String, aliasName: String, description: String, container: String, length: Double, voltage: String, normalOpen: Boolean, ratedCurrent: Double, power: Double, installationDate: String, receivedDate: String, status: String, x1: String, y1: String, x2: String, y2: String) extends Serializable case class TopoEdge (id_seq_1: String, id_island_1: String, id_seq_2: String, id_island_2: String, id_equ: String, clazz: String, name: String, aliasName: String, description: String, container: String, length: Double, voltage: String, normalOpen: Boolean, ratedCurrent: Double, power: Double, installationDate: String, receivedDate: String, status: String, x1: String, y1: String, x2: String, y2: String) extends Serializable -class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with Serializable +class CIMEdges (spark: SparkSession, storage: StorageLevel) +extends CIMRDD +with Serializable { implicit val session: SparkSession = spark implicit val level: StorageLevel = storage @@ -22,7 +24,7 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with def get_extremum (): RDD[Extremum] = { - val points = get[PositionPoint] + val points = getOrElse[PositionPoint] val point_seq_op = (x: Extremum, p: PositionPoint) ⇒ { if (null == x) @@ -367,10 +369,10 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with log.info ("making Edges RDD") // get the elements RDD - val elements = get[Element]("Elements") + val elements = getOrElse[Element]("Elements") // get the terminals - val terminals = get[Terminal] + val terminals = getOrElse[Terminal] // first get the terminals keyed by equipment val terms = terminals.groupBy (_.ConductingEquipment) @@ -388,8 +390,8 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with val located_edges = preedges2.keyBy (_.location).leftOuterJoin (extremum.keyBy (_.id_loc)).values // join assets & lifecycles with edges - val asset = get[Asset] - val lifecycledate = get[LifecycleDate] + val asset = getOrElse[Asset] + val lifecycledate = getOrElse[LifecycleDate] val assets = asset.keyBy (_.lifecycle).leftOuterJoin (lifecycledate.keyBy (_.id)).values def psr (arg: (Asset, Option[LifecycleDate])) = { @@ -399,14 +401,14 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel) extends CIMRDD with else List[(String, (Asset, Option[LifecycleDate]))]() } - val asseted_edges = located_edges.keyBy (_._1.id_equ).leftOuterJoin (assets.flatMap (psr)).map ((x) => (x._2._1._1, x._2._1._2, x._2._2)) + val asseted_edges = located_edges.keyBy (_._1.id_equ).leftOuterJoin (assets.flatMap (psr)).map (x => (x._2._1._1, x._2._1._2, x._2._2)) // join with topological nodes if requested if (topological_nodes) { - val topologicals = get[TopologicalNode].keyBy (_.id) - val topo1 = asseted_edges.keyBy (_._1.cn_1).leftOuterJoin (topologicals).values.map ((x) => (x._1._1, x._1._2, x._1._3, x._2)) - val topo2 = topo1.keyBy (_._1.cn_2).leftOuterJoin (topologicals).values.map ((x) => (x._1._1, x._1._2, x._1._3, x._1._4, x._2)) + val topologicals = getOrElse[TopologicalNode].keyBy (_.id) + val topo1 = asseted_edges.keyBy (_._1.cn_1).leftOuterJoin (topologicals).values.map (x => (x._1._1, x._1._2, x._1._3, x._2)) + val topo2 = topo1.keyBy (_._1.cn_2).leftOuterJoin (topologicals).values.map (x => (x._1._1, x._1._2, x._1._3, x._1._4, x._2)) val edges = topo2.map (topo_edge_op) // persist and expose it diff --git a/src/main/scala/ch/ninecode/cim/CIMExport.scala b/src/main/scala/ch/ninecode/cim/CIMExport.scala index cb3dcc8bf..e5505d38d 100644 --- a/src/main/scala/ch/ninecode/cim/CIMExport.scala +++ b/src/main/scala/ch/ninecode/cim/CIMExport.scala @@ -144,93 +144,93 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable */ def exportIsland (island: String, filename: String): Unit = { - val allislands = get[TopologicalIsland] + val allislands = getOrElse[TopologicalIsland] val someislands = allislands.filter (_.id == island) if (someislands.isEmpty()) log.error (island + " not found") else { // get the topological elements - val some_topos = get[TopologicalNode].filter (_.TopologicalIsland == island) - val some_nodes = if (null != get[ConnectivityNode]) get[ConnectivityNode].keyBy (_.TopologicalNode).join (some_topos.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[ConnectivityNode] - val some_terminals = get[Terminal].keyBy (_.TopologicalNode).join (some_topos.keyBy (_.id)).map (_._2._1) - val equipment = get[ConductingEquipment].keyBy (_.id).join (some_terminals.keyBy (_.ConductingEquipment)).map (_._2._1).distinct - val terminals = get[Terminal].keyBy (_.ConductingEquipment).join (equipment.keyBy (_.id)).map (_._2._1) - val (nodes, topos) = if (null != get[ConnectivityNode]) + val some_topos = getOrElse[TopologicalNode].filter (_.TopologicalIsland == island) + val some_nodes = if (null != getOrElse[ConnectivityNode]) getOrElse[ConnectivityNode].keyBy (_.TopologicalNode).join (some_topos.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[ConnectivityNode] + val some_terminals = getOrElse[Terminal].keyBy (_.TopologicalNode).join (some_topos.keyBy (_.id)).map (_._2._1) + val equipment = getOrElse[ConductingEquipment].keyBy (_.id).join (some_terminals.keyBy (_.ConductingEquipment)).map (_._2._1).distinct + val terminals = getOrElse[Terminal].keyBy (_.ConductingEquipment).join (equipment.keyBy (_.id)).map (_._2._1) + val (nodes, topos) = if (null != getOrElse[ConnectivityNode]) { - val n = get[ConnectivityNode].keyBy (_.id).join (terminals.keyBy (_.ConnectivityNode)).map (_._2._1).distinct - val t = get[TopologicalNode].keyBy (_.id).join (n.keyBy (_.TopologicalNode)).map (_._2._1).distinct + val n = getOrElse[ConnectivityNode].keyBy (_.id).join (terminals.keyBy (_.ConnectivityNode)).map (_._2._1).distinct + val t = getOrElse[TopologicalNode].keyBy (_.id).join (n.keyBy (_.TopologicalNode)).map (_._2._1).distinct (n, t) } else (some_nodes, some_topos) - val islands = get[TopologicalIsland].keyBy (_.id).join (topos.keyBy (_.TopologicalIsland)).map (_._2._1).distinct - val ends = get[PowerTransformerEnd].keyBy (_.PowerTransformer).join (equipment.keyBy (_.id)).map (_._2._1) - val status = if (null != get[SvStatus]) get[SvStatus].keyBy (_.id).join (equipment.keyBy (_.SvStatus)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[SvStatus] + val islands = getOrElse[TopologicalIsland].keyBy (_.id).join (topos.keyBy (_.TopologicalIsland)).map (_._2._1).distinct + val ends = getOrElse[PowerTransformerEnd].keyBy (_.PowerTransformer).join (equipment.keyBy (_.id)).map (_._2._1) + val status = if (null != getOrElse[SvStatus]) getOrElse[SvStatus].keyBy (_.id).join (equipment.keyBy (_.SvStatus)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[SvStatus] // get other elements related to the equipment - val voltages = get[BaseVoltage].keyBy (_.id).join (equipment.keyBy (_.BaseVoltage)).map (_._2._1) - .union (get[BaseVoltage].keyBy (_.id).join (ends.keyBy (_.TransformerEnd.BaseVoltage)).map (_._2._1)).distinct - val containers = get[EquipmentContainer].keyBy (_.id).join (equipment.keyBy (_.Equipment.EquipmentContainer)).map (_._2._1).distinct - val infos = if (null != get[AssetInfo]) get[AssetInfo].keyBy (_.id).join (equipment.keyBy (_.Equipment.PowerSystemResource.AssetDatasheet)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[AssetInfo] - val locations = if (null != get[Location]) get[Location].keyBy (_.id).join (equipment.keyBy (_.Equipment.PowerSystemResource.Location)).map (_._2._1) - .union ( get[Location].keyBy (_.id).join (containers.keyBy (_.ConnectivityNodeContainer.PowerSystemResource.Location)).map (_._2._1)).distinct else spark.sparkContext.emptyRDD[Location] - val coordinates = if (null != get[CoordinateSystem]) get[CoordinateSystem].keyBy (_.id).join (locations.keyBy (_.CoordinateSystem)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[CoordinateSystem] - val points = if (null != get[PositionPoint]) get[PositionPoint].keyBy (_.Location).join (locations.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[PositionPoint] - val psrtypes = if (null != get[PSRType]) get[PSRType].keyBy (_.id).join (equipment.keyBy (_.Equipment.PowerSystemResource.PSRType)).map (_._2._1) - .union (get[PSRType].keyBy (_.id).join (containers.keyBy (_.ConnectivityNodeContainer.PowerSystemResource.PSRType)).map (_._2._1)).distinct else spark.sparkContext.emptyRDD[PSRType] - val streets = if (null != get[StreetAddress]) get[StreetAddress].keyBy (_.id).join (locations.keyBy (_.mainAddress)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[StreetAddress] - val towns = if (null != get[TownDetail]) get[TownDetail].keyBy (_.id).join (streets.keyBy (_.townDetail)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[TownDetail] - val attributes = if (null != get[UserAttribute]) get[UserAttribute].keyBy (_.name).join (equipment.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[UserAttribute] - val strings = if (null != get[StringQuantity]) get[StringQuantity].keyBy (_.id).join (attributes.keyBy (_.value)).map (_._2._1) else spark.sparkContext.emptyRDD[StringQuantity] + val voltages = getOrElse[BaseVoltage].keyBy (_.id).join (equipment.keyBy (_.BaseVoltage)).map (_._2._1) + .union (getOrElse[BaseVoltage].keyBy (_.id).join (ends.keyBy (_.TransformerEnd.BaseVoltage)).map (_._2._1)).distinct + val containers = getOrElse[EquipmentContainer].keyBy (_.id).join (equipment.keyBy (_.Equipment.EquipmentContainer)).map (_._2._1).distinct + val infos = if (null != getOrElse[AssetInfo]) getOrElse[AssetInfo].keyBy (_.id).join (equipment.keyBy (_.Equipment.PowerSystemResource.AssetDatasheet)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[AssetInfo] + val locations = if (null != getOrElse[Location]) getOrElse[Location].keyBy (_.id).join (equipment.keyBy (_.Equipment.PowerSystemResource.Location)).map (_._2._1) + .union ( getOrElse[Location].keyBy (_.id).join (containers.keyBy (_.ConnectivityNodeContainer.PowerSystemResource.Location)).map (_._2._1)).distinct else spark.sparkContext.emptyRDD[Location] + val coordinates = if (null != getOrElse[CoordinateSystem]) getOrElse[CoordinateSystem].keyBy (_.id).join (locations.keyBy (_.CoordinateSystem)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[CoordinateSystem] + val points = if (null != getOrElse[PositionPoint]) getOrElse[PositionPoint].keyBy (_.Location).join (locations.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[PositionPoint] + val psrtypes = if (null != getOrElse[PSRType]) getOrElse[PSRType].keyBy (_.id).join (equipment.keyBy (_.Equipment.PowerSystemResource.PSRType)).map (_._2._1) + .union (getOrElse[PSRType].keyBy (_.id).join (containers.keyBy (_.ConnectivityNodeContainer.PowerSystemResource.PSRType)).map (_._2._1)).distinct else spark.sparkContext.emptyRDD[PSRType] + val streets = if (null != getOrElse[StreetAddress]) getOrElse[StreetAddress].keyBy (_.id).join (locations.keyBy (_.mainAddress)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[StreetAddress] + val towns = if (null != getOrElse[TownDetail]) getOrElse[TownDetail].keyBy (_.id).join (streets.keyBy (_.townDetail)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[TownDetail] + val attributes = if (null != getOrElse[UserAttribute]) getOrElse[UserAttribute].keyBy (_.name).join (equipment.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[UserAttribute] + val strings = if (null != getOrElse[StringQuantity]) getOrElse[StringQuantity].keyBy (_.id).join (attributes.keyBy (_.value)).map (_._2._1) else spark.sparkContext.emptyRDD[StringQuantity] // get implementation specific related elements // ServiceLocations - val specific = if (null != get[UserAttribute]) + val specific = if (null != getOrElse[UserAttribute]) { - val mst_has_s = get[StringQuantity].keyBy (_.value).join (equipment.keyBy (_.id)).map (_._2._1) - val mst_has = get[UserAttribute].keyBy (_.value).join (mst_has_s.keyBy (_.id)).map (_._2._1) - val mst = get[ServiceLocation].keyBy (_.id).join (mst_has.keyBy (_.name)).map (_._2._1) - val mst_nam = get[Name].keyBy (_.IdentifiedObject).join (mst.keyBy (_.id)).map (_._2._1) - val name_type = get[NameType].keyBy (_.id).join (mst_nam.keyBy (_.NameType)).map (_._2._1).distinct - val name_auth = get[NameTypeAuthority].keyBy (_.id).join (name_type.keyBy (_.NameTypeAuthority)).map (_._2._1).distinct - val mst_addr = get[StreetAddress].keyBy (_.id).join (mst.keyBy (_.WorkLocation.Location.secondaryAddress)).map (_._2._1) - val mst_town = get[TownDetail].keyBy (_.id).join (mst_addr.keyBy (_.townDetail)).map (_._2._1) - val mst_point = get[PositionPoint].keyBy (_.Location).join (mst.keyBy (_.id)).map (_._2._1) - val mst_street = if (null != get[StreetDetail]) - get[StreetDetail].keyBy (_.id).join (mst_addr.keyBy (_.streetDetail)).map (_._2._1) + val mst_has_s = getOrElse[StringQuantity].keyBy (_.value).join (equipment.keyBy (_.id)).map (_._2._1) + val mst_has = getOrElse[UserAttribute].keyBy (_.value).join (mst_has_s.keyBy (_.id)).map (_._2._1) + val mst = getOrElse[ServiceLocation].keyBy (_.id).join (mst_has.keyBy (_.name)).map (_._2._1) + val mst_nam = getOrElse[Name].keyBy (_.IdentifiedObject).join (mst.keyBy (_.id)).map (_._2._1) + val name_type = getOrElse[NameType].keyBy (_.id).join (mst_nam.keyBy (_.NameType)).map (_._2._1).distinct + val name_auth = getOrElse[NameTypeAuthority].keyBy (_.id).join (name_type.keyBy (_.NameTypeAuthority)).map (_._2._1).distinct + val mst_addr = getOrElse[StreetAddress].keyBy (_.id).join (mst.keyBy (_.WorkLocation.Location.secondaryAddress)).map (_._2._1) + val mst_town = getOrElse[TownDetail].keyBy (_.id).join (mst_addr.keyBy (_.townDetail)).map (_._2._1) + val mst_point = getOrElse[PositionPoint].keyBy (_.Location).join (mst.keyBy (_.id)).map (_._2._1) + val mst_street = if (null != getOrElse[StreetDetail]) + getOrElse[StreetDetail].keyBy (_.id).join (mst_addr.keyBy (_.streetDetail)).map (_._2._1) else spark.sparkContext.emptyRDD[StreetDetail] - val mst_status = if (null != get[Status]) - get[Status].keyBy (_.id).join (mst_addr.keyBy (_.status)).map (_._2._1) + val mst_status = if (null != getOrElse[Status]) + getOrElse[Status].keyBy (_.id).join (mst_addr.keyBy (_.status)).map (_._2._1) else spark.sparkContext.emptyRDD[Status] // SolarGeneratingUnit - val eea_s = get[StringQuantity].keyBy (_.value).join (mst.keyBy (_.id)).map (_._2._1) - val eea_a = get[UserAttribute].keyBy (_.value).join (eea_s.keyBy (_.id)).map (_._2._1) - val eea = get[SolarGeneratingUnit].keyBy (_.id).join (eea_a.keyBy (_.name)).map (_._2._1) - val eea_n = get[Name].keyBy (_.IdentifiedObject).join (eea.keyBy (_.id)).map (_._2._1) - val eea_l = get[Location].keyBy (_.id).join (eea.keyBy (_.GeneratingUnit.Equipment.PowerSystemResource.Location)).map (_._2._1) - val eea_p = get[PositionPoint].keyBy (_.Location).join (eea_l.keyBy (_.id)).map (_._2._1) + val eea_s = getOrElse[StringQuantity].keyBy (_.value).join (mst.keyBy (_.id)).map (_._2._1) + val eea_a = getOrElse[UserAttribute].keyBy (_.value).join (eea_s.keyBy (_.id)).map (_._2._1) + val eea = getOrElse[SolarGeneratingUnit].keyBy (_.id).join (eea_a.keyBy (_.name)).map (_._2._1) + val eea_n = getOrElse[Name].keyBy (_.IdentifiedObject).join (eea.keyBy (_.id)).map (_._2._1) + val eea_l = getOrElse[Location].keyBy (_.id).join (eea.keyBy (_.GeneratingUnit.Equipment.PowerSystemResource.Location)).map (_._2._1) + val eea_p = getOrElse[PositionPoint].keyBy (_.Location).join (eea_l.keyBy (_.id)).map (_._2._1) // get assets val eq: RDD[Equipment] = equipment.map (_.Equipment).union (eea.map (_.GeneratingUnit.Equipment)) - val assets = if (null != get[Asset]) - get[Asset].flatMap ((asset: Asset) ⇒ { val psr = asset.PowerSystemResources; if (null == psr) List() else psr.map (y => (y, asset))}).join (eq.keyBy (_.PowerSystemResource.id)).map (_._2._1) + val assets = if (null != getOrElse[Asset]) + getOrElse[Asset].flatMap ((asset: Asset) ⇒ { val psr = asset.PowerSystemResources; if (null == psr) List() else psr.map (y => (y, asset))}).join (eq.keyBy (_.PowerSystemResource.id)).map (_._2._1) else spark.sparkContext.emptyRDD[Asset] - val lifecycles = if (null != get[LifecycleDate]) - get[LifecycleDate].keyBy (_.id).join (assets.keyBy (_.lifecycle)).map (_._2._1) + val lifecycles = if (null != getOrElse[LifecycleDate]) + getOrElse[LifecycleDate].keyBy (_.id).join (assets.keyBy (_.lifecycle)).map (_._2._1) else spark.sparkContext.emptyRDD[LifecycleDate] - val ownership: RDD[Ownership] = if (null != get[Ownership]) - get[Ownership].keyBy (_.Asset).join (assets.keyBy (_.id)).map (_._2._1) + val ownership: RDD[Ownership] = if (null != getOrElse[Ownership]) + getOrElse[Ownership].keyBy (_.Asset).join (assets.keyBy (_.id)).map (_._2._1) else spark.sparkContext.emptyRDD[Ownership] - val owners = if (null != get[AssetOwner]) - get[AssetOwner].keyBy (_.id).join (ownership.keyBy (_.AssetOwner)).map (_._2._1).distinct + val owners = if (null != getOrElse[AssetOwner]) + getOrElse[AssetOwner].keyBy (_.id).join (ownership.keyBy (_.AssetOwner)).map (_._2._1).distinct else spark.sparkContext.emptyRDD[AssetOwner] @@ -282,7 +282,7 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable .union (specific) // get the elements - val elements = get[Element]("Elements").keyBy (_.id).join (ids).map (_._2._1) + val elements = getOrElse[Element]("Elements").keyBy (_.id).join (ids).map (_._2._1) export (elements, filename, island, "/tmp/" + island + ".rdf") } } @@ -298,7 +298,7 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable */ def exportAll (filename: String, about: String = ""):Unit = { - val elements = get[Element]("Elements") + val elements = getOrElse[Element]("Elements") export (elements, filename, about) } @@ -310,7 +310,7 @@ class CIMExport (spark: SparkSession) extends CIMRDD with Serializable def exportAllIslands (directory: String = "simulation/"): Int = { val dir = if (directory.endsWith ("/")) directory else directory + "/" - val allislands = get[TopologicalIsland].map (_.id).collect + val allislands = getOrElse[TopologicalIsland].map (_.id).collect val islands = allislands.map (island ⇒ { exportIsland (island, dir + island + ".rdf"); 1}) islands.sum } diff --git a/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala b/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala index 9ead3f72c..d9c56ee5a 100644 --- a/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala +++ b/src/main/scala/ch/ninecode/cim/CIMIntegrityChecker.scala @@ -100,7 +100,7 @@ class CIMIntegrityChecker (spark: SparkSession) extends CIMRDD with Serializable def check (classes: List[ClassInfo], info: ClassInfo) (relation: Relationship): String = { - // val equipment: RDD[Equipment] = get[Equipment] + // val equipment: RDD[Equipment] = getOrElse[Equipment] // val equipment: RDD[Equipment] = spark.sparkContext.getPersistentRDDs.filter (_._2.name == "Equipment").head._2.asInstanceOf[RDD[Equipment]] type child = info.subsetter.basetype diff --git a/src/main/scala/ch/ninecode/cim/CIMJoin.scala b/src/main/scala/ch/ninecode/cim/CIMJoin.scala index 12a3d9fce..18766feb9 100644 --- a/src/main/scala/ch/ninecode/cim/CIMJoin.scala +++ b/src/main/scala/ch/ninecode/cim/CIMJoin.scala @@ -190,13 +190,13 @@ class CIMJoin (spark: SparkSession, storage: StorageLevel) extends CIMRDD with S { log.info ("joining ISU and NIS") - val names = get[Name] - val service_locations = get[ServiceLocation] - val points = get[PositionPoint] - val attributes = get[UserAttribute] - val work_loc = get[WorkLocation] - val locations = get[Location] - val idobj = get[IdentifiedObject] + val names = getOrElse[Name] + val service_locations = getOrElse[ServiceLocation] + val points = getOrElse[PositionPoint] + val attributes = getOrElse[UserAttribute] + val work_loc = getOrElse[WorkLocation] + val locations = getOrElse[Location] + val idobj = getOrElse[IdentifiedObject] // get only the cim:Name objects pertaining to the ServiceLocation join val isusl = names.keyBy (_.name).join (service_locations.keyBy (_.id)).values @@ -274,7 +274,7 @@ class CIMJoin (spark: SparkSession, storage: StorageLevel) extends CIMRDD with S union (updated_locations.asInstanceOf[RDD[Element]]) // replace elements in Elements - val old_elements = get[Element]("Elements") + val old_elements = getOrElse[Element]("Elements") val new_elements = old_elements.keyBy (_.id).leftOuterJoin (newelem.keyBy (_.id)). values.flatMap ( (arg: (Element, Option[Element])) => diff --git a/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala b/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala index 8f2c06f1d..ce119c23a 100644 --- a/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala +++ b/src/main/scala/ch/ninecode/cim/CIMNetworkTopologyProcessor.scala @@ -259,14 +259,14 @@ with def make_graph (): Graph[CIMVertexData, CIMEdgeData] = { // get the terminals keyed by equipment - val terms = get[Terminal].groupBy (_.ConductingEquipment) + val terms = getOrElse[Terminal].groupBy (_.ConductingEquipment) // map elements with their terminal 'pairs' to edges - val edges: RDD[Edge[CIMEdgeData]] = get[Element]("Elements").keyBy (_.id).join (terms) + val edges: RDD[Edge[CIMEdgeData]] = getOrElse[Element]("Elements").keyBy (_.id).join (terms) .flatMapValues (edge_operator).values.map (make_graph_edges).persist (storage) // get the vertices - val vertices: RDD[(VertexId, CIMVertexData)] = get[ConnectivityNode].map (to_vertex).persist (storage) + val vertices: RDD[(VertexId, CIMVertexData)] = getOrElse[ConnectivityNode].map (to_vertex).persist (storage) if (debug) { @@ -626,10 +626,10 @@ with graph = identifyIslands (graph) // get the terminals keyed by equipment with equipment - val elements = get[Element]("Elements").keyBy (_.id) - val terms = get[Terminal].keyBy (_.ConductingEquipment).join (elements).values + val elements = getOrElse[Element]("Elements").keyBy (_.id) + val terms = getOrElse[Terminal].keyBy (_.ConductingEquipment).join (elements).values // map each graph vertex to the terminals - val vertices = get[ConnectivityNode].map (x => (vertex_id (x.id), x)) + val vertices = getOrElse[ConnectivityNode].map (x => (vertex_id (x.id), x)) val td_plus = graph.vertices.join (vertices).values.filter (_._1.island != 0L).keyBy (_._2.id).leftOuterJoin (terms.keyBy (_._1.ConnectivityNode)).values val islands = td_plus.groupBy (_._1._1.island).values.map (to_islands) @@ -671,7 +671,7 @@ with // but the other RDD (ConnectivityNode and Terminal also ACDCTerminal) need to be updated in IdentifiedObject and Element // assign every ConnectivtyNode to a TopologicalNode - val old_cn = get[ConnectivityNode] + val old_cn = getOrElse[ConnectivityNode] val new_cn = old_cn.keyBy (a => vertex_id (a.id)).leftOuterJoin (graph.vertices).values.map (update_cn) // swap the old ConnectivityNode RDD for the new one @@ -680,7 +680,7 @@ with // assign every Terminal with a connectivity node to a TopologicalNode // note: keep the original enclosed ACDCTerminal objects - val old_terminals = get[Terminal] + val old_terminals = getOrElse[Terminal] val t_with = old_terminals.filter (null != _.ConnectivityNode) val t_without = old_terminals.filter (null == _.ConnectivityNode) val new_terminals = t_with.keyBy (a => vertex_id (a.ConnectivityNode)).leftOuterJoin (graph.vertices).values.map (update_terminals) @@ -705,7 +705,7 @@ with union (new_terminals.map (_.ACDCTerminal.IdentifiedObject)) // replace identified objects in IdentifiedObject - val old_idobj = get[IdentifiedObject] + val old_idobj = getOrElse[IdentifiedObject] val new_idobj = old_idobj.keyBy (_.id).subtract (oldobj.keyBy (_.id)).values.union (newobj) // swap the old IdentifiedObject RDD for the new one @@ -726,7 +726,7 @@ with union (new_terminals.asInstanceOf[RDD[Element]]) // replace elements in Elements - val old_elements = get[Element]("Elements") + val old_elements = getOrElse[Element]("Elements") val new_elements = old_elements.keyBy (_.id).subtract (oldelem.keyBy (_.id)).values.union (newelem) // swap the old Elements RDD for the new one diff --git a/src/main/scala/ch/ninecode/cim/CIMNormalize.scala b/src/main/scala/ch/ninecode/cim/CIMNormalize.scala index 9e8221ea1..2eaad62bc 100644 --- a/src/main/scala/ch/ninecode/cim/CIMNormalize.scala +++ b/src/main/scala/ch/ninecode/cim/CIMNormalize.scala @@ -166,7 +166,7 @@ with def do_normalization (): RDD[Element] = { // get the elements RDD keyed by id - val old_elements = get[Element]("Elements") + val old_elements = getOrElse[Element]("Elements") val elements = old_elements.keyBy (_.id) val all = elements.count