-
Notifications
You must be signed in to change notification settings - Fork 201
/
Copy pathDisruptor_SPMC.tla
171 lines (142 loc) · 6.45 KB
/
Disruptor_SPMC.tla
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
----------------------------- MODULE Disruptor_SPMC ------------------------
(***************************************************************************)
(* Models a Single Producer, Multi Consumer Disruptor (SPMC). *)
(* *)
(* The producer publishes the sequence number as value into the ring *)
(* buffer and the model verifies that all consumers read all published *)
(* values. *)
(* *)
(* The model also verifies that no data races occur between the producer *)
(* and consumers and that all consumers eventually read all published *)
(* values (in a Multicast fashion - i.e. all consumers read all events). *)
(* *)
(* To see a data race, try and run the model with two producers. *)
(***************************************************************************)
EXTENDS Integers, FiniteSets, Sequences
CONSTANTS
MaxPublished, (* Max number of published events. Bounds the model. *)
Writers, (* Writer/producer thread ids. *)
Readers, (* Reader/consumer thread ids. *)
Size, (* Ringbuffer size. *)
NULL
ASSUME Writers /= {}
ASSUME Readers /= {}
ASSUME Size \in Nat \ {0}
ASSUME MaxPublished \in Nat \ {0}
VARIABLES
ringbuffer,
published, (* Write cursor. One for the producer. *)
read, (* Read cursors. One per consumer. *)
pc, (* Program Counter of each Writer/Reader. *)
consumed (* Sequence of all read events by the Readers. *)
(* This is a history variable used for liveliness *)
(* checking. *)
vars == <<
ringbuffer,
published,
read,
consumed,
pc
>>
(***************************************************************************)
(* Each producer/consumer can be in one of two states: *)
(* 1. Accessing a slot in the Disruptor or *)
(* 2. Advancing to the next slot. *)
(***************************************************************************)
Access == "Access"
Advance == "Advance"
Transition(t, from, to) ==
/\ pc[t] = from
/\ pc' = [ pc EXCEPT ![t] = to ]
Buffer == INSTANCE RingBuffer WITH Values <- Int
Range(f) ==
{ f[x] : x \in DOMAIN(f) }
MinReadSequence ==
CHOOSE min \in Range(read) : \A r \in Readers : min <= read[r]
(***************************************************************************)
(* Producer Actions: *)
(***************************************************************************)
BeginWrite(writer) ==
LET
next == published + 1
index == Buffer!IndexOf(next)
min_read == MinReadSequence
IN
\* Are we clear of all consumers? (Potentially a full cycle behind).
/\ min_read >= next - Size
/\ Transition(writer, Advance, Access)
/\ Buffer!Write(index, writer, next)
/\ UNCHANGED << consumed, published, read >>
EndWrite(writer) ==
LET
next == published + 1
index == Buffer!IndexOf(next)
IN
/\ Transition(writer, Access, Advance)
/\ Buffer!EndWrite(index, writer)
/\ published' = next
/\ UNCHANGED << consumed, read >>
(***************************************************************************)
(* Consumer Actions: *)
(***************************************************************************)
BeginRead(reader) ==
LET
next == read[reader] + 1
index == Buffer!IndexOf(next)
IN
/\ published >= next
/\ Transition(reader, Advance, Access)
/\ Buffer!BeginRead(index, reader)
\* Track what we read from the ringbuffer.
/\ consumed' = [ consumed EXCEPT ![reader] = Append(@, Buffer!Read(index)) ]
/\ UNCHANGED << published, read >>
EndRead(reader) ==
LET
next == read[reader] + 1
index == Buffer!IndexOf(next)
IN
/\ Transition(reader, Access, Advance)
/\ Buffer!EndRead(index, reader)
/\ read' = [ read EXCEPT ![reader] = next ]
/\ UNCHANGED << consumed, published >>
(***************************************************************************)
(* Spec: *)
(***************************************************************************)
Init ==
/\ Buffer!Init
/\ published = -1
/\ read = [ r \in Readers |-> -1 ]
/\ consumed = [ r \in Readers |-> << >> ]
/\ pc = [ a \in Writers \union Readers |-> Advance ]
Next ==
\/ \E w \in Writers : BeginWrite(w)
\/ \E w \in Writers : EndWrite(w)
\/ \E r \in Readers : BeginRead(r)
\/ \E r \in Readers : EndRead(r)
Fairness ==
/\ \A r \in Readers : WF_vars(BeginRead(r))
/\ \A r \in Readers : WF_vars(EndRead(r))
Spec ==
Init /\ [][Next]_vars /\ Fairness
(***************************************************************************)
(* State constraint - bounds model: *)
(***************************************************************************)
StateConstraint == published < MaxPublished
(***************************************************************************)
(* Invariants: *)
(***************************************************************************)
TypeOk ==
/\ Buffer!TypeOk
/\ published \in Int
/\ read \in [ Readers -> Int ]
/\ consumed \in [ Readers -> Seq(Nat) ]
/\ pc \in [ Writers \union Readers -> { Access, Advance } ]
NoDataRaces == Buffer!NoDataRaces
(***************************************************************************)
(* Properties: *)
(***************************************************************************)
(* Eventually always, consumers must have read all published values. *)
Liveliness ==
\A r \in Readers : \A i \in 0 .. (MaxPublished - 1) :
<>[](i \in 0 .. published => Len(consumed[r]) >= i + 1 /\ consumed[r][i + 1] = i)
=============================================================================