-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbasicKMeans.txt
1 lines (1 loc) · 2.39 KB
/
basicKMeans.txt
1
/**
* Created by erimkardes
*/
//********************************************************************************************************************
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import util.Random
object tMain{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("kmeans")
val sc = new SparkContext(conf)
val data = sc.textFile("path to input txt file")
val parsedData = data.map(s => s.split(' ').map(_.toDouble))
val numClusters = 2
val numIterations = 20
val seed = Random.nextInt()
var centers = parsedData.takeSample(false, numClusters, seed)
var moved = true
var iteration = 1
var prevIndex = parsedData.map{ p => -2 }.collect
while(moved && iteration < numIterations) {
val costIndex = parsedData.map {p =>
val rs = Array.tabulate(numClusters){
r => getDistance(p, centers(r))
}
val rss = rs.zipWithIndex.min
((rss._2),(p,1))
}
val indexPoint = costIndex.map{ case (r,(p,n)) => (r,p)}
val index = costIndex.map{ case (r,(p,n)) => r}.collect()
val diff = (index zip prevIndex).map{case (xi,yi) => xi - yi}
if(diff.sum == 0)
{
moved = false
}
val indexCount = costIndex.map{ case (r,(p,n)) => (r,n)}
val sumPoints = indexPoint.reduceByKey( (x: Array[Double], y: Array[Double]) =>
(x zip y).map{case (xi,yi) => xi + yi} ).collect()
val sumCount = indexCount.reduceByKey(_ + _).collect()
//update centers
(0 until numClusters).foreach{c =>
centers(c) = (sumPoints(c)._2 zip Array.fill(sumPoints(c)._2.length)(sumCount(c)._2)).map{ case (xi,yi) => xi / yi}
}
prevIndex = index
iteration += 1
println("Centers : \n")
println(centers.deep.mkString("\n"))
println("Clusters : \n")
println(index.deep.mkString("\n"))
}
def getDistance(point: Array[Double], center:Array[Double]): Double = {
val d: Array[Double] = new Array(point.length)
var i = 0
while(i < point.length)
{d(i) = (point(i) - center(i))*(point(i) - center(i))
i += 1
}
math.sqrt(d.sum)
}
}
}