我有一个需要从一组阻塞队列中读取的要求。阻塞队列是由我正在使用的库创建的。我的代码必须从队列中读取。我不想为这些阻塞队列中的每一个创建一个读取器线程。相反,我想使用单个线程(或者可能最多使用 2/3 个线程)来轮询它们的数据可用性。由于一些阻塞队列可能长时间没有数据,而其中一些可能会出现数据突发。用小超时轮询队列会起作用,但这根本不是有效的,因为即使其中一些队列长时间没有数据,它仍然需要继续循环遍历所有队列。基本上,我正在寻找一种阻塞队列的选择/epoll(用于套接字)机制。任何线索都非常感谢。
不过,在 Go 中这样做真的很容易。下面的代码对通道和 goroutine 进行了相同的模拟:
package main
import "fmt"
import "time"
import "math/rand"
func sendMessage(sc chan string) {
var i int
for {
i = rand.Intn(10)
for ; i >= 0 ; i-- {
sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
}
i = 1000 + rand.Intn(32000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func sendNum(c chan int) {
var i int
for {
i = rand.Intn(16);
for ; i >= 0; i-- {
time.Sleep(20 * time.Millisecond)
c <- rand.Intn(65534)
}
i = 1000 + rand.Intn(24000);
time.Sleep(time.Duration(i) * time.Millisecond)
}
}
func main() {
msgchan := make(chan string, 32)
numchan := make(chan int, 32)
i := 0
for ; i < 8 ; i++ {
go sendNum(numchan)
go sendMessage(msgchan)
}
for {
select {
case msg := <- msgchan:
fmt.Printf("Worked on %s\n", msg)
case x := <- numchan:
fmt.Printf("I got %d \n", x)
}
}
}
我建议您考虑使用JCSP库。Go 的等价物
select
称为Alternative。您只需要一个消费线程,如果它使用Alternative
. 因此,这将是复用源数据的一种有效方式。如果您能够将 BlockingQueues 替换为 JCSP 通道,将会有很大帮助。通道的行为基本相同,但在通道端共享的扇出或扇入方面提供了更大程度的灵活性,特别是在使用
Alternative
.对于使用示例,这里是一个公平的多路复用器。此示例演示了一个过程,该过程将流量从其输入通道阵列公平地多路复用到其单个输出通道。无论竞争对手多么渴望,任何输入渠道都不会被饿死。
import org.jcsp.lang.*; public class FairPlex implements CSProcess { private final AltingChannelInput[] in; private final ChannelOutput out; public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) { this.in = in; this.out = out; } public void run () { final Alternative alt = new Alternative (in); while (true) { final int index = alt.fairSelect (); out.write (in[index].read ()); } } }
请注意,如果
priSelect
在上面使用,如果索引较低的频道不断要求服务,则索引较高的频道将被饿死。或者可以使用 , 代替fairSelect
,select
但无法进行饥饿分析。select
仅当饥饿不是问题时才应使用该机制。摆脱僵局
与 Go 一样,使用通道的 Java 程序必须设计为不会出现死锁。Java 中低级并发原语的实现很难正确实现,您需要一些可靠的东西。幸运的是,
Alternative
已经通过形式分析验证了,还有 JCSP 渠道。这使它成为一个可靠的选择。只是为了澄清一点混乱,当前的 JCSP 版本在 Maven 存储库中是1.1-rc5,而不是网站上所说的。