给 TiCDC 接上一个 unbound 的 channel
最近在改造 TiCDC 的 Sink 组件时需要为 MQ 类型的 Sink 接上一个 unbound 的 channel。 在搜索过程中发现了一个项目叫做 chann,它是一个接口统一并且支持范型的 channel。 虽然这个库看上去实现很简单,但是我在实际使用过程中并不是很顺利。下面我就介绍一下我在使用该库时遇到的问题和进行的思考。 此博客在 GitHub 上公开发布。 如果您有任何问题或疑问,请在此处打开一个 issue。 简介 该库由 golang 社区非常活跃的 changkun 编写,托管在他组建的 golang-design 组织下。 它提供了统一的接口来创建不同类型的 channel,并且支持范型: ch := chann.New[int]() // 无界限, 无容量限制 ch := chann.New[func()](chann.Cap(0)) // 没有缓存, 容量为 0 ch := chann.New[string](chann.Cap(100)) // 有缓存,容量为 100 它的发送和接收操作和原生 channel 一致: ch.In() <- 42 println(<-ch.Out()) // 42 它的关闭接口为: ch.Close() 从接口来看,它几乎能无缝的接入到我当前的项目当中,这也是我选择它的原因。 源码阅读 在开始分析我遇到的问题之前,我们需要先深入阅读一下源码。它的源码不是很长,并且我主要是用的是无界限的 chann,所以下面就着重看一下无界限的 chann 的源码。 type Chann[T any] struct { in, out chan T close chan struct{} cfg *config q []T } Chann 是一个范型结构体,它里面维护了 in 和 out channel,这就是我们能使用原生 channel 语法操作 chann 的入口和出口。 另外一个比较关键的字段是 q,它将负责存储 chann 的缓存。 它的构造方法: type config struct { typ chanType len, cap int64 } type Opt func(*config) func New[T any](opts ...Opt) *Chann[T] { cfg := &config{ cap: -1, len: 0, typ: unbounded, } if len(opts) > 1 { panic("chann: too many arguments") } for _, o := range opts { o(cfg) } ch := &Chann[T]{cfg: cfg, close: make(chan struct{})} switch ch.cfg.typ { case unbuffered: ch.in = make(chan T) ch.out = ch.in case buffered: ch.in = make(chan T, ch.cfg.cap) ch.out = ch.in case unbounded: ch.in = make(chan T, 16) ch.out = make(chan T, 16) go ch.unboundedProcessing() } return ch 它的构造方法抽象出了一个 Opts 可选参数,根据它我们能够统一构造方法,我们不传递 Opts 参数使用默认 config 就表明创建了一个无界限的 channel。注意:如果我们创建的是无界限的 chann,那么它将启动一个 goroutine 来处理发送和接收数据。同时入口和出口 channel 的缓存长度为 16。 ...