RxPY - Guida rapida

Questo capitolo spiega cos'è la programmazione reattiva, cos'è RxPY, i suoi operatori, caratteristiche, vantaggi e svantaggi.

Cos'è la programmazione reattiva?

La programmazione reattiva è un paradigma di programmazione, che si occupa del flusso di dati e della propagazione del cambiamento. Significa che, quando un flusso di dati viene emesso da un componente, la modifica verrà propagata ad altri componenti da una libreria di programmazione reattiva. La propagazione del cambiamento continuerà fino a raggiungere il ricevitore finale.

Utilizzando RxPY, hai un buon controllo sui flussi di dati asincroni, ad esempio, una richiesta effettuata all'URL può essere tracciata utilizzando osservabile e usa l'osservatore per ascoltare quando la richiesta è completa per la risposta o l'errore.

RxPY ti offre la possibilità di gestire flussi di dati asincroni utilizzando Observables, interroga i flussi di dati usando Operators vale a dire filtro, somma, concatenazione, mappa e anche utilizzare la concorrenza per i flussi di dati utilizzando Schedulers. La creazione di un Observable fornisce un oggetto osservatore con i metodi on_next (v), on_error (e) e on_completed (), che deve esseresubscribed in modo da ricevere una notifica quando si verifica un evento.

L'Observable può essere interrogato utilizzando più operatori in un formato a catena utilizzando l'operatore pipe.

RxPY offre operatori in varie categorie come: -

  • Operatori matematici

  • Operatori di trasformazione

  • Operatori di filtraggio

  • Operatori di gestione degli errori

  • Operatori di servizi

  • Operatori condizionali

  • Operatori di creazione

  • Operatori collegabili

Questi operatori sono spiegati in dettaglio in questo tutorial.

Cos'è RxPy?

RxPY è definito come a library for composing asynchronous and event-based programs using observable collections and pipable query operators in Python come da sito web ufficiale di RxPy, che è https://rxpy.readthedocs.io/en/latest/.

RxPY è una libreria python per supportare la programmazione reattiva. RxPy sta perReactive Extensions for Python. È una libreria che utilizza osservabili per lavorare con la programmazione reattiva che si occupa di chiamate dati asincrone, callback e programmi basati su eventi.

Caratteristiche di RxPy

In RxPy, i seguenti concetti si occupano della gestione dell'attività asincrona -

Osservabile

Un osservabile è una funzione che crea un osservatore e lo collega alla sorgente con flussi di dati attesi, ad esempio, da Tweet, eventi relativi al computer, ecc.

Osservatore

È un oggetto con metodi on_next (), on_error () e on_completed (), che verrà chiamato quando c'è interazione con l'osservabile, cioè la fonte interagisce per un esempio di Tweet in arrivo, ecc.

Sottoscrizione

Quando l'osservabile viene creato, per eseguire l'osservabile dobbiamo sottoscriverlo.

Operatori

Un operatore è una funzione pura che accetta l'osservabile come input e anche l'output è osservabile. È possibile utilizzare più operatori su un dato osservabile utilizzando l'operatore pipe.

Soggetto

Un soggetto è una sequenza osservabile così come un osservatore che può trasmettere in multicast, cioè parlare con molti osservatori che si sono iscritti. Il soggetto è un osservabile freddo, cioè i valori saranno condivisi tra gli osservatori che sono stati sottoscritti.

Schedulatori

Una caratteristica importante di RxPy è la concorrenza, ovvero consentire l'esecuzione del compito in parallelo. Per far sì che ciò accada, RxPy ha due operatori subscribe_on () e explore_on () che lavorano con gli scheduler e decideranno l'esecuzione dell'attività sottoscritta.

Vantaggi dell'utilizzo di RxPY

I seguenti sono i vantaggi di RxPy:

  • RxPY è una libreria fantastica quando si tratta della gestione di flussi di dati ed eventi asincroni. RxPY utilizza osservabili per lavorare con la programmazione reattiva che si occupa di chiamate dati asincrone, callback e programmi basati su eventi.

  • RxPY offre una vasta collezione di operatori in categorie matematiche, trasformazione, filtraggio, utilità, condizionale, gestione degli errori, join che semplifica la vita se utilizzato con la programmazione reattiva.

  • La concorrenza, ovvero il lavoro di più attività insieme, si ottiene utilizzando gli scheduler in RxPY.

  • Le prestazioni vengono migliorate utilizzando RxPY poiché la gestione delle attività asincrone e dell'elaborazione parallela è semplificata.

Svantaggio dell'utilizzo di RxPY

  • Il debug del codice con osservabili è un po 'difficile.

In questo capitolo lavoreremo all'installazione di RxPy. Per iniziare a lavorare con RxPY, dobbiamo prima installare Python. Quindi, lavoreremo su quanto segue:

  • Installa Python
  • Installa RxPy

Installazione di Python

Vai al sito ufficiale di Python: https://www.python.org/downloads/.come mostrato di seguito e fare clic sull'ultima versione disponibile per Windows, Linux / Unix e mac os. Scarica Python secondo il tuo sistema operativo a 64 o 32 bit disponibile con te.

Una volta scaricato, fai clic sul file .exe file e segui i passaggi per installare python sul tuo sistema.

Anche il gestore di pacchetti python, ovvero pip, verrà installato di default con l'installazione precedente. Per farlo funzionare globalmente sul tuo sistema, aggiungi direttamente la posizione di python alla variabile PATH, la stessa viene mostrata all'inizio dell'installazione, ricordati di spuntare la casella di spunta, che dice ADD to PATH. Nel caso in cui ti dimentichi di controllarlo, segui i passaggi indicati di seguito per aggiungere a PATH.

Per aggiungere a PATH seguire i passaggi seguenti:

Fare clic con il pulsante destro del mouse sull'icona del computer e fare clic su proprietà → Impostazioni di sistema avanzate.

Verrà visualizzata la schermata come mostrato di seguito:

Fare clic su Variabili d'ambiente come mostrato sopra. Verrà visualizzata la schermata come mostrato di seguito:

Seleziona Percorso e fai clic sul pulsante Modifica, aggiungi il percorso della posizione del tuo pitone alla fine. Ora, controlliamo la versione di Python.

Controllo della versione di Python

E:\pyrx>python --version
Python 3.7.3

Installa RxPY

Ora che abbiamo installato Python, installeremo RxPy.

Una volta installato python, verrà installato anche il gestore pacchetti python, ovvero pip. Di seguito è riportato il comando per verificare la versione di pip -

E:\pyrx>pip --version
pip 19.1.1 from c:\users\xxxx\appdata\local\programs\python\python37\lib\site-
packages\pip (python 3.7)

Abbiamo pip installato e la versione è 19.1.1. Ora useremo pip per installare RxPy

Il comando è il seguente:

pip install rx

In questo tutorial, stiamo usando RxPY versione 3 e python versione 3.7.3. Il funzionamento di RxPY versione 3 differisce leggermente dalla versione precedente, ovvero RxPY versione 1.

In questo capitolo, discuteremo le differenze tra le 2 versioni e le modifiche che devono essere eseguite nel caso in cui si stiano aggiornando le versioni Python e RxPY.

Osservabile in RxPY

Nella versione 1 di RxPy, Observable era una classe separata -

from rx import Observable

Per usare l'osservabile, devi usarlo come segue:

Observable.of(1,2,3,4,5,6,7,8,9,10)

In RxPy versione 3, Observable è direttamente una parte del pacchetto rx.

Example

import rx
rx.of(1,2,3,4,5,6,7,8,9,10)

Operatori in RxPy

Nella versione 1, l'operatore era metodi nella classe Observable. Ad esempio, per utilizzare gli operatori dobbiamo importare Observable come mostrato di seguito:

from rx import Observable

Gli operatori vengono utilizzati come Observable.operator, ad esempio, come mostrato di seguito:

Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Nel caso di RxPY versione 3, gli operatori sono funzionali e vengono importati e utilizzati come segue:

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Concatenamento di operatori utilizzando il metodo Pipe ()

In RxPy versione 1, nel caso in cui dovessi usare più operatori su un osservabile, doveva essere fatto come segue:

Example

from rx import Observable
Observable.of(1,2,3,4,5,6,7,8,9,10)\
   .filter(lambda i: i %2 == 0) \
   .sum() \
   .subscribe(lambda x: print("Value is {0}".format(x)))

Ma, in caso di RxPY versione 3, puoi usare il metodo pipe () e più operatori come mostrato di seguito -

Example

import rx
from rx import operators as ops
rx.of(1,2,3,4,5,6,7,8,9,10).pipe(
   ops.filter(lambda i: i %2 == 0),
   ops.sum()
).subscribe(lambda x: print("Value is {0}".format(x)))

Un osservabile è una funzione che crea un osservatore e lo collega alla sorgente in cui sono previsti valori, ad esempio clic, eventi del mouse da un elemento dom, ecc.

Gli argomenti menzionati di seguito saranno studiati in dettaglio in questo capitolo.

  • Crea osservabili

  • Sottoscrivi ed esegui un osservabile

Crea osservabili

Per creare un osservabile useremo create() e passagli la funzione che ha i seguenti elementi.

  • on_next() - Questa funzione viene chiamata quando l'osservabile emette un oggetto.

  • on_completed() - Questa funzione viene chiamata quando l'Observable è completo.

  • on_error() - Questa funzione viene chiamata quando si verifica un errore sull'Observable.

Per lavorare con il metodo create () prima importa il metodo come mostrato di seguito -

from rx import create

Ecco un esempio funzionante, per creare un osservabile:

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error")
   observer.on_completed()
source = create(test_observable).

Sottoscrivi ed esegui un osservabile

Per iscriversi a un osservabile, è necessario utilizzare la funzione subscribe () e passare la funzione di callback on_next, on_error e on_completed.

Ecco un esempio funzionante:

testrx.py

from rx import create
deftest_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Il metodo subscribe () si occupa di eseguire l'osservabile. La funzione di callbackon_next, on_error e on_completeddeve essere passato al metodo di sottoscrizione. La chiamata al metodo di sottoscrizione, a sua volta, esegue la funzione test_observable ().

Non è obbligatorio passare tutte e tre le funzioni di callback al metodo subscribe (). Puoi passare secondo i tuoi requisiti on_next (), on_error () e on_completed ().

La funzione lambda viene utilizzata per on_next, on_error e on_completed. Prenderà gli argomenti ed eseguirà l'espressione data.

Ecco l'output, dell'osservabile creato -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

Questo capitolo spiega in dettaglio gli operatori in RxPY. Questi operatori includono:

  • Lavorare con gli operatori
  • Operatori matematici
  • Operatori di trasformazione
  • Operatori di filtraggio
  • Operatori di gestione degli errori
  • Operatori di servizi
  • Operatori condizionali
  • Operatori di creazione
  • Operatori collegabili
  • Combinare gli operatori

Python reattivo (Rx) ha quasi molti operatori, che semplificano la vita con il codice Python. È possibile utilizzare questi più operatori insieme, ad esempio, mentre si lavora con le stringhe è possibile utilizzare operatori di mappatura, filtro e unione.

Lavorare con gli operatori

Puoi lavorare con più operatori insieme usando il metodo pipe (). Questo metodo consente di concatenare più operatori insieme.

Ecco un esempio funzionante dell'utilizzo degli operatori:

test = of(1,2,3) // an observable
subscriber = test.pipe(
   op1(),
   op2(),
   op3()
)

Nell'esempio sopra, abbiamo creato un osservabile utilizzando il metodo of () che accetta i valori 1, 2 e 3. Ora, su questo osservabile, puoi eseguire un'operazione diversa, utilizzando qualsiasi numero di operatori utilizzando il metodo pipe () come mostrato sopra. L'esecuzione degli operatori proseguirà sequenzialmente sull'osservabile dato.

Per lavorare con gli operatori, importalo prima come mostrato di seguito:

from rx import of, operators as op

Ecco un esempio funzionante:

testrx.py

from rx import of, operators as op
test = of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
sub1 = test.pipe(
   op.filter(lambda s: s%2==0),
   op.reduce(lambda acc, x: acc + x)
)
sub1.subscribe(lambda x: print("Sum of Even numbers is {0}".format(x)))

Nell'esempio precedente, è presente un elenco di numeri, da cui filtriamo i numeri pari utilizzando un operatore di filtro e successivamente lo aggiungiamo utilizzando un operatore di riduzione.

Output

E:\pyrx>python testrx.py
Sum of Even numbers is 30

Ecco un elenco di operatori di cui parleremo:

  • Creazione di osservabili
  • Operatori matematici
  • Operatori di trasformazione
  • Operatori di filtraggio
  • Operatori di gestione degli errori
  • Operatori di servizi
  • Conditional
  • Connectable
  • Combinare gli operatori

Creazione di osservabili

Di seguito sono riportati gli osservabili, che discuteremo nella categoria Creazione

Mostra esempi

Osservabile Descrizione
creare Questo metodo viene utilizzato per creare un osservabile.
vuoto Questo osservabile non produrrà nulla ed emetterà direttamente lo stato completo.
mai Questo metodo crea un osservabile che non raggiungerà mai lo stato completo.
gettare Questo metodo creerà un osservabile che genererà un errore.
a partire dal_ Questo metodo convertirà l'array o l'oggetto dato in un osservabile.
intervallo Questo metodo darà una serie di valori prodotti dopo un timeout.
appena Questo metodo convertirà il valore dato in un osservabile.
gamma Questo metodo fornirà un intervallo di numeri interi in base all'input fornito.
repeat_value Questo metodo creerà un osservabile che ripeterà il valore dato in base al conteggio.
inizio Questo metodo accetta una funzione come input e restituisce un osservabile che restituirà un valore dalla funzione di input.
Timer Questo metodo emetterà i valori in sequenza al termine del timeout.

Operatori matematici

Gli operatori di cui parleremo nella categoria Operatore matematico sono i seguenti: -

Mostra esempi

Operatore Descrizione
media Questo operatore calcolerà la media dalla sorgente osservabile data e produrrà un osservabile che avrà il valore medio.
concat Questo operatore prenderà due o più osservabili e gli verrà assegnato un unico osservabile con tutti i valori nella sequenza.
contare

Questo operatore accetta un osservabile con valori e lo converte in un osservabile che avrà un unico valore. La funzione count accetta la funzione predicato come argomento opzionale.

La funzione è di tipo booleano e aggiungerà valore all'output solo se soddisfa la condizione.

max Questo operatore fornirà un osservabile con il valore massimo dalla sorgente osservabile.
min Questo operatore darà un osservabile con valore minimo dalla sorgente osservabile.
ridurre Questo operatore accetta una funzione chiamata funzione accumulator che viene utilizzata sui valori provenienti dalla sorgente osservabile e restituisce i valori accumulati sotto forma di osservabile, con un valore seed opzionale passato alla funzione accumulator.
somma Questo operatore restituirà un osservabile con la somma di tutti i valori delle osservabili di origine.

Operatori di trasformazione

Gli operatori di cui parleremo nella categoria Operatore di trasformazione sono menzionati di seguito:

Mostra esempi

Operatore Categoria
buffer Questo operatore raccoglierà tutti i valori dalla sorgente osservabile e li emetterà a intervalli regolari una volta soddisfatta la data condizione al contorno.
ground_by Questo operatore raggrupperà i valori provenienti dalla sorgente osservabile in base alla funzione key_mapper data.
carta geografica Questo operatore cambierà ogni valore dalla sorgente osservabile in un nuovo valore basato sull'output di mapper_func fornito.
scansione Questo operatore applicherà una funzione accumulatore ai valori provenienti dalla sorgente osservabile e restituirà un osservabile con nuovi valori.

Operatori di filtraggio

Gli operatori di cui parleremo nella categoria Operatori di filtraggio sono riportati di seguito:

Mostra esempi

Operatore Categoria
antirimbalzo Questo operatore fornirà i valori dalla sorgente osservabile, fino al periodo di tempo specificato e ignorerà il resto del tempo.
distinto Questo operatore darà tutti i valori che sono distinti dalla fonte osservabile.
element_at Questo operatore fornirà un elemento dalla fonte osservabile per l'indice dato.
filtro Questo operatore filtrerà i valori dalla sorgente osservabile in base alla funzione del predicato fornita.
primo Questo operatore darà il primo elemento osservabile dalla sorgente.
ignore_elements Questo operatore ignorerà tutti i valori dalla fonte osservabile ed eseguirà solo chiamate per completare o generare errori nelle funzioni di callback.
scorso Questo operatore fornirà l'ultimo elemento dalla sorgente osservabile.
Salta Questo operatore restituirà un osservabile che salterà la prima occorrenza degli elementi di conteggio presi come input.
skip_last Questo operatore restituirà un osservabile che salterà l'ultima occorrenza degli elementi di conteggio presi come input.
prendere Questo operatore fornirà un elenco di valori di origine in ordine continuo in base al conteggio fornito.
take_last Questo operatore fornirà un elenco di valori di origine in ordine continuo dall'ultimo in base al conteggio fornito.

Operatori di gestione degli errori

Gli operatori di cui parleremo nella categoria Operatore di gestione degli errori sono: -

Mostra esempi

Operatore Descrizione
catturare Questo operatore terminerà la sorgente osservabile quando si verifica un'eccezione.
riprova Questo operatore riproverà sull'origine osservabile quando si verifica un errore e una volta terminato il conteggio dei tentativi, terminerà.

Operatori di servizi

I seguenti sono gli operatori di cui discuteremo nella categoria Operatore di utilità.

Mostra esempi

Operatore Descrizione
ritardo Questo operatore ritarderà l'emissione osservabile della fonte in base all'ora o alla data fornita.
materializzarsi Questo operatore convertirà i valori dalla sorgente osservabile con i valori emessi sotto forma di valori di notifica esplicita.
Intervallo di tempo Questo operatore fornirà il tempo trascorso tra i valori osservabili dalla sorgente.
tempo scaduto Questo operatore fornirà tutti i valori dalla sorgente osservabili dopo il tempo trascorso oppure attiverà un errore.
timestamp Questo operatore allegherà un timestamp a tutti i valori dalla fonte osservabile.

Operatori condizionali e booleani

Gli operatori di cui parleremo nella categoria Operatore condizionale e Operatore booleano sono i seguenti:

Mostra esempi

Operatore Descrizione
tutti Questo operatore verificherà se tutti i valori della sorgente osservabile soddisfano la condizione data.
contiene Questo operatore restituirà un osservabile con il valore vero o falso se il valore dato è presente e se è il valore della sorgente osservabile.
default_if_empty Questo operatore restituirà un valore predefinito se l'osservabile di origine è vuoto.
sequenza_equal Questo operatore confronterà due sequenze di osservabili o una matrice di valori e restituirà un'osservabile con il valore vero o falso.
skip_until Questo operatore scarterà i valori dalla sorgente osservabile fino a quando la seconda osservabile non emetterà un valore.
skip_ while Questo operatore restituirà un osservabile con valori dalla sorgente osservabile che soddisfa la condizione passata.
take_until Questo operatore scarterà i valori dalla sorgente osservabile dopo che la seconda osservabile emette un valore o viene terminata.
prendere_tempo Questo operatore scarterà i valori dalla fonte osservabile quando la condizione fallisce.

Operatori collegabili

Gli operatori di cui parleremo nella categoria Operatore collegabile sono:

Mostra esempi

Operatore Descrizione
pubblicare Questo metodo convertirà l'osservabile in un osservabile collegabile.
ref_count Questo operatore renderà l'osservabile una normale osservabile.
replay Questo metodo funziona in modo simile a replaySubject. Questo metodo restituirà gli stessi valori, anche se l'osservabile ha già emesso e alcuni degli abbonati sono in ritardo nella sottoscrizione.

Combinazione di operatori

I seguenti sono gli operatori che discuteremo nella categoria Operatore di combinazione.

Mostra esempi

Operatore Descrizione
combinare_latest Questo operatore creerà una tupla per l'osservabile dato come input.
unire Questo operatore unirà dati osservabili.
iniziare con Questo operatore prenderà i valori dati e aggiungerà all'inizio della sorgente osservabile per restituire l'intera sequenza.
cerniera lampo Questo operatore restituisce un osservabile con valori in una forma tupla che è formata prendendo il primo valore dell'osservabile dato e così via.

Un soggetto è una sequenza osservabile, così come un osservatore che può trasmettere in multicast, cioè parlare con molti osservatori che si sono iscritti.

Discuteremo i seguenti argomenti sull'argomento:

  • Crea un soggetto
  • Iscriviti a un argomento
  • Passaggio dei dati al soggetto
  • BehaviorSubject
  • ReplaySubject
  • AsyncSubject

Crea un soggetto

Per lavorare con un oggetto, dobbiamo importare l'oggetto come mostrato di seguito:

from rx.subject import Subject

Puoi creare un soggetto-oggetto come segue:

subject_test = Subject()

L'oggetto è un osservatore che ha tre metodi:

  • on_next(value)
  • on_error (errore) e
  • on_completed()

Iscriviti a un argomento

È possibile creare più abbonamenti sull'argomento come mostrato di seguito:

subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)

Passaggio di dati al soggetto

È possibile passare i dati all'oggetto creato utilizzando il metodo on_next (valore) come mostrato di seguito -

subject_test.on_next("A")
subject_test.on_next("B")

I dati verranno passati a tutti gli iscritti, aggiunti sull'argomento.

Ecco un esempio funzionante dell'argomento.

Esempio

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_next("B")

L'oggetto subject_test viene creato chiamando un Subject (). L'oggetto subject_test fa riferimento ai metodi on_next (value), on_error (error) e on_completed (). L'output dell'esempio precedente è mostrato di seguito:

Produzione

E:\pyrx>python testrx.py
The value is A
The value is A
The value is B
The value is B

Possiamo usare il metodo on_completed (), per interrompere l'esecuzione del soggetto come mostrato di seguito.

Esempio

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("The value is {0}".format(x))
)
subject_test.on_next("A")
subject_test.on_completed()
subject_test.on_next("B")

Una volta chiamato complete, il metodo successivo chiamato in seguito non viene richiamato.

Produzione

E:\pyrx>python testrx.py
The value is A
The value is A

Vediamo ora come chiamare il metodo on_error (error).

Esempio

from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.subscribe(
   on_error = lambda e: print("Error : {0}".format(e))
)
subject_test.on_error(Exception('There is an Error!'))

Produzione

E:\pyrx>python testrx.py
Error: There is an Error!
Error: There is an Error!

BehaviorSubject

BehaviorSubject ti darà l'ultimo valore quando viene chiamato. Puoi creare un soggetto comportamentale come mostrato di seguito:

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject"); // initialized the behaviour subject with value:Testing Behaviour Subject

Ecco un esempio funzionante per utilizzare Behavior Subject

Esempio

from rx.subject import BehaviorSubject
behavior_subject = BehaviorSubject("Testing Behaviour Subject");
behavior_subject.subscribe(
   lambda x: print("Observer A : {0}".format(x))
)
behavior_subject.on_next("Hello")
behavior_subject.subscribe(
   lambda x: print("Observer B : {0}".format(x))
)
behavior_subject.on_next("Last call to Behaviour Subject")

Produzione

E:\pyrx>python testrx.py
Observer A : Testing Behaviour Subject
Observer A : Hello
Observer B : Hello
Observer A : Last call to Behaviour Subject
Observer B : Last call to Behaviour Subject

Replay oggetto

Un replaysubject è simile al behavior subject, in cui può memorizzare i valori e riprodurre lo stesso ai nuovi abbonati. Ecco un esempio funzionante di soggetto replay.

Esempio

from rx.subject import ReplaySubject
replay_subject = ReplaySubject(2)
replay_subject.subscribe(lambda x: print("Testing Replay Subject A: {0}".format(x)))
replay_subject.on_next(1)
replay_subject.on_next(2)
replay_subject.on_next(3)
replay_subject.subscribe(lambda x: print("Testing Replay Subject B: {0}".format(x)));
replay_subject.on_next(5)

Il valore del buffer utilizzato è 2 sull'oggetto della riproduzione. Quindi, gli ultimi due valori verranno memorizzati nel buffer e utilizzati per i nuovi abbonati chiamati.

Produzione

E:\pyrx>python testrx.py
Testing Replay Subject A: 1
Testing Replay Subject A: 2
Testing Replay Subject A: 3
Testing Replay Subject B: 2
Testing Replay Subject B: 3
Testing Replay Subject A: 5
Testing Replay Subject B: 5

AsyncSubject

Nel caso di AsyncSubject, l'ultimo valore chiamato viene passato al sottoscrittore e verrà eseguito solo dopo che il metodo complete () è stato chiamato.

Esempio

from rx.subject import AsyncSubject
async_subject = AsyncSubject()
async_subject.subscribe(lambda x: print("Testing Async Subject A: {0}".format(x)))
async_subject.on_next(1)
async_subject.on_next(2)
async_subject.on_completed()
async_subject.subscribe(lambda x: print("Testing Async Subject B: {0}".format(x)))
Here, before complete is called, the last value passed to the subject is 2, and the same is given to the subscribers.

Produzione

E:\pyrx>python testrx.py
Testing Async Subject A: 2
Testing Async Subject B: 2

Una caratteristica importante di RxPy è la concorrenza, ovvero consentire l'esecuzione del task in parallelo. Per fare ciò, abbiamo due operatori subscribe_on () e explore_on () che lavoreranno con uno scheduler, che deciderà l'esecuzione del task sottoscritto.

Ecco un esempio funzionante che mostra la necessità di subscibe_on (), Observ_on () e scheduler.

Esempio

import random
import time
import rx
from rx import operators as ops
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a))
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
) 
input("Press any key to exit\n")

Nell'esempio precedente, ho 2 attività: Attività 1 e Attività 2. L'esecuzione dell'attività è in sequenza. La seconda attività viene avviata solo al termine della prima.

Produzione

E:\pyrx>python testrx.py
From Task 1: 1
From Task 1: 2
From Task 1: 3
From Task 1: 4
From Task 1: 5
Task 1 complete
From Task 2: 1
From Task 2: 2
From Task 2: 3
From Task 2: 4
Task 2 complete

RxPy supporta molti Scheduler e qui utilizzeremo ThreadPoolScheduler. ThreadPoolScheduler proverà principalmente a gestire con i thread della CPU disponibili.

Nell'esempio che abbiamo visto prima, utilizzeremo un modulo multiprocessing che ci darà il cpu_count. Il conteggio verrà assegnato al ThreadPoolScheduler che riuscirà a far funzionare l'attività in parallelo in base ai thread disponibili.

Ecco un esempio funzionante:

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
input("Press any key to exit\n")

Nell'esempio sopra, ho 2 attività e cpu_count è 4. Poiché l'attività è 2 e i thread disponibili con noi sono 4, entrambe le attività possono iniziare in parallelo.

Produzione

E:\pyrx>python testrx.py
Cpu count is : 4
Press any key to exit
From Task 1: 1
From Task 2: 1
From Task 1: 2
From Task 2: 2
From Task 2: 3
From Task 1: 3
From Task 2: 4
Task 2 complete
From Task 1: 4
From Task 1: 5
Task 1 complete

Se vedi l'output, entrambe le attività sono state avviate in parallelo.

Ora, considera uno scenario in cui l'attività è maggiore del conteggio della CPU, cioè il conteggio della CPU è 4 e le attività sono 5. In questo caso, dovremmo controllare se qualche thread è libero dopo il completamento dell'attività, in modo che assegnato alla nuova attività disponibile in coda.

A questo scopo, possiamo usare l'operatore Observ_on () che osserverà lo scheduler se qualche thread è libero. Ecco un esempio funzionante che utilizza osservare_on ()

Esempio

import multiprocessing
import random
import time
from threading import current_thread
import rx
from rx.scheduler import ThreadPoolScheduler
from rx import operators as ops
# calculate cpu count, using which will create a ThreadPoolScheduler
thread_count = multiprocessing.cpu_count()
thread_pool_scheduler = ThreadPoolScheduler(thread_count)
print("Cpu count is : {0}".format(thread_count))
def adding_delay(value):
   time.sleep(random.randint(5, 20) * 0.1)
   return value
# Task 1
rx.of(1,2,3,4,5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 1: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 1 complete")
)
# Task 2
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 2: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 2 complete")
)
#Task 3
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 3: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 3 complete")
)
#Task 4
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.subscribe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 4: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 4 complete")
)
#Task 5
rx.range(1, 5).pipe(
   ops.map(lambda a: adding_delay(a)),
   ops.observe_on(thread_pool_scheduler)
).subscribe(
   lambda s: print("From Task 5: {0}".format(s)),
   lambda e: print(e),
   lambda: print("Task 5 complete")
)
input("Press any key to exit\n")

Produzione

E:\pyrx>python testrx.py
Cpu count is : 4
From Task 4: 1
From Task 4: 2
From Task 1: 1
From Task 2: 1
From Task 3: 1
From Task 1: 2
From Task 3: 2
From Task 4: 3
From Task 3: 3
From Task 2: 2
From Task 1: 3
From Task 4: 4
Task 4 complete
From Task 5: 1
From Task 5: 2
From Task 5: 3
From Task 3: 4
Task 3 complete
From Task 2: 3
Press any key to exit
From Task 5: 4
Task 5 complete
From Task 1: 4
From Task 2: 4
Task 2 complete
From Task 1: 5
Task 1 complete

Se vedi l'output, nel momento in cui l'attività 4 è completa, il thread viene passato all'attività successiva, cioè l'attività 5 e la stessa inizia l'esecuzione.

In questo capitolo, discuteremo in dettaglio i seguenti argomenti:

  • Esempio di base che mostra il funzionamento di osservabile, operatori e sottoscrizione all'osservatore.
  • Differenza tra osservabile e soggetto.
  • Capire le osservabili fredde e calde.

Di seguito è riportato un esempio di base che mostra il funzionamento di osservabili, operatori e sottoscrizione all'osservatore.

Esempio

test.py

import requests
import rx
import json
from rx import operators as ops
def filternames(x):
   if (x["name"].startswith("C")):
      return x["name"]
   else :
      return ""
content = requests.get('https://jsonplaceholder.typicode.com/users')
y = json.loads(content.text)
source = rx.from_(y)
case1 = source.pipe(
   ops.filter(lambda c: filternames(c)),
   ops.map(lambda a:a["name"])
)
case1.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)), 8. RxPy — Examples
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Ecco un esempio molto semplice, in cui ottengo i dati dell'utente da questo URL:

https://jsonplaceholder.typicode.com/users.

Filtrare i dati, per dare i nomi che iniziano con "C", e successivamente utilizzare la mappa per restituire solo i nomi. Ecco l'output per lo stesso -

E:\pyrx\examples>python test.py
Got - Clementine Bauch
Got - Chelsey Dietrich
Got - Clementina DuBuque
Job Done!

Differenza tra osservabile e soggetto

In questo esempio, vedremo la differenza tra un osservabile e un soggetto.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Produzione

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Nell'esempio sopra, ogni volta che ti iscrivi all'osservabile, ti darà nuovi valori.

Oggetto Esempio

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Produzione

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Se vedi i valori sono condivisi, tra entrambi gli iscritti utilizzando l'oggetto.

Comprensione degli osservabili freddi e caldi

Un osservabile è classificato come

  • Osservabili freddi
  • Osservabili caldi

La differenza nelle osservabili sarà notata quando più abbonati si stanno abbonando.

Osservabili freddi

Osservabili a freddo, sono osservabili che vengono eseguiti e rende i dati ogni volta che vengono sottoscritti. Quando viene sottoscritto, l'osservabile viene eseguito e vengono forniti i nuovi valori.

Il seguente esempio fornisce la comprensione del freddo osservabile.

from rx import of, operators as op
import random
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
print("From first subscriber")
subscriber1 = sub1.subscribe(lambda i: print("From sub1 {0}".format(i)))
print("From second subscriber")
subscriber2 = sub1.subscribe(lambda i: print("From sub2 {0}".format(i)))

Produzione

E:\pyrx>python testrx.py
From first subscriber
From sub1 1.610450821095726
From sub1 2.9567564032037335
From sub1 3.933217537811936
From sub1 4.82444905626622
From sub1 5.929414892567188
From second subscriber
From sub2 1.8573813517529874
From sub2 2.902433239469483
From sub2 3.2289868093016825
From sub2 4.050413890694411
From sub2 5.226515068012821

Nell'esempio sopra, ogni volta che ti iscrivi all'osservabile, eseguirà l'osservabile ed emetterà valori. I valori possono anche differire da abbonato ad abbonato come mostrato nell'esempio precedente.

Osservabili caldi

Nel caso di osservabili a caldo, emetteranno i valori quando saranno pronti e non aspetteranno sempre un abbonamento. Quando i valori vengono emessi, tutti gli abbonati riceveranno lo stesso valore.

È possibile utilizzare l'osservabile a caldo quando si desidera che i valori vengano emessi quando l'osservabile è pronto o si desidera condividere gli stessi valori con tutti i propri iscritti.

Un esempio di caldo osservabile è Soggetto e operatori collegabili.

from rx import of, operators as op
import random
from rx.subject import Subject
subject_test = Subject()
subject_test.subscribe(
   lambda x: print("From sub1 {0}".format(x))
)
subject_test.subscribe(
   lambda x: print("From sub2 {0}".format(x))
)
test1 = of(1,2,3,4,5)
sub1 = test1.pipe(
   op.map(lambda a : a+random.random())
)
subscriber = sub1.subscribe(subject_test)

Produzione

E:\pyrx>python testrx.py
From sub1 1.1789422863284509
From sub2 1.1789422863284509
From sub1 2.5525627903260153
From sub2 2.5525627903260153
From sub1 3.4191549324778325
From sub2 3.4191549324778325
From sub1 4.644042420199624
From sub2 4.644042420199624
From sub1 5.079896897489065
From sub2 5.079896897489065

Se vedi, lo stesso valore è condiviso tra gli iscritti. È possibile ottenere lo stesso risultato utilizzando l'operatore osservabile collegabile publish ().