diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
index 12aa04fc6e..b61dac7acd 100644
--- a/synapse/api/events/factory.py
+++ b/synapse/api/events/factory.py
@@ -15,7 +15,7 @@
from synapse.api.events.room import (
RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
- InviteJoinEvent, RoomConfigEvent
+ InviteJoinEvent, RoomConfigEvent, RoomNameEvent, GenericEvent,
)
from synapse.util.stringutils import random_string
@@ -25,6 +25,7 @@ class EventFactory(object):
_event_classes = [
RoomTopicEvent,
+ RoomNameEvent,
MessageEvent,
RoomMemberEvent,
FeedbackEvent,
@@ -42,10 +43,9 @@ class EventFactory(object):
if "event_id" not in kwargs:
kwargs["event_id"] = random_string(10)
- try:
+ if etype in self._event_list:
handler = self._event_list[etype]
- except KeyError: # unknown event type
- # TODO allow custom event types.
- raise NotImplementedError("Unknown etype=%s" % etype)
+ else:
+ handler = GenericEvent
return handler(**kwargs)
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
index f3df849af2..42459f3f21 100644
--- a/synapse/api/events/room.py
+++ b/synapse/api/events/room.py
@@ -16,17 +16,45 @@
from . import SynapseEvent
+class GenericEvent(SynapseEvent):
+ def get_content_template(self):
+ return {}
+
+
class RoomTopicEvent(SynapseEvent):
TYPE = "m.room.topic"
+ internal_keys = SynapseEvent.internal_keys + [
+ "topic",
+ ]
+
def __init__(self, **kwargs):
kwargs["state_key"] = ""
+ if "topic" in kwargs["content"]:
+ kwargs["topic"] = kwargs["content"]["topic"]
super(RoomTopicEvent, self).__init__(**kwargs)
def get_content_template(self):
return {"topic": u"string"}
+class RoomNameEvent(SynapseEvent):
+ TYPE = "m.room.name"
+
+ internal_keys = SynapseEvent.internal_keys + [
+ "name",
+ ]
+
+ def __init__(self, **kwargs):
+ kwargs["state_key"] = ""
+ if "name" in kwargs["content"]:
+ kwargs["name"] = kwargs["content"]["name"]
+ super(RoomNameEvent, self).__init__(**kwargs)
+
+ def get_content_template(self):
+ return {"name": u"string"}
+
+
class RoomMemberEvent(SynapseEvent):
TYPE = "m.room.member"
@@ -38,6 +66,8 @@ class RoomMemberEvent(SynapseEvent):
def __init__(self, **kwargs):
if "target_user_id" in kwargs:
kwargs["state_key"] = kwargs["target_user_id"]
+ if "membership" not in kwargs:
+ kwargs["membership"] = kwargs.get("content", {}).get("membership")
super(RoomMemberEvent, self).__init__(**kwargs)
def get_content_template(self):
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
index 65b5a4ebb3..9f622df6bb 100644
--- a/synapse/api/notifier.py
+++ b/synapse/api/notifier.py
@@ -15,6 +15,7 @@
from synapse.api.constants import Membership
from synapse.api.events.room import RoomMemberEvent
+from synapse.api.streams.event import EventsStreamData
from twisted.internet import defer
from twisted.internet import reactor
@@ -66,7 +67,7 @@ class Notifier(object):
self._notify_and_callback(
user_id=user_id,
event_data=event.get_dict(),
- stream_type=event.type,
+ stream_type=EventsStreamData.EVENT_TYPE,
store_id=store_id)
def on_new_user_event(self, user_id, event_data, stream_type, store_id):
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
index 4b6d739e54..414b05be30 100644
--- a/synapse/api/streams/event.py
+++ b/synapse/api/streams/event.py
@@ -18,6 +18,7 @@
from twisted.internet import defer
from synapse.api.errors import EventStreamError
+from synapse.api.events import SynapseEvent
from synapse.api.events.room import (
RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent
)
@@ -28,17 +29,17 @@ import logging
logger = logging.getLogger(__name__)
-class MessagesStreamData(StreamData):
- EVENT_TYPE = MessageEvent.TYPE
+class EventsStreamData(StreamData):
+ EVENT_TYPE = "EventsStream"
def __init__(self, hs, room_id=None, feedback=False):
- super(MessagesStreamData, self).__init__(hs)
+ super(EventsStreamData, self).__init__(hs)
self.room_id = room_id
self.with_feedback = feedback
@defer.inlineCallbacks
def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_message_stream(
+ data, latest_ver = yield self.store.get_room_events_stream(
user_id=user_id,
from_key=from_key,
to_key=to_key,
@@ -50,74 +51,7 @@ class MessagesStreamData(StreamData):
@defer.inlineCallbacks
def max_token(self):
- val = yield self.store.get_max_message_id()
- defer.returnValue(val)
-
-
-class RoomMemberStreamData(StreamData):
- EVENT_TYPE = RoomMemberEvent.TYPE
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_member_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key
- )
-
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_member_id()
- defer.returnValue(val)
-
-
-class FeedbackStreamData(StreamData):
- EVENT_TYPE = FeedbackEvent.TYPE
-
- def __init__(self, hs, room_id=None):
- super(FeedbackStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_feedback_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_feedback_id()
- defer.returnValue(val)
-
-
-class RoomDataStreamData(StreamData):
- EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types
-
- def __init__(self, hs, room_id=None):
- super(RoomDataStreamData, self).__init__(hs)
- self.room_id = room_id
-
- @defer.inlineCallbacks
- def get_rows(self, user_id, from_key, to_key, limit):
- (data, latest_ver) = yield self.store.get_room_data_stream(
- user_id=user_id,
- from_key=from_key,
- to_key=to_key,
- limit=limit,
- room_id=self.room_id
- )
- defer.returnValue((data, latest_ver))
-
- @defer.inlineCallbacks
- def max_token(self):
- val = yield self.store.get_max_room_data_id()
+ val = yield self.store.get_room_events_max_id()
defer.returnValue(val)
@@ -227,7 +161,10 @@ class EventStream(PaginationStream):
self.user_id, from_pkey, to_pkey, limit
)
- chunk += event_chunk
+ chunk.extend([
+ e.get_dict() if isinstance(e, SynapseEvent) else e
+ for e in event_chunk
+ ])
next_ver.append(str(max_pkey))
defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
|