Concorrenza in Python - Pool di thread

Supponiamo di dover creare un gran numero di thread per le nostre attività multithread. Sarebbe computazionalmente più costoso in quanto possono esserci molti problemi di prestazioni, a causa di troppi thread. Un grosso problema potrebbe essere la limitazione del throughput. Possiamo risolvere questo problema creando un pool di thread. Un pool di thread può essere definito come il gruppo di thread pre-istanziati e inattivi, che sono pronti per ricevere il lavoro. La creazione di pool di thread è preferibile rispetto all'istanza di nuovi thread per ogni attività quando è necessario eseguire un numero elevato di attività. Un pool di thread può gestire l'esecuzione simultanea di un numero elevato di thread come segue:

  • Se un thread in un pool di thread completa la sua esecuzione, quel thread può essere riutilizzato.

  • Se un thread viene terminato, verrà creato un altro thread per sostituire quel thread.

Modulo Python - Concurrent.futures

La libreria standard di Python include l'estensione concurrent.futuresmodulo. Questo modulo è stato aggiunto in Python 3.2 per fornire agli sviluppatori un'interfaccia di alto livello per l'avvio di attività asincrone. È un livello di astrazione in cima ai moduli di threading e multiprocessing di Python per fornire l'interfaccia per eseguire le attività utilizzando pool di thread o processi.

Nelle sezioni successive impareremo a conoscere le diverse classi del modulo concurrent.futures.

Classe esecutore

Executorè una classe astratta di concurrent.futuresModulo Python. Non può essere utilizzato direttamente e dobbiamo utilizzare una delle seguenti sottoclassi concrete:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor - Una sottoclasse concreta

È una delle sottoclassi concrete della classe Executor. La sottoclasse utilizza il multi-threading e otteniamo un pool di thread per inviare le attività. Questo pool assegna le attività ai thread disponibili e ne pianifica l'esecuzione.

Come creare un ThreadPoolExecutor?

Con l'aiuto di concurrent.futures modulo e la sua sottoclasse concreta Executor, possiamo creare facilmente un pool di thread. Per questo, dobbiamo costruire un fileThreadPoolExecutorcon il numero di thread che vogliamo nel pool. Per impostazione predefinita, il numero è 5. Quindi possiamo inviare un'attività al pool di thread. Quando noisubmit() un compito, torniamo a Future. L'oggetto Future ha un metodo chiamatodone(), che dice se il futuro si è risolto. Con questo, è stato impostato un valore per quel particolare oggetto futuro. Al termine di un'attività, l'esecutore del pool di thread imposta il valore sull'oggetto futuro.

Esempio

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Produzione

False
True
Completed

Nell'esempio sopra, a ThreadPoolExecutorè stato costruito con 5 fili. Quindi un'attività, che attenderà 2 secondi prima di inviare il messaggio, viene inviata all'esecutore del pool di thread. Come si vede dall'output, l'attività non viene completata fino a 2 secondi, quindi la prima chiamata adone()restituirà False. Dopo 2 secondi, l'attività è completata e otteniamo il risultato del futuro chiamando ilresult() metodo su di esso.

Creazione di istanze di ThreadPoolExecutor - Gestore contesto

Un altro modo per creare un'istanza ThreadPoolExecutorè con l'aiuto di context manager. Funziona in modo simile al metodo utilizzato nell'esempio precedente. Il vantaggio principale dell'utilizzo di context manager è che sembra sintatticamente buono. La creazione di istanze può essere eseguita con l'aiuto del codice seguente:

with ThreadPoolExecutor(max_workers = 5) as executor

Esempio

Il seguente esempio è preso in prestito dalla documentazione di Python. In questo esempio, prima di tutto il fileconcurrent.futuresil modulo deve essere importato. Quindi una funzione denominataload_url()viene creato che caricherà l'URL richiesto. La funzione quindi creaThreadPoolExecutor con i 5 fili in piscina. IlThreadPoolExecutorè stato utilizzato come gestore di contesto. Possiamo ottenere il risultato del futuro chiamando ilresult() metodo su di esso.

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

Produzione

Di seguito sarebbe l'output dello script Python sopra -

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Uso della funzione Executor.map ()

Il pitone map()è ampiamente utilizzata in una serie di attività. Uno di questi compiti è applicare una determinata funzione a ogni elemento all'interno degli iterabili. Allo stesso modo, possiamo mappare tutti gli elementi di un iteratore su una funzione e inviarli come lavori indipendenti a outThreadPoolExecutor. Considera il seguente esempio di script Python per capire come funziona la funzione.

Esempio

In questo esempio di seguito, la funzione map viene utilizzata per applicare l'estensione square() funzione su ogni valore nell'array di valori.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

Produzione

Lo script Python precedente genera il seguente output:

4
9
16
25