-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient-link.5c
145 lines (130 loc) · 3.88 KB
/
client-link.5c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
/*
* Copyright © 2012 Keith Packard <[email protected]>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; version 2 of the License.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
*/
autoload RR;
autoload RR::Lex;
autoload RR::Send;
autoload Client;
autoload Client::Net;
autoload Mutex;
autoload List;
extend namespace Client {
public namespace Link {
public typedef List::list_t + struct {
string[*] reply;
} message_struct;
public typedef *message_struct message_t;
public exception link_error(string);
typedef struct {
file f;
string host;
int port;
List::list_t replies;
semaphore replies_sem;
List::list_t notices;
semaphore notices_sem;
bool closing;
Mutex::mutex command_lock;
Mutex::mutex notice_lock;
Mutex::mutex reply_lock;
thread reader;
} link_struct;
public typedef *link_struct link_t;
public exception link_closing();
void read_replies(link_t l) {
try {
for (;;) {
message_t m = &(message_struct) { .reply = RR::Lex::recv(l->f) };
if (dim(m->reply) == 0)
continue;
twixt (Mutex::acquire(l->reply_lock); Mutex::release(l->reply_lock)) {
if (m->reply[0] == "NOTICE") {
List::append(m, &l->notices);
Semaphore::signal(l->notices_sem);
} else {
List::append(m, &l->replies);
Semaphore::signal(l->replies_sem);
}
}
}
} catch Thread::signal(int sig) {
} catch File::io_eof(file f) {
l->closing = true;
} catch File::io_error(string reason, File::error_type error, file f) {
l->closing = true;
}
while (Semaphore::count(l->notices_sem) < 0)
Semaphore::signal(l->notices_sem);
while (Semaphore::count(l->replies_sem) < 0)
Semaphore::signal(l->replies_sem);
}
public message_t command(link_t l, string format, poly args...) {
message_t reply;
twixt (Mutex::acquire(l->command_lock); Mutex::release(l->command_lock)) {
try {
RR::Send::send(l->f, format, args...);
File::flush(l->f);
} catch File::io_error(string reason, File::error_type error, file f) {
raise link_error(sprintf("I/O error on link: %s", reason));
}
Semaphore::wait(l->replies_sem);
twixt(Mutex::acquire(l->reply_lock); Mutex::release(l->reply_lock)) {
if (l->closing)
raise link_closing();
reply = List::first(&l->replies);
List::remove(reply);
}
}
return reply;
}
public message_t notice(link_t l) {
message_t notice;
twixt (Mutex::acquire(l->notice_lock); Mutex::release(l->notice_lock)) {
Semaphore::wait(l->notices_sem);
twixt(Mutex::acquire(l->reply_lock); Mutex::release(l->reply_lock)) {
if (l->closing)
raise link_closing();
notice = List::first(&l->notices);
List::remove(notice);
}
}
return notice;
}
public link_t new (string host, int port) {
link_t l = &(link_struct) {
.host = host,
.port = port,
.closing = false,
.command_lock = Mutex::new(),
.notice_lock = Mutex::new(),
.reply_lock = Mutex::new(),
.replies_sem = Semaphore::new(),
.notices_sem = Semaphore::new(),
};
List::init(&l->replies);
List::init(&l->notices);
l->f = Net::connect (l->host, l->port);
l->reader = fork read_replies(l);
return l;
}
public void close(link_t l) {
l->closing = true;
Thread::send_signal(l->reader, 1);
Thread::join(l->reader);
File::close(l->f);
}
}
}