-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathhdRDMAThread.cc
800 lines (693 loc) · 27.1 KB
/
hdRDMAThread.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
#include <sstream>
#include <iostream>
#include <atomic>
#include <strings.h>
#include <sys/stat.h>
#include <errno.h>
#include <zlib.h>
#include "hdRDMA.h"
using std::cout;
using std::cerr;
using std::endl;
using std::atomic;
using std::chrono::duration;
using std::chrono::duration_cast;
using std::chrono::high_resolution_clock;
extern atomic<uint64_t> BYTES_RECEIVED_TOT;
extern std::string HDRDMA_REMOTE_ADDR;
//
// Some notes on server mode:
//
// - To utilize the full bandwidth of QDR or better IB, multiple
// streams must be used. It is not enough to post multiple sends
// to one QP.
//
// - A choice is made here to use reliably connected (RC) connection
// types. This means a new QP is made for each remote connection.
//
// - We use a separate thread for each remote connection. The thread
// creates and maintains its own completion queue.
//
// - A single MR is created for all receives, but the memory is broken
// up into smaller buffers that are maintainined in a pool for use
// by all threads. This is done so that the "small" buffers can still
// be quite large, but we can support multiple simultaneous connections.
//
//-----------------------------------------
// hdRDMAThread (constructor)
//-----------------------------------------
hdRDMAThread::hdRDMAThread(hdRDMA *hdrdma)
{
// Keep copy of pointer to hdRDMA object
this->hdrdma = hdrdma;
t1 = high_resolution_clock::now();
t_last = t1;
}
//-----------------------------------------
// ~hdRDMAThread (destructor)
//-----------------------------------------
hdRDMAThread::~hdRDMAThread()
{
// Put QP insto RESET state so it releases all outstanding work requests
if( qp!=nullptr ){
struct ibv_qp_attr qp_attr;
bzero( &qp_attr, sizeof(qp_attr) );
qp_attr.qp_state = IBV_QPS_RESET;
ibv_modify_qp (qp, &qp_attr, IBV_QP_STATE);
}
// Delete all of our allocated objects
// n.b. order here matters! If the qp is destroyed after the
// comp_channel it will leave open a file descriptor pointing
// to [infinibandevent] that we have no way of closing!
if( qp!=nullptr ) ibv_destroy_qp( qp );
if( cq!=nullptr ) ibv_destroy_cq ( cq );
if( comp_channel!=nullptr ) ibv_destroy_comp_channel( comp_channel );
if( ofs!=nullptr ) delete ofs;
// Return MR buffers to pool
hdrdma->ReturnBuffers( buffers );
}
//----------------------------------------------------------------------
// ThreadRun
//
// This is run in a dedicated thread in server mode as soon as a
// TCP connection is established. It will exchange RDMA connection
// information over the given socket and then loop continously until
// the client signals it is done or the "stop" flag is set by the
// hdRDMA object.
//----------------------------------------------------------------------
void hdRDMAThread::ThreadRun(int sockfd)
{
// The first thing we send via TCP is a 3 byte message indicating
// success or failure. This really just allows us to inform the client
// if the server cannot accept another connection right now due to
// limited RDMA resources.
// The client will read in 3 bytes from the socket. If they are "OK:"
// then it knows the next thing to come is the QPInfo structure.
// If it is "BD:" then it knows then next thing to follow is a message
// string describing the error.
// This bit of magic ensures that the sockfd is closed and our "stopped"
// flag is set before leaving this method, even if early due to error.
std::shared_ptr<int> x(NULL, [&](int*){ close(sockfd); stopped=true;});
// Get pool buffers (up to 4). If none are available then tell
// remote client we have too many RDMA connections.
hdrdma->GetBuffers(buffers, 4);
if( buffers.empty() ){
// No buffers in MR available. Notify remote peer and exit thread
std::string mess("BD: RDMA server has no more MR buffers (too many connections)");
cerr << mess << endl;
write(sockfd, mess.c_str(), mess.length()+1);
return;
}
// Create completion channel and completion queue.
//
// TODO:
// The cq_size can be used to cause an error if too many WR are placed
// in it. The error would come in the form of an async event (see
// ibv_get_async_event). If I understand correctly, we would need to
// make cq_size smaller than the number of WRs and then check for async
// errors in a separate thread if we wanted to guarantee that we were
// processing the data as fast as it is coming in. That adds some
// significant complication so we skip it for now.
int cq_size = buffers.size();
comp_channel = ibv_create_comp_channel( hdrdma->ctx );
cq = ibv_create_cq( hdrdma->ctx, cq_size, NULL, comp_channel, 0);
if( !cq ){
std::stringstream ss;
ss << "BD: ERROR: Unable to create Completion Queue! errno=" << errno;
cerr << ss.str() << endl;
write(sockfd, ss.str().c_str(), ss.str().length()+1);
return;
}
// Tell remote peer we are ready to exchange QPInfo
std::string mess("OK:");
write(sockfd, mess.c_str(), mess.length());
// Exchange QP info over TCP socket so we can transmit via RDMA
try{
ExchangeQPInfo( sockfd );
}catch( Exception &e){
cerr << e.what() << endl;
return;
}
// Loop until we're told to stop by either the master thread or the
// remote peer declaring the connection is closing.
int num_wc = 1;
struct ibv_wc wc;
auto t_last_received = high_resolution_clock::now(); // time we last received a wc
while( !stop ){
// Check to see if a work completion notification has come in
int n = ibv_poll_cq(cq, num_wc, &wc);
if( n<0 ){
cerr << "ERROR: ibv_poll_cq returned " << n << " - closing connection" << endl;
break;
}
if( n == 0 ){
std::this_thread::sleep_for(std::chrono::microseconds(1));
// Timeout if nothing recieved for more than 30 seconds
auto t_now = high_resolution_clock::now();
duration<double> duration_since_receive = duration_cast<duration<double>>(t_now - t_last_received);
auto delta_t = duration_since_receive.count();
if( delta_t > 30.0 ){
cout << "TIMEOUT: no RDMA buffers received in more than 30 secs (" << delta_t << "). Closing connection." << endl;
cout << " (filename=" << ofilename <<" Ntransferred=" << Ntransferred << ")" << endl;
stop = true;
}
continue;
}
// Work completed!
if( wc.status != IBV_WC_SUCCESS ){
cerr << "ERROR: Status of WC not zero (" << wc.status << ") - closing connection" << endl;
break;
}
// Make sure this is a IBV_WC_RECV opcode
if( wc.opcode != IBV_WC_RECV ){
cerr << " This is strange... I should only be getting IBV_WC_RECV here! - closing connection" << endl;
break;
}
// Process the received data
auto id = wc.wr_id;
if( id >= buffers.size() ){
cerr << "ERROR: Bad id in wc (" << id << ") expected it to be < " << buffers.size() << endl;
break; // exit thread
}
auto &buffer = buffers[id];
auto buff = std::get<0>(buffer);
//auto buff_len = std::get<1>(buffer);
BYTES_RECEIVED_TOT += wc.byte_len;
ReceiveBuffer( buff, wc.byte_len ); //n.b. do NOT use buff_len here!
t_last_received = high_resolution_clock::now();
// Re-post the receive request
PostWR( id );
} // while( !stop )
}
//-------------------------------------------------------------
// PostWR
//
// Post a receive work request for our QP using the buffer
// parameters associated with the given id.
//-------------------------------------------------------------
void hdRDMAThread::PostWR( int id )
{
//cout << "Posting WR for id: " << id << endl;
auto &buffer = buffers[id];
auto buff = std::get<0>(buffer);
auto buff_len = std::get<1>(buffer);
struct ibv_recv_wr wr;
struct ibv_sge sge;
bzero( &wr, sizeof(wr));
bzero( &sge, sizeof(sge));
wr.wr_id = id;
wr.sg_list = &sge;
wr.num_sge = 1;
sge.addr = (uint64_t)buff;
sge.length = buff_len;
sge.lkey = hdrdma->mr->lkey;
auto ret = ibv_post_recv( qp, &wr, &bad_wr);
if( ret != 0 ){
cout << "ERROR: ibv_post_recv returned non zero value (" << ret << ")" << endl;
}
}
//-------------------------------------------------------------
// ExchangeQPInfo
//
// This will create a new QP and send the information to the remote
// peer. It will then receive the QP info from the peer so that the
// two can be linked. It will then call SetToRTS to set the local
// QP to the RTS (Ready To Send) state and RTR (Ready to Receive)
// state.
//-------------------------------------------------------------
void hdRDMAThread::ExchangeQPInfo( int sockfd )
{
int n;
struct QPInfo tmp_qp_info;
// Create a new QP to use with the remote peer.
CreateQP();
// Create a work receive request for each MR buffer we have
for( uint32_t id=0; id<buffers.size(); id++ ) PostWR( id );
tmp_qp_info.lid = htons(qpinfo.lid);
tmp_qp_info.qp_num = htonl(qpinfo.qp_num);
// n.b. we assume below that the remote peer and host pad the QPInfo structure
// the same. This will be true if we're using the same executable.
//------ Send QPInfo ---------
n = write(sockfd, (char *)&tmp_qp_info, sizeof(struct QPInfo));
if( n!= sizeof(struct QPInfo) ){
std::stringstream ss;
ss << "ERROR: Sending QPInfo! Tried sending " << sizeof(struct QPInfo) << " bytes but only " << n << " were sent!";
throw Exception( ss.str() );
}
//------ Receive QPInfo ---------
n = read(sockfd, (char *)&tmp_qp_info, sizeof(struct QPInfo));
if( n!= sizeof(struct QPInfo) ){
std::stringstream ss;
ss << "ERROR: Sending QPInfo! Tried reading " << sizeof(struct QPInfo) << " bytes but only " << n << " were read!!";
throw Exception( ss.str() );
}
remote_qpinfo.lid = ntohs(tmp_qp_info.lid);
remote_qpinfo.qp_num = ntohl(tmp_qp_info.qp_num);
// cout << "local lid: " << qpinfo.lid << " qp_num: " << qpinfo.qp_num << endl;
// cout << "remote lid: " << remote_qpinfo.lid << " qp_num: " << remote_qpinfo.qp_num << endl;
// Set QP state to RTS
auto ret = SetToRTS();
if( ret != 0 ) cout << "ERROR: Unable to set QP to RTS state!" << endl;
}
//-------------------------------------------------------------
// CreateQP
//
// This is used by both client and server modes.
//-------------------------------------------------------------
void hdRDMAThread::CreateQP(void)
{
// Check if QP already created
if( qp != nullptr ) return;
// Set up attributes for creating a QP.
struct ibv_qp_init_attr qp_init_attr;
bzero( &qp_init_attr, sizeof(qp_init_attr) );
qp_init_attr.send_cq = cq;
qp_init_attr.recv_cq = cq;
qp_init_attr.cap.max_send_wr = 1;
qp_init_attr.cap.max_recv_wr = buffers.size();
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.qp_type = IBV_QPT_RC;
// Create Queue Pair
qp = ibv_create_qp( hdrdma->pd, &qp_init_attr );
if( !qp ){
std::stringstream ss;
ss << "ERROR: Unable to create QP! errno=" << errno;
throw Exception(ss.str().c_str());
}
// Set our QP info so it can be sent to remote hosts when
// socket connection is established.
qpinfo.lid = hdrdma->port_attr.lid;
qpinfo.qp_num = qp->qp_num;
}
//-------------------------------------------------------------
// SetToRTS
//-------------------------------------------------------------
int hdRDMAThread::SetToRTS(void)
{
#define IB_MTU IBV_MTU_4096
#define IB_SL 0
int ret = 0;
/* change QP state to INIT */
{
struct ibv_qp_attr qp_attr;
bzero( &qp_attr, sizeof(qp_attr) );
qp_attr.qp_state = IBV_QPS_INIT,
qp_attr.pkey_index = 0,
qp_attr.port_num = hdrdma->port_num,
qp_attr.qp_access_flags = IBV_ACCESS_LOCAL_WRITE |
IBV_ACCESS_REMOTE_READ |
IBV_ACCESS_REMOTE_ATOMIC |
IBV_ACCESS_REMOTE_WRITE;
ret = ibv_modify_qp (qp, &qp_attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX |
IBV_QP_PORT | IBV_QP_ACCESS_FLAGS);
if( ret!=0 ){
cout << "ERROR: Unable to set QP to INIT state!" << endl;
return ret;
}
}
/* Change QP state to RTR */
{
struct ibv_qp_attr qp_attr;
bzero( &qp_attr, sizeof(qp_attr) );
qp_attr.qp_state = IBV_QPS_RTR,
qp_attr.path_mtu = IB_MTU,
qp_attr.dest_qp_num = remote_qpinfo.qp_num,
qp_attr.rq_psn = 0,
qp_attr.max_dest_rd_atomic = 1,
qp_attr.min_rnr_timer = 12,
qp_attr.ah_attr.is_global = 0,
qp_attr.ah_attr.dlid = remote_qpinfo.lid,
qp_attr.ah_attr.sl = IB_SL,
qp_attr.ah_attr.src_path_bits = 0,
qp_attr.ah_attr.port_num = hdrdma->port_num,
ret = ibv_modify_qp(qp, &qp_attr,
IBV_QP_STATE | IBV_QP_AV |
IBV_QP_PATH_MTU | IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER);
if( ret!=0 ){
cout << "ERROR: Unable to set QP to RTR state!" << endl;
return ret;
}
}
/* Change QP state to RTS */
{
struct ibv_qp_attr qp_attr;
bzero( &qp_attr, sizeof(qp_attr) );
qp_attr.qp_state = IBV_QPS_RTS,
qp_attr.timeout = 14,
qp_attr.retry_cnt = 7,
qp_attr.rnr_retry = 7,
qp_attr.sq_psn = 0,
qp_attr.max_rd_atomic = 1,
ret = ibv_modify_qp (qp, &qp_attr,
IBV_QP_STATE | IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC);
if( ret!=0 ){
cout << "ERROR: Unable to set QP to RTS state!" << endl;
return ret;
}
}
return ret;
}
//-------------------------------------------------------------
// ReceiveBuffer
//-------------------------------------------------------------
void hdRDMAThread::ReceiveBuffer(uint8_t *buff, uint32_t buff_len)
{
hdrdma->Ntransferred += (uint64_t)buff_len;
auto hi = (HeaderInfo*)buff;
if( hi->buff_type == 1 ){
// Buffer holds file information
if( hi->flags & HI_FIRST_BUFFER ){
if( ofs != nullptr ) {
cout << "ERROR: Received new file buffer while file " << ofilename << " already open!" << endl;
ofs->close();
delete ofs;
ofs = nullptr;
}
ofilename = (char*)&hi->payload;
cout << "Receiving file: " << ofilename << endl;
// Create parent directory path if specified by remote sender
cout << "hi->flags: 0x" << std::hex << hi->flags << std::dec << endl;
if( hi->flags & HI_MAKE_PARENT_DIRS ){
auto pos = ofilename.find_last_of('/');
if( pos != std::string::npos ) makePath( ofilename.substr(0, pos) );
}
ofs = new std::ofstream( ofilename.c_str() );
ofilesize = 0;
crcsum = adler32( 0L, Z_NULL, 0 );
calculate_checksum = (hi->flags & HI_CALCULATE_CHECKSUM); // optionally calculate checksum
t1 = high_resolution_clock::now();
t_last = t1; // used for intermediate rate calculations
delta_t_io = 0.0;
Ntransferred = 0;
}
if( !ofs ){
cout << "ERROR: Received file buffer with no file open!" << endl;
return;
}
// Write buffer payload to file
auto data = &buff[hi->header_len];
auto data_len = buff_len - hi->header_len;
if( calculate_checksum ) crcsum = adler32( crcsum, data, data_len );
auto t_io_start = high_resolution_clock::now();
ofs->write( (const char*)data, data_len );
auto t_io_end = high_resolution_clock::now();
duration<double> duration_io = duration_cast<duration<double>>(t_io_end-t_io_start);
delta_t_io += duration_io.count();
ofilesize += data_len;
if( (hi->flags & HI_FIRST_BUFFER) == 0 ) Ntransferred += data_len; // exclude first buffer where we don't have timing info
// If last buffer for file then close it and print stats
if( hi->flags & HI_LAST_BUFFER ){
if( t_last != t1 ) cout << endl; // print carriage return if we printed any intermediate progress
if( ofs ){
auto t_io_start = high_resolution_clock::now();
ofs->close();
auto t_io_end = high_resolution_clock::now();
duration<double> duration_io = duration_cast<duration<double>>(t_io_end-t_io_start);
delta_t_io += duration_io.count();
ofs->close();
delete ofs;
ofs = nullptr;
}
// auto t2 = high_resolution_clock::now();
// duration<double> delta_t = duration_cast<duration<double>>(t2-t1);
// double rate_GBps = (double)Ntransferred/delta_t.count()/1.0E9;
// double rate_io_GBps = (double)ofilesize/delta_t_io/1.0E9;
//
// cout << " Closed file " << ofilename << " with " << ofilesize/1000000 << " MB" << endl;
// cout << " Transferred the last " << ((double)Ntransferred*1.0E-9) << " GB in " << delta_t.count() << " sec (" << rate_GBps << " GB/s)" << endl;
// cout << " I/O rate writing to file: " << delta_t_io << " sec (" << rate_io_GBps << " GB/s)" << endl;
if( calculate_checksum ) cout << " checksum: " << std::hex << crcsum << std::dec << endl;
// cout << "-----------------------------------------------------------" << endl;
// Tell ThreadRun to stop
stop = true;
}else{
// Report progress (n.b. this is now reported from hdrdma::Poll() )
auto t2 = high_resolution_clock::now();
//duration<double> delta_t = duration_cast<duration<double>>(t2-t_last);
//double rate_Gbps = (double)buff_len/delta_t.count()*8.0/1.0E9;
//cout << "\r received " << buff_len/1000000 << " MB (" << Ntransferred/1000000 << " MB total) - " << rate_Gbps << " Gbps ";
//cout.flush();
t_last = t2;
}
// Check if flag set indicating the remote side is finished with the connection.
// n.b. remote client can also send separate message with buff_type==2 for same effect.
if( hi->flags & HI_LAST_FILE ){
stop = true;
}
}else if( hi->buff_type == 2 ){
// Remote client is done with this connection so tell ThreadRun to stop
// n.b. remote client can also set flag in hi->flags when buff_type==1 for same effect.
stop = true;
}
}
//-------------------------------------------------------------
// ClientConnect
//
// This is called when in client mode to finish setting everything
// up for the connection. The given sockfd will already be connected
// via TCP to the server, but nothing will have been read/written
// yet.
//-------------------------------------------------------------
void hdRDMAThread::ClientConnect( int sockfd )
{
// This bit of magic ensures that the sockfd is closed and our "stopped"
// flag is set before leaving this method, even if early due to error.
std::shared_ptr<int> x(NULL, [&](int*){ close(sockfd); stopped=true;});
// Get pool buffers (all of them). If none are available then throw exception
hdrdma->GetBuffers( buffers );
if( buffers.empty() ) throw Exception("ERROR: no MR buffers available in client mode!");
// Create completion channel and completion queue.
//
// TODO:
// The cq_size can be used to cause an error if too many WR are placed
// in it. The error would come in the form of an async event (see
// ibv_get_async_event). If I understand correctly, we would need to
// make cq_size smaller than the number of WRs and then check for async
// errors in a separate thread if we wanted to guarantee that we were
// processing the data as fast as it is coming in. That adds some
// significant complication so we skip it for now.
int cq_size = buffers.size();
comp_channel = ibv_create_comp_channel( hdrdma->ctx );
cq = ibv_create_cq( hdrdma->ctx, cq_size, NULL, comp_channel, 0);
if( !cq ){
std::stringstream ss;
ss << "ERROR: Unable to create Completion Queue! errno=" << errno;
throw Exception(ss.str());
}
// Set the socket to timeout if unable to read in 10 seconds
struct timeval tv ={10, 0};
setsockopt( sockfd, SOL_SOCKET, SO_RCVTIMEO, (const char*)&tv, sizeof(tv));
// Read first 3 bytes from TCP socket to make sure the server is able to
// send us QPInfo.
char str[256];
bzero(str, 256); // status code does not include terminating null
auto n = read(sockfd, str, 3);
if( n!= 3 ) throw Exception("ERROR: Unable to read 3 byte status code from TCP socket!" );
if( std::string(str) != "OK:" ){
auto n = read(sockfd, str, 256);
if( n<=0 ) sprintf(str, "Unknown error status from server");
throw Exception( str );
}
// Exchange QP info over TCP socket so we can transmit via RDMAcout << __FILE__ << ":" << __LINE__ << endl;
ExchangeQPInfo( sockfd );
}
//-------------------------------------------------------------
// SendFile
//-------------------------------------------------------------
void hdRDMAThread::SendFile(std::string srcfilename, std::string dstfilename, bool delete_after_send, bool calculate_checksum, bool makeparentdirs)
{
// Open local file
std::ifstream ifs(srcfilename.c_str());
if( !ifs.is_open() ){
cerr <<"ERROR: Unable to open file \"" << srcfilename << "\"!" << endl;
exit(-40);
}
// Get filesize
ifs.seekg(0, ifs.end);
auto filesize = ifs.tellg();
ifs.seekg(0, ifs.beg);
double filesize_GB = (double)filesize*1.0E-9;
std::string mess = delete_after_send ? " - will be deleted after send":"";
cout << "Sending file: " << srcfilename << "-> (" << HDRDMA_REMOTE_ADDR << ":)" << dstfilename << " (" << filesize_GB << " GB)" << mess << endl;
struct ibv_send_wr wr, *bad_wr = nullptr;
struct ibv_sge sge;
bzero( &wr, sizeof(wr) );
bzero( &sge, sizeof(sge) );
wr.opcode = IBV_WR_SEND;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.send_flags = IBV_SEND_SIGNALED,
sge.lkey = hdrdma->mr->lkey;
// Send buffers
crcsum = adler32( 0L, Z_NULL, 0 );
t1 = high_resolution_clock::now();
t_last = t1;
uint64_t Ntransferred = 0;
uint64_t bytes_left = filesize;
uint32_t Noutstanding_writes = 0;
double delta_t_io = 0.0;
for(int i=0; i<1000; i++){ // if sending more than 1000 buffers something is wrong!
auto id = i%buffers.size();
auto &buffer = buffers[id];
auto buff = std::get<0>(buffer);
auto buff_len = std::get<1>(buffer);
sge.addr = (uint64_t)buff;
HeaderInfo *hi = (HeaderInfo*)sge.addr;
hi->buff_type = 1; // buffer holds data for file transfer
hi->flags = 0x0;
// First buffer must contain destination file name.
// Subsequent buffers don't.
if( i==0 ){
hi->header_len = 256;
hi->flags |= HI_FIRST_BUFFER; // first buffer of file
if( calculate_checksum ) hi->flags |= HI_CALCULATE_CHECKSUM; // tell remote server to calculate checksum
if( makeparentdirs ) hi->flags |= HI_MAKE_PARENT_DIRS; // tell remote server to make directory path if needed
sprintf( (char*)&hi->payload, dstfilename.c_str() );
}else{
hi->header_len = sizeof(*hi) - sizeof(hi->payload);
}
// Calculate bytes to be sent in this buffer
auto bytes_available = buff_len - hi->header_len;
uint64_t bytes_payload = 0;
if( bytes_available >= bytes_left ){
// last buffer of file
hi->flags |= (HI_LAST_BUFFER | HI_LAST_FILE); // flag as last buffer and last file so connection is closed
bytes_payload = bytes_left;
}else{
// intermediate buffer of file
bytes_payload = bytes_available;
}
// Read next block of data directly into mr memory
auto payload_ptr = &((char*)sge.addr)[hi->header_len];
auto t_io_start = high_resolution_clock::now();
ifs.read( payload_ptr, bytes_payload );
auto t_io_end = high_resolution_clock::now();
duration<double> duration_io = duration_cast<duration<double>>(t_io_end-t_io_start);
delta_t_io += duration_io.count();
// Total length of buffer we are actually sending
sge.length = hi->header_len + bytes_payload;
// Optionally calculate cehcksum
if( calculate_checksum ) crcsum = adler32( crcsum, (uint8_t*)payload_ptr, bytes_payload );
// Post write
auto ret = ibv_post_send( qp, &wr, &bad_wr );
if( ret != 0 ){
cout << "ERROR: ibv_post_send returned non zero value (" << ret << ")" << endl;
break;
}
Noutstanding_writes++;
Ntransferred += bytes_payload;
bytes_left -= bytes_payload;
// Report progress
auto t2 = high_resolution_clock::now();
duration<double> delta_t = duration_cast<duration<double>>(t2-t_last);
double rate_Gbps = (double)sge.length/delta_t.count()*8.0/1.0E9;
cout << "\r queued " << sge.length/1000000 << "MB (" << Ntransferred/1000000 << "/" << filesize/1000000 << " MB -- " << (100.0*Ntransferred/filesize) <<"% - " << rate_Gbps << " Gbps) ";
cout.flush();
t_last = t2;
// If we've posted data using all available sections of the mr
// then we need to wait for one to finish so we can recycle it.
if( Noutstanding_writes>=buffers.size() ){
PollCQ();
Noutstanding_writes--;
}
if( hi->flags & HI_LAST_BUFFER ) break; // this was last buffer of file
}
// Wait for final buffers to transfer
if( Noutstanding_writes != 0 ) cout << endl;
while( Noutstanding_writes > 0 ){
cout << "\r waiting for final " << Noutstanding_writes << " transfers to complete ...";
cout .flush();
PollCQ();
Noutstanding_writes--;
}
cout << endl;
// Calculate total transfer rate and report.
auto t2 = high_resolution_clock::now();
duration<double> delta_t = duration_cast<duration<double>>(t2-t1);
double rate_Gbps = (double)Ntransferred/delta_t.count()*8.0/1.0E9;
double rate_io_Gbps = (double)Ntransferred/delta_t_io*8.0/1.0E9;
//double rate_ib_Gbps = (double)Ntransferred/(delta_t.count()-delta_t_io)*8.0/1.0E9;
if( Ntransferred>2E8 ){
cout << " Transferred " << ((double)Ntransferred*1.0E-9) << " GB in " << delta_t.count() << " sec (" << rate_Gbps << " Gbps)" << endl;
cout << " I/O rate reading from file: " << delta_t_io << " sec (" << rate_io_Gbps << " Gbps)" << endl;
}else{
cout << " Transferred " << ((double)Ntransferred*1.0E-6) << " MB in " << delta_t.count() << " sec (" << rate_Gbps*1000.0 << " Mbps)" << endl;
cout << " I/O rate reading from file: " << delta_t_io << " sec (" << rate_io_Gbps*1000.0 << " Mbps)" << endl;
}
if( calculate_checksum ) cout << " checksum: " << std::hex << crcsum << std::dec << endl;
//cout << " IB rate sending file: " << delta_t.count()-delta_t_io << " sec (" << rate_ib_Gbps << " Gbps) - n.b. don't take this seriously!" << endl;
if( delete_after_send ){
unlink( srcfilename.c_str() );
cout <<" Deleted src file: " << srcfilename << endl;
}
}
//-------------------------------------------------------------
// PollCQ
//
// This is called from SendFile to check for completed send requests.
// It will block until a WR is received or the stop flag is set. Only
// one WR will be pulled from the CQ so the caller may keep track of
// number of outstanding WRs.
//-------------------------------------------------------------
void hdRDMAThread::PollCQ(void)
{
// Loop until a WR is received
int num_wc = 1;
struct ibv_wc wc;
while( !stop ){
// Check to see if a work completion notification has come in
int n = ibv_poll_cq(cq, num_wc, &wc);
if( n<0 ){
std::stringstream ss;
ss << "ERROR: ibv_poll_cq returned " << n << " - closing connection";
throw Exception( ss.str() );
}
if( n == 0 ){
std::this_thread::sleep_for(std::chrono::microseconds(1));
continue;
}
break;
}
}
//-------------------------------------------------------------
// makePath
//
// Make all directories needed for a given path to exist
//
// (copied from SO question 675039 and streamlined for Linux only)
//-------------------------------------------------------------
bool hdRDMAThread::makePath( const std::string &path )
{
mode_t mode = 0777;
int ret = mkdir( path.c_str(), mode );
if( ret == 0 ) {
cout << "mkdir: " << path << endl;
return true;
}
switch( errno ){
case ENOENT:
// parent didn't exist, try to create it
{
auto pos = path.find_last_of('/');
if( pos == std::string::npos ) return false;
if( !makePath( path.substr(0, pos) ) ) return false;
}
// now, try to create again
return 0 == mkdir( path.c_str(), mode );
case EEXIST:
// Path already exists! Verify it is actually a directory
{
struct stat info;
if( stat( path.c_str(), &info) != 0 ) return false;
return (info.st_mode & S_IFDIR) != 0;
}
default:
return false;
}
}