mercredi 25 juillet 2018

Calling Thread.Sleep in Subscriber thread causes Publisher thread to sleep

I have implemented Publish and Subscribe pattern in my application but when I called Thread.sleep() method in any one Subscriber or any one of my Subscriber throws exception then all others subscribers and publishers gets affected by this so how can I prevent this from happening.

I have created one small demo of above problem

Publisher Code

import java.util.Random;

public class Publisher extends Thread {

    Broker broker = Broker.getInstance();
    Random random = new Random();

    @Override
    public void run() {
        while (true) {
            System.out.println("Published " + new Timestamp(System.currentTimeMillis()));
            broker.updateSubscribers(Integer.toString(random.nextInt(250)));
        }

    }
}

Subscriber Interface

public interface Subscriber {

    public void onUpdate(String message);
}

MessageSubscriber code

import java.util.logging.Level;
import java.util.logging.Logger;

public class MessageSubscriber extends Thread implements Subscriber {

    Broker broker = Broker.getInstance();

    @Override
    public void run() {
        System.out.println("MessageSubscriber started...");
        broker.subscribe(this);
    }

    @Override
    public void onUpdate(String message) {
        try {
            System.out.println(message);
            sleep(1000);                    // called sleep affects the publisher too
        } catch (InterruptedException ex) {
            Logger.getLogger(MessageSubscriber.class.getName()).log(Level.SEVERE, null, ex);
        }
    }

}

As you can see I have called sleep method in MessageSubscriber which also affects the Publisher and makes it sleep too for that duration

Edit added Broker Code

import java.util.ArrayList;
import java.util.List;

/**
 *
 * @author hemants
 */
public class Broker {

    List<Subscriber> subscribersList = new ArrayList<>();

    private Broker() {
    }

    public static Broker getInstance() {
        return BrokerHolder.INSTANCE;
    }

    private static class BrokerHolder {

        private static final Broker INSTANCE = new Broker();
    }

    public void subscribe(Subscriber s) {
        subscribersList.add(s);
    }

    public void unsubscribe(Subscriber s) {
        subscribersList.remove(s);
    }

    public void updateSubscribers(String message) {
        subscribersList.stream().forEach(subscriber -> subscriber.onUpdate(message));
    }
}

Main class to run above code

public class PubSubPattern {

    public static void main(String[] args) {
        Publisher publisher = new Publisher();
        publisher.start();

        MessageSubscriber messageSubscriber = new MessageSubscriber();
        messageSubscriber.start();
    }
}

Aucun commentaire:

Enregistrer un commentaire