-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathgraph1.scala
131 lines (94 loc) · 2.96 KB
/
graph1.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val sqlCtx = new org.apache.spark.sql.SQLContext(sc)
import sqlCtx._
val edge = sqlCtx.parquetFile("graf_edge.parquet")
edge.registerTempTable("edge")
val node = sqlCtx.parquetFile("graf_node.parquet")
node.registerTempTable("node")
// Let's pick one message as an example --
// at scale we'd parallelize this
val msg_id = "CA+B-+fyrBU1yGZAYJM_u=gnBVtzB=sXoBHkhmS-6L1n8K5Hhbw"
val sql = """
SELECT node_id, root
FROM node
WHERE id='%s' AND keep='1'
""".format(msg_id)
val n = sqlCtx.sql(sql.stripMargin).distinct()
val nodes: RDD[(Long, String)] = n.map{ p =>
(p(0).asInstanceOf[Int].toLong, p(1).asInstanceOf[String])
}
nodes.collect()
val sql = """
SELECT node0, node1
FROM edge
WHERE id='%s'
""".format(msg_id)
val e = sqlCtx.sql(sql.stripMargin).distinct()
val edges: RDD[Edge[Int]] = e.map{ p =>
Edge(p(0).asInstanceOf[Int].toLong, p(1).asInstanceOf[Int].toLong, 0)
}
edges.collect()
// run PageRank
val g: Graph[String, Int] = Graph(nodes, edges)
val r = g.pageRank(0.0001).vertices
r.join(nodes).sortBy(_._2._1, ascending=false).foreach(println)
// save the ranks
case class Rank(id: Int, rank: Float)
val rank = r.map(p => Rank(p._1.toInt, p._2.toFloat))
rank.registerTempTable("rank")
//////////////////////////////////////////////////////////////////////
def median[T](s: Seq[T])(implicit n: Fractional[T]) = {
import n._
val (lower, upper) = s.sortWith(_<_).splitAt(s.size / 2)
if (s.size % 2 == 0) (lower.last + upper.head) / fromInt(2) else upper.head
}
node.schema
edge.schema
rank.schema
val sql = """
SELECT n.num, n.raw, r.rank
FROM node n JOIN rank r ON n.node_id = r.id
WHERE n.id='%s' AND n.keep='1'
ORDER BY n.num
""".format(msg_id)
val s = sqlCtx.sql(sql.stripMargin).collect()
val min_rank = median(r.map(_._2).collect())
var span:List[String] = List()
var last_index = -1
var rank_sum = 0.0
var phrases:collection.mutable.Map[String, Double] = collection.mutable.Map()
s.foreach { x =>
//println (x)
val index = x.getInt(0)
val word = x.getString(1)
val rank = x.getFloat(2)
var isStop = false
// test for break from past
if (span.size > 0 && rank < min_rank) isStop = true
if (span.size > 0 && (index - last_index > 1)) isStop = true
// clear accumulation
if (isStop) {
val phrase = span.mkString(" ")
phrases += (phrase -> rank_sum)
//println(phrase, rank_sum)
span = List()
last_index = index
rank_sum = 0.0
}
// start or append
if (rank >= min_rank) {
span = span :+ word
last_index = index
rank_sum += rank
}
}
// summarize the text as a list of ranked keyphrases
var summary = sc.parallelize(phrases.toSeq).distinct().sortBy(_._2, ascending=false).cache()
// take top 50 percentile
// NOT USED FOR SMALL MESSAGES
val min_rank = median(summary.map(_._2).collect().toSeq)
summary = summary.filter(_._2 >= min_rank)
val sum = summary.map(_._2).reduce(_ + _)
summary = summary.map(x => (x._1, x._2 / sum))
summary.collect()