97国产精品视频人人做人人爱,久久精品亚洲日本,亚洲国产精品线播放,亚洲另类欧洲综合久久

      Go中響應(yīng)式編程庫(kù)RxGo詳細(xì)介紹 熱頭條
      2023-04-23 01:47:48 來源:博客園

      最近的項(xiàng)目用到了 RxGo ,因?yàn)橹皬臎]有接觸過,特意去學(xué)了學(xué),特此記錄下。文章很多內(nèi)容是復(fù)制了參考資料或者官方文檔。如果涉及侵權(quán),請(qǐng)聯(lián)系刪除,謝謝。

      1、RxGo簡(jiǎn)介

      1.1 基礎(chǔ)介紹

      RxGo是一個(gè)基于Go語言的響應(yīng)式編程庫(kù),它提供了一種簡(jiǎn)單而強(qiáng)大的方式來處理異步事件流和數(shù)據(jù)流。RxGo的設(shè)計(jì)靈感來自于ReactiveX,它提供了類似于ReactiveX的操作符和概念,如Observable、Observer、Subject、Scheduler等。

      RxGo的目標(biāo)是提供一種簡(jiǎn)單而強(qiáng)大的方式來處理異步事件流和數(shù)據(jù)流,使得開發(fā)人員可以更容易地編寫高效、可維護(hù)和可擴(kuò)展的代碼。RxGo的特點(diǎn)包括:


      (資料圖片僅供參考)

      1. 響應(yīng)式編程:RxGo提供了Observable和Observer兩個(gè)核心概念,使得開發(fā)人員可以更容易地處理異步事件流和數(shù)據(jù)流。
      2. 操作符:RxGo提供了類似于ReactiveX的操作符,如map、filter、reduce等,使得開發(fā)人員可以更容易地對(duì)事件流進(jìn)行轉(zhuǎn)換、過濾和聚合等操作。
      3. 調(diào)度器:RxGo提供了調(diào)度器,使得開發(fā)人員可以更容易地控制事件流的執(zhí)行線程和順序。
      4. 可組合性:RxGo的操作符具有可組合性,使得開發(fā)人員可以更容易地組合多個(gè)操作符來實(shí)現(xiàn)復(fù)雜的操作。
      5. 高效性:RxGo的設(shè)計(jì)和實(shí)現(xiàn)都非常高效,可以處理大量的事件流和數(shù)據(jù)流。

      總之,RxGo是一個(gè)非常強(qiáng)大和實(shí)用的響應(yīng)式編程庫(kù),它可以幫助開發(fā)人員更容易地處理異步事件流和數(shù)據(jù)流,提高代碼的可維護(hù)性和可擴(kuò)展性。

      1.2 RxGo 數(shù)據(jù)流程圖

      RxGo的實(shí)現(xiàn)基于管道的概念。管道是由通道連接的一系列階段,其中每個(gè)階段是運(yùn)行相同功能的一組goroutine。

      • 使用Just操作符創(chuàng)建一個(gè)基于固定列表的靜態(tài)可觀測(cè)數(shù)據(jù)。
      • 使用Map操作符定義了一個(gè)轉(zhuǎn)換函數(shù)(把圓形變成方形)。
      • Filter操作符過濾掉黃色方形。

      從上面的例子中可以看出來,最終生成的數(shù)據(jù)被發(fā)送到一個(gè)通道中,消費(fèi)者讀取數(shù)據(jù)進(jìn)行消費(fèi)。RxGo中有很多種消費(fèi)和生成數(shù)據(jù)的方式,發(fā)布結(jié)果到通道中只是其中一種方式。

      2、快速入門

      2.1 安裝 RxGo v2

      go get -u github.com/reactivex/rxgo/v2

      2.2 簡(jiǎn)單案例

      我們先寫一個(gè)簡(jiǎn)單的案例,來學(xué)習(xí)RxGo的簡(jiǎn)單使用。

      package mainimport (  "fmt"  "github.com/reactivex/rxgo/v2")func main() {  observable := rxgo.Just(1, 2, 3, 4, 5)()  ch := observable.Observe()  for item := range ch {    fmt.Println(item.V)  }}

      使用 RxGo 的一般流程如下:

      • 使用相關(guān)的 Operator創(chuàng)建 Observable,Operator就是用來創(chuàng)建 Observable的。
      • 中間各個(gè)階段可以使用過濾操作篩選出我們想要的數(shù)據(jù),使用轉(zhuǎn)換操作對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換;
      • 調(diào)用 ObservableObserve()方法,該方法返回一個(gè)<- chan rxgo.Item。然后for range遍歷即可。

      結(jié)合上面的這張圖,我們就比較容易理解RxGo的數(shù)據(jù)處理流程。因?yàn)槔颖容^簡(jiǎn)單,沒有用到Map、Filter操作。

      執(zhí)行結(jié)果:

      $ go run main.go 12345

      Just使用到柯里化的編程思想??吕锘–urrying)是一種函數(shù)式編程的技術(shù),它將一個(gè)接受多個(gè)參數(shù)的函數(shù)轉(zhuǎn)換成一系列接受單個(gè)參數(shù)的函數(shù)。這些單參數(shù)函數(shù)可以被組合起來,以便在后續(xù)的計(jì)算中使用。

      柯里化的主要優(yōu)點(diǎn)是它可以使函數(shù)更加靈活和可復(fù)用。通過將函數(shù)分解為一系列單參數(shù)函數(shù),我們可以更容易地組合和重用這些函數(shù),從而減少代碼的重復(fù)性和冗余性。

      例如:

      //柯里化的例子func addCurried(x int) func(int) int {return func(y int) int {return x + y}}func main()  {add5 := addCurried(5)fmt.Println(add5(10))}

      由于 Go 不支持多個(gè)可變參數(shù),Just通過柯里化迂回地實(shí)現(xiàn)了這個(gè)功能:

      //Just creates an Observable with the provided items.func Just(items ...interface{}) func(opts ...Option) Observable {  return func(opts ...Option) Observable {    return &ObservableImpl{      iterable: newJustIterable(items...)(opts...),    }  }}

      Observe()返回一個(gè) Item 的chan ,Item的結(jié)構(gòu)如下:

      // Item is a wrapper having either a value or an error.typeItem struct {V interface{}E error}

      所以通過Just生成observable對(duì)象時(shí),傳入的數(shù)據(jù)可以包含錯(cuò)誤,在使用時(shí)通過 item.Error() 來區(qū)分。

      func main() {  observable := rxgo.Just(1, 2, errors.New("unknown"), 3, 4, 5)()  ch := observable.Observe()  for item := range ch {    if item.Error() {      fmt.Println("error:", item.E)    } else {      fmt.Println(item.V)    }  }}

      我們使用item.Error()檢查是否出現(xiàn)錯(cuò)誤。然后使用item.V訪問數(shù)據(jù),item.E訪問錯(cuò)誤。

      除了使用for range之外,我們還可以調(diào)用 ObservableForEach()方法來實(shí)現(xiàn)遍歷。ForEach()接受 3 個(gè)回調(diào)函數(shù):

      • NextFunc:類型為func (v interface {}),傳入的數(shù)據(jù)不包含錯(cuò)誤類型時(shí)走此函數(shù)處理。
      • ErrFunc:類型為func (err error),當(dāng)傳入的數(shù)據(jù)包含錯(cuò)誤時(shí)走此函數(shù);
      • CompletedFunc:類型為func (),Observable完成時(shí)調(diào)用。

      有點(diǎn)Promise那味了。使用ForEach(),可以將上面的示例改寫為:

      func main() {  observable := rxgo.Just(1, 2, errors.New("這是一個(gè)測(cè)試錯(cuò)誤!"), 4, 5)()  <-observable.ForEach(func(v interface{}) {    fmt.Println("received:", v)  }, func(err error) {    fmt.Println("error:", err)  }, func() {    fmt.Println("completed")  })}
      $ go run main.go received: 1received: 2error: 這是一個(gè)測(cè)試錯(cuò)誤!received: 4received: 5completed

      ForEach()返回的是一個(gè) chan,用于當(dāng) observable 關(guān)閉時(shí)會(huì)向此chan發(fā)送數(shù)據(jù)。所以在 observable前面加了<-來阻塞等待 ForEach()處理完數(shù)據(jù)。

      3、RxGo 深入學(xué)習(xí)

      上面的簡(jiǎn)單案例,我們是使用Just來創(chuàng)建observable。其實(shí)還有其他的方式創(chuàng)建observable。一起來看一看。

      3.1 rxgo.Create

      傳入一個(gè)[]rxgo.Producer的切片,其中rxgo.Producer的類型為func(ctx context.Context, next chan<- Item)。我們可以在代碼中調(diào)用rxgo.Of(value)生成數(shù)據(jù),rxgo.Error(err)生成錯(cuò)誤,然后發(fā)送到next通道中:

      package mainimport ("context""errors""fmt""github.com/reactivex/rxgo/v2")func main()  {observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {next <- rxgo.Of(1)next <- rxgo.Of("aaa")next <- rxgo.Of(errors.New("test"))}})ch := observable.Observe()for item := range ch {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}

      因?yàn)?code>rxgo.Create中的參數(shù)是[]rxgo.Producer,所以分成兩個(gè)rxgo.Producer也是一樣的效果:

      observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {  next <- rxgo.Of(1)  next <- rxgo.Of(2)  next <- rxgo.Of(3)  next <- rxgo.Error(errors.New("unknown"))  }, func(ctx context.Context, next chan<- rxgo.Item) {  next <- rxgo.Of(4)  next <- rxgo.Of(5)}})

      3.2 rxgo.FromChannel

      FromChannel可以直接從一個(gè)已存在的<-chan rxgo.Item對(duì)象中創(chuàng)建 Observable

      package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main()  {ch := make(chan rxgo.Item)go func() {for i := 0; i < 5; i++ {ch <- rxgo.Of(i)}//需要手動(dòng)關(guān)閉 ch 通道close(ch)}()observable := rxgo.FromChannel(ch)for item := range observable.Observe() {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}

      注意:

      通道需要手動(dòng)調(diào)用close()關(guān)閉,上面Create()方法內(nèi)部rxgo自動(dòng)幫我們執(zhí)行了這個(gè)步驟。

      func newCreateIterable(fs []Producer, opts ...Option) Iterable {...go func() {// Create方法內(nèi)部自動(dòng)關(guān)閉了 next 通道defer close(next)for _, f := range fs {f(ctx, next)}}()...}

      3.3 rxgo.Interval

      Interval以傳入的時(shí)間間隔生成一個(gè)無窮的數(shù)字序列,從 0 開始:

      func main()  {observable := rxgo.Interval(rxgo.WithDuration(time.Second))for item := range observable.Observe() {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}

      運(yùn)行后,第一秒輸出 0,第二秒輸出 1,以此類推。

      3.4 rxgo.Range

      func main() {  observable := rxgo.Range(0, 3)  for item := range observable.Observe() {    fmt.Println(item.V)  }}

      Range可以生成一個(gè)范圍內(nèi)的數(shù)字:

      上面代碼依次輸出 0,1,2,3。

      3.5 Repeat

      這個(gè)和之前的不太一樣,這個(gè)是對(duì)已經(jīng)存在的 observable對(duì)象調(diào)用 Repeat方法,從而實(shí)現(xiàn)重復(fù)生成數(shù)據(jù)。

      package mainimport ("fmt""github.com/reactivex/rxgo/v2""time")func main()  {observable := rxgo.Range(0,3).Repeat(2, rxgo.WithDuration(time.Second))for item := range observable.Observe() {if item.Error() {fmt.Println("err:", item.E)}else {fmt.Println(item.V)}}}

      輸出:

      012012012

      注意:這里執(zhí)行的次數(shù)一共是3次,Repeat中的參數(shù)是2,重復(fù)2次,一共3次。

      3.6 rxgo.Start

      可以給Start方法傳入[]rxgo.Supplier作為參數(shù),它可以包含任意數(shù)量的rxgo.Supplier類型。rxgo.Supplier的底層類型為:

      var Supplier func(ctx context.Context) rxgo.Item

      Observable內(nèi)部會(huì)依次調(diào)用這些rxgo.Supplier生成rxgo.Item

      package mainimport ("context""fmt""github.com/reactivex/rxgo/v2""time")func Supplier1(ctx context.Context) rxgo.Item {deadline, ok  := ctx.Deadline()fmt.Println("Supplier1", deadline, ok)time.Sleep(time.Second)return rxgo.Of(1)}func Supplier2(ctx context.Context) rxgo.Item {deadline, ok  := ctx.Deadline()fmt.Println("Supplier2", deadline, ok)time.Sleep(time.Second)return rxgo.Of(2)}func Supplier3(ctx context.Context) rxgo.Item {deadline, ok  := ctx.Deadline()fmt.Println("Supplier3", deadline, ok)time.Sleep(time.Second)return rxgo.Of(3)}func main() {ctx, _ := context.WithTimeout(context.Background(), time.Second*2)observable := rxgo.Start([]rxgo.Supplier{Supplier1, Supplier2, Supplier3}, rxgo.WithContext(ctx))for item := range observable.Observe() {fmt.Println(item.V)}}

      4、Observable 分類

      根據(jù)數(shù)據(jù)在何處生成,Observable被分為 HotCold兩種類型。

      • Hot Observable:熱可觀測(cè)量,數(shù)據(jù)由可觀測(cè)量外部產(chǎn)生。
      • Cold Observable:冷可觀測(cè)量,數(shù)據(jù)由可觀測(cè)量內(nèi)部產(chǎn)生。

      通常不想一次性的創(chuàng)建所有的數(shù)據(jù),使用 熱可觀測(cè)量。

      4.1 熱可觀測(cè)量示例

      func main() {  ch := make(chan rxgo.Item)  go func() {    for i := 0; i < 3; i++ {      ch <- rxgo.Of(i)    }    close(ch)  }()  observable := rxgo.FromChannel(ch)  for item := range observable.Observe() {    fmt.Println(item.V)  }  for item := range observable.Observe() {    fmt.Println(item.V)  }}

      結(jié)果:

      012

      上面創(chuàng)建的是 Hot Observable。但是有個(gè)問題,第一次Observe()消耗了所有的數(shù)據(jù),第二個(gè)就沒有數(shù)據(jù)輸出了。(可以用可連接的觀測(cè)量來修改這一行為,后面再說)。

      4.2 冷可觀測(cè)量示例

      Cold Observable就不會(huì)有這個(gè)問題,因?yàn)樗鼊?chuàng)建的流是獨(dú)立于每個(gè)觀察者的。即每次調(diào)用Observe()都創(chuàng)建一個(gè)新的 channel。我們使用Defer()方法創(chuàng)建 Cold Observable,它的參數(shù)與Create()方法一樣。

      func main() {  observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {    for i := 0; i < 3; i++ {      ch <- rxgo.Of(i)    }  }})  for item := range observable.Observe() {    fmt.Println(item.V)  }  for item := range observable.Observe() {    fmt.Println(item.V)  }}

      Defer源碼介紹:

      // Defer does not create the Observable until the observer subscribes,// and creates a fresh Observable for each observer.func Defer(f []Producer, opts ...Option) Observable {return &ObservableImpl{iterable: newDeferIterable(f, opts...),}}

      執(zhí)行結(jié)果:

      $ go run main.go012012

      4.3 可連接的 Observable

      可連接的(Connectable)Observable對(duì)普通的 Observable進(jìn)行了一層組裝。調(diào)用它的Observe()方法時(shí)并不會(huì)立刻產(chǎn)生數(shù)據(jù)。使用它,我們可以等所有的觀察者都準(zhǔn)備就緒了(即調(diào)用了Observe()方法)之后,再調(diào)用其Connect()方法開始生成數(shù)據(jù)。我們通過兩個(gè)示例比較使用普通的 Observable和可連接的 Observable有何不同。

      4.3.1 普通的Observable,并不是可連接的Observable
      func main() {  ch := make(chan rxgo.Item)  go func() {    for i := 1; i <= 3; i++ {      ch <- rxgo.Of(i)    }    close(ch)  }()  observable := rxgo.FromChannel(ch)  observable.DoOnNext(func(i interface{}) {    fmt.Printf("First observer: %d\n", i)  })  time.Sleep(3 * time.Second)  fmt.Println("before subscribe second observer")  observable.DoOnNext(func(i interface{}) {    fmt.Printf("Second observer: %d\n", i)  })  time.Sleep(3 * time.Second)}

      上例中我們使用DoOnNext()方法來注冊(cè)觀察者。由于DoOnNext()方法是異步執(zhí)行的,所以為了等待結(jié)果輸出,在最后增加了一行time.Sleep。運(yùn)行結(jié)果:

      First observer: 1First observer: 2First observer: 3before subscribe second observer

      由輸出可以看出,注冊(cè)第一個(gè)觀察者之后就開始產(chǎn)生數(shù)據(jù)了。第二個(gè)觀察者并不會(huì)得到數(shù)據(jù)。

      4.3.2 可連接的Observable

      通過在創(chuàng)建 Observable的方法中指定rxgo.WithPublishStrategy()選項(xiàng)就可以創(chuàng)建可連接的 Observable

      • 重點(diǎn)是傳入rxgo.WithPublishStrategy()
      func main() {  ch := make(chan rxgo.Item)  go func() {    for i := 1; i <= 3; i++ {      ch <- rxgo.Of(i)    }    close(ch)  }()  observable := rxgo.FromChannel(ch, rxgo.WithPublishStrategy())  observable.DoOnNext(func(i interface{}) {    fmt.Printf("First observer: %d\n", i)  })  time.Sleep(3 * time.Second)  fmt.Println("before subscribe second observer")  observable.DoOnNext(func(i interface{}) {    fmt.Printf("Second observer: %d\n", i)  })  //需要手動(dòng)調(diào)用 observable.Connect 才會(huì)產(chǎn)生數(shù)據(jù)  observable.Connect(context.Background())  time.Sleep(3 * time.Second)}

      運(yùn)行輸出:

      $ go run main.gobefore subscribe second observerSecond observer: 1First observer: 1First observer: 2First observer: 3Second observer: 2Second observer: 3

      上面是等兩個(gè)觀察者都注冊(cè)之后,并且手動(dòng)調(diào)用了 Observable 的Connect()方法才產(chǎn)生數(shù)據(jù)。而且可連接的 Observable有一個(gè)特性:它是冷啟動(dòng)的?。?!,即每個(gè)觀察者都會(huì)收到一份相同的拷貝。

      5、轉(zhuǎn)換 Observable

      通過 RxGo 數(shù)據(jù)流程圖我們知道,我們可以對(duì)rxgo.Item進(jìn)行轉(zhuǎn)換。rxgo 提供了很多轉(zhuǎn)換函數(shù),下面一起來學(xué)一學(xué)這些轉(zhuǎn)換函數(shù)。

      5.1 Map

      Map()方法簡(jiǎn)單修改它收到的rxgo.Item然后發(fā)送到下一個(gè)階段(轉(zhuǎn)換或過濾)。Map()接受一個(gè)類型為func (context.Context, interface{}) (interface{}, error)的函數(shù)。第二個(gè)參數(shù)就是rxgo.Item中的數(shù)據(jù),返回轉(zhuǎn)換后的數(shù)據(jù)。如果出錯(cuò),則返回錯(cuò)誤。

      func main() {observable := rxgo.Just(1, 2, 3)()observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {return i.(int), nil}).Map(func(_ context.Context, i interface{}) (interface{}, error) {b := i.(int)if b % 2 == 0 {return nil, errors.New("test")} else {return i, nil}})for item := range observable.Observe() {fmt.Println(item.V)}}

      上例中每個(gè)數(shù)字經(jīng)過兩個(gè)Map,第一個(gè)Map邏輯是原樣輸出,第二個(gè)Map邏輯是判斷i是不是偶數(shù),如果是偶數(shù),就返回錯(cuò)誤,否則原樣輸出。運(yùn)行結(jié)果:

      1

      我們將第一個(gè)Map中的語句改為下面的邏輯:

      return i.(int) + 1, nil

      運(yùn)行結(jié)果:

      我們可以知道,數(shù)據(jù)的處理是串行的,第一個(gè)數(shù)據(jù)執(zhí)行完所有的Map過后,第二個(gè)數(shù)據(jù)才會(huì)執(zhí)行,當(dāng)其中某一個(gè)執(zhí)行返回的結(jié)果包含錯(cuò)誤,就不會(huì)繼續(xù)進(jìn)行轉(zhuǎn)換了,即不會(huì)數(shù)據(jù)不會(huì)進(jìn)入到 Observe() 中的通道中去。

      5.2 Marshal

      Marshal對(duì)經(jīng)過它的數(shù)據(jù)進(jìn)行一次Marshal。這個(gè)Marshal可以是json.Marshal/proto.Marshal,甚至我們自己寫的Marshal函數(shù)。它接受一個(gè)類型為func(interface{}) ([]byte, error)的函數(shù)用于對(duì)數(shù)據(jù)進(jìn)行處理。

      type User struct {  Name string `json:"name"`  Age  int    `json:"age"`}func main() {  observable := rxgo.Just(    User{      Name: "dj",      Age:  18,    },    User{      Name: "jw",      Age:  20,    },  )()  observable = observable.Marshal(json.Marshal)  for item := range observable.Observe() {    fmt.Println(string(item.V.([]byte)))  }}

      執(zhí)行結(jié)果:

      {"name":"dj","age":18}{"name":"jw","age":20}

      由于Marshal操作返回的是[]byte類型,我們需要進(jìn)行類型轉(zhuǎn)換之后再輸出。

      5.3 Unmarshal

      既然有Marshal,也就有它的相反操作Unmarshal。Unmarshal用于將一個(gè)[]byte類型轉(zhuǎn)換為相應(yīng)的結(jié)構(gòu)體或其他類型。與Marshal不同,Unmarshal需要知道轉(zhuǎn)換的目標(biāo)類型,所以需要提供一個(gè)函數(shù)用于生成該類型的對(duì)象。然后將[]byte數(shù)據(jù)Unmarshal到該對(duì)象中。Unmarshal接受兩個(gè)參數(shù),參數(shù)一是類型為func([]byte, interface{}) error的函數(shù),參數(shù)二是func () interface{}用于生成實(shí)際類型的對(duì)象。我們拿上面的例子中生成的 JSON 字符串作為數(shù)據(jù),將它們重新UnmarshalUser對(duì)象:

      type User struct {  Name string `json:"name"`  Age  int    `json:"age"`}func main() {  observable := rxgo.Just(    `{"name":"dj","age":18}`,    `{"name":"jw","age":20}`,  )()  observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {    return []byte(i.(string)), nil  }).Unmarshal(json.Unmarshal, func() interface{} {    return &User{}  })  for item := range observable.Observe() {    fmt.Println(item.V)  }}

      由于Unmarshaller接受[]byte類型的參數(shù),我們?cè)?code>Unmarshal之前加了一個(gè)Map用于將string轉(zhuǎn)為[]byte。運(yùn)行結(jié)果:

      &{dj 18}&{jw 20}

      5.4 Buffer

      Buffer按照一定的規(guī)則收集接收到的數(shù)據(jù),然后一次性發(fā)送出去(作為切片),而不是收到一個(gè)發(fā)送一個(gè)。有 3 種類型的Buffer

      • BufferWithCount(n):每收到n個(gè)數(shù)據(jù)發(fā)送一次,最后一次可能少于n個(gè);
      • BufferWithTime(n):發(fā)送在一個(gè)時(shí)間間隔n內(nèi)收到的數(shù)據(jù);
      • BufferWithTimeOrCount(d, n):收到n個(gè)數(shù)據(jù),或經(jīng)過d時(shí)間間隔,發(fā)送當(dāng)前收到的數(shù)據(jù)。
      5.4.1 BufferWithCount
      func main() {observable := rxgo.Range(0, 5)observable = observable.BufferWithCount(2)for item := range observable.Observe() {fmt.Println(item.V)}}

      執(zhí)行結(jié)果:

      [0 1][2 3][4]

      最后一組只有一個(gè)。

      5.4.2 BufferWithTime
      unc main() {ch := make(chan rxgo.Item, 1)go func() {i := 0for range time.Tick(time.Second) {ch <- rxgo.Of(i)i++}}()observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(2 * time.Second))layout := "2006-01-02 13:04:05"fmt.Println("startTime", time.Now().Format(layout))for item := range observable.Observe() {fmt.Println(item.V)fmt.Println("nextTime", time.Now().Format(layout))}}

      執(zhí)行結(jié)果是不確定的,這里需要注意:

      startTime 2023-04-22 44:15:49[0]nextTime 2023-04-22 44:15:51[1 2]nextTime 2023-04-22 44:15:53[3 4 5]nextTime 2023-04-22 44:15:55...
      5.4.3 BufferWithTimeOrCount
      func main() {ch := make(chan rxgo.Item, 1)go func() {i := 0for range time.Tick(time.Second) {ch <- rxgo.Of(i)i++}}()observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(2*time.Second), 2)layout := "2006-01-02 13:04:05"fmt.Println("startTime", time.Now().Format(layout))for item := range observable.Observe() {fmt.Println(item.V)fmt.Println("nextTime", time.Now().Format(layout))}}

      執(zhí)行結(jié)果:

      startTime 2023-04-22 44:18:48[0]nextTime 2023-04-22 44:18:50[1 2]nextTime 2023-04-22 44:18:51[3 4]nextTime 2023-04-22 44:18:53

      BufferWithTimeOrCount是以BufferWithCount、BufferWithTime誰先滿足條件為準(zhǔn),誰先滿足誰就先執(zhí)行。

      5.5 GroupBy

      ``GroupBy將一個(gè)Observable分成多個(gè)子Observable,每個(gè)子Observable`包含相同的索引值的元素。

      GroupBy函數(shù)定義如下:

      GroupBy(length int, distribution func(Item) int, opts ...Option) Observable

      即將一個(gè)Observable分成length個(gè)子Observable,根據(jù)distribution函數(shù)返回的int作為分組的依據(jù)。

      package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {// 創(chuàng)建一個(gè)Observable,它發(fā)出一些整數(shù)值source := rxgo.Just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)()// 使用GroupBy操作符將整數(shù)值按照奇偶性進(jìn)行分組grouped := source.GroupBy(2, func(item rxgo.Item) int {return item.V.(int) % 2}, rxgo.WithBufferedChannel(10))for subObservable := range grouped.Observe() {fmt.Println("new subObservable ------ ")for item := range subObservable.V.(rxgo.Observable).Observe() {fmt.Printf("%v\n", item.V)}}}

      上面根據(jù)每個(gè)數(shù)模 3 的余數(shù)將整個(gè)流分為 3 組。運(yùn)行:

      new subObservable ------ 246810new subObservable ------ 13579

      注意rxgo.WithBufferedChannel(10)的使用,由于我們的數(shù)字是連續(xù)生成的,依次為 0->1->2->…->9->10。而 Observable默認(rèn)是惰性的,即由Observe()驅(qū)動(dòng)。內(nèi)層的Observe()在返回一個(gè) 0 之后就等待下一個(gè)數(shù),但是下一個(gè)數(shù) 1 不在此 Observable中。所以會(huì)陷入死鎖。使用rxgo.WithBufferedChannel(10),設(shè)置它們之間的連接 channel 緩沖區(qū)大小為 10,這樣即使我們未取出 channel 里面的數(shù)字,上游還是能發(fā)送數(shù)字進(jìn)來。

      6、并行操作

      默認(rèn)情況下,這些轉(zhuǎn)換操作都是串行的,即只有一個(gè) goroutine 負(fù)責(zé)執(zhí)行轉(zhuǎn)換函數(shù)。從上面的Map操作也可以得知默認(rèn)是串行執(zhí)行的??梢愿淖冞@一默認(rèn)行為,使用rxgo.WithPool(n)選項(xiàng)設(shè)置運(yùn)行n個(gè) goroutine,或者rxgo.WitCPUPool()選項(xiàng)設(shè)置運(yùn)行與邏輯 CPU 數(shù)量相等的 goroutine。

      package mainimport ("context""fmt""github.com/reactivex/rxgo/v2""math/rand""time")func main() {observable := rxgo.Range(1, 10)observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {time.Sleep(time.Duration(rand.Int31()))return i.(int) + 1, nil}, rxgo.WithCPUPool())for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      891065112473

      由于是并行運(yùn)算,所以結(jié)果是不固定的。

      我們可以直接看官網(wǎng)的介紹:https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

      7、過濾 Observable

      我們可以對(duì)Observable中發(fā)送過來的數(shù)據(jù)進(jìn)行過濾,過濾掉不需要的數(shù)據(jù),有以下方式:

      • Filter

      • ElementAt

      • Debounce

      • Distinct

      • Skip

      • Take

      下面的內(nèi)容大多來自官方的示例,地址:https://github.com/ReactiveX/RxGo/tree/v2.5.0/doc

      7.1 Filter

      Filter()接受一個(gè)類型為func (i interface{}) bool的參數(shù),通過的數(shù)據(jù)使用這個(gè)函數(shù)斷言,返回true的將發(fā)送給下一個(gè)階段。否則,丟棄。

      package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 3)().Filter(func(i interface{}) bool {return i != 2})for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      13

      7.2 ElementAt

      ElementAt()只發(fā)送指定索引的數(shù)據(jù),如ElementAt(2)只發(fā)送索引為 2 的數(shù)據(jù),即第 3 個(gè)數(shù)據(jù)。

      package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(0, 1, 2, 3, 4)().ElementAt(2)for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      2

      7.3 Debounce

      只有當(dāng)特定的時(shí)間跨度已經(jīng)過去而沒有發(fā)出另一個(gè)Item時(shí),才從Observable發(fā)出一個(gè)Item

      package mainimport ("fmt""github.com/reactivex/rxgo/v2""time")func main() {ch := make(chan rxgo.Item)go func() {ch <- rxgo.Of(1)time.Sleep(2 * time.Second)ch <- rxgo.Of(2)ch <- rxgo.Of(3)time.Sleep(2 * time.Second)close(ch)}()observable := rxgo.FromChannel(ch).Debounce(rxgo.WithDuration(1 * time.Second))for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      13

      上面示例,先收到 1,然后 2s 內(nèi)沒收到數(shù)據(jù),所以發(fā)送 1。接著收到了數(shù)據(jù) 2,由于馬上又收到了 3,所以 2 不會(huì)發(fā)送。收到 3 之后 2s 內(nèi)沒有收到數(shù)據(jù),發(fā)送了 3。所以最后輸出為 1,3。

      7.4 Distinct

      Distinct()會(huì)記錄它發(fā)送的所有數(shù)據(jù),它不會(huì)發(fā)送重復(fù)的數(shù)據(jù)。由于數(shù)據(jù)格式多樣,Distinct()要求我們提供一個(gè)函數(shù),根據(jù)原數(shù)據(jù)返回一個(gè)唯一標(biāo)識(shí)碼(有點(diǎn)類似哈希值)?;谶@個(gè)標(biāo)識(shí)碼去重。

      package mainimport ("context""fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 2, 3, 4, 4, 5)().Distinct(func(_ context.Context, i interface{}) (interface{}, error) {return i, nil})for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      12345

      7.5 Skip

      Skip可以跳過前若干個(gè)數(shù)據(jù)。

      package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 3, 4, 5)().Skip(2)for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      345

      7.6 Take

      Take只取前若干個(gè)數(shù)據(jù)。

      package mainimport ("fmt""github.com/reactivex/rxgo/v2")func main() {observable := rxgo.Just(1, 2, 3, 4, 5)().Take(2)for item := range observable.Observe() {fmt.Println(item.V)}}

      結(jié)果:

      12

      8、選項(xiàng)

      因?yàn)間olang中不支持默認(rèn)參數(shù),所以我們經(jīng)常會(huì)用到選項(xiàng)設(shè)計(jì)模式,rxgo中也大量使用到了此模式。

      • rxgo.WithBufferedChannel(10):設(shè)置 channel 的緩存大?。?/li>
      • rxgo.WithPool(n)/rxgo.WithCpuPool():使用多個(gè) goroutine 執(zhí)行轉(zhuǎn)換操作;
      • rxgo.WithPublishStrategy():使用發(fā)布策略,即創(chuàng)建可連接的 Observable。

      rxgo還有很多其他選項(xiàng),具體看官方文檔,地址:

      https://github.com/ReactiveX/RxGo/blob/v2.5.0/doc/options.md

      9、簡(jiǎn)化的真實(shí)案例

      假設(shè)現(xiàn)在有一個(gè)定時(shí)處理任務(wù),結(jié)構(gòu)如下:

      type ScheduledTask struct {RecordId intHandleStartTime time.TimeStatus bool}

      在執(zhí)行具體的任務(wù)時(shí),需要去數(shù)據(jù)庫(kù)查詢下是否已經(jīng)被取消了,如果已經(jīng)被取消掉的,則不再執(zhí)行。

      完整代碼如下:

      package mainimport ("fmt""github.com/reactivex/rxgo/v2""time")type ScheduledTask struct {RecordId intHandleStartTime stringStatus bool}func main() {ch := make(chan rxgo.Item)go producer(ch)time.Sleep(time.Second*3)observable := rxgo.FromChannel(ch)observable = observable.Filter(func(i interface{}) bool {st := i.(*ScheduledTask)return st.Status}, rxgo.WithBufferedChannel(1))// 消費(fèi)可觀測(cè)量for customer := range observable.Observe() {st := customer.V.(*ScheduledTask)fmt.Printf("resutl: --> %+v\n", st)}}func producer(ch chan <- rxgo.Item)  {for i := 0; i < 10; i++ {status := falseif i % 2 == 0 {status = true}st := &ScheduledTask{RecordId: i,HandleStartTime: time.Now().Format("2006-01-02 13:04:05"),Status: status,}ch <- rxgo.Of(st)}  // 這里千萬不要忘記了close(ch)}

      結(jié)果:

      resutl: --> &{RecordId:0 HandleStartTime:2023-04-22 46:04:07 Status:true}resutl: --> &{RecordId:2 HandleStartTime:2023-04-22 46:04:10 Status:true}resutl: --> &{RecordId:4 HandleStartTime:2023-04-22 46:04:10 Status:true}resutl: --> &{RecordId:6 HandleStartTime:2023-04-22 46:04:10 Status:true}resutl: --> &{RecordId:8 HandleStartTime:2023-04-22 46:04:10 Status:true}

      參考鏈接

      Go 每日一庫(kù)之 rxgo

      [官方例子](

      關(guān)鍵詞:

      相關(guān)閱讀
      分享到:
      版權(quán)和免責(zé)申明

      凡注有"實(shí)況網(wǎng)-重新發(fā)現(xiàn)生活"或電頭為"實(shí)況網(wǎng)-重新發(fā)現(xiàn)生活"的稿件,均為實(shí)況網(wǎng)-重新發(fā)現(xiàn)生活獨(dú)家版權(quán)所有,未經(jīng)許可不得轉(zhuǎn)載或鏡像;授權(quán)轉(zhuǎn)載必須注明來源為"實(shí)況網(wǎng)-重新發(fā)現(xiàn)生活",并保留"實(shí)況網(wǎng)-重新發(fā)現(xiàn)生活"的電頭。