Concorrenza in Python - Pool di processi
Il pool di processi può essere creato e utilizzato nello stesso modo in cui abbiamo creato e utilizzato il pool di thread. Il pool di processi può essere definito come il gruppo di processi preistanziati e inattivi, che sono pronti per ricevere il lavoro. La creazione di un pool di processi è preferibile rispetto all'istanza di nuovi processi per ogni attività quando è necessario eseguire un numero elevato di attività.
Modulo Python - Concurrent.futures
La libreria standard di Python ha un modulo chiamato concurrent.futures. Questo modulo è stato aggiunto in Python 3.2 per fornire agli sviluppatori un'interfaccia di alto livello per l'avvio di attività asincrone. È un livello di astrazione in cima ai moduli di threading e multiprocessing di Python per fornire l'interfaccia per eseguire le attività utilizzando pool di thread o processi.
Nelle sezioni successive, esamineremo le diverse sottoclassi del modulo concurrent.futures.
Classe esecutore
Executor è una classe astratta di concurrent.futuresModulo Python. Non può essere utilizzato direttamente e dobbiamo utilizzare una delle seguenti sottoclassi concrete:
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor - Una sottoclasse concreta
È una delle sottoclassi concrete della classe Executor. Utilizza l'elaborazione multipla e otteniamo un pool di processi per l'invio delle attività. Questo pool assegna le attività ai processi disponibili e ne pianifica l'esecuzione.
Come creare un ProcessPoolExecutor?
Con l'aiuto di concurrent.futures modulo e la sua sottoclasse concreta Executor, possiamo facilmente creare un pool di processi. Per questo, dobbiamo costruire un fileProcessPoolExecutorcon il numero di processi che vogliamo nel pool. Per impostazione predefinita, il numero è 5. Segue l'invio di un'attività al pool di processi.
Esempio
Considereremo ora lo stesso esempio che abbiamo usato durante la creazione del pool di thread, l'unica differenza è che ora useremo ProcessPoolExecutor invece di ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def task(message):
sleep(2)
return message
def main():
executor = ProcessPoolExecutor(5)
future = executor.submit(task, ("Completed"))
print(future.done())
sleep(2)
print(future.done())
print(future.result())
if __name__ == '__main__':
main()
Produzione
False
False
Completed
Nell'esempio sopra, un ProcessPoolExecutorè stato costruito con 5 fili. Quindi un'attività, che attenderà 2 secondi prima di inviare il messaggio, viene inviata all'esecutore del pool di processi. Come si vede dall'output, l'attività non viene completata fino a 2 secondi, quindi la prima chiamata adone()restituirà False. Dopo 2 secondi, l'attività è completata e otteniamo il risultato del futuro chiamando ilresult() metodo su di esso.
Creazione di istanze di ProcessPoolExecutor - Gestore contesto
Un altro modo per istanziare ProcessPoolExecutor è con l'aiuto del gestore di contesto. Funziona in modo simile al metodo utilizzato nell'esempio precedente. Il vantaggio principale dell'utilizzo di context manager è che sembra sintatticamente buono. La creazione di istanze può essere eseguita con l'aiuto del codice seguente:
with ProcessPoolExecutor(max_workers = 5) as executor
Esempio
Per una migliore comprensione, stiamo prendendo lo stesso esempio utilizzato durante la creazione del pool di thread. In questo esempio, dobbiamo iniziare importando il fileconcurrent.futuresmodulo. Quindi una funzione denominataload_url()viene creato che caricherà l'URL richiesto. IlProcessPoolExecutorviene quindi creato con il numero 5 di thread nel pool. Il processoPoolExecutorè stato utilizzato come gestore di contesto. Possiamo ottenere il risultato del futuro chiamando ilresult() metodo su di esso.
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout = timeout) as conn:
return conn.read()
def main():
with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
if __name__ == '__main__':
main()
Produzione
Lo script Python precedente genererà il seguente output:
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229476 bytes
'http://www.cnn.com/' page is 165323 bytes
'http://www.bbc.co.uk/' page is 284981 bytes
'http://europe.wsj.com/' page is 967575 bytes
Utilizzo della funzione Executor.map ()
Il pitone map()è ampiamente utilizzata per eseguire una serie di attività. Uno di questi compiti è applicare una determinata funzione a ogni elemento all'interno degli iterabili. Allo stesso modo, possiamo mappare tutti gli elementi di un iteratore su una funzione e inviarli come lavori indipendenti aProcessPoolExecutor. Considera il seguente esempio di script Python per capirlo.
Esempio
Considereremo lo stesso esempio che abbiamo usato durante la creazione del pool di thread utilizzando il Executor.map()funzione. Nell'esempio riportato di seguito, la funzione map viene utilizzata per applicaresquare() funzione su ogni valore nell'array di valori.
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
return n * n
def main():
with ProcessPoolExecutor(max_workers = 3) as executor:
results = executor.map(square, values)
for result in results:
print(result)
if __name__ == '__main__':
main()
Produzione
Lo script Python precedente genererà il seguente output
4
9
16
25
Quando utilizzare ProcessPoolExecutor e ThreadPoolExecutor?
Ora che abbiamo studiato entrambe le classi Executor - ThreadPoolExecutor e ProcessPoolExecutor, abbiamo bisogno di sapere quando utilizzare quale executor. Dobbiamo scegliere ProcessPoolExecutor in caso di carichi di lavoro associati a CPU e ThreadPoolExecutor in caso di carichi di lavoro associati a I / O.
Se usiamo ProcessPoolExecutor, quindi non dobbiamo preoccuparci di GIL perché utilizza il multiprocessing. Inoltre, il tempo di esecuzione sarà inferiore rispetto aThreadPoolExecution. Considera il seguente esempio di script Python per capirlo.
Esempio
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Produzione
Start: 8000000 Time taken: 1.5509998798370361
Start: 7000000 Time taken: 1.3259999752044678
Total time taken: 2.0840001106262207
Example- Python script with ThreadPoolExecutor:
import time
import concurrent.futures
value = [8000000, 7000000]
def counting(n):
start = time.time()
while n > 0:
n -= 1
return time.time() - start
def main():
start = time.time()
with concurrent.futures.ThreadPoolExecutor() as executor:
for number, time_taken in zip(value, executor.map(counting, value)):
print('Start: {} Time taken: {}'.format(number, time_taken))
print('Total time taken: {}'.format(time.time() - start))
if __name__ == '__main__':
main()
Produzione
Start: 8000000 Time taken: 3.8420000076293945
Start: 7000000 Time taken: 3.6010000705718994
Total time taken: 3.8480000495910645
Dalle uscite di entrambi i programmi sopra, possiamo vedere la differenza del tempo di esecuzione durante l'utilizzo ProcessPoolExecutor e ThreadPoolExecutor.