RxPY - Creazione di osservabili
creare
Questo metodo viene utilizzato per creare un osservabile. Avrà il metodo dell'osservatore, cioè
on_next() - Questa funzione viene chiamata quando Observable emette un elemento.
on_completed() - Questa funzione viene chiamata, quando l'Osservabile è completo.
on_error() - Questa funzione viene chiamata quando si verifica un errore sull'Observable.
Ecco un esempio funzionante:
testrx.py
from rx import create
def test_observable(observer, scheduler):
observer.on_next("Hello")
observer.on_error("Error occured")
observer.on_completed()
source = create(test_observable)
source.subscribe(
on_next = lambda i: print("Got - {0}".format(i)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!"),
)
Ecco il file output dell'osservabile creato -
E:\pyrx>python testrx.py
Got - Hello
Job Done!
vuoto
Questo osservabile non produrrà nulla ed emetterà direttamente lo stato completo.
Sintassi
empty()
Valore di ritorno
Restituirà un osservabile senza elementi.
Esempio
from rx import empty
test = empty()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
Job Done!
mai
Questo metodo crea un osservabile che non raggiungerà mai lo stato completo.
Sintassi
never()
Valore di ritorno
Restituirà un osservabile che non verrà mai completato.
Esempio
from rx import never
test = never()
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
It does not show any output.
gettare
Questo metodo creerà un osservabile che genererà un errore.
Sintassi
throw(exception)
Parametri
eccezione: un oggetto che ha dettagli sull'errore.
Valore di ritorno
Viene restituito un osservabile con i dettagli dell'errore.
Esempio
from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
Error: There is an Error!
a partire dal_
Questo metodo convertirà l'array o l'oggetto dato in un osservabile.
Sintassi
from_(iterator)
Parametri
iteratore: questo è un oggetto o un array.
Valore di ritorno
Ciò restituirà un osservabile per l'iteratore dato.
Esempio
from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!
intervallo
Questo metodo darà una serie di valori prodotti dopo un timeout.
Sintassi
interval(period)
Parametri
periodo: per avviare la sequenza di interi.
Valore di ritorno
Restituisce un osservabile con tutti i valori in ordine sequenziale.
Esempio
import rx
from rx import operators as ops
rx.interval(1).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
Produzione
E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400
appena
Questo metodo convertirà il valore dato in un osservabile.
Sintassi
just(value)
Parametri
valore: da convertire in un osservabile.
Valore di ritorno
Restituirà un osservabile con i valori dati.
Esempio
from rx import just
test = just([15, 25,50, 55])
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!
gamma
Questo metodo fornirà un intervallo di numeri interi in base all'input fornito.
Sintassi
range(start, stop=None)
Parametri
inizio: il primo valore da cui partirà l'intervallo.
stop: opzionale, l'ultimo valore dell'intervallo da fermare.
Valore di ritorno
Questo restituirà un osservabile con valore intero basato sull'input fornito.
Esempio
from rx import range
test = range(0,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!
repeat_value
Questo metodo creerà un osservabile che ripeterà il valore dato in base al conteggio.
Sintassi
repeat_value(value=None, repeat_count=None)
Parametri
valore: opzionale. Il valore da ripetere.
repeat_count: opzionale. Il numero di volte che il valore specificato deve essere ripetuto.
Valore di ritorno
Restituirà un osservabile che ripeterà il valore dato secondo il conteggio fornito.
Esempio
from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!
inizio
Questo metodo accetta una funzione come input e restituisce un osservabile che restituirà un valore dalla funzione di input.
Sintassi
start(func)
Parametri
func: una funzione che verrà chiamata.
Valore di ritorno
Restituisce un osservabile che avrà un valore di ritorno dalla funzione di input.
Esempio
from rx import start
test = start(lambda : "Hello World")
test.subscribe(
lambda x: print("The value is {0}".format(x)),
on_error = lambda e: print("Error : {0}".format(e)),
on_completed = lambda: print("Job Done!")
)
Produzione
E:\pyrx>python testrx.py
The value is Hello World
Job Done!
Timer
Questo metodo emetterà i valori in sequenza al termine del timeout.
Sintassi
timer(duetime)
Parametri
duetime: tempo dopo il quale dovrebbe emettere il primo valore.
Valore di ritorno
Restituirà un osservabile con valori emessi dopo duetime.
Esempio
import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")
Produzione
E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64