A Chat System

In this tutorial, we will build a simple, distributed chat system based on RSB.

The initial goal of the first part is having a chat client which sends and receives text messages to and from other clients without the need for a server. A session could look like this:

$ rsb-chat-client
> hi, anyone listening?
other-user: hi, i thought, i was the only one :)
> /quit
$

As an extension, in the second part the chat program should be able to send and receive avatar images to and from other chat clients.

Part 1: Send and Receiving Text Messages

The distributed chat system can be organized by assigning a scope or the form /chat/text/NICKNAME to each participating nickname. This allows receiving messages from a particular sender by listening on /chat/text/NICKNAME and receiving all messages by listening on the superscope /chat/text/.

Implementation-wise, sending and receiving textual chat messages, requires an informer and a listener on the respective appropriate scope:

  • The informer publishes messages on /chat/text/NICKNAME

  • The listener receives all messages on /chat/text/.

    Note

    This includes one’s own published messages, so these have to be filtered out to prevent an “echo” effect.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import sys
import rsb
import rsb.filter

def chatClient(nick):
    informer = rsb.createInformer('/chat/text/%s' % nick)

    listener = rsb.createListener('/chat/text')
    def printMessage(event):
        sys.stdout.write('%s%s: %s\n> '
                         % (chr(13),
                            event.scope.components[-1],
                            event.data))
        sys.stdout.flush()
    listener.addFilter(rsb.filter.OriginFilter(informer.id, invert = True))
    listener.addHandler(printMessage)

    while True:
        sys.stdout.write('> ')
        sys.stdout.flush()
        line = sys.stdin.readline().strip()
        if line == '/quit':
            return
        if line:
            informer.publishData(line)

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print 'usage: %s NICKNAME' % sys.argv[0]
        sys.exit(1)
    nick = sys.argv[1]

    chatClient(nick)

Download this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#include <string>

#include <boost/shared_ptr.hpp>

#include <rsb/Event.h>
#include <rsb/Factory.h>
#include <rsb/Handler.h>
#include <rsb/filter/OriginFilter.h>

using namespace std;

using namespace boost;

void printMessage(rsb::EventPtr event) {
    shared_ptr<string> message
        = static_pointer_cast<string>(event->getData());

    string sender = event->getScope().getComponents().back();

    cout << "\r" << sender << ": " << *message  << endl
         << "> ";
    cout.flush();
}

int main(int argc, char *argv[]) {
    if (argc != 2) {
        cerr << "usage: " << argv[0] << " NICKNAME" << endl;
        return EXIT_FAILURE;
    }
    string nick = argv[1];

    rsb::Factory &factory = rsb::Factory::getInstance();

    rsb::Informer<string>::Ptr informer
        = factory.createInformer<string>("/chat/text/" + nick);
    rsb::ListenerPtr listener = factory.createListener("/chat/text");
    listener->addFilter(rsb::filter::FilterPtr(new rsb::filter::OriginFilter(informer->getId(), true)));
    listener->addHandler(rsb::HandlerPtr(new rsb::EventFunctionHandler(&printMessage)));

    while (true) {
        cout << "> ";
        cout.flush();
        shared_ptr<string> message(new string());
        getline(cin, *message);
        if (*message == "/quit") {
            break;
        }
        informer->publish(message);
    }

    return EXIT_SUCCESS;
}

Download this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package chat1;

import java.io.IOException;
import java.io.InputStreamReader;
import java.io.BufferedReader;
import java.util.List;

import rsb.RSBException;
import rsb.Event;
import rsb.Listener;
import rsb.Informer;
import rsb.Factory;
import rsb.Handler;

import rsb.filter.OriginFilter;

public class Chat1 {

    private static class MessagePrinter implements Handler {
        @Override
        public void internalNotify(Event e) {
            List<String> results =  e.getScope().getComponents();
                System.out.print("\r" + results.get(results.size()-1) + ": " + e.getData() + "\n> ");
            System.out.flush();
        }
    };

    public static void main(String args[]) throws IOException, RSBException {
        if (args.length != 1) {
            System.err.println("usage: <java command> NICKNAME");
            System.exit(1);
        }
        String nick = args[0];

        Informer informer = Factory.getInstance().createInformer("/chat/text/" + nick);
        informer.activate();

        Listener listener = Factory.getInstance().createListener("/chat/text");
        listener.activate();
        listener.addFilter(new OriginFilter(informer.getId(), true));
        listener.addHandler(new MessagePrinter(), true);

        InputStreamReader converter = new InputStreamReader(System.in);
        BufferedReader in = new BufferedReader(converter);
        while (true) {
            System.out.print("> ");
            System.out.flush();
            String line = in.readLine();
            if (line.equals("/quit")) {
                break;
            }
            informer.send(line);
        }
    }
};

Download this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
(cl:in-package #:chat)

(defvar *base-url* "/chat/")

(defun chat (nick)
  (let* ((text-url  (puri:merge-uris "text/" *base-url*))
         (speak-url (puri:merge-uris nick text-url)))
    (with-informer (i speak-url t)
      (with-listener (l text-url)
        (setf (receiver-filters l)
              (list (complement (filter :origin :origin (participant-id i)))))
        (with-handler l
            ((event)
             (format *standard-output* "~C ~A: ~A~%> "
                     #\Return
                     (lastcar (scope-components (event-scope event)))
                     (event-data event))
             (finish-output *standard-output*))
          (loop (format *standard-output* "> ")
             (finish-output *standard-output*)
             (let ((line (read-line)))
               (when (string= line "/quit")
                 (return))
               (send i line))))))))

Download this example

Part 2: Avatar Images

Avatar images are exchanged between participants of the distributed chat via RSB‘s RPC mechanism. In order to implement this, each chat program

  • creates a local server providing the avatar image of the participant via a method get method under the scope /chat/avatar/NICKNAME.
  • creates a remote server for downloading avatar images from other participants by calling the methods mentioned above.
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import sys

import rsb
from rsb.transport.converter import ProtocolBufferConverter, registerGlobalConverter

from rst.vision.Image_pb2 import Image

import chat1

registerGlobalConverter(ProtocolBufferConverter(messageClass = Image))
rsb.__defaultParticipantConfig = rsb.ParticipantConfig.fromDefaultSources()

class AvatarServer (object):
    def __init__(self, nick, image):
        self.__image = image
        self.__localServer = rsb.createServer('/chat/avatar/%s' % nick)
        self.__localServer.addMethod('get', self.sendAvatar)

    def sendAvatar(self, ignored):
        return self.__image

    def getAvatar(self, nick):
        remoteServer = rsb.createRemoteServer('/chat/avatar/%s' % nick)
        return remoteServer.get(None)

__image = Image()
__image.width = 32
__image.height = 32
__image.data = 'c'*(32 * 32 * 3)

if __name__ == '__main__':
    if len(sys.argv) < 2:
        print 'usage: %s NICKNAME' % sys.argv[0]
        sys.exit(1)
    nick = sys.argv[1]

    __server = AvatarServer(nick, __image)
    chat1.chatClient(nick)

Download this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
#include <string>

#include <boost/shared_ptr.hpp>

#include <rsb/Event.h>
#include <rsb/Factory.h>
#include <rsb/Handler.h>
#include <rsb/filter/OriginFilter.h>

#include <rsb/converter/Repository.h>
#include <rsb/converter/ProtocolBufferConverter.h>

#include <rsb/patterns/Server.h>

#include <rst/vision/Image.pb.h>

using namespace std;

using namespace boost;

void printMessage(rsb::EventPtr event) {
    shared_ptr<string> message
        = static_pointer_cast<string>(event->getData());

    string sender = event->getScope().getComponents().back();
    rsb::patterns::RemoteServerPtr rms = rsb::Factory::getInstance().createRemoteServer("/chat/avatar/" +sender);
    shared_ptr<rst::vision::Image> Image = rms->call<rst::vision::Image>("get", shared_ptr<string>(new string("bla")));
    cout << "\r" << "-- Image width is: "<< Image->width() << " and height: " << Image->height() << endl << sender << ": " << *message  << endl
         << "> ";
    cout.flush();
}

typedef shared_ptr<rst::vision::Image> ImagePtr;

class AvatarCallback: public rsb::patterns::Server::Callback<std::string, rst::vision::Image> {
public:
    AvatarCallback(ImagePtr image):
        image(image) {
    }

    ImagePtr call(const string &methodName, shared_ptr<string> /*ignored*/) {
        return this->image;
    }
private:
    ImagePtr image;
};

int main(int argc, char *argv[]) {
    rsb::converter::stringConverterRepository()->registerConverter(rsb::converter::Converter<string>::Ptr(new rsb::converter::ProtocolBufferConverter<rst::vision::Image>()));

    if (argc != 2) {
        cerr << "usage: " << argv[0] << " NICKNAME" << endl;
        return EXIT_FAILURE;
    }
    string nick = argv[1];

    rsb::Factory &factory = rsb::Factory::getInstance();

    rsb::Informer<string>::Ptr informer
        = factory.createInformer<string>("/chat/text/" + nick);
    rsb::ListenerPtr listener = factory.createListener("/chat/text");
    listener->addFilter(rsb::filter::FilterPtr(new rsb::filter::OriginFilter(informer->getId(), true)));
    listener->addHandler(rsb::HandlerPtr(new rsb::EventFunctionHandler(&printMessage)));

    ImagePtr avatarImage(new rst::vision::Image());
    avatarImage->set_width(32);
    avatarImage->set_height(32);
    avatarImage->mutable_data()->resize(32 * 32 * 3);
    rsb::patterns::ServerPtr avatarServer
        = factory.createServer("/chat/avatar/"  + nick);
    avatarServer->registerMethod("get", rsb::patterns::Server::CallbackPtr(new AvatarCallback(avatarImage)));

    while (true) {
        cout << "> ";
        cout.flush();
        shared_ptr<string> message(new string());
        getline(cin, *message);
        if (*message == "/quit") {
            break;
        }
        informer->publish(message);
    }

    return EXIT_SUCCESS;
}

Download this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package chat2;

import com.google.protobuf.ByteString;
import java.util.logging.Level;
import java.util.logging.Logger;
import rsb.Factory;
import rsb.InitializeException;
import rsb.patterns.DataCallback;

import rst.vision.ImageType;
import rst.vision.ImageType.Image;

public class AvatarServer {

    private ImageType.Image image;

    private class Get implements DataCallback<ImageType.Image, Object> {
        public ImageType.Image invoke(Object ignored) {
            return AvatarServer.this.image;
        }

    }

    AvatarServer(String nickname) {
        rsb.patterns.LocalServer server = ...
            try {
                Image.Builder builder  = Image.newBuilder();
                this.image =  builder.setWidth(32).setHeight(32).setData(ByteString.EMPTY).build();
                server.addMethod("get", new Get());
                server.activate();
            } catch (InitializeException ex) {
                Logger.getLogger(AvatarServer.class.getName()).log(Level.SEVERE, null, ex);
            }
    }

};

Download this example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
(cl:in-package #:chat)

(defvar *avatar* (make-instance 'rst.vision:image
                                :width  32
                                :height 32
                                :data   (nibbles:make-octet-vector (* 32 32 3))))

(let ((avatar-url (puri:merge-uris "avatar/" *base-url*)))

  (defun start-avatar-server (nick avatar)
    (let* ((url            (puri:merge-uris nick avatar-url))
           (server (make-local-server url)))
      (setf (server-method server "get") (lambda () avatar))
      server))

  (defun get-avatar (nick)
    (with-remote-server (server (puri:merge-uris nick avatar-url))
      (call server "get" rsb.converter:+no-value+))))

Download this example