diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 78df9ac53e..85a0f5dff5 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -31,7 +31,8 @@ class BaseHandler(object):
class BaseRoomHandler(BaseHandler):
@defer.inlineCallbacks
- def _on_new_room_event(self, event, snapshot, extra_destinations=[]):
+ def _on_new_room_event(self, event, snapshot, extra_destinations=[],
+ extra_users=[]):
snapshot.fill_out_prev_events(event)
store_id = yield self.store.persist_event(event)
@@ -43,7 +44,7 @@ class BaseRoomHandler(BaseHandler):
)))
event.destinations = list(destinations)
- self.notifier.on_new_room_event(event, store_id)
+ self.notifier.on_new_room_event(event, extra_users=[])
federation_handler = self.hs.get_handlers().federation_handler
yield federation_handler.handle_new_event(event, snapshot)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 1bd173acd8..e08231406d 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -15,19 +15,17 @@
from twisted.internet import defer
+from synapse.api.events import SynapseEvent
+
from ._base import BaseHandler
-from synapse.api.streams.event import (
- EventStream, EventsStreamData
-)
-from synapse.handlers.presence import PresenceStreamData
+import logging
-class EventStreamHandler(BaseHandler):
- stream_data_classes = [
- EventsStreamData,
- PresenceStreamData,
- ]
+logger = logging.getLogger(__name__)
+
+
+class EventStreamHandler(BaseHandler):
def __init__(self, hs):
super(EventStreamHandler, self).__init__(hs)
@@ -43,45 +41,12 @@ class EventStreamHandler(BaseHandler):
self.clock = hs.get_clock()
- def get_event_stream_token(self, stream_type, store_id, start_token):
- """Return the next token after this event.
-
- Args:
- stream_type (str): The StreamData.EVENT_TYPE
- store_id (int): The new storage ID assigned from the data store.
- start_token (str): The token the user started with.
- Returns:
- str: The end token.
- """
- for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
- if stream_cls.EVENT_TYPE == stream_type:
- # this is the stream for this event, so replace this part of
- # the token
- store_ids = start_token.split(EventStream.SEPARATOR)
- store_ids[i] = str(store_id)
- return EventStream.SEPARATOR.join(store_ids)
- raise RuntimeError("Didn't find a stream type %s" % stream_type)
+ self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def get_stream(self, auth_user_id, pagin_config, timeout=0):
- """Gets events as an event stream for this user.
-
- This function looks for interesting *events* for this user. This is
- different from the notifier, which looks for interested *users* who may
- want to know about a single event.
-
- Args:
- auth_user_id (str): The user requesting their event stream.
- pagin_config (synapse.api.streams.PaginationConfig): The config to
- use when obtaining the stream.
- timeout (int): The max time to wait for an incoming event in ms.
- Returns:
- A pagination stream API dict
- """
auth_user = self.hs.parse_userid(auth_user_id)
- stream_id = object()
-
try:
if auth_user not in self._streams_per_user:
self._streams_per_user[auth_user] = 0
@@ -94,41 +59,30 @@ class EventStreamHandler(BaseHandler):
)
self._streams_per_user[auth_user] += 1
- # construct an event stream with the correct data ordering
- stream_data_list = []
- for stream_class in EventStreamHandler.stream_data_classes:
- stream_data_list.append(stream_class(self.hs))
- event_stream = EventStream(auth_user_id, stream_data_list)
-
- # fix unknown tokens to known tokens
- pagin_config = yield event_stream.fix_tokens(pagin_config)
-
- # register interest in receiving new events
- self.notifier.store_events_for(user_id=auth_user_id,
- stream_id=stream_id,
- from_tok=pagin_config.from_tok)
-
- # see if we can grab a chunk now
- data_chunk = yield event_stream.get_chunk(config=pagin_config)
-
- # if there are previous events, return those. If not, wait on the
- # new events for 'timeout' seconds.
- if len(data_chunk["chunk"]) == 0 and timeout != 0:
- results = yield defer.maybeDeferred(
- self.notifier.get_events_for,
- user_id=auth_user_id,
- stream_id=stream_id,
- timeout=timeout
- )
- if results:
- defer.returnValue(results)
+ if pagin_config.from_token is None:
+ pagin_config.from_token = None
- defer.returnValue(data_chunk)
- finally:
- # cleanup
- self.notifier.purge_events_for(user_id=auth_user_id,
- stream_id=stream_id)
+ rm_handler = self.hs.get_handlers().room_member_handler
+ room_ids = yield rm_handler.get_rooms_for_user(auth_user)
+ events, tokens = yield self.notifier.get_events_for(
+ auth_user, room_ids, pagin_config, timeout
+ )
+
+ chunks = [
+ e.get_dict() if isinstance(e, SynapseEvent) else e
+ for e in events
+ ]
+
+ chunk = {
+ "chunk": chunks,
+ "start": tokens[0].to_string(),
+ "end": tokens[1].to_string(),
+ }
+
+ defer.returnValue(chunk)
+
+ finally:
self._streams_per_user[auth_user] -= 1
if not self._streams_per_user[auth_user]:
del self._streams_per_user[auth_user]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 7253f56322..9023c3d403 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -176,7 +176,7 @@ class FederationHandler(BaseHandler):
)
if not backfilled:
- yield self.notifier.on_new_room_event(event, store_id)
+ yield self.notifier.on_new_room_event(event)
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index be10162db5..c479908f61 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState
-from synapse.api.streams import StreamData
from ._base import BaseHandler
@@ -677,46 +676,10 @@ class PresenceHandler(BaseHandler):
statuscache.make_event(user=observed_user, clock=self.clock)
self.notifier.on_new_user_event(
- observer_user.to_string(),
- event_data=statuscache.make_event(
- user=observed_user,
- clock=self.clock
- ),
- stream_type=PresenceStreamData,
- store_id=statuscache.serial
+ [observer_user],
)
-class PresenceStreamData(StreamData):
- def __init__(self, hs):
- super(PresenceStreamData, self).__init__(hs)
- self.presence = hs.get_handlers().presence_handler
-
- def get_rows(self, user_id, from_key, to_key, limit, direction):
- from_key = int(from_key)
- to_key = int(to_key)
-
- cachemap = self.presence._user_cachemap
-
- # TODO(paul): limit, and filter by visibility
- updates = [(k, cachemap[k]) for k in cachemap
- if from_key < cachemap[k].serial <= to_key]
-
- if updates:
- clock = self.presence.clock
-
- latest_serial = max([x[1].serial for x in updates])
- data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
- return ((data, latest_serial))
- else:
- return (([], self.presence._user_cachemap_latest_serial))
-
- def max_token(self):
- return self.presence._user_cachemap_latest_serial
-
-PresenceStreamData.EVENT_TYPE = PresenceStreamData
-
-
class UserPresenceCache(object):
"""Store an observed user's state and status message.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b4b051888..f01349b339 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,8 +22,7 @@ from synapse.api.errors import RoomError, StoreError, SynapseError
from synapse.api.events.room import (
RoomTopicEvent, RoomMemberEvent, RoomConfigEvent
)
-from synapse.api.streams.event import EventStream, EventsStreamData
-from synapse.handlers.presence import PresenceStreamData
+from synapse.streams.config import PaginationConfig
from synapse.util import stringutils
from ._base import BaseRoomHandler
@@ -107,13 +106,24 @@ class MessageHandler(BaseRoomHandler):
"""
yield self.auth.check_joined_room(room_id, user_id)
- data_source = [
- EventsStreamData(self.hs, room_id=room_id, feedback=feedback)
- ]
- event_stream = EventStream(user_id, data_source)
- pagin_config = yield event_stream.fix_tokens(pagin_config)
- data_chunk = yield event_stream.get_chunk(config=pagin_config)
- defer.returnValue(data_chunk)
+ data_source = self.hs.get_event_sources().sources["room"]
+
+ if not pagin_config.from_token:
+ pagin_config.from_token = yield self.hs.get_event_sources().get_current_token()
+
+ user = self.hs.parse_userid(user_id)
+
+ events, next_token = yield data_source.get_pagination_rows(
+ user, pagin_config, room_id
+ )
+
+ chunk = {
+ "chunk": [e.get_dict() for e in events],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ defer.returnValue(chunk)
@defer.inlineCallbacks
def store_room_data(self, event=None, stamp_event=True):
@@ -235,20 +245,18 @@ class MessageHandler(BaseRoomHandler):
membership_list=[Membership.INVITE, Membership.JOIN]
)
+ user = self.hs.parse_userid(user_id)
+
rooms_ret = []
- now_rooms_token = yield self.store.get_room_events_max_id()
+ now_token = yield self.hs.get_event_sources().get_current_token()
- # FIXME (erikj): Fix this.
- presence_stream = PresenceStreamData(self.hs)
- now_presence_token = yield presence_stream.max_token()
- presence = yield presence_stream.get_rows(
- user_id, 0, now_presence_token, None, None
+ presence_stream = self.hs.get_event_sources().sources["presence"]
+ pagination_config = PaginationConfig(from_token=now_token)
+ presence, _ = yield presence_stream.get_pagination_rows(
+ user, pagination_config, None
)
- # FIXME (erikj): We need to not generate this token,
- now_token = "%s_%s" % (now_rooms_token, now_presence_token)
-
limit = pagin_config.limit
if not limit:
limit = 10
@@ -270,7 +278,7 @@ class MessageHandler(BaseRoomHandler):
messages, token = yield self.store.get_recent_events_for_room(
event.room_id,
limit=limit,
- end_token=now_rooms_token,
+ end_token=now_token.events_key,
)
d["messages"] = {
@@ -279,14 +287,18 @@ class MessageHandler(BaseRoomHandler):
"end": token[1],
}
- current_state = yield self.store.get_current_state(event.room_id)
+ current_state = yield self.store.get_current_state(
+ event.room_id
+ )
d["state"] = [c.get_dict() for c in current_state]
except:
logger.exception("Failed to get snapshot")
- ret = {"rooms": rooms_ret, "presence": presence[0], "end": now_token}
-
- # logger.debug("snapshot_all_rooms returning: %s", ret)
+ ret = {
+ "rooms": rooms_ret,
+ "presence": presence,
+ "end": now_token.to_string()
+ }
defer.returnValue(ret)
@@ -381,7 +393,6 @@ class RoomCreationHandler(BaseRoomHandler):
federation_handler = self.hs.get_handlers().federation_handler
yield federation_handler.handle_new_event(config_event, snapshot)
- # self.notifier.on_new_room_event(event, store_id)
content = {"membership": Membership.JOIN}
join_event = self.event_factory.create_event(
@@ -477,7 +488,7 @@ class RoomMemberHandler(BaseRoomHandler):
for entry in member_list
]
chunk_data = {
- "start": "START", # FIXME (erikj): START is no longer a valid value
+ "start": "START", # FIXME (erikj): START is no longer valid
"end": "END",
"chunk": event_list
}
@@ -701,17 +712,19 @@ class RoomMemberHandler(BaseRoomHandler):
# If we're inviting someone, then we should also send it to that
# HS.
target_user_id = event.state_key
+ target_user = self.hs.parse_userid(target_user_id)
if membership == Membership.INVITE:
- host = UserID.from_string(target_user_id, self.hs).domain
+ host = target_user.domain
destinations.append(host)
# If we are joining a remote HS, include that.
if membership == Membership.JOIN:
- host = UserID.from_string(target_user_id, self.hs).domain
+ host = target_user.domain
destinations.append(host)
return self._on_new_room_event(
- event, snapshot, extra_destinations=destinations
+ event, snapshot, extra_destinations=destinations,
+ extra_users=[target_user]
)
class RoomListHandler(BaseRoomHandler):
|