-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathLockService.scala
136 lines (119 loc) · 4.12 KB
/
LockService.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package rings
import akka._
import akka.actor.{Actor, ActorRef, ActorSystem, Cancellable, Props}
import akka.pattern.ask
import akka.util.Timeout
import scala.collection.mutable
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, TimeoutException}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
// LockClient sends these messages to LockServer
sealed trait LockServiceAPI
case class Acquire(lockId: Lock, senderId: BigInt) extends LockServiceAPI
case class Release(lockId: Lock, senderId: BigInt) extends LockServiceAPI
case class KeepAlive(lockId: Lock, senderId: BigInt) extends LockServiceAPI
// Responses to the LockClient
sealed trait LockResponseAPI
case class LockGranted(lock: Lock) extends LockResponseAPI
// Required classes
class LockCell(var lock: Lock, var clientId: BigInt, var scheduledTimeout: Cancellable)
class LockServer (system: ActorSystem, t: Int) extends Actor {
import system.dispatcher
val generator = new scala.util.Random
var lockMap = new mutable.HashMap[String, LockCell]()
var clientServers: Seq[ActorRef] = null
implicit val timeout = Timeout(t seconds)
/**
* View: Primes the LockServer with a view of all the clients
* Acquire: Client request for lock, should always return to the client
*
* @return
*/
def receive() = {
case View(clients: Seq[ActorRef]) =>
clientServers = clients
case Acquire(lock: Lock, id: BigInt) =>
if(!clientFailure()) acquire(sender, lock, id)
case Release(lock: Lock, id: BigInt) =>
if(!clientFailure()) release(lock, id)
case KeepAlive(lock: Lock, id: BigInt) =>
if(!clientFailure()) keepAlive(lock, id)
}
def acquire(client: ActorRef, lock: Lock, clientId: BigInt) = {
val name = lock.symbolicName
println(s"Acquire request from: $clientId for lock: $name")
val cell = directRead(lock.symbolicName)
if (!cell.isEmpty) {
val lc = cell.get
if(lc != null) {
lc.scheduledTimeout.cancel()
if(lc.clientId != clientId) { // Get the lock back from this client
recall(lock, clientId)
}
}
}
client ! LockGranted(lock)
startTimeout(lock, clientId)
}
def release(lock: Lock, clientId: BigInt) = {
val name = lock.symbolicName
println(s"Release request from: $clientId for lock: $name")
val cell = directRead(lock.symbolicName)
if (!cell.isEmpty) {
val lc = cell.get
if(lc != null && lc.clientId == clientId) { // Make sure client owns the lock
if(!lc.scheduledTimeout.isCancelled) lc.scheduledTimeout.cancel() // Cancel the timeout
directWrite(lock.symbolicName, null) //
}
}
}
private def recall(lock: Lock, clientId: BigInt) = {
try{
val future = ask(clientServers(clientId.toInt), Recall(lock)).mapTo[LockServiceAPI]
val response = Await.result(future, timeout.duration)
} catch {
case te: TimeoutException =>
println(s"We timed out here. Yay? $clientId")
}
release(lock, clientId)
}
private def keepAlive(lock: Lock, clientId: BigInt): Unit = {
val cell = directRead(lock.symbolicName)
if(!cell.isEmpty) {
val lc = cell.get
if(lc != null && lc.clientId == clientId) {
lc.scheduledTimeout.cancel()
startTimeout(lock, clientId)
}
}
}
private def startTimeout(lock: Lock, id: BigInt): Unit = {
val cancel = system.scheduler.scheduleOnce(timeout.duration) {
recall(lock, id)
}
directWrite(lock.symbolicName, new LockCell(lock, id, cancel))
}
private def clientFailure(): Boolean = {
false
// val sample = generator.nextInt(100)
// sample <= 90
}
def directRead(key: String): Option[LockCell] = {
val result = lockMap.get(key)
if (result.isEmpty) None
else
Some(result.get.asInstanceOf[LockCell])
}
def directWrite(key: String, value: LockCell): Option[LockCell] = {
val result = lockMap.put(key, value)
if (result.isEmpty) None
else
Some(result.get.asInstanceOf[LockCell])
}
}
object LockServer {
def props(system: ActorSystem, t: Int): Props = {
Props(classOf[LockServer], system, t)
}
}