RxPY - Creazione di osservabili

creare

Questo metodo viene utilizzato per creare un osservabile. Avrà il metodo dell'osservatore, cioè

  • on_next() - Questa funzione viene chiamata quando Observable emette un elemento.

  • on_completed() - Questa funzione viene chiamata, quando l'Osservabile è completo.

  • on_error() - Questa funzione viene chiamata quando si verifica un errore sull'Observable.

Ecco un esempio funzionante:

testrx.py

from rx import create
def test_observable(observer, scheduler):
   observer.on_next("Hello")
   observer.on_error("Error occured")
   observer.on_completed()
source = create(test_observable)
source.subscribe(
   on_next = lambda i: print("Got - {0}".format(i)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!"),
)

Ecco il file output dell'osservabile creato -

E:\pyrx>python testrx.py
Got - Hello
Job Done!

vuoto

Questo osservabile non produrrà nulla ed emetterà direttamente lo stato completo.

Sintassi

empty()

Valore di ritorno

Restituirà un osservabile senza elementi.

Esempio

from rx import empty
test = empty()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
Job Done!

mai

Questo metodo crea un osservabile che non raggiungerà mai lo stato completo.

Sintassi

never()

Valore di ritorno

Restituirà un osservabile che non verrà mai completato.

Esempio

from rx import never
test = never()
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

It does not show any output.

gettare

Questo metodo creerà un osservabile che genererà un errore.

Sintassi

throw(exception)

Parametri

eccezione: un oggetto che ha dettagli sull'errore.

Valore di ritorno

Viene restituito un osservabile con i dettagli dell'errore.

Esempio

from rx import throw
test = throw(Exception('There is an Error!'))
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
Error: There is an Error!

a partire dal_

Questo metodo convertirà l'array o l'oggetto dato in un osservabile.

Sintassi

from_(iterator)

Parametri

iteratore: questo è un oggetto o un array.

Valore di ritorno

Ciò restituirà un osservabile per l'iteratore dato.

Esempio

from rx import from_
test = from_([1,2,3,4,5,6,7,8,9,10])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
The value is 10
Job Done!

intervallo

Questo metodo darà una serie di valori prodotti dopo un timeout.

Sintassi

interval(period)

Parametri

periodo: per avviare la sequenza di interi.

Valore di ritorno

Restituisce un osservabile con tutti i valori in ordine sequenziale.

Esempio

import rx
from rx import operators as ops
rx.interval(1).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

Produzione

E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64
The value is 81
The value is 100
The value is 121
The value is 144
The value is 169
The value is 196
The value is 225
The value is 256
The value is 289
The value is 324
The value is 361
The value is 400

appena

Questo metodo convertirà il valore dato in un osservabile.

Sintassi

just(value)

Parametri

valore: da convertire in un osservabile.

Valore di ritorno

Restituirà un osservabile con i valori dati.

Esempio

from rx import just
test = just([15, 25,50, 55])
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
The value is [15, 25, 50, 55]
Job Done!

gamma

Questo metodo fornirà un intervallo di numeri interi in base all'input fornito.

Sintassi

range(start, stop=None)

Parametri

inizio: il primo valore da cui partirà l'intervallo.

stop: opzionale, l'ultimo valore dell'intervallo da fermare.

Valore di ritorno

Questo restituirà un osservabile con valore intero basato sull'input fornito.

Esempio

from rx import range
test = range(0,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
The value is 0
The value is 1
The value is 2
The value is 3
The value is 4
The value is 5
The value is 6
The value is 7
The value is 8
The value is 9
Job Done!

repeat_value

Questo metodo creerà un osservabile che ripeterà il valore dato in base al conteggio.

Sintassi

repeat_value(value=None, repeat_count=None)

Parametri

valore: opzionale. Il valore da ripetere.

repeat_count: opzionale. Il numero di volte che il valore specificato deve essere ripetuto.

Valore di ritorno

Restituirà un osservabile che ripeterà il valore dato secondo il conteggio fornito.

Esempio

from rx import repeat_value
test = repeat_value(44,10)
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
The value is 44
Job Done!

inizio

Questo metodo accetta una funzione come input e restituisce un osservabile che restituirà un valore dalla funzione di input.

Sintassi

start(func)

Parametri

func: una funzione che verrà chiamata.

Valore di ritorno

Restituisce un osservabile che avrà un valore di ritorno dalla funzione di input.

Esempio

from rx import start
test = start(lambda : "Hello World")
test.subscribe(
   lambda x: print("The value is {0}".format(x)),
   on_error = lambda e: print("Error : {0}".format(e)),
   on_completed = lambda: print("Job Done!")
)

Produzione

E:\pyrx>python testrx.py
The value is Hello World
Job Done!

Timer

Questo metodo emetterà i valori in sequenza al termine del timeout.

Sintassi

timer(duetime)

Parametri

duetime: tempo dopo il quale dovrebbe emettere il primo valore.

Valore di ritorno

Restituirà un osservabile con valori emessi dopo duetime.

Esempio

import rx
from rx import operators as ops
rx.timer(5.0, 10).pipe(
   ops.map(lambda i: i * i)
).subscribe(lambda x: print("The value is {0}".format(x)))
input("Press any key to exit\n")

Produzione

E:\pyrx>python testrx.py
Press any key to exit
The value is 0
The value is 1
The value is 4
The value is 9
The value is 16
The value is 25
The value is 36
The value is 49
The value is 64