-
-
Notifications
You must be signed in to change notification settings - Fork 34.5k
Expand file tree
/
Copy pathmultiprocessing.rst
More file actions
2523 lines (1746 loc) · 90.5 KB
/
multiprocessing.rst
File metadata and controls
2523 lines (1746 loc) · 90.5 KB
Edit and raw actions
OlderNewer
1
:mod:`multiprocessing` --- Process-based "threading" interface
2
==============================================================
3
4
.. module:: multiprocessing
5
:synopsis: Process-based "threading" interface.
6
7
.. versionadded:: 2.6
8
9
10
Introduction
11
----------------------
12
13
:mod:`multiprocessing` is a package that supports spawning processes using an
14
API similar to the :mod:`threading` module. The :mod:`multiprocessing` package
15
offers both local and remote concurrency, effectively side-stepping the
16
:term:`Global Interpreter Lock` by using subprocesses instead of threads. Due
17
to this, the :mod:`multiprocessing` module allows the programmer to fully
18
leverage multiple processors on a given machine. It runs on both Unix and
19
Windows.
20
21
The :mod:`multiprocessing` module also introduces APIs which do not have
22
analogs in the :mod:`threading` module. A prime example of this is the
23
:class:`Pool` object which offers a convenient means of parallelizing the
24
execution of a function across multiple input values, distributing the
25
input data across processes (data parallelism). The following example
26
demonstrates the common practice of defining such functions in a module so
27
that child processes can successfully import that module. This basic example
28
of data parallelism using :class:`Pool`, ::
29
30
from multiprocessing import Pool
31
32
def f(x):
33
return x*x
34
35
if __name__ == '__main__':
36
p = Pool(5)
37
print(p.map(f, [1, 2, 3]))
38
39
will print to standard output ::
40
41
[1, 4, 9]
42
43
44
The :class:`Process` class
45
~~~~~~~~~~~~~~~~~~~~~~~~~~
46
47
In :mod:`multiprocessing`, processes are spawned by creating a :class:`Process`
48
object and then calling its :meth:`~Process.start` method. :class:`Process`
49
follows the API of :class:`threading.Thread`. A trivial example of a
50
multiprocess program is ::
51
52
from multiprocessing import Process
53
54
def f(name):
55
print 'hello', name
56
57
if __name__ == '__main__':
58
p = Process(target=f, args=('bob',))
59
p.start()
60
p.join()
61
62
To show the individual process IDs involved, here is an expanded example::
63
64
from multiprocessing import Process
65
import os
66
67
def info(title):
68
print title
69
print 'module name:', __name__
70
if hasattr(os, 'getppid'): # only available on Unix
71
print 'parent process:', os.getppid()
72
print 'process id:', os.getpid()
73
74
def f(name):
75
info('function f')
76
print 'hello', name
77
78
if __name__ == '__main__':
79
info('main line')
80
p = Process(target=f, args=('bob',))
81
p.start()
82
p.join()
83
84
For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
85
necessary, see :ref:`multiprocessing-programming`.
86
87
88
Exchanging objects between processes
89
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
90
91
:mod:`multiprocessing` supports two types of communication channel between
92
processes:
93
94
**Queues**
95
96
The :class:`~multiprocessing.Queue` class is a near clone of :class:`Queue.Queue`. For
97
example::
98
99
from multiprocessing import Process, Queue
100
101
def f(q):
102
q.put([42, None, 'hello'])
103
104
if __name__ == '__main__':
105
q = Queue()
106
p = Process(target=f, args=(q,))
107
p.start()
108
print q.get() # prints "[42, None, 'hello']"
109
p.join()
110
111
Queues are thread and process safe.
112
113
**Pipes**
114
115
The :func:`Pipe` function returns a pair of connection objects connected by a
116
pipe which by default is duplex (two-way). For example::
117
118
from multiprocessing import Process, Pipe
119
120
def f(conn):
121
conn.send([42, None, 'hello'])
122
conn.close()
123
124
if __name__ == '__main__':
125
parent_conn, child_conn = Pipe()
126
p = Process(target=f, args=(child_conn,))
127
p.start()
128
print parent_conn.recv() # prints "[42, None, 'hello']"
129
p.join()
130
131
The two connection objects returned by :func:`Pipe` represent the two ends of
132
the pipe. Each connection object has :meth:`~Connection.send` and
133
:meth:`~Connection.recv` methods (among others). Note that data in a pipe
134
may become corrupted if two processes (or threads) try to read from or write
135
to the *same* end of the pipe at the same time. Of course there is no risk
136
of corruption from processes using different ends of the pipe at the same
137
time.
138
139
140
Synchronization between processes
141
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
142
143
:mod:`multiprocessing` contains equivalents of all the synchronization
144
primitives from :mod:`threading`. For instance one can use a lock to ensure
145
that only one process prints to standard output at a time::
146
147
from multiprocessing import Process, Lock
148
149
def f(l, i):
150
l.acquire()
151
print 'hello world', i
152
l.release()
153
154
if __name__ == '__main__':
155
lock = Lock()
156
157
for num in range(10):
158
Process(target=f, args=(lock, num)).start()
159
160
Without using the lock output from the different processes is liable to get all
161
mixed up.
162
163
164
Sharing state between processes
165
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
166
167
As mentioned above, when doing concurrent programming it is usually best to
168
avoid using shared state as far as possible. This is particularly true when
169
using multiple processes.
170
171
However, if you really do need to use some shared data then
172
:mod:`multiprocessing` provides a couple of ways of doing so.
173
174
**Shared memory**
175
176
Data can be stored in a shared memory map using :class:`Value` or
177
:class:`Array`. For example, the following code ::
178
179
from multiprocessing import Process, Value, Array
180
181
def f(n, a):
182
n.value = 3.1415927
183
for i in range(len(a)):
184
a[i] = -a[i]
185
186
if __name__ == '__main__':
187
num = Value('d', 0.0)
188
arr = Array('i', range(10))
189
190
p = Process(target=f, args=(num, arr))
191
p.start()
192
p.join()
193
194
print num.value
195
print arr[:]
196
197
will print ::
198
199
3.1415927
200
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
201
202
The ``'d'`` and ``'i'`` arguments used when creating ``num`` and ``arr`` are
203
typecodes of the kind used by the :mod:`array` module: ``'d'`` indicates a
204
double precision float and ``'i'`` indicates a signed integer. These shared
205
objects will be process and thread-safe.
206
207
For more flexibility in using shared memory one can use the
208
:mod:`multiprocessing.sharedctypes` module which supports the creation of
209
arbitrary ctypes objects allocated from shared memory.
210
211
**Server process**
212
213
A manager object returned by :func:`Manager` controls a server process which
214
holds Python objects and allows other processes to manipulate them using
215
proxies.
216
217
A manager returned by :func:`Manager` will support types :class:`list`,
218
:class:`dict`, :class:`~managers.Namespace`, :class:`Lock`, :class:`RLock`,
219
:class:`Semaphore`, :class:`BoundedSemaphore`, :class:`Condition`,
220
:class:`Event`, :class:`~multiprocessing.Queue`, :class:`Value` and :class:`Array`. For
221
example, ::
222
223
from multiprocessing import Process, Manager
224
225
def f(d, l):
226
d[1] = '1'
227
d['2'] = 2
228
d[0.25] = None
229
l.reverse()
230
231
if __name__ == '__main__':
232
manager = Manager()
233
234
d = manager.dict()
235
l = manager.list(range(10))
236
237
p = Process(target=f, args=(d, l))
238
p.start()
239
p.join()
240
241
print d
242
print l
243
244
will print ::
245
246
{0.25: None, 1: '1', '2': 2}
247
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
248
249
Server process managers are more flexible than using shared memory objects
250
because they can be made to support arbitrary object types. Also, a single
251
manager can be shared by processes on different computers over a network.
252
They are, however, slower than using shared memory.
253
254
255
Using a pool of workers
256
~~~~~~~~~~~~~~~~~~~~~~~
257
258
The :class:`~multiprocessing.pool.Pool` class represents a pool of worker
259
processes. It has methods which allows tasks to be offloaded to the worker
260
processes in a few different ways.
261
262
For example::
263
264
from multiprocessing import Pool, TimeoutError
265
import time
266
import os
267
268
def f(x):
269
return x*x
270
271
if __name__ == '__main__':
272
pool = Pool(processes=4) # start 4 worker processes
273
274
# print "[0, 1, 4,..., 81]"
275
print pool.map(f, range(10))
276
277
# print same numbers in arbitrary order
278
for i in pool.imap_unordered(f, range(10)):
279
print i
280
281
# evaluate "f(20)" asynchronously
282
res = pool.apply_async(f, (20,)) # runs in *only* one process
283
print res.get(timeout=1) # prints "400"
284
285
# evaluate "os.getpid()" asynchronously
286
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
287
print res.get(timeout=1) # prints the PID of that process
288
289
# launching multiple evaluations asynchronously *may* use more processes
290
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
291
print [res.get(timeout=1) for res in multiple_results]
292
293
# make a single worker sleep for 10 secs
294
res = pool.apply_async(time.sleep, (10,))
295
try:
296
print res.get(timeout=1)
297
except TimeoutError:
298
print "We lacked patience and got a multiprocessing.TimeoutError"
299
300
Note that the methods of a pool should only ever be used by the
301
process which created it.
302
303
.. note::
304
305
Functionality within this package requires that the ``__main__`` module be
306
importable by the children. This is covered in :ref:`multiprocessing-programming`
307
however it is worth pointing out here. This means that some examples, such
308
as the :class:`Pool` examples will not work in the interactive interpreter.
309
For example::
310
311
>>> from multiprocessing import Pool
312
>>> p = Pool(5)
313
>>> def f(x):
314
... return x*x
315
...
316
>>> p.map(f, [1,2,3])
317
Process PoolWorker-1:
318
Process PoolWorker-2:
319
Process PoolWorker-3:
320
Traceback (most recent call last):
321
Traceback (most recent call last):
322
Traceback (most recent call last):
323
AttributeError: 'module' object has no attribute 'f'
324
AttributeError: 'module' object has no attribute 'f'
325
AttributeError: 'module' object has no attribute 'f'
326
327
(If you try this it will actually output three full tracebacks
328
interleaved in a semi-random fashion, and then you may have to
329
stop the master process somehow.)
330
331
332
Reference
333
---------
334
335
The :mod:`multiprocessing` package mostly replicates the API of the
336
:mod:`threading` module.
337
338
339
:class:`Process` and exceptions
340
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
341
342
.. class:: Process(group=None, target=None, name=None, args=(), kwargs={})
343
344
Process objects represent activity that is run in a separate process. The
345
:class:`Process` class has equivalents of all the methods of
346
:class:`threading.Thread`.
347
348
The constructor should always be called with keyword arguments. *group*
349
should always be ``None``; it exists solely for compatibility with
350
:class:`threading.Thread`. *target* is the callable object to be invoked by
351
the :meth:`run()` method. It defaults to ``None``, meaning nothing is
352
called. *name* is the process name. By default, a unique name is constructed
353
of the form 'Process-N\ :sub:`1`:N\ :sub:`2`:...:N\ :sub:`k`' where N\
354
:sub:`1`,N\ :sub:`2`,...,N\ :sub:`k` is a sequence of integers whose length
355
is determined by the *generation* of the process. *args* is the argument
356
tuple for the target invocation. *kwargs* is a dictionary of keyword
357
arguments for the target invocation. By default, no arguments are passed to
358
*target*.
359
360
If a subclass overrides the constructor, it must make sure it invokes the
361
base class constructor (:meth:`Process.__init__`) before doing anything else
362
to the process.
363
364
.. method:: run()
365
366
Method representing the process's activity.
367
368
You may override this method in a subclass. The standard :meth:`run`
369
method invokes the callable object passed to the object's constructor as
370
the target argument, if any, with sequential and keyword arguments taken
371
from the *args* and *kwargs* arguments, respectively.
372
373
.. method:: start()
374
375
Start the process's activity.
376
377
This must be called at most once per process object. It arranges for the
378
object's :meth:`run` method to be invoked in a separate process.
379
380
.. method:: join([timeout])
381
382
Block the calling thread until the process whose :meth:`join` method is
383
called terminates or until the optional timeout occurs.
384
385
If *timeout* is ``None`` then there is no timeout.
386
387
A process can be joined many times.
388
389
A process cannot join itself because this would cause a deadlock. It is
390
an error to attempt to join a process before it has been started.
391
392
.. attribute:: name
393
394
The process's name.
395
396
The name is a string used for identification purposes only. It has no
397
semantics. Multiple processes may be given the same name. The initial
398
name is set by the constructor.
399
400
.. method:: is_alive
401
402
Return whether the process is alive.
403
404
Roughly, a process object is alive from the moment the :meth:`start`
405
method returns until the child process terminates.
406
407
.. attribute:: daemon
408
409
The process's daemon flag, a Boolean value. This must be set before
410
:meth:`start` is called.
411
412
The initial value is inherited from the creating process.
413
414
When a process exits, it attempts to terminate all of its daemonic child
415
processes.
416
417
Note that a daemonic process is not allowed to create child processes.
418
Otherwise a daemonic process would leave its children orphaned if it gets
419
terminated when its parent process exits. Additionally, these are **not**
420
Unix daemons or services, they are normal processes that will be
421
terminated (and not joined) if non-daemonic processes have exited.
422
423
In addition to the :class:`threading.Thread` API, :class:`Process` objects
424
also support the following attributes and methods:
425
426
.. attribute:: pid
427
428
Return the process ID. Before the process is spawned, this will be
429
``None``.
430
431
.. attribute:: exitcode
432
433
The child's exit code. This will be ``None`` if the process has not yet
434
terminated. A negative value *-N* indicates that the child was terminated
435
by signal *N*.
436
437
.. attribute:: authkey
438
439
The process's authentication key (a byte string).
440
441
When :mod:`multiprocessing` is initialized the main process is assigned a
442
random string using :func:`os.urandom`.
443
444
When a :class:`Process` object is created, it will inherit the
445
authentication key of its parent process, although this may be changed by
446
setting :attr:`authkey` to another byte string.
447
448
See :ref:`multiprocessing-auth-keys`.
449
450
.. method:: terminate()
451
452
Terminate the process. On Unix this is done using the ``SIGTERM`` signal;
453
on Windows :c:func:`TerminateProcess` is used. Note that exit handlers and
454
finally clauses, etc., will not be executed.
455
456
Note that descendant processes of the process will *not* be terminated --
457
they will simply become orphaned.
458
459
.. warning::
460
461
If this method is used when the associated process is using a pipe or
462
queue then the pipe or queue is liable to become corrupted and may
463
become unusable by other process. Similarly, if the process has
464
acquired a lock or semaphore etc. then terminating it is liable to
465
cause other processes to deadlock.
466
467
Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`,
468
:meth:`terminate` and :attr:`exitcode` methods should only be called by
469
the process that created the process object.
470
471
Example usage of some of the methods of :class:`Process`:
472
473
.. doctest::
474
475
>>> import multiprocessing, time, signal
476
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
477
>>> print p, p.is_alive()
478
<Process(Process-1, initial)> False
479
>>> p.start()
480
>>> print p, p.is_alive()
481
<Process(Process-1, started)> True
482
>>> p.terminate()
483
>>> time.sleep(0.1)
484
>>> print p, p.is_alive()
485
<Process(Process-1, stopped[SIGTERM])> False
486
>>> p.exitcode == -signal.SIGTERM
487
True
488
489
490
.. exception:: BufferTooShort
491
492
Exception raised by :meth:`Connection.recv_bytes_into()` when the supplied
493
buffer object is too small for the message read.
494
495
If ``e`` is an instance of :exc:`BufferTooShort` then ``e.args[0]`` will give
496
the message as a byte string.
497
498
499
Pipes and Queues
500
~~~~~~~~~~~~~~~~
501
502
When using multiple processes, one generally uses message passing for
503
communication between processes and avoids having to use any synchronization
504
primitives like locks.
505
506
For passing messages one can use :func:`Pipe` (for a connection between two
507
processes) or a queue (which allows multiple producers and consumers).
508
509
The :class:`~multiprocessing.Queue`, :class:`multiprocessing.queues.SimpleQueue` and :class:`JoinableQueue` types are multi-producer,
510
multi-consumer FIFO queues modelled on the :class:`Queue.Queue` class in the
511
standard library. They differ in that :class:`~multiprocessing.Queue` lacks the
512
:meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join` methods introduced
513
into Python 2.5's :class:`Queue.Queue` class.
514
515
If you use :class:`JoinableQueue` then you **must** call
516
:meth:`JoinableQueue.task_done` for each task removed from the queue or else the
517
semaphore used to count the number of unfinished tasks may eventually overflow,
518
raising an exception.
519
520
Note that one can also create a shared queue by using a manager object -- see
521
:ref:`multiprocessing-managers`.
522
523
.. note::
524
525
:mod:`multiprocessing` uses the usual :exc:`Queue.Empty` and
526
:exc:`Queue.Full` exceptions to signal a timeout. They are not available in
527
the :mod:`multiprocessing` namespace so you need to import them from
528
:mod:`Queue`.
529
530
.. note::
531
532
When an object is put on a queue, the object is pickled and a
533
background thread later flushes the pickled data to an underlying
534
pipe. This has some consequences which are a little surprising,
535
but should not cause any practical difficulties -- if they really
536
bother you then you can instead use a queue created with a
537
:ref:`manager <multiprocessing-managers>`.
538
539
(1) After putting an object on an empty queue there may be an
540
infinitesimal delay before the queue's :meth:`~Queue.empty`
541
method returns :const:`False` and :meth:`~Queue.get_nowait` can
542
return without raising :exc:`Queue.Empty`.
543
544
(2) If multiple processes are enqueuing objects, it is possible for
545
the objects to be received at the other end out-of-order.
546
However, objects enqueued by the same process will always be in
547
the expected order with respect to each other.
548
549
.. warning::
550
551
If a process is killed using :meth:`Process.terminate` or :func:`os.kill`
552
while it is trying to use a :class:`~multiprocessing.Queue`, then the data in the queue is
553
likely to become corrupted. This may cause any other process to get an
554
exception when it tries to use the queue later on.
555
556
.. warning::
557
558
As mentioned above, if a child process has put items on a queue (and it has
559
not used :meth:`JoinableQueue.cancel_join_thread
560
<multiprocessing.Queue.cancel_join_thread>`), then that process will
561
not terminate until all buffered items have been flushed to the pipe.
562
563
This means that if you try joining that process you may get a deadlock unless
564
you are sure that all items which have been put on the queue have been
565
consumed. Similarly, if the child process is non-daemonic then the parent
566
process may hang on exit when it tries to join all its non-daemonic children.
567
568
Note that a queue created using a manager does not have this issue. See
569
:ref:`multiprocessing-programming`.
570
571
For an example of the usage of queues for interprocess communication see
572
:ref:`multiprocessing-examples`.
573
574
575
.. function:: Pipe([duplex])
576
577
Returns a pair ``(conn1, conn2)`` of :class:`Connection` objects representing
578
the ends of a pipe.
579
580
If *duplex* is ``True`` (the default) then the pipe is bidirectional. If
581
*duplex* is ``False`` then the pipe is unidirectional: ``conn1`` can only be
582
used for receiving messages and ``conn2`` can only be used for sending
583
messages.
584
585
586
.. class:: Queue([maxsize])
587
588
Returns a process shared queue implemented using a pipe and a few
589
locks/semaphores. When a process first puts an item on the queue a feeder
590
thread is started which transfers objects from a buffer into the pipe.
591
592
The usual :exc:`Queue.Empty` and :exc:`Queue.Full` exceptions from the
593
standard library's :mod:`Queue` module are raised to signal timeouts.
594
595
:class:`~multiprocessing.Queue` implements all the methods of :class:`Queue.Queue` except for
596
:meth:`~Queue.Queue.task_done` and :meth:`~Queue.Queue.join`.
597
598
.. method:: qsize()
599
600
Return the approximate size of the queue. Because of
601
multithreading/multiprocessing semantics, this number is not reliable.
602
603
Note that this may raise :exc:`NotImplementedError` on Unix platforms like
604
Mac OS X where ``sem_getvalue()`` is not implemented.
605
606
.. method:: empty()
607
608
Return ``True`` if the queue is empty, ``False`` otherwise. Because of
609
multithreading/multiprocessing semantics, this is not reliable.
610
611
.. method:: full()
612
613
Return ``True`` if the queue is full, ``False`` otherwise. Because of
614
multithreading/multiprocessing semantics, this is not reliable.
615
616
.. method:: put(obj[, block[, timeout]])
617
618
Put obj into the queue. If the optional argument *block* is ``True``
619
(the default) and *timeout* is ``None`` (the default), block if necessary until
620
a free slot is available. If *timeout* is a positive number, it blocks at
621
most *timeout* seconds and raises the :exc:`Queue.Full` exception if no
622
free slot was available within that time. Otherwise (*block* is
623
``False``), put an item on the queue if a free slot is immediately
624
available, else raise the :exc:`Queue.Full` exception (*timeout* is
625
ignored in that case).
626
627
.. method:: put_nowait(obj)
628
629
Equivalent to ``put(obj, False)``.
630
631
.. method:: get([block[, timeout]])
632
633
Remove and return an item from the queue. If optional args *block* is
634
``True`` (the default) and *timeout* is ``None`` (the default), block if
635
necessary until an item is available. If *timeout* is a positive number,
636
it blocks at most *timeout* seconds and raises the :exc:`Queue.Empty`
637
exception if no item was available within that time. Otherwise (block is
638
``False``), return an item if one is immediately available, else raise the
639
:exc:`Queue.Empty` exception (*timeout* is ignored in that case).
640
641
.. method:: get_nowait()
642
643
Equivalent to ``get(False)``.
644
645
:class:`~multiprocessing.Queue` has a few additional methods not found in
646
:class:`Queue.Queue`. These methods are usually unnecessary for most
647
code:
648
649
.. method:: close()
650
651
Indicate that no more data will be put on this queue by the current
652
process. The background thread will quit once it has flushed all buffered
653
data to the pipe. This is called automatically when the queue is garbage
654
collected.
655
656
.. method:: join_thread()
657
658
Join the background thread. This can only be used after :meth:`close` has
659
been called. It blocks until the background thread exits, ensuring that
660
all data in the buffer has been flushed to the pipe.
661
662
By default if a process is not the creator of the queue then on exit it
663
will attempt to join the queue's background thread. The process can call
664
:meth:`cancel_join_thread` to make :meth:`join_thread` do nothing.
665
666
.. method:: cancel_join_thread()
667
668
Prevent :meth:`join_thread` from blocking. In particular, this prevents
669
the background thread from being joined automatically when the process
670
exits -- see :meth:`join_thread`.
671
672
A better name for this method might be
673
``allow_exit_without_flush()``. It is likely to cause enqueued
674
data to lost, and you almost certainly will not need to use it.
675
It is really only there if you need the current process to exit
676
immediately without waiting to flush enqueued data to the
677
underlying pipe, and you don't care about lost data.
678
679
.. note::
680
681
This class's functionality requires a functioning shared semaphore
682
implementation on the host operating system. Without one, the
683
functionality in this class will be disabled, and attempts to
684
instantiate a :class:`Queue` will result in an :exc:`ImportError`. See
685
:issue:`3770` for additional information. The same holds true for any
686
of the specialized queue types listed below.
687
688
689
.. class:: multiprocessing.queues.SimpleQueue()
690
691
It is a simplified :class:`~multiprocessing.Queue` type, very close to a locked :class:`Pipe`.
692
693
.. method:: empty()
694
695
Return ``True`` if the queue is empty, ``False`` otherwise.
696
697
.. method:: get()
698
699
Remove and return an item from the queue.
700
701
.. method:: put(item)
702
703
Put *item* into the queue.
704
705
706
.. class:: JoinableQueue([maxsize])
707
708
:class:`JoinableQueue`, a :class:`~multiprocessing.Queue` subclass, is a queue which
709
additionally has :meth:`task_done` and :meth:`join` methods.
710
711
.. method:: task_done()
712
713
Indicate that a formerly enqueued task is complete. Used by queue consumer
714
threads. For each :meth:`~Queue.get` used to fetch a task, a subsequent
715
call to :meth:`task_done` tells the queue that the processing on the task
716
is complete.
717
718
If a :meth:`~Queue.Queue.join` is currently blocking, it will resume when all
719
items have been processed (meaning that a :meth:`task_done` call was
720
received for every item that had been :meth:`~Queue.put` into the queue).
721
722
Raises a :exc:`ValueError` if called more times than there were items
723
placed in the queue.
724
725
726
.. method:: join()
727
728
Block until all items in the queue have been gotten and processed.
729
730
The count of unfinished tasks goes up whenever an item is added to the
731
queue. The count goes down whenever a consumer thread calls
732
:meth:`task_done` to indicate that the item was retrieved and all work on
733
it is complete. When the count of unfinished tasks drops to zero,
734
:meth:`~Queue.Queue.join` unblocks.
735
736
737
Miscellaneous
738
~~~~~~~~~~~~~
739
740
.. function:: active_children()
741
742
Return list of all live children of the current process.
743
744
Calling this has the side effect of "joining" any processes which have
745
already finished.
746
747
.. function:: cpu_count()
748
749
Return the number of CPUs in the system. May raise
750
:exc:`NotImplementedError`.
751
752
.. function:: current_process()
753
754
Return the :class:`Process` object corresponding to the current process.
755
756
An analogue of :func:`threading.current_thread`.
757
758
.. function:: freeze_support()
759
760
Add support for when a program which uses :mod:`multiprocessing` has been
761
frozen to produce a Windows executable. (Has been tested with **py2exe**,
762
**PyInstaller** and **cx_Freeze**.)
763
764
One needs to call this function straight after the ``if __name__ ==
765
'__main__'`` line of the main module. For example::
766
767
from multiprocessing import Process, freeze_support
768
769
def f():
770
print 'hello world!'
771
772
if __name__ == '__main__':
773
freeze_support()
774
Process(target=f).start()
775
776
If the ``freeze_support()`` line is omitted then trying to run the frozen
777
executable will raise :exc:`RuntimeError`.
778
779
Calling ``freeze_support()`` has no effect when invoked on any operating
780
system other than Windows. In addition, if the module is being run
781
normally by the Python interpreter on Windows (the program has not been
782
frozen), then ``freeze_support()`` has no effect.
783
784
.. function:: set_executable()
785
786
Sets the path of the Python interpreter to use when starting a child process.
787
(By default :data:`sys.executable` is used). Embedders will probably need to
788
do some thing like ::
789
790
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
791
792
before they can create child processes. (Windows only)
793
794
795
.. note::
796
797
:mod:`multiprocessing` contains no analogues of
798
:func:`threading.active_count`, :func:`threading.enumerate`,
799
:func:`threading.settrace`, :func:`threading.setprofile`,
800
:class:`threading.Timer`, or :class:`threading.local`.
801
802
803
Connection Objects
804
~~~~~~~~~~~~~~~~~~
805
806
.. currentmodule:: None
807
808
Connection objects allow the sending and receiving of picklable objects or
809
strings. They can be thought of as message oriented connected sockets.
810
811
Connection objects are usually created using
812
:func:`Pipe <multiprocessing.Pipe>` -- see also
813
:ref:`multiprocessing-listeners-clients`.
814
815
.. class:: Connection
816
817
.. method:: send(obj)
818
819
Send an object to the other end of the connection which should be read
820
using :meth:`recv`.
821
822
The object must be picklable. Very large pickles (approximately 32 MB+,
823
though it depends on the OS) may raise a :exc:`ValueError` exception.
824
825
.. method:: recv()
826
827
Return an object sent from the other end of the connection using
828
:meth:`send`. Blocks until there is something to receive. Raises
829
:exc:`EOFError` if there is nothing left to receive
830
and the other end was closed.
831
832
.. method:: fileno()
833
834
Return the file descriptor or handle used by the connection.
835
836
.. method:: close()
837
838
Close the connection.
839
840
This is called automatically when the connection is garbage collected.
841
842
.. method:: poll([timeout])
843
844
Return whether there is any data available to be read.
845
846
If *timeout* is not specified then it will return immediately. If
847
*timeout* is a number then this specifies the maximum time in seconds to
848
block. If *timeout* is ``None`` then an infinite timeout is used.
849
850
.. method:: send_bytes(buffer[, offset[, size]])
851
852
Send byte data from an object supporting the buffer interface as a
853
complete message.
854
855
If *offset* is given then data is read from that position in *buffer*. If
856
*size* is given then that many bytes will be read from buffer. Very large
857
buffers (approximately 32 MB+, though it depends on the OS) may raise a
858
:exc:`ValueError` exception
859
860
.. method:: recv_bytes([maxlength])
861
862
Return a complete message of byte data sent from the other end of the
863
connection as a string. Blocks until there is something to receive.
864
Raises :exc:`EOFError` if there is nothing left
865
to receive and the other end has closed.
866
867
If *maxlength* is specified and the message is longer than *maxlength*
868
then :exc:`IOError` is raised and the connection will no longer be
869
readable.
870
871
.. method:: recv_bytes_into(buffer[, offset])
872
873
Read into *buffer* a complete message of byte data sent from the other end
874
of the connection and return the number of bytes in the message. Blocks
875
until there is something to receive. Raises
876
:exc:`EOFError` if there is nothing left to receive and the other end was
877
closed.
878
879
*buffer* must be an object satisfying the writable buffer interface. If
880
*offset* is given then the message will be written into the buffer from
881
that position. Offset must be a non-negative integer less than the
882
length of *buffer* (in bytes).
883
884
If the buffer is too short then a :exc:`BufferTooShort` exception is
885
raised and the complete message is available as ``e.args[0]`` where ``e``
886
is the exception instance.
887
888
889
For example:
890
891
.. doctest::
892
893
>>> from multiprocessing import Pipe
894
>>> a, b = Pipe()
895
>>> a.send([1, 'hello', None])
896
>>> b.recv()
897
[1, 'hello', None]
898
>>> b.send_bytes('thank you')
899
>>> a.recv_bytes()
900
'thank you'
901
>>> import array
902
>>> arr1 = array.array('i', range(5))
903
>>> arr2 = array.array('i', [0] * 10)
904
>>> a.send_bytes(arr1)
905
>>> count = b.recv_bytes_into(arr2)
906
>>> assert count == len(arr1) * arr1.itemsize
907
>>> arr2
908
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
909
910
911
.. warning::
912
913
The :meth:`Connection.recv` method automatically unpickles the data it
914
receives, which can be a security risk unless you can trust the process
915
which sent the message.
916
917
Therefore, unless the connection object was produced using :func:`Pipe` you
918
should only use the :meth:`~Connection.recv` and :meth:`~Connection.send`
919
methods after performing some sort of authentication. See
920
:ref:`multiprocessing-auth-keys`.
921
922
.. warning::
923
924
If a process is killed while it is trying to read or write to a pipe then
925
the data in the pipe is likely to become corrupted, because it may become
926
impossible to be sure where the message boundaries lie.
927
928
929
Synchronization primitives
930
~~~~~~~~~~~~~~~~~~~~~~~~~~
931
932
.. currentmodule:: multiprocessing
933
934
Generally synchronization primitives are not as necessary in a multiprocess
935
program as they are in a multithreaded program. See the documentation for
936
:mod:`threading` module.
937
938
Note that one can also create synchronization primitives by using a manager
939
object -- see :ref:`multiprocessing-managers`.
940
941
.. class:: BoundedSemaphore([value])
942
943
A bounded semaphore object: a close analog of
944
:class:`threading.BoundedSemaphore`.
945
946
A solitary difference from its close analog exists: its ``acquire`` method's
947
first argument is named *block* and it supports an optional second argument
948
*timeout*, as is consistent with :meth:`Lock.acquire`.
949
950
.. note::
951
On Mac OS X, this is indistinguishable from :class:`Semaphore` because
952
``sem_getvalue()`` is not implemented on that platform.
953
954
.. class:: Condition([lock])
955
956
A condition variable: a clone of :class:`threading.Condition`.
957
958
If *lock* is specified then it should be a :class:`Lock` or :class:`RLock`
959
object from :mod:`multiprocessing`.
960
961
.. class:: Event()
962
963
A clone of :class:`threading.Event`.
964
This method returns the state of the internal semaphore on exit, so it
965
will always return ``True`` except if a timeout is given and the operation
966
times out.
967
968
.. versionchanged:: 2.7
969
Previously, the method always returned ``None``.
970
971
972
.. class:: Lock()
973
974
A non-recursive lock object: a close analog of :class:`threading.Lock`.
975
Once a process or thread has acquired a lock, subsequent attempts to
976
acquire it from any process or thread will block until it is released;
977
any process or thread may release it. The concepts and behaviors of
978
:class:`threading.Lock` as it applies to threads are replicated here in
979
:class:`multiprocessing.Lock` as it applies to either processes or threads,
980
except as noted.
981
982
Note that :class:`Lock` is actually a factory function which returns an
983
instance of ``multiprocessing.synchronize.Lock`` initialized with a
984
default context.
985
986
:class:`Lock` supports the :term:`context manager` protocol and thus may be
987
used in :keyword:`with` statements.
988
989
.. method:: acquire(block=True, timeout=None)
990
991
Acquire a lock, blocking or non-blocking.
992
993
With the *block* argument set to ``True`` (the default), the method call
994
will block until the lock is in an unlocked state, then set it to locked
995
and return ``True``. Note that the name of this first argument differs
996
from that in :meth:`threading.Lock.acquire`.
997
998
With the *block* argument set to ``False``, the method call does not
999
block. If the lock is currently in a locked state, return ``False``;
1000
otherwise set the lock to a locked state and return ``True``.