mirror of
https://source.quilibrium.com/quilibrium/ceremonyclient.git
synced 2025-01-23 22:25:19 +00:00
113 lines
2.6 KiB
Go
113 lines
2.6 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
)
|
|
|
|
// ChatRoomBufSize is the number of incoming messages to buffer for each topic.
|
|
const ChatRoomBufSize = 128
|
|
|
|
// ChatRoom represents a subscription to a single PubSub topic. Messages
|
|
// can be published to the topic with ChatRoom.Publish, and received
|
|
// messages are pushed to the Messages channel.
|
|
type ChatRoom struct {
|
|
// Messages is a channel of messages received from other peers in the chat room
|
|
Messages chan *ChatMessage
|
|
|
|
ctx context.Context
|
|
ps *pubsub.PubSub
|
|
topic *pubsub.Topic
|
|
sub *pubsub.Subscription
|
|
|
|
roomName string
|
|
self peer.ID
|
|
nick string
|
|
}
|
|
|
|
// ChatMessage gets converted to/from JSON and sent in the body of pubsub messages.
|
|
type ChatMessage struct {
|
|
Message string
|
|
SenderID string
|
|
SenderNick string
|
|
}
|
|
|
|
// JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning
|
|
// a ChatRoom on success.
|
|
func JoinChatRoom(ctx context.Context, ps *pubsub.PubSub, selfID peer.ID, nickname string, roomName string) (*ChatRoom, error) {
|
|
// join the pubsub topic
|
|
topic, err := ps.Join(topicName(roomName))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// and subscribe to it
|
|
sub, err := topic.Subscribe()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cr := &ChatRoom{
|
|
ctx: ctx,
|
|
ps: ps,
|
|
topic: topic,
|
|
sub: sub,
|
|
self: selfID,
|
|
nick: nickname,
|
|
roomName: roomName,
|
|
Messages: make(chan *ChatMessage, ChatRoomBufSize),
|
|
}
|
|
|
|
// start reading messages from the subscription in a loop
|
|
go cr.readLoop()
|
|
return cr, nil
|
|
}
|
|
|
|
// Publish sends a message to the pubsub topic.
|
|
func (cr *ChatRoom) Publish(message string) error {
|
|
m := ChatMessage{
|
|
Message: message,
|
|
SenderID: cr.self.String(),
|
|
SenderNick: cr.nick,
|
|
}
|
|
msgBytes, err := json.Marshal(m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return cr.topic.Publish(cr.ctx, msgBytes)
|
|
}
|
|
|
|
func (cr *ChatRoom) ListPeers() []peer.ID {
|
|
return cr.ps.ListPeers(topicName(cr.roomName))
|
|
}
|
|
|
|
// readLoop pulls messages from the pubsub topic and pushes them onto the Messages channel.
|
|
func (cr *ChatRoom) readLoop() {
|
|
for {
|
|
msg, err := cr.sub.Next(cr.ctx)
|
|
if err != nil {
|
|
close(cr.Messages)
|
|
return
|
|
}
|
|
// only forward messages delivered by others
|
|
if msg.ReceivedFrom == cr.self {
|
|
continue
|
|
}
|
|
cm := new(ChatMessage)
|
|
err = json.Unmarshal(msg.Data, cm)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// send valid messages onto the Messages channel
|
|
cr.Messages <- cm
|
|
}
|
|
}
|
|
|
|
func topicName(roomName string) string {
|
|
return "chat-room:" + roomName
|
|
}
|