339
339
(instances of :class:`RESetMapReduceWorker`).
340
340
341
341
Each running map reduce instance work on a :class:`RecursivelyEnumeratedSet of
342
- forest type<sage.combinat.backtrack.SearchForest>` called here `C` and is coordinated by a
343
- :class:`RESetMapReduce` object called the *master*. The master is in charge of
344
- lauching the work, gathering the results and cleaning up at the end of the
345
- computation. It doesn't perform any computation associated to the generation
346
- of the element `C` nor the computation of the mapped function. It however
347
- occasionally perform a reduce, but most reducing is by default done by the
348
- workers. Also thanks to the work-stealing algorithm, the master is only
349
- involved in detecting the termination of the computation but all the load
350
- balancing is done at the level of the worker.
342
+ forest type<sage.combinat.backtrack.SearchForest>` called here `C` and is
343
+ coordinated by a :class:`RESetMapReduce` object called the ** master** . The
344
+ master is in charge of lauching the work, gathering the results and cleaning
345
+ up at the end of the computation. It doesn't perform any computation
346
+ associated to the generation of the element `C` nor the computation of the
347
+ mapped function. It however occasionally perform a reduce, but most reducing
348
+ is by default done by the workers. Also thanks to the work-stealing algorithm,
349
+ the master is only involved in detecting the termination of the computation
350
+ but all the load balancing is done at the level of the worker.
351
351
352
352
Workers are instance of :class:`RESetMapReduceWorker`. They are responsible of
353
353
doing the actual computations: elements generation, mapping and reducing. They
354
354
are also responsible of the load balancing thanks to work-stealing.
355
355
356
- Here is a description of the attribute of the *master* relevant to the
356
+ Here is a description of the attribute of the ** master* * relevant to the
357
357
map-reduce protocol:
358
358
359
359
- ``master._results`` -- a :class:`~multiprocessing.queues.SimpleQueue` where
442
442
The end of the computation
443
443
--------------------------
444
444
445
+ To detect when a computation is finished, we keep a synchronized integer which
446
+ count the number of active task. This is essentially a semaphore but semaphore
447
+ are broken on Darwin's OSes so we ship two implementations depending on the os
448
+ (see :class:`ActiveTaskCounter` and :class:`ActiveTaskCounterDarwin` and note
449
+ below).
450
+
445
451
When a worker finishes working on a task, it calls
446
- :meth:`master._signal_task_done`. This decrease the counter of the semaphore
452
+ :meth:`master._signal_task_done`. This decrease the task counter
447
453
``master._active_tasks``. When it reaches 0, it means that there are no more
448
454
nodes: the work is done. The worker executes :meth:`master._shutdown` which
449
455
sends ``AbortError`` on all :meth:`worker._request` and
450
- :meth:`worker._write_task` Queues. Each worker or thief thread receiving such a
451
- message raise the corresponding exception, stoping therefore its work. A lock
452
- called ``master._done`` ensures that shutdown is only done once.
456
+ :meth:`worker._write_task` Queues. Each worker or thief thread receiving such
457
+ a message raise the corresponding exception, stoping therefore its work. A
458
+ lock called ``master._done`` ensures that shutdown is only done once.
453
459
454
460
Finally, it is also possible to interrupt the computation before its ends
455
461
calling :meth:`master.abort()`. This is done by putting
456
462
``master._active_tasks`` to 0 and calling :meth:`master._shutdown`.
457
463
464
+ .. warning:: The MacOSX Semaphore bug
465
+
466
+ Darwin's OSes do not correctly implement POSIX's semaphore semantic.
467
+ Indeed, on this system, acquire may fail and return False not only because
468
+ the semaphore is equal to zero but also **because someone else is trying to
469
+ acquire** at the same time. This renders the usage of Semaphore impossible
470
+ on MacOSX so that on this system we use a synchronized integer.
471
+
458
472
459
473
.. _examples:
460
474
@@ -564,25 +578,17 @@ class ActiveTaskCounterDarwin(object):
564
578
r"""
565
579
Handling the number of Active Tasks
566
580
567
- A class for handling the number of active task in distributed
568
- computation process. The goal is mostly to circumvent the non POSIX
569
- compliant implementation of Semaphore under Darwin's OSes.
570
-
571
- .. note:: The MacOSX Semaphore bug
572
-
573
- This raises a big problem on MacOSX since they do not correctly
574
- implement POSIX's semaphore semantic. Indeed, on this system, acquire
575
- may fail and return False not only because the semaphore is equal to
576
- zero but also BECAUSE SOMEONE ELSE IS TRYING TO ACQUIRE at the same
577
- time. This renders the usage of Semaphore impossible on MacOSX so that
578
- on this system we use a synchronized integer.
581
+ A class for handling the number of active task in distributed computation
582
+ process. This is essentially a semaphore, but Darwin's OSes do not
583
+ correctly implement POSIX's semaphore semantic. So we use a shared integer
584
+ with a lock.
579
585
"""
580
586
def __init__ (self , task_number ):
581
587
r"""
582
588
TESTS::
583
589
584
- sage: from sage.parallel.map_reduce import ActiveTaskCounter
585
- sage: t = ActiveTaskCounter (4)
590
+ sage: from sage.parallel.map_reduce import ActiveTaskCounterDarwin as ATC
591
+ sage: t = ATC (4)
586
592
sage: TestSuite(t).run(skip="_test_pickling", verbose=True)
587
593
"""
588
594
self ._active_tasks = Value (ctypes .c_int , task_number )
@@ -592,18 +598,24 @@ def __repr__(self):
592
598
"""
593
599
TESTS::
594
600
595
- sage: from sage.parallel.map_reduce import ActiveTaskCounter
596
- sage: ActiveTaskCounter (4)
601
+ sage: from sage.parallel.map_reduce import ActiveTaskCounterDarwin as ATC
602
+ sage: ATC (4)
597
603
ActiveTaskCounter(value=4)
598
604
"""
599
605
return "ActiveTaskCounter(value=%s)" % (self ._active_tasks .value )
600
606
601
607
def task_start (self ):
602
608
r"""
609
+ Increment the task counter by one.
610
+
611
+ OUTPUT: Calling :meth:`task_start` on a zero or negative counter
612
+ returns 0, otherwise increment the counter and returns its value after
613
+ the incrementation.
614
+
603
615
EXAMPLES::
604
616
605
- sage: from sage.parallel.map_reduce import ActiveTaskCounter
606
- sage: c = ActiveTaskCounter (4); c
617
+ sage: from sage.parallel.map_reduce import ActiveTaskCounterDarwin as ATC
618
+ sage: c = ATC (4); c
607
619
ActiveTaskCounter(value=4)
608
620
sage: c.task_start()
609
621
5
@@ -612,7 +624,7 @@ def task_start(self):
612
624
613
625
Calling :meth:`task_start` on a zero counter does nothing::
614
626
615
- sage: c = ActiveTaskCounter (0)
627
+ sage: c = ATC (0)
616
628
sage: c.task_start()
617
629
0
618
630
sage: c
@@ -622,24 +634,29 @@ def task_start(self):
622
634
with self ._lock :
623
635
# The following test is not necessary but is allows active thieves to
624
636
# stop before receiving the poison pill.
625
- if self ._active_tasks .value = = 0 :
637
+ if self ._active_tasks .value < = 0 :
626
638
return 0
627
639
self ._active_tasks .value += 1
628
640
return self ._active_tasks .value
629
641
630
642
def task_done (self ):
631
643
r"""
644
+ Decrement the task counter by one.
645
+
646
+ OUTPUT: Calling :meth:`task_done` decrement the counter and returns
647
+ its value after the decrementation.
648
+
632
649
EXAMPLES::
633
650
634
- sage: from sage.parallel.map_reduce import ActiveTaskCounter
635
- sage: c = ActiveTaskCounter (4); c
651
+ sage: from sage.parallel.map_reduce import ActiveTaskCounterDarwin as ATC
652
+ sage: c = ATC (4); c
636
653
ActiveTaskCounter(value=4)
637
654
sage: c.task_done()
638
655
3
639
656
sage: c
640
657
ActiveTaskCounter(value=3)
641
658
642
- sage: c = ActiveTaskCounter (0)
659
+ sage: c = ATC (0)
643
660
sage: c.task_done()
644
661
-1
645
662
"""
@@ -650,10 +667,12 @@ def task_done(self):
650
667
651
668
def abort (self ):
652
669
r"""
670
+ Set the task counter to 0.
671
+
653
672
EXAMPLES::
654
673
655
- sage: from sage.parallel.map_reduce import ActiveTaskCounter
656
- sage: c = ActiveTaskCounter (4); c
674
+ sage: from sage.parallel.map_reduce import ActiveTaskCounterDarwin as ATC
675
+ sage: c = ATC (4); c
657
676
ActiveTaskCounter(value=4)
658
677
sage: c.abort()
659
678
sage: c
@@ -663,24 +682,60 @@ def abort(self):
663
682
self ._active_tasks .value = 0
664
683
665
684
666
- class ActiveTaskCounterOther (object ):
685
+ class ActiveTaskCounter (object ):
667
686
r"""
668
687
Handling the number of Active Tasks
669
688
670
- A class for handling the number of active task in distributed
671
- computation process. The goal is mostly to circumvent the non POSIX
672
- compliant implementation of Semaphore under Darwin's OSes.
673
-
689
+ A class for handling the number of active task in distributed computation
690
+ process. This is the standard implementation on POSIX compliant OSes. We
691
+ essentially wrap a semaphore.
674
692
"""
675
693
def __init__ (self , task_number ):
676
694
r"""
695
+ TESTS::
696
+
697
+ sage: from sage.parallel.map_reduce import ActiveTaskCounter as ATC
698
+ sage: t = ATC(4)
699
+ sage: TestSuite(t).run(skip="_test_pickling", verbose=True)
677
700
"""
678
701
self ._active_tasks = Semaphore (task_number )
679
702
680
703
def __repr__ (self ):
704
+ """
705
+ TESTS::
706
+
707
+ sage: from sage.parallel.map_reduce import ActiveTaskCounter as ATC
708
+ sage: ATC(4)
709
+ ActiveTaskCounter(value=4)
710
+ """
681
711
return "ActiveTaskCounter(value=%s)" % (self ._active_tasks .get_value ())
682
712
683
713
def task_start (self ):
714
+ r"""
715
+ Increment the task counter by one.
716
+
717
+ OUTPUT: Calling :meth:`task_start` on a zero or negative counter
718
+ returns 0, otherwise increment the counter and returns its value after
719
+ the incrementation.
720
+
721
+ EXAMPLES::
722
+
723
+ sage: from sage.parallel.map_reduce import ActiveTaskCounter as ATC
724
+ sage: c = ATC(4); c
725
+ ActiveTaskCounter(value=4)
726
+ sage: c.task_start()
727
+ 5
728
+ sage: c
729
+ ActiveTaskCounter(value=5)
730
+
731
+ Calling :meth:`task_start` on a zero counter does nothing::
732
+
733
+ sage: c = ATC(0)
734
+ sage: c.task_start()
735
+ 0
736
+ sage: c
737
+ ActiveTaskCounter(value=0)
738
+ """
684
739
logger .debug ("_signal_task_start called" )
685
740
# The following test is not necessary but is allows active thieves to
686
741
# stop before receiving the poison pill.
@@ -693,6 +748,24 @@ def task_start(self):
693
748
694
749
def task_done (self ):
695
750
r"""
751
+ Decrement the task counter by one.
752
+
753
+ OUTPUT: Calling :meth:`task_done` decrement the counter and returns
754
+ its value after the decrementation.
755
+
756
+ EXAMPLES::
757
+
758
+ sage: from sage.parallel.map_reduce import ActiveTaskCounter as ATC
759
+ sage: c = ATC(4); c
760
+ ActiveTaskCounter(value=4)
761
+ sage: c.task_done()
762
+ 3
763
+ sage: c
764
+ ActiveTaskCounter(value=3)
765
+
766
+ sage: c = ATC(0)
767
+ sage: c.task_done()
768
+ -1
696
769
"""
697
770
logger .debug ("_signal_task_done called" )
698
771
# We tests if the semaphore counting the number of active tasks is
@@ -705,14 +778,39 @@ def task_done(self):
705
778
706
779
def abort (self ):
707
780
r"""
781
+ Set the task counter to 0.
782
+
783
+ EXAMPLES::
784
+
785
+ sage: from sage.parallel.map_reduce import ActiveTaskCounter as ATC
786
+ sage: c = ATC(4); c
787
+ ActiveTaskCounter(value=4)
788
+ sage: c.abort()
789
+ sage: c
790
+ ActiveTaskCounter(value=0)
708
791
"""
709
792
while self ._active_tasks .acquire (False ):
710
793
pass
711
794
795
+ ## Timings:
796
+ #
797
+ # Timings are made with:
798
+ #
799
+ # S = RecursivelyEnumeratedSet( [[]],
800
+ # lambda l: ([l[:i] + [len(l)] + l[i:] for i in range(len(l)+1)]
801
+ # if len(l) < NNN else []),
802
+ # structure='forest', enumeration='depth')
803
+ # sage: %time sp = S.map_reduce(lambda z: x**len(z)); sp
804
+ #
805
+ # For NNN = 10
806
+ #
807
+ # Posix complient implementation : 17.04 s
808
+ # Darwin's implementation : 18.26 s
809
+
712
810
713
811
ActiveTaskCounter = (ActiveTaskCounterDarwin if sys .platform == 'darwin'
714
- else ActiveTaskCounterOther )
715
- ActiveTaskCounter = ActiveTaskCounterDarwin # to debug DARWIN's implem
812
+ else ActiveTaskCounter )
813
+ # ActiveTaskCounter = ActiveTaskCounterDarwin # to debug DARWIN's implem
716
814
717
815
718
816
0 commit comments