golang的reflect.Select使用方法 - Wed, Nov 18, 2020
golang的reflect.Select使用方法
1. 概述
总结一下GO的channel使用方法
1.1 使用select选择channel
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
go func() {
for {
select {
case <-time.Tick(2 * time.Second):
ch <- struct{}{}
}
}
}()
for {
select {
case <-ch:
fmt.Println("hello channel")
}
}
}
1.2 关闭管道能够触发所有信号
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
quit := make(chan struct{})
go func() {
for {
select {
case <-time.After(10 * time.Second):
close(quit)
fmt.Println("timeout")
return
}
}
}()
go func() {
for {
select {
case <-quit:
fmt.Println("close timer")
return
case <-time.Tick(2 * time.Second):
ch <- struct{}{}
}
}
}()
Loop:
for {
select {
case <-quit:
fmt.Println("close channel")
break Loop
case <-ch:
fmt.Println("hello channel")
}
}
time.Sleep(2 * time.Second)
}
output:
hello channel
hello channel
hello channel
hello channel
timeout
close channel
close timer
1.3 非close信号只有一个接收者
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
quit := make(chan struct{})
go func() {
for {
select {
case <-time.After(10 * time.Second):
quit <- struct{}{}
fmt.Println("timeout")
return
}
}
}()
go func() {
for {
select {
case <-quit:
fmt.Println("close timer")
return
case <-time.Tick(2 * time.Second):
ch <- struct{}{}
}
}
}()
Loop:
for {
select {
case <-quit:
fmt.Println("close channel")
break Loop
case <-ch:
fmt.Println("hello channel")
}
}
time.Sleep(2 * time.Second)
}
output:
hello channel
hello channel
hello channel
hello channel
close channel true
timeout
1.4 for range遍历channel
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
quit := make(chan struct{})
go func() {
for range time.After(10 * time.Second) {
close(quit)
fmt.Println("timeout")
return
}
}()
go func() {
for {
select {
case <-quit:
fmt.Println("close timer")
return
case <-time.Tick(2 * time.Second):
ch <- struct{}{}
}
}
}()
Loop:
for {
select {
case <-quit:
fmt.Println("close channel")
break Loop
case <-ch:
fmt.Println("hello channel")
}
}
time.Sleep(2 * time.Second)
}
output:
hello channel
hello channel
hello channel
hello channel
timeout
close channel
close timer
2. reflect.Select等待多个channel
当channel是动态增减的时候,有两种方法可以消费channel的数据。
- 每个channel单独启动一个协程,用于消费数据
- 使用reflect.Select批量接收数据进行消费
// Handle 连接处理
func (s *Server) Handle() (err error) {
// ...
s.cases = append(s.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ss.PubChan()),
})
s.delivery <- struct{}{}
// ...
}
// Delivery 分发消息
func (s *Server) Delivery() (err error) {
for {
chosen, value, ok := reflect.Select(s.cases)
logrus.Debugln("delivery", chosen, ok, value)
switch chosen {
case 0:
// quit
return
case 1:
// cases changed
if !ok {
return
}
logrus.Infoln("cases count", len(s.cases))
default:
if !ok {
s.cases = append(s.cases[:chosen], s.cases[chosen+1:]...)
logrus.Infoln("cases count", len(s.cases))
continue
}
p, ok := value.Interface().(*packets.PublishPacket)
if !ok {
continue
}
for clientID, session := range s.sessions {
for topic := range session.Topics {
if p.TopicName == topic {
logrus.Debugf("send %s %v", clientID, p)
_ = session.Send(p)
break
}
}
}
}
}
}