Java 中 Go 通道的等价物

IT小君   2022-01-14T02:32:43

我有一个需要从一组阻塞队列中读取的要求。阻塞队列是由我正在使用的库创建的。我的代码必须从队列中读取。我不想为这些阻塞队列中的每一个创建一个读取器线程。相反,我想使用单个线程(或者可能最多使用 2/3 个线程)来轮询它们的数据可用性。由于一些阻塞队列可能长时间没有数据,而其中一些可能会出现数据突发。用小超时轮询队列会起作用,但这根本不是有效的,因为即使其中一些队列长时间没有数据,它仍然需要继续循环遍历所有队列。基本上,我正在寻找一种阻塞队列的选择/epoll(用于套接字)机制。任何线索都非常感谢。

不过,在 Go 中这样做真的很容易。下面的代码对通道和 goroutine 进行了相同的模拟:

package main

import "fmt"
import "time"
import "math/rand"

func sendMessage(sc chan string) {
    var i int

    for {
        i =  rand.Intn(10)
        for ; i >= 0 ; i-- {
            sc <- fmt.Sprintf("Order number %d",rand.Intn(100))
        }
        i = 1000 + rand.Intn(32000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func sendNum(c chan int) {
    var i int 
    for  {
        i = rand.Intn(16);
        for ; i >=  0; i-- {
            time.Sleep(20 * time.Millisecond)
            c <- rand.Intn(65534)
        }
        i = 1000 + rand.Intn(24000);
        time.Sleep(time.Duration(i) * time.Millisecond)
    }
}

func main() {
    msgchan := make(chan string, 32)
    numchan := make(chan int, 32)
    i := 0
    for ; i < 8 ; i++ {
        go sendNum(numchan)
        go sendMessage(msgchan)
    }
    for {
        select {
        case msg := <- msgchan:
            fmt.Printf("Worked on  %s\n", msg)
        case x := <- numchan:
            fmt.Printf("I got %d \n", x)
        }
    }
}
评论(4)
IT小君

我建议您考虑使用JCSP库。Go 的等价物select称为Alternative您只需要一个消费线程,如果它使用Alternative. 因此,这将是复用源数据的一种有效方式。

如果您能够将 BlockingQueues 替换为 JCSP 通道,将会有很大帮助。通道的行为基本相同,但在通道端共享的扇出或扇入方面提供了更大程度的灵活性,特别是在使用Alternative.

对于使用示例,这里是一个公平的多路复用器。此示例演示了一个过程,该过程将流量从其输入通道阵列公平地多路复用到其单个输出通道。无论竞争对手多么渴望,任何输入渠道都不会被饿死。

import org.jcsp.lang.*;

public class FairPlex implements CSProcess {

   private final AltingChannelInput[] in;
   private final ChannelOutput out;

   public FairPlex (final AltingChannelInput[] in, final ChannelOutput out) {
     this.in = in;
     this.out = out;
   }

   public void run () {

     final Alternative alt = new Alternative (in);

     while (true) {
       final int index = alt.fairSelect ();
       out.write (in[index].read ());
     }
   }
 }

请注意,如果priSelect在上面使用,如果索引较低的频道不断要求服务,则索引较高的频道将被饿死。或者可以使用 , 代替fairSelect,select但无法进行饥饿分析。select仅当饥饿不是问题时才应使用该机制。

摆脱僵局

与 Go 一样,使用通道的 Java 程序必须设计为不会出现死锁。Java 中低级并发原语的实现很难正确实现,您需要一些可靠的东西。幸运的是,Alternative已经通过形式分析验证了,还有 JCSP 渠道。这使它成为一个可靠的选择。

只是为了澄清一点混乱,当前的 JCSP 版本在 Maven 存储库中是1.1-rc5,而不是网站上所说的。

2022-01-14T02:32:43   回复
IT小君

唯一的方法是用功能更强大的类的对象替换标准队列,当数据插入空队列时通知消费者。这个类仍然可以实现BlockingQueue接口,所以对方(生产者)看不出区别。诀窍是put操作还应该引发一个标志并通知消费者。消费者在轮询所有线程后,清除标志并调用Object.wait().

2022-01-14T02:32:43   回复
IT小君

Java6+ 的另一种选择

一个 BlockingDeque 实现类:

import java.lang.ref.WeakReference;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

class GoChannelPool {

    private final static GoChannelPool defaultInstance = newPool();

    private final AtomicLong serialNumber = new AtomicLong();
    private final WeakHashMap<Long, WeakReference<GoChannel>> channelWeakHashMap = new WeakHashMap<>();
    private final LinkedBlockingDeque<GoChannelObject> totalQueue = new LinkedBlockingDeque<>();

    public <T> GoChannel<T> newChannel()  {
        GoChannel<T> channel = new GoChannel<>();
        channelWeakHashMap.put(channel.getId(), new WeakReference<GoChannel>(channel));
        return channel;
    }

    public void select(GoSelectConsumer consumer) throws InterruptedException {
        consumer.accept(getTotalQueue().take());
    }

    public int size() {
        return getTotalQueue().size();
    }

    public int getChannelCount() {
        return channelWeakHashMap.values().size();
    }

    private LinkedBlockingDeque<GoChannelObject> getTotalQueue() {
        return totalQueue;
    }

    public static GoChannelPool getDefaultInstance() {
        return defaultInstance;
    }

    public static GoChannelPool newPool()  {
        return new GoChannelPool();
    }

    private GoChannelPool() {}

    private long getSerialNumber() {
        return serialNumber.getAndIncrement();
    }

    private synchronized void syncTakeAndDispatchObject() throws InterruptedException {
        select(new GoSelectConsumer() {
            @Override
            void accept(GoChannelObject t) {

                WeakReference<GoChannel> goChannelWeakReference = channelWeakHashMap.get(t.channel_id);
                GoChannel channel = goChannelWeakReference != null ? goChannelWeakReference.get() : null;
                if (channel != null) {
                    channel.offerBuffer(t);
                }
            }
        });
    }

    class GoChannel<E> {
        // Instance
        private final long id;
        private final LinkedBlockingDeque<GoChannelObject<E>> buffer = new LinkedBlockingDeque<>();

        public GoChannel() {
            this(getSerialNumber());
        }

        private GoChannel(long id) {
            this.id = id;
        }

        public long getId() {
            return id;
        }

        public E take() throws InterruptedException {
            GoChannelObject object;
            while((object = pollBuffer()) == null) {
                syncTakeAndDispatchObject();
            }

            return (E) object.data;
        }

        public void offer(E object) {
            GoChannelObject<E> e = new GoChannelObject();
            e.channel_id = getId();
            e.data = object;

            getTotalQueue().offer(e);
        }

        protected void offerBuffer(GoChannelObject<E> data) {
            buffer.offer(data);
        }

        protected GoChannelObject<E> pollBuffer() {
            return buffer.poll();
        }

        public int size() {
            return buffer.size();
        }

        @Override
        protected void finalize() throws Throwable {
            super.finalize();

            channelWeakHashMap.remove(getId());
        }
    }

    class GoChannelObject<E> {
        long channel_id;
        E data;

        boolean belongsTo(GoChannel channel) {
            return channel != null && channel_id == channel.id;
        }
    }

    abstract static class GoSelectConsumer{
        abstract void accept(GoChannelObject t);
    }
}

那么我们可以这样使用它:

GoChannelPool pool = GoChannelPool.getDefaultInstance();
final GoChannelPool.GoChannel<Integer> numberCh = pool.newChannel();
final GoChannelPool.GoChannel<String> stringCh = pool.newChannel();
final GoChannelPool.GoChannel<String> otherCh = pool.newChannel();

ExecutorService executorService = Executors.newCachedThreadPool();
int times;
times = 2000;
final CountDownLatch countDownLatch = new CountDownLatch(times * 2);

final AtomicInteger numTimes = new AtomicInteger();
final AtomicInteger strTimes = new AtomicInteger();
final AtomicInteger defaultTimes = new AtomicInteger();

final int finalTimes = times;
executorService.submit(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < finalTimes; i++) {
            numberCh.offer(i);

            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});
executorService.submit(new Runnable() {
    @Override
    public void run() {
        for (int i = 0; i < finalTimes; i++) {
            stringCh.offer("s"+i+"e");

            try {
                Thread.sleep((long) (Math.random() * 10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
});

int otherTimes = 3;
for (int i = 0; i < otherTimes; i++) {
    otherCh.offer("a"+i);
}

for (int i = 0; i < times*2 + otherTimes; i++) {
    pool.select(new GoChannelPool.GoSelectConsumer() {
        @Override
        void accept(GoChannelPool.GoChannelObject t) {
            // The data order should be randomized.
            System.out.println(t.data);

            countDownLatch.countDown();

            if (t.belongsTo(stringCh)) {
                strTimes.incrementAndGet();
                return;
            }
            else if (t.belongsTo(numberCh)) {
                numTimes.incrementAndGet();
                return;
            }

            defaultTimes.incrementAndGet();
        }
    });
}
countDownLatch.await(10, TimeUnit.SECONDS);

/**
The console output of data should be randomized.
numTimes.get() should be 2000
strTimes.get() should be 2000
defaultTimes.get() should be 3
*/

并注意只有当频道属于同一个 GoChannelPool 时,选择才有效,或者只使用默认的 GoChannelPool(但是如果太多频道共享同一个 GoChannelPool,性能会降低)

2022-01-14T02:32:43   回复
IT小君

我记得当我对 Java 很陌生时,不知道线程可以共享进程的内存,我会让我的线程使用(TCP/本地)套接字进行通信。也许这也可以。

2022-01-14T02:32:44   回复