kaisawind's blog
  • 关于
  • 所有帖子

golang的reflect.Select使用方法 - Wed, Nov 18, 2020

golang的reflect.Select使用方法

golang的reflect.Select使用方法

1. 概述

总结一下GO的channel使用方法

1.1 使用select选择channel

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{})
	go func() {
		for {
			select {
			case <-time.Tick(2 * time.Second):
				ch <- struct{}{}
			}
		}
	}()

	for {
		select {
		case <-ch:
			fmt.Println("hello channel")
		}
	}
}

1.2 关闭管道能够触发所有信号

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{})
	quit := make(chan struct{})
	go func() {
		for {
			select {
			case <-time.After(10 * time.Second):
				close(quit)
				fmt.Println("timeout")
				return
			}
		}
	}()
	go func() {
		for {
			select {
			case <-quit:
				fmt.Println("close timer")
				return
			case <-time.Tick(2 * time.Second):
				ch <- struct{}{}
			}
		}
	}()

Loop:
	for {
		select {
		case <-quit:
			fmt.Println("close channel")
			break Loop
		case <-ch:
			fmt.Println("hello channel")
		}
	}
	time.Sleep(2 * time.Second)
}

output:

hello channel
hello channel
hello channel
hello channel
timeout
close channel
close timer

1.3 非close信号只有一个接收者

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{})
	quit := make(chan struct{})
	go func() {
		for {
			select {
			case <-time.After(10 * time.Second):
				quit <- struct{}{}
				fmt.Println("timeout")
				return
			}
		}
	}()
	go func() {
		for {
			select {
			case <-quit:
				fmt.Println("close timer")
				return
			case <-time.Tick(2 * time.Second):
				ch <- struct{}{}
			}
		}
	}()

Loop:
	for {
		select {
		case <-quit:
			fmt.Println("close channel")
			break Loop
		case <-ch:
			fmt.Println("hello channel")
		}
	}
	time.Sleep(2 * time.Second)
}

output:

hello channel
hello channel
hello channel
hello channel
close channel true
timeout

1.4 for range遍历channel

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{})
	quit := make(chan struct{})
	go func() {
		for range time.After(10 * time.Second) {
			close(quit)
			fmt.Println("timeout")
			return
		}
	}()
	go func() {
		for {
			select {
			case <-quit:
				fmt.Println("close timer")
				return
			case <-time.Tick(2 * time.Second):
				ch <- struct{}{}
			}
		}
	}()

Loop:
	for {
		select {
		case <-quit:
			fmt.Println("close channel")
			break Loop
		case <-ch:
			fmt.Println("hello channel")
		}
	}
	time.Sleep(2 * time.Second)
}

output:

hello channel
hello channel
hello channel
hello channel
timeout
close channel
close timer

2. reflect.Select等待多个channel

当channel是动态增减的时候,有两种方法可以消费channel的数据。

  1. 每个channel单独启动一个协程,用于消费数据
  2. 使用reflect.Select批量接收数据进行消费
// Handle 连接处理
func (s *Server) Handle() (err error) {
    // ...
    s.cases = append(s.cases, reflect.SelectCase{
        Dir:  reflect.SelectRecv,
        Chan: reflect.ValueOf(ss.PubChan()),
    })
    s.delivery <- struct{}{}
    // ...
}

// Delivery 分发消息
func (s *Server) Delivery() (err error) {
	for {
		chosen, value, ok := reflect.Select(s.cases)
		logrus.Debugln("delivery", chosen, ok, value)
		switch chosen {
		case 0:
			// quit
			return
		case 1:
			// cases changed
			if !ok {
				return
			}
			logrus.Infoln("cases count", len(s.cases))
		default:
			if !ok {
				s.cases = append(s.cases[:chosen], s.cases[chosen+1:]...)
				logrus.Infoln("cases count", len(s.cases))
				continue
			}
			p, ok := value.Interface().(*packets.PublishPacket)
			if !ok {
				continue
			}
			for clientID, session := range s.sessions {
				for topic := range session.Topics {
					if p.TopicName == topic {
						logrus.Debugf("send %s %v", clientID, p)
						_ = session.Send(p)
						break
					}
				}
			}
		}
	}
}


辽ICP备2021007608号 | © 2026 | kaisawind

Facebook Twitter GitHub