forked from libswift/libswift
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathswift.h
1248 lines (1064 loc) · 50.7 KB
/
swift.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* swift.h
* the main header file for libswift, normally you should only read this one.
*
* This implementation supports 2 versions of swift:
* - the original (legacy version)
* - the IETF Peer-to-Peer Streaming Peer Protocol -03 compliant one.
*
* Arno: libswift clients should call the swift top-level API which consists of
* swift::Listen, Open, Read, etc. The *Transfer interfaces are internal
* to the library, except in a few exceptional cases.
*
* The swift API hides the swarm management mechanism that activates/
* deactivates file-based swarms. A swarm is activated when a *Transfer object
* exists. Live swarms are always active (LiveTransfer). Orthogonal to swarm
* activation is the use of file-based swarms in zero-state mode, which means
* that when activated only the minimal information needed to use the swarm is
* loaded into memory. In particular, content and hashes are read directly
* from disk.
*
* Created by Victor Grishchenko, Arno Bakker, Riccardo Petrocco
* Copyright 2009-2016 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
*
*/
/*
The swift protocol (legacy)
Messages
HANDSHAKE 00, channelid
Communicates the channel id of the sender. The
initial handshake packet also has the root hash
(a HASH message).
DATA 01, bin_32, buffer
1K of data.
ACK 02, bin_32, timestamp_64
HAVE 03, bin_32
Confirms successfull delivery of data. Used for congestion control, as well.
REQUEST 08, bin_32
Practical value of requests aka "hints" is to avoid overlap, mostly.
Hints might be lost in the network or ignored.
Peer might send out data without a hint.
Hint which was not responded (by DATA) in some RTTs
is considered to be ignored.
As peers cant pick randomly kilobyte here and there,
they send out "long hints" for non-base bins.
INTEGRITY 04, bin_32, sha1hash
SHA1 hash tree hashes for data verification. The
connection to a fresh peer starts with bootstrapping
him with peak hashes. Later, before sending out
any data, a peer sends the necessary uncle hashes.
PEX+/PEX- 05/06, ipv4 addr, port
Peer exchange messages; reports all connected and
disconnected peers. Might has special meaning (as
in the case with swarm supervisors).
*/
#ifndef SWIFT_H
#define SWIFT_H
// Arno, 2013-06-11: Must come first to ensure SIZE_MAX etc are defined
#include "compat.h"
#include <deque>
#include <vector>
#include <set>
#include <map>
#include <list>
#include <algorithm>
#include <string>
#include <event2/event.h>
#include <event2/event_struct.h>
#include <event2/buffer.h>
#include "bin.h"
#include "binmap.h"
#include "hashtree.h"
#include "avgspeed.h"
// Arno, 2012-05-21: MacOS X has an Availability.h :-(
#include "avail.h"
#include "address.h"
#include "Socks5Connection.h"
namespace swift {
#define SWIFT_MAX_UDP_OVER_ETH_PAYLOAD (1500-20-8)
// Arno: Maximum size of non-DATA messages in a UDP packet we send.
#define SWIFT_MAX_NONDATA_DGRAM_SIZE (SWIFT_MAX_UDP_OVER_ETH_PAYLOAD-SWIFT_DEFAULT_CHUNK_SIZE-1-4)
// Arno: Maximum size of a UDP packet we send. Note: depends on CHUNKSIZE 8192
#define SWIFT_MAX_SEND_DGRAM_SIZE (SWIFT_MAX_NONDATA_DGRAM_SIZE+1+4+8192)
// Arno: Maximum size of a UDP packet we are willing to accept. Note: depends on CHUNKSIZE 8192
#define SWIFT_MAX_RECV_DGRAM_SIZE (SWIFT_MAX_SEND_DGRAM_SIZE*2)
#define layer2bytes(ln,cs) (uint64_t)( ((double)cs)*pow(2.0,(double)ln))
#define bytes2layer(bn,cs) (int)log2( ((double)bn)/((double)cs) )
// Arno, 2012-12-12: Configure which PPSP version to use by default. Set to 0 for legacy swift.
#define ENABLE_IETF_PPSP_VERSION 1
// Whether to try legacy protocol when PPSP handshakes don't result in response
#define ENABLE_FALLBACK_TO_LEGACY_PROTO 0
// Arno, 2011-12-22: Enable Riccardo's VodPiecePicker
#define ENABLE_VOD_PIECEPICKER 1
#define SWIFT_URI_SCHEME "tswift"
// Value for protocol option: Live Discard Window
#define POPT_LIVE_DISC_WND_ALL 0xFFFFFFFF // automatically truncated for 32-bit
// Max size of the swarm ID protocol option in a HANDSHAKE message.
#define POPT_MAX_SWARMID_SIZE 1024
// Max size of a X.509 certificate in a PEX_REScert message.
#define PEX_RES_MAX_CERT_SIZE 1024
// Ric: allowed hints in the future (e.g., 2 x TINT_SEC)
#define HINT_TIME 2 // seconds
// Arno, 2011-10-03: Use libevent callback functions, no on_error?
#define sockcb_t event_callback_fn
struct sckrwecb_t {
sckrwecb_t (evutil_socket_t s=0, sockcb_t mr=NULL, sockcb_t mw=NULL,
sockcb_t oe=NULL) :
sock(s), may_read(mr), may_write(mw), on_error(oe) {}
evutil_socket_t sock;
sockcb_t may_read;
sockcb_t may_write;
sockcb_t on_error;
};
struct now_t {
static tint now;
};
#define NOW now_t::now
/** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
Most frequently used in different queues (acknowledgements, requests,
etc). */
struct tintbin {
tint time;
bin_t bin;
tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
tintbin() : time(TINT_NEVER), bin(bin_t::NONE) {}
tintbin(tint time_, bin_t bin_) : time(time_), bin(bin_) {}
tintbin(bin_t bin_) : time(NOW), bin(bin_) {}
bool operator < (const tintbin& b) const
{ return time > b.time; }
bool operator == (const tintbin& b) const
{ return time==b.time && bin==b.bin; }
bool operator != (const tintbin& b) const
{ return !(*this==b); }
};
typedef std::deque<tintbin> tbqueue;
typedef std::deque<bin_t> binqueue;
typedef std::vector<bin_t> binvector;
/** A heap (priority queue) for timestamped bin numbers (tintbins). */
class tbheap {
tbqueue data_;
public:
int size () const { return data_.size(); }
bool is_empty () const { return data_.empty(); }
tintbin pop() {
tintbin ret = data_.front();
std::pop_heap(data_.begin(),data_.end());
data_.pop_back();
return ret;
}
void push(const tintbin& tb) {
data_.push_back(tb);
push_heap(data_.begin(),data_.end());
}
const tintbin& peek() const {
return data_.front();
}
};
typedef std::pair<std::string,std::string> stringpair;
typedef std::map<std::string,std::string> parseduri_t;
bool ParseURI(std::string uri,parseduri_t &map);
/** swift protocol message types; these are used on the wire. */
typedef enum {
SWIFT_HANDSHAKE = 0,
SWIFT_DATA = 1,
SWIFT_ACK = 2,
SWIFT_HAVE = 3,
SWIFT_INTEGRITY = 4, // previously SWIFT_HASH
SWIFT_PEX_RESv4 = 5, // previously SWIFT_PEX_ADD
SWIFT_PEX_REQ = 6,
SWIFT_SIGNED_INTEGRITY = 7, // previously SWIFT_SIGNED_HASH
SWIFT_REQUEST = 8, // previously SWIFT_HINT
SWIFT_CANCEL = 9,
SWIFT_CHOKE = 10,
// SWIFT_RANDOMIZE = 10, //FRAGRAND disabled
SWIFT_UNCHOKE = 11,
SWIFT_PEX_RESv6 = 12,
SWIFT_PEX_REScert = 13,
SWIFT_MESSAGE_COUNT = 14
} messageid_t;
typedef enum {
DDIR_UPLOAD,
DDIR_DOWNLOAD
} data_direction_t;
typedef enum {
FILE_TRANSFER,
LIVE_TRANSFER
} transfer_t;
/** Arno: enum to indicate when to send an explicit close to the peer when
* doing a local close.
*/
typedef enum {
CLOSE_DO_NOT_SEND,
CLOSE_SEND,
CLOSE_SEND_IF_ESTABLISHED,
} close_send_t;
typedef enum {
VER_SWIFT_LEGACY=0, //legacy swift
VER_PPSPP_v1=1 // IETF PPSPP compliant
} popt_version_t;
// Protocol options defined by IETF PPSPP
typedef enum {
POPT_VERSION = 0,
POPT_MIN_VERSION = 1,
POPT_SWARMID = 2,
POPT_CONT_INT_PROT = 3, // content integrity protection method
POPT_MERKLE_HASH_FUNC = 4,
POPT_LIVE_SIG_ALG = 5,
POPT_CHUNK_ADDR = 6,
POPT_LIVE_DISC_WND = 7,
POPT_SUPP_MSGS = 8,
POPT_END = 255
} popt_t;
typedef enum {
POPT_CONT_INT_PROT_NONE = 0,
POPT_CONT_INT_PROT_MERKLE = 1,
POPT_CONT_INT_PROT_SIGNALL = 2,
POPT_CONT_INT_PROT_UNIFIED_MERKLE = 3
} popt_cont_int_prot_t;
typedef enum {
POPT_MERKLE_HASH_FUNC_SHA1 = 0,
POPT_MERKLE_HASH_FUNC_SHA224 = 1,
POPT_MERKLE_HASH_FUNC_SHA256 = 2,
POPT_MERKLE_HASH_FUNC_SHA384 = 3,
POPT_MERKLE_HASH_FUNC_SHA512 = 4
} popt_merkle_func_t;
typedef enum {
POPT_CHUNK_ADDR_BIN32 = 0,
POPT_CHUNK_ADDR_BYTE64 = 1,
POPT_CHUNK_ADDR_CHUNK32 = 2,
POPT_CHUNK_ADDR_BIN64 = 3,
POPT_CHUNK_ADDR_CHUNK64 = 4
} popt_chunk_addr_t;
// http://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xml
typedef enum {
POPT_LIVE_SIG_ALG_DH = 2,
POPT_LIVE_SIG_ALG_DSA = 3,
POPT_LIVE_SIG_ALG_RSASHA1 = 5,
POPT_LIVE_SIG_ALG_DSA_NSEC3_SHA1 = 6,
POPT_LIVE_SIG_ALG_RSASHA1_NSEC3_SHA1 = 7,
POPT_LIVE_SIG_ALG_RSASHA256 = 8,
POPT_LIVE_SIG_ALG_RSASHA512 = 10,
POPT_LIVE_SIG_ALG_ECC_GOST = 12,
POPT_LIVE_SIG_ALG_ECDSAP256SHA256 = 13,
POPT_LIVE_SIG_ALG_ECDSAP384SHA384 = 14,
POPT_LIVE_SIG_ALG_PRIVATEDNS = 253 // supported. Hacks ECDSA with SHA1
} popt_live_sig_alg_t;
class Handshake
{
public:
#if ENABLE_IETF_PPSP_VERSION == 1
Handshake() : version_(VER_PPSPP_v1), min_version_(VER_PPSPP_v1), merkle_func_(POPT_MERKLE_HASH_FUNC_SHA1), live_sig_alg_(POPT_LIVE_SIG_ALG_PRIVATEDNS), chunk_addr_(POPT_CHUNK_ADDR_CHUNK32), live_disc_wnd_(POPT_LIVE_DISC_WND_ALL), swarm_id_ptr_(NULL) {}
#else
Handshake() : version_(VER_SWIFT_LEGACY), min_version_(VER_SWIFT_LEGACY), merkle_func_(POPT_MERKLE_HASH_FUNC_SHA1), live_sig_alg_(POPT_LIVE_SIG_ALG_PRIVATEDNS), chunk_addr_(POPT_CHUNK_ADDR_BIN32), live_disc_wnd_(POPT_LIVE_DISC_WND_ALL), swarm_id_ptr_(NULL) {}
#endif
~Handshake() { ReleaseSwarmID(); }
void SetSwarmID(Sha1Hash &swarmid) { swarm_id_ptr_ = new Sha1Hash(swarmid); }
const Sha1Hash &GetSwarmID() { return (swarm_id_ptr_ == NULL) ? Sha1Hash::ZERO : *swarm_id_ptr_; }
void ReleaseSwarmID() { if (swarm_id_ptr_ != NULL) delete swarm_id_ptr_; swarm_id_ptr_ = NULL; }
bool IsSupported()
{
if (cont_int_prot_ == POPT_CONT_INT_PROT_SIGNALL)
return false; // PPSPTODO
else if (merkle_func_ >= POPT_MERKLE_HASH_FUNC_SHA224)
return false; // PPSPTODO
else if (chunk_addr_ == POPT_CHUNK_ADDR_BYTE64 || chunk_addr_ == POPT_CHUNK_ADDR_BIN64 || chunk_addr_ == POPT_CHUNK_ADDR_CHUNK64)
return false; // PPSPTODO
else if (live_sig_alg_ != POPT_LIVE_SIG_ALG_PRIVATEDNS)
return false; // PPSPTODO
return true;
}
void ResetToLegacy()
{
// Do not reset peer_channel_id
version_ = VER_SWIFT_LEGACY;
min_version_ = VER_SWIFT_LEGACY;
cont_int_prot_ = POPT_CONT_INT_PROT_MERKLE;
merkle_func_ = POPT_MERKLE_HASH_FUNC_SHA1;
live_sig_alg_ = POPT_LIVE_SIG_ALG_PRIVATEDNS;
chunk_addr_ = POPT_CHUNK_ADDR_BIN32;
live_disc_wnd_ = (uint32_t)POPT_LIVE_DISC_WND_ALL;
}
/** Peer channel id; zero if we are trying to open a channel. */
uint32_t peer_channel_id_;
popt_version_t version_;
popt_version_t min_version_;
popt_cont_int_prot_t cont_int_prot_;
popt_merkle_func_t merkle_func_;
popt_live_sig_alg_t live_sig_alg_; // PPSPTODO
popt_chunk_addr_t chunk_addr_;
uint64_t live_disc_wnd_;
protected:
/** Dynamically allocated such that we can deallocate it and
* save Sha1Hash::SIZE bytes per channel */
Sha1Hash *swarm_id_ptr_;
};
class PiecePicker;
//class CongestionController; // Arno: Currently part of Channel. See ::NextSendTime()
class Channel;
typedef std::vector<Channel *> channels_t;
typedef void (*ProgressCallback) (int td, bin_t bin);
typedef std::pair<ProgressCallback,uint8_t> progcallbackreg_t;
typedef std::vector<progcallbackreg_t> progcallbackregs_t;
typedef std::vector<int> tdlist_t;
class Storage;
/** Superclass for live and vod */
class ContentTransfer : public Operational {
public:
ContentTransfer(transfer_t ttype);
virtual ~ContentTransfer();
/** Returns the type of transfer, FILE_TRANSFER or LIVE_TRANSFER */
transfer_t ttype() { return ttype_; }
// Overridable methods
/** Returns the global ID for this transfer */
virtual const Sha1Hash& swarm_id() const = 0;
/** The binmap pointer for data already retrieved and checked. */
virtual binmap_t * ack_out() = 0;
/** Returns the number of bytes in a chunk for this transfer */
virtual uint32_t chunk_size() = 0;
/** Check whether all components still in working state */
virtual void UpdateOperational() = 0;
/** Piece picking strategy used by this transfer. */
PiecePicker * picker() { return picker_; }
/** Returns the local ID for this transfer. */
int td() const { return td_; }
/** Sets the ID for this transfer post create (used by SwarmManager) */
void SetTD(int td);
// Gertjan fix: return bool
bool OnPexIn(const Address& addr);
// Gertjan
Channel * RandomChannel(Channel *notc);
/** Arno: Return the Channel to peer "addr" that is not equal to "notc". */
Channel * FindChannel(const Address &addr, Channel *notc);
void CloseChannels(channels_t delset); // do not pass by reference
void GarbageCollectChannels();
// RATELIMIT
/** Arno: Call when n bytes are received. */
void OnRecvData(int n);
/** Arno: Call when n bytes are sent. */
void OnSendData(int n);
/** Arno: Call when no bytes are sent due to rate limiting. */
void OnSendNoData();
/** Ric: Call when no bytes are received. */
void OnRecvNoData();
/** Arno: Return current speed for the given direction in bytes/s */
double GetCurrentSpeed(data_direction_t ddir);
/** Arno: Return maximum speed for the given direction in bytes/s */
double GetMaxSpeed(data_direction_t ddir);
/** Arno: Set maximum speed for the given direction in bytes/s */
void SetMaxSpeed(data_direction_t ddir, double m);
/** Arno: Return the number of non-seeders current channeled with. */
uint32_t GetNumLeechers();
/** Arno: Return the number of seeders current channeled with. */
uint32_t GetNumSeeders();
/** Arno: Return (pointer to) the list of Channels for this transfer. MORESTATS */
channels_t * GetChannels() { return &mychannels_; }
/** Arno: Return the list of callbacks for this transfer */
progcallbackregs_t GetProgressCallbackRegistrations() { return callbacks_; }
// MULTIFILE
Storage * GetStorage() { return storage_; }
/** Add a peer to the set of addresses to connect to */
void AddPeer(Address &peer);
/** Ric: add number of hints for slow start scenario */
void SetSlowStartHints(uint32_t hints) { slow_start_hints_ += hints; }
/** Ric: get the # of slow start hints */
uint32_t GetSlowStartHints() { return slow_start_hints_; }
/** Arno: set the tracker for this transfer. Reseting it won't kill
* any existing connections. */
void SetTracker(Address tracker) { tracker_ = tracker; }
/** Arno: (Re)Connect to tracker for this transfer, or global Channel::tracker if not set */
void ConnectToTracker();
/** Arno: Reconnect to the tracker if no established peers and
* exp backoff allows it. */
void ReConnectToTrackerIfAllowed(bool hasestablishedpeers);
/** Progress callback management **/
void AddProgressCallback(ProgressCallback cb, uint8_t agg);
void RemoveProgressCallback(ProgressCallback cb);
void Progress(bin_t bin); /** Called by channels when data comes in */
/** Arno: Callback to do maintenance for all transfers */
static void LibeventGlobalCleanCallback(int fd, short event, void *arg);
static struct event evclean; // Global for all Transfers
static uint64_t cleancounter;
protected:
transfer_t ttype_;
int td_; // transfer descriptor as used by swift API.
/** Channels working for this transfer. */
channels_t mychannels_;
/** Progress callback management **/
progcallbackregs_t callbacks_;
/** Piece picker strategy. */
PiecePicker* picker_;
// RATELIMIT
MovingAverageSpeed cur_speed_[2];
double max_speed_[2];
uint32_t speedupcount_;
uint32_t speeddwcount_;
// MULTIFILE
Storage *storage_;
Address tracker_; // Tracker for this transfer
tint tracker_retry_interval_;
tint tracker_retry_time_;
// Ric: slow start 4 requesting hints
uint32_t slow_start_hints_;
};
/** A class representing a file/VOD transfer of one or multiple files */
class FileTransfer : public ContentTransfer {
public:
/** A constructor. Open/submit/retrieve a file.
* @param file_name the name of the file
* @param root_hash the root hash of the file; zero hash if the file
* is newly submitted
* @param force_check_diskvshash whether to force a check of disk versus hashes
* @param check_netwvshash whether to hash check chunk on receipt
* @param chunk_size size of chunk to use
* @param zerostate whether to serve the hashes + content directly from disk
*/
FileTransfer(int td, std::string file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO, bool force_check_diskvshash=true, bool check_netwvshash=true, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE, bool zerostate=false);
/** Close everything. */
~FileTransfer();
// ContentTransfer overrides
const Sha1Hash& swarm_id() const { return hashtree_->root_hash(); }
/** The binmap pointer for data already retrieved and checked. */
binmap_t * ack_out () { return hashtree_->ack_out(); }
/** Piece picking strategy used by this transfer. */
uint32_t chunk_size() { return hashtree_->chunk_size(); }
/** Check whether all components still in working state */
void UpdateOperational();
// FileTransfer specific methods
/** Hash tree checked file; all the hashes and data are kept here. */
HashTree * hashtree() { return hashtree_; }
/** Ric: the availability in the swarm */
Availability* availability() { return availability_; }
//ZEROSTATE
/** Returns whether this FileTransfer is running in zero-state mode,
* meaning that the hash tree is not mmapped into memory but read
* directly from disk, and other memory saving measures.
*/
bool IsZeroState() { return zerostate_; }
protected:
/** HashTree for transfer (either MmapHashTree or ZeroHashTree) */
HashTree* hashtree_;
// Ric: PPPLUG
/** Availability in the swarm */
Availability* availability_;
//ZEROSTATE
bool zerostate_;
};
/** A class representing a live transfer. */
class LiveTransfer : public ContentTransfer {
public:
/** A constructor. */
LiveTransfer(std::string filename, const Sha1Hash& swarm_id=Sha1Hash::ZERO,bool amsource=false,uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
/** Close everything. */
~LiveTransfer();
// ContentTransfer overrides
const Sha1Hash& swarm_id() const { return swarm_id_; }
/** The binmap for data already retrieved and checked. */
binmap_t * ack_out () { return &ack_out_; }
/** Returns the number of bytes in a chunk for this transmission */
uint32_t chunk_size() { return chunk_size_; }
/** Check whether all components still in working state */
void UpdateOperational();
// LiveTransfer specific methods
/** Returns the number of bytes that are complete sequentially, starting from the
hookin offset, till the first not-yet-retrieved packet. */
uint64_t SeqComplete();
/** Returns whether this transfer is the source */
bool am_source() { return am_source_; }
/** Source: add a chunk to the swarm */
int AddData(const void *buf, size_t nbyte);
/** Returns the byte offset at which we hooked into the live stream */
uint64_t GetHookinOffset();
// Arno: FileTransfers are managed by the SwarmManager which
// activates/deactivates them as required. LiveTransfers are unmanaged.
/** Find transfer by the transfer descriptor. */
static LiveTransfer* FindByTD(int td);
/** Find transfer by the swarm ID. */
static LiveTransfer* FindBySwarmID(const Sha1Hash& swarmid);
/** Return list of transfer descriptors of all LiveTransfers */
static tdlist_t GetTransferDescriptors();
/** Add this LiveTransfer to the global list */
void GlobalAdd();
/** Remove this LiveTransfer for the global list */
void GlobalDel();
protected:
/** Swarm Identifier. E.g hash of public key */
Sha1Hash swarm_id_;
/** Binmap of own chunk availability */
binmap_t ack_out_;
// CHUNKSIZE
/** Arno: configurable fixed chunk size in bytes */
uint32_t chunk_size_;
/** Source: Am I a source */
bool am_source_;
/** Name of file used for storing live chunks */
std::string filename_;
/** Source: ID of last generated chunk */
uint64_t last_chunkid_;
/** Source: Current write position in storage file */
uint64_t offset_;
/** Arno: global list of LiveTransfers, which are not managed */
static std::vector<LiveTransfer*> liveswarms;
};
/** PiecePicker implements some strategy of choosing (picking) what
to request next, given the possible range of choices:
data acknowledged by the peer minus data already retrieved.
May pick sequentially, do rarest first or in some other way. */
class PiecePicker {
public:
virtual void Randomize (uint64_t twist) = 0;
/** The piece picking method itself.
* @param offered the data acknowledged by the peer
* @param max_width maximum number of packets to ask for
* @param expires (not used currently) when to consider request expired
* @return the bin number to request */
virtual bin_t Pick (binmap_t& offered, uint64_t max_width, tint expires) = 0;
virtual void LimitRange (bin_t range) = 0;
/** updates the playback position for streaming piece picking.
* @param offbin bin number of new playback pos
* @param whence only SEEK_CUR supported */
virtual int Seek(bin_t offbin, int whence) = 0;
virtual ~PiecePicker() {}
};
class LivePiecePicker : public PiecePicker {
public:
/** Arno: Register which bins a peer has, to be able to choose a hook-in
* point. Because multiple HAVE messages may be encoded in single
* datagram make this a transaction like (Start/End) */
virtual void StartAddPeerPos(uint32_t channelid, bin_t peerpos, bool peerissource) = 0;
virtual void EndAddPeerPos(uint32_t channelid) = 0;
/** Returns the bin at which we hooked into the live stream. */
virtual bin_t GetHookinPos() = 0;
/** Returns the bin in the live stream we currently want to download. */
virtual bin_t GetCurrentPos() = 0;
};
/** swift channel's "control block"; channels loosely correspond to TCP
connections or FTP sessions; one channel is created for one file
being transferred between two peers. As we don't need buffers and
lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
Normally, API users do not deal with this class. */
class Channel {
public:
Channel( ContentTransfer* transfer, int socket=INVALID_SOCKET, Address peer=Address(), bool peerissource=false);
~Channel();
typedef enum {
KEEP_ALIVE_CONTROL,
PING_PONG_CONTROL,
SLOW_START_CONTROL,
AIMD_CONTROL,
LEDBAT_CONTROL,
CLOSE_CONTROL
} send_control_t;
#define DGRAM_MAX_SOCK_OPEN 128
static int sock_count;
static sckrwecb_t sock_open[DGRAM_MAX_SOCK_OPEN];
static Socks5Connection socks5_connection;
static Address tracker; // Global tracker for all transfers
static struct event_base *evbase;
static struct event evrecv;
static const char* SEND_CONTROL_MODES[];
static tint epoch, start;
static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up, global_bytes_down;
static void CloseChannelByAddress(const Address &addr);
// SOCKMGMT
// Arno: channel is also a "singleton" class that manages all sockets
// for a swift process
static void LibeventSendCallback(int fd, short event, void *arg);
static void LibeventReceiveCallback(int fd, short event, void *arg);
static void RecvDatagram (evutil_socket_t socket); // Called by LibeventReceiveCallback
static int RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer *evb); // Called by RecvDatagram
static int SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer *evb); // Called by Channel::Send()
static evutil_socket_t Bind(Address address, sckrwecb_t callbacks=sckrwecb_t());
static Address BoundAddress(evutil_socket_t sock);
static evutil_socket_t default_socket()
{ return sock_count ? sock_open[0].sock : INVALID_SOCKET; }
/** close the port */
static void CloseSocket(evutil_socket_t sock);
static void Shutdown();
/** the current time */
static tint Time();
static tint last_tick;
// Ric: used for testing LEDBAT's behaviour
float GetCwnd() { return cwnd_; }
uint64_t GetHintSize() { return hint_in_size_; }
bool Totest;
bool Tocancel;
// Arno: Per instance methods
void Recv (struct evbuffer *evb);
void Send (); // Called by LibeventSendCallback
void Close (close_send_t closesend);
void OnAck (struct evbuffer *evb);
void OnHave (struct evbuffer *evb);
bin_t OnData (struct evbuffer *evb);
void OnHint (struct evbuffer *evb);
void OnHash (struct evbuffer *evb);
void OnPexAdd(struct evbuffer *evb, int family);
void OnPexAddCert (struct evbuffer *evb);
static Handshake *StaticOnHandshake( Address &addr, uint32_t cid, bool ver_known, popt_version_t ver, struct evbuffer *evb);
void OnHandshake (Handshake *hishs);
void OnCancel(struct evbuffer *evb); // PPSP
void OnChoke(struct evbuffer *evb);
void OnUnchoke(struct evbuffer *evb);
void OnSignedHash (struct evbuffer *evb);
void AddHandshake (struct evbuffer *evb);
bin_t AddData (struct evbuffer *evb);
void AddAck (struct evbuffer *evb);
void AddHave (struct evbuffer *evb);
void AddHint (struct evbuffer *evb);
void AddCancel (struct evbuffer *evb);
void AddUncleHashes (struct evbuffer *evb, bin_t pos);
void AddPeakHashes (struct evbuffer *evb);
void AddPex (struct evbuffer *evb);
void OnPexReq(void);
void AddPexReq(struct evbuffer *evb);
void BackOffOnLosses (float ratio=0.5);
tint SwitchSendControl (send_control_t control_mode);
tint NextSendTime ();
tint KeepAliveNextSendTime ();
tint PingPongNextSendTime ();
tint CwndRateNextSendTime ();
tint SlowStartNextSendTime ();
tint AimdNextSendTime ();
tint LedbatNextSendTime ();
/** Arno: return true if this peer has complete file. May be fuzzy if Peak Hashes not in */
bool IsComplete();
/** Arno: return (UDP) port for this channel */
uint16_t GetMyPort();
bool IsDiffSenderOrDuplicate(Address addr, uint32_t chid);
static int MAX_REORDERING;
static tint TIMEOUT;
static tint MIN_DEV;
static tint MAX_SEND_INTERVAL;
static tint LEDBAT_TARGET;
static float LEDBAT_GAIN;
static tint LEDBAT_DELAY_BIN;
static bool SELF_CONN_OK;
static tint MAX_POSSIBLE_RTT;
static tint MIN_PEX_REQUEST_INTERVAL;
static FILE* debug_file;
const std::string id_string () const;
/** A channel is "established" if had already sent and received packets. */
bool is_established () { return (hs_in_ == NULL) ? false : hs_in_->peer_channel_id_ && own_id_mentioned_; }
HashTree * hashtree();
ContentTransfer *transfer() { return transfer_; }
const Address& peer() const { return peer_; }
const Address& recv_peer() const { return recv_peer_; }
tint ack_timeout () {
tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
tint tmo = rtt_avg_ + dev * 4;
return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
}
uint32_t id () const { return id_; }
const binmap_t& ack_in() const { return ack_in_; }
// MORESTATS
uint64_t raw_bytes_up() { return raw_bytes_up_; }
uint64_t raw_bytes_down() { return raw_bytes_down_; }
uint64_t bytes_up() { return bytes_up_; }
uint64_t bytes_down() { return bytes_down_; }
static int DecodeID(int scrambled);
static int EncodeID(int unscrambled);
static Channel* channel(int i) {
return i<channels.size()?channels[i]:NULL;
}
// SAFECLOSE
void ClearEvents();
void Schedule4Delete() { scheduled4del_ = true; }
bool IsScheduled4Delete() { return scheduled4del_; }
//ZEROSTATE
// Message handler replacements
void OnDataZeroState(struct evbuffer *evb);
void OnHaveZeroState(struct evbuffer *evb);
void OnHashZeroState(struct evbuffer *evb);
void OnPexAddZeroState(struct evbuffer *evb, int family);
void OnPexAddCertZeroState(struct evbuffer *evb);
void OnPexReqZeroState(struct evbuffer *evb);
tint GetOpenTime() { return open_time_; }
// LIVE
/** Arno: Called when source generates chunk. */
void LiveSend();
void CloseOnError();
protected:
struct event *evsend_ptr_; // Arno: timer per channel // SAFECLOSE
//LIVE
struct event *evsendlive_ptr_; // Arno: timer per channel
/** Channel id: index in the channel array. */
uint32_t id_;
/** Socket address of the peer. */
Address peer_;
/** The UDP socket fd. */
evutil_socket_t socket_;
/** Descriptor of the file in question. */
ContentTransfer* transfer_;
bool own_id_mentioned_;
/** Peer's progress, based on acknowledgements. */
binmap_t ack_in_;
/** Last data received; needs to be acked immediately. */
tintbin data_in_;
bin_t data_in_dbl_;
/** The history of data sent and still unacknowledged. */
tbqueue data_out_;
/** Timeouted data (potentially to be retransmitted). */
tbqueue data_out_tmo_;
bin_t data_out_cap_;
/** Index in the history array. */
binmap_t have_out_;
/** Transmit schedule: in most cases filled with the peer's hints */
tbqueue hint_in_;
uint64_t hint_in_size_;
/** Hints sent (to detect and reschedule ignored hints). */
tbqueue hint_out_;
uint64_t hint_out_size_;
/** Ric: hints that are removed from the hint_out_ queue and need to be canceled */
std::deque<bin_t> cancel_out_;
/** Types of messages the peer accepts. */
uint64_t cap_in_;
/** PEX progress */
bool pex_requested_;
tint last_pex_request_time_;
tint next_pex_request_time_;
bool pex_request_outstanding_;
tbqueue reverse_pex_out_; // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
int useless_pex_count_;
/** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
tint rtt_avg_, dev_avg_, dip_avg_;
tint last_send_time_;
tint last_recv_time_;
tint last_data_out_time_;
tint last_data_in_time_;
tint last_loss_time_;
tint next_send_time_;
tint open_time_;
/** Congestion window; TODO: int, bytes. */
float cwnd_;
int cwnd_count1_;
/** Data sending interval. */
tint send_interval_;
/** The congestion control strategy. */
send_control_t send_control_;
/** Datagrams (not data) sent since last recv. */
int sent_since_recv_;
/** Arno: Fix for KEEP_ALIVE_CONTROL */
bool lastrecvwaskeepalive_;
bool lastsendwaskeepalive_;
/** Arno: For live, we may receive a HAVE but have no hints
outstanding. In that case we should not wait till next_send_time_
but request directly. See send_control.cpp */
bool live_have_no_hint_;
/** Recent acknowlegements for data previously sent. */
int ack_rcvd_recent_;
/** Recent non-acknowlegements (losses) of data previously sent. */
int ack_not_rcvd_recent_;
/** LEDBAT one-way delay machinery */
tint owd_min_bins_[4];
int owd_min_bin_;
tint owd_min_bin_start_;
tint owd_current_[4];
int owd_cur_bin_;
/** Stats */
int dgrams_sent_;
int dgrams_rcvd_;
// Arno, 2011-11-28: for detailed, per-peer stats. MORESTATS
uint64_t raw_bytes_up_, raw_bytes_down_, bytes_up_, bytes_down_;
// SAFECLOSE
bool scheduled4del_;
/** Arno: Socket address of the peer where packets are received from,
* when an IANA private address, otherwise 0.
* May not be equal to peer_. 2PEERSBEHINDSAMENAT */
Address recv_peer_;
bool direct_sending_;
//LIVE
bool peer_is_source_;
// PPSP
/** Handshake I sent to peer. swarmid not set. */
Handshake *hs_out_;
/** Handshake I got from peer. */
Handshake *hs_in_;
// RTTCS
tintbin rtt_hint_tintbin_;
int PeerBPS() const { return TINT_SEC / dip_avg_ * 1024; }
/** Get a request for one packet from the queue of peer's requests. */
bin_t DequeueHint(bool *retransmitptr);
bin_t ImposeHint();
void TimeoutDataOut ();
void CleanStaleHintOut();
void CleanHintOut(bin_t pos);
void Reschedule();
void UpdateDIP(bin_t pos); // RETRANSMIT
// Arno, 2012-06-14: Replace with hashtable (unsorted_map). This
// currently grows for ever, filling with NULLs for old channels
// and results in channel IDs with are not really random.
//
static channels_t channels;
};
// MULTIFILE
/*
* Class representing a single file in a multi-file swarm.
*/
class StorageFile : public Operational
{
public:
StorageFile(std::string specpath, int64_t start, int64_t size, std::string ospath);
~StorageFile();
int64_t GetStart() { return start_; }
int64_t GetEnd() { return end_; }
int64_t GetSize() { return end_+1-start_; }
std::string GetSpecPathName() { return spec_pathname_; }
std::string GetOSPathName() { return os_pathname_; }
ssize_t Write(const void *buf, size_t nbyte, int64_t offset) { return pwrite(fd_,buf,nbyte,offset); }
ssize_t Read(void *buf, size_t nbyte, int64_t offset) { return pread(fd_,buf,nbyte,offset); }
int ResizeReserved() { return file_resize(fd_,GetSize()); }
protected:
std::string spec_pathname_;
std::string os_pathname_;
int64_t start_;
int64_t end_;
int fd_; // actual fd
};
typedef std::vector<StorageFile *> storage_files_t;
/*
* Class representing the persistent storage layer. Supports a swarm
* stored as multiple files.
*
* This is implemented by storing a multi-file specification in chunk 0
* (and further if needed). This spec lists what other files the swarm
* contains and their sizes. E.g.
*
* META-INF-multifilespec.txt 113
* seeder/190557.ts 249798796
* seeder/berta.dat 2395920988
* seeder/bunny.ogg 166825767
*
* The concatenation of these files (starting with the multi-file spec with
* pseudo filename META-INF-multifile-spec.txt) are the contents of the
* swarm.
*/
class Storage : public Operational {
public:
static const std::string MULTIFILE_PATHNAME;
static const std::string MULTIFILE_PATHNAME_FILE_SEP;
static const int MULTIFILE_MAX_PATH = 2048;
static const int MULTIFILE_MAX_LINE = MULTIFILE_MAX_PATH+1+32+1;
typedef enum {
STOR_STATE_INIT,
STOR_STATE_MFSPEC_SIZE_KNOWN,
STOR_STATE_MFSPEC_COMPLETE,
STOR_STATE_SINGLE_FILE
} storage_state_t;
/** StorageFile for every file in this transfer */
typedef std::vector<StorageFile *> storage_files_t;
/** convert multi-file spec filename (UTF-8 encoded Unicode) to OS name and vv. */
static std::string spec2ospn(std::string specpn);
static std::string os2specpn(std::string ospn);
/** Create Storage from specified path and destination dir if content turns about to be a multi-file */
Storage(std::string ospathname, std::string destdir, int td);
~Storage();
/** UNIX pread approximation. Does change file pointer. Thread-safe if no concurrent writes */
ssize_t Read(void *buf, size_t nbyte, int64_t offset); // off_t not 64-bit dynamically on Win32
/** UNIX pwrite approximation. Does change file pointer. Is not thread-safe */
ssize_t Write(const void *buf, size_t nbyte, int64_t offset);
/** Link to HashTree */
void SetHashTree(HashTree *ht) { ht_ = ht; }