diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b73ad62147..82c8cb5f0c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import SynapseError, AuthError, Codes
+from synapse.api.errors import AuthError, Codes
from synapse.streams.config import PaginationConfig
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
@@ -119,9 +119,12 @@ class MessageHandler(BaseHandler):
if source_config.direction == 'b':
# if we're going backwards, we might need to backfill. This
# requires that we have a topo token.
- if room_token.topological is None:
- raise SynapseError(400, "Invalid token: cannot paginate "
- "backwards from a stream token")
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
+ room_id, room_token.stream
+ )
if membership == Membership.LEAVE:
# If they have left the room then clamp the token to be before
@@ -131,11 +134,11 @@ class MessageHandler(BaseHandler):
member_event_id
)
leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < room_token.topological:
+ if leave_token.topological < max_topo:
source_config.from_key = str(leave_token)
yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, room_token.topological
+ room_id, max_topo
)
events, next_key = yield data_source.get_pagination_rows(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 328c049b03..51ec4702db 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -72,7 +72,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
)
-class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
"room_id", # str
"timeline", # TimelineBatch
"state", # dict[(str, str), FrozenEvent]
@@ -298,46 +298,19 @@ class SyncHandler(BaseHandler):
room_id, sync_config, now_token, since_token=timeline_since_token
)
- notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config, ephemeral_by_room
- )
-
- unread_notifications = {}
- if notifs is not None:
- unread_notifications["notification_count"] = len(notifs)
- unread_notifications["highlight_count"] = len([
- 1 for notif in notifs if _action_has_highlight(notif["actions"])
- ])
-
- current_state = yield self.get_state_at(room_id, now_token)
-
- current_state = {
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(
- current_state.values()
- )
- }
-
- account_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
-
- account_data = sync_config.filter_collection.filter_room_account_data(
- account_data
+ room_sync = yield self.incremental_sync_with_gap_for_room(
+ room_id, sync_config,
+ now_token=now_token,
+ since_token=timeline_since_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ all_ephemeral_by_room=ephemeral_by_room,
+ batch=batch,
+ full_state=True,
)
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(
- ephemeral_by_room.get(room_id, [])
- )
-
- defer.returnValue(JoinedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=current_state,
- ephemeral=ephemeral,
- account_data=account_data,
- unread_notifications=unread_notifications,
- ))
+ defer.returnValue(room_sync)
def account_data_for_user(self, account_data):
account_data_events = []
@@ -429,44 +402,20 @@ class SyncHandler(BaseHandler):
defer.returnValue((now_token, ephemeral_by_room))
- @defer.inlineCallbacks
def full_state_sync_for_archived_room(self, room_id, sync_config,
leave_event_id, leave_token,
timeline_since_token, tags_by_room,
account_data_by_room):
"""Sync a room for a client which is starting without any state
Returns:
- A Deferred JoinedSyncResult.
+ A Deferred ArchivedSyncResult.
"""
- batch = yield self.load_filtered_recents(
- room_id, sync_config, leave_token, since_token=timeline_since_token
- )
-
- leave_state = yield self.store.get_state_for_event(leave_event_id)
-
- leave_state = {
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(
- leave_state.values()
- )
- }
-
- account_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
-
- account_data = sync_config.filter_collection.filter_room_account_data(
- account_data
+ return self.incremental_sync_for_archived_room(
+ sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
+ account_data_by_room, full_state=True, leave_token=leave_token,
)
- defer.returnValue(ArchivedSyncResult(
- room_id=room_id,
- timeline=batch,
- state=leave_state,
- account_data=account_data,
- ))
-
@defer.inlineCallbacks
def incremental_sync_with_gap(self, sync_config, since_token):
""" Get the incremental delta needed to bring the client up to
@@ -512,154 +461,127 @@ class SyncHandler(BaseHandler):
sync_config.user
)
- timeline_limit = sync_config.filter_collection.timeline_limit()
+ user_id = sync_config.user.to_string()
- room_events, _ = yield self.store.get_room_events_stream(
- sync_config.user.to_string(),
- from_key=since_token.room_key,
- to_key=now_token.room_key,
- limit=timeline_limit + 1,
- )
+ timeline_limit = sync_config.filter_collection.timeline_limit()
tags_by_room = yield self.store.get_updated_tags(
- sync_config.user.to_string(),
+ user_id,
since_token.account_data_key,
)
account_data, account_data_by_room = (
yield self.store.get_updated_account_data_for_user(
- sync_config.user.to_string(),
+ user_id,
since_token.account_data_key,
)
)
- joined = []
- archived = []
- if len(room_events) <= timeline_limit:
- # There is no gap in any of the rooms. Therefore we can just
- # partition the new events by room and return them.
- logger.debug("Got %i events for incremental sync - not limited",
- len(room_events))
-
- invite_events = []
- leave_events = []
- events_by_room_id = {}
- for event in room_events:
- events_by_room_id.setdefault(event.room_id, []).append(event)
- if event.room_id not in joined_room_ids:
- if (event.type == EventTypes.Member
- and event.state_key == sync_config.user.to_string()):
- if event.membership == Membership.INVITE:
- invite_events.append(event)
- elif event.membership in (Membership.LEAVE, Membership.BAN):
- leave_events.append(event)
-
- for room_id in joined_room_ids:
- recents = events_by_room_id.get(room_id, [])
- logger.debug("Events for room %s: %r", room_id, recents)
- state = {
- (event.type, event.state_key): event
- for event in recents if event.is_state()}
- limited = False
+ # Get a list of membership change events that have happened.
+ rooms_changed = yield self.store.get_room_changes_for_user(
+ user_id, since_token.room_key, now_token.room_key
+ )
- if recents:
- prev_batch = now_token.copy_and_replace(
- "room_key", recents[0].internal_metadata.before
- )
- else:
- prev_batch = now_token
-
- just_joined = yield self.check_joined_room(sync_config, state)
- if just_joined:
- logger.debug("User has just joined %s: needs full state",
- room_id)
- state = yield self.get_state_at(room_id, now_token)
- # the timeline is inherently limited if we've just joined
- limited = True
-
- recents = sync_config.filter_collection.filter_room_timeline(recents)
-
- state = {
- (e.type, e.state_key): e
- for e in sync_config.filter_collection.filter_room_state(
- state.values()
- )
- }
-
- acc_data = self.account_data_for_room(
- room_id, tags_by_room, account_data_by_room
- )
+ mem_change_events_by_room_id = {}
+ for event in rooms_changed:
+ mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
- acc_data = sync_config.filter_collection.filter_room_account_data(
- acc_data
- )
-
- ephemeral = sync_config.filter_collection.filter_room_ephemeral(
- ephemeral_by_room.get(room_id, [])
- )
+ newly_joined_rooms = []
+ archived = []
+ invited = []
+ for room_id, events in mem_change_events_by_room_id.items():
+ non_joins = [e for e in events if e.membership != Membership.JOIN]
+ has_join = len(non_joins) != len(events)
+
+ # We want to figure out if we joined the room at some point since
+ # the last sync (even if we have since left). This is to make sure
+ # we do send down the room, and with full state, where necessary
+ if room_id in joined_room_ids or has_join:
+ old_state = yield self.get_state_at(room_id, since_token)
+ old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+ if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+ newly_joined_rooms.append(room_id)
+
+ if room_id in joined_room_ids:
+ continue
+
+ if not non_joins:
+ continue
- room_sync = JoinedSyncResult(
- room_id=room_id,
- timeline=TimelineBatch(
- events=recents,
- prev_batch=prev_batch,
- limited=limited,
- ),
- state=state,
- ephemeral=ephemeral,
- account_data=acc_data,
- unread_notifications={},
+ # Only bother if we're still currently invited
+ should_invite = non_joins[-1].membership == Membership.INVITE
+ if should_invite:
+ room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+ if room_sync:
+ invited.append(room_sync)
+
+ # Always include leave/ban events. Just take the last one.
+ # TODO: How do we handle ban -> leave in same batch?
+ leave_events = [
+ e for e in non_joins
+ if e.membership in (Membership.LEAVE, Membership.BAN)
+ ]
+
+ if leave_events:
+ leave_event = leave_events[-1]
+ room_sync = yield self.incremental_sync_for_archived_room(
+ sync_config, room_id, leave_event.event_id, since_token,
+ tags_by_room, account_data_by_room,
+ full_state=room_id in newly_joined_rooms
)
- logger.debug("Result for room %s: %r", room_id, room_sync)
-
if room_sync:
- notifs = yield self.unread_notifs_for_room_id(
- room_id, sync_config, all_ephemeral_by_room
- )
+ archived.append(room_sync)
- if notifs is not None:
- notif_dict = room_sync.unread_notifications
- notif_dict["notification_count"] = len(notifs)
- notif_dict["highlight_count"] = len([
- 1 for notif in notifs
- if _action_has_highlight(notif["actions"])
- ])
+ # Get all events for rooms we're currently joined to.
+ room_to_events = yield self.store.get_room_events_stream_for_rooms(
+ room_ids=joined_room_ids,
+ from_key=since_token.room_key,
+ to_key=now_token.room_key,
+ limit=timeline_limit + 1,
+ )
- joined.append(room_sync)
+ joined = []
+ # We loop through all room ids, even if there are no new events, in case
+ # there are non room events taht we need to notify about.
+ for room_id in joined_room_ids:
+ room_entry = room_to_events.get(room_id, None)
- else:
- logger.debug("Got %i events for incremental sync - hit limit",
- len(room_events))
+ if room_entry:
+ events, start_key = room_entry
- invite_events = yield self.store.get_invites_for_user(
- sync_config.user.to_string()
- )
+ prev_batch_token = now_token.copy_and_replace("room_key", start_key)
- leave_events = yield self.store.get_leave_and_ban_events_for_user(
- sync_config.user.to_string()
- )
+ newly_joined_room = room_id in newly_joined_rooms
+ full_state = newly_joined_room
- for room_id in joined_room_ids:
- room_sync = yield self.incremental_sync_with_gap_for_room(
- room_id, sync_config, since_token, now_token,
- ephemeral_by_room, tags_by_room, account_data_by_room,
- all_ephemeral_by_room=all_ephemeral_by_room,
+ batch = yield self.load_filtered_recents(
+ room_id, sync_config, prev_batch_token,
+ since_token=since_token,
+ recents=events,
+ newly_joined_room=newly_joined_room,
)
- if room_sync:
- joined.append(room_sync)
+ else:
+ batch = TimelineBatch(
+ events=[],
+ prev_batch=since_token,
+ limited=False,
+ )
+ full_state = False
- for leave_event in leave_events:
- room_sync = yield self.incremental_sync_for_archived_room(
- sync_config, leave_event, since_token, tags_by_room,
- account_data_by_room
+ room_sync = yield self.incremental_sync_with_gap_for_room(
+ room_id=room_id,
+ sync_config=sync_config,
+ since_token=since_token,
+ now_token=now_token,
+ ephemeral_by_room=ephemeral_by_room,
+ tags_by_room=tags_by_room,
+ account_data_by_room=account_data_by_room,
+ all_ephemeral_by_room=all_ephemeral_by_room,
+ batch=batch,
+ full_state=full_state,
)
if room_sync:
- archived.append(room_sync)
-
- invited = [
- InvitedSyncResult(room_id=event.room_id, invite=event)
- for event in invite_events
- ]
+ joined.append(room_sync)
account_data_for_user = sync_config.filter_collection.filter_account_data(
self.account_data_for_user(account_data)
@@ -680,28 +602,40 @@ class SyncHandler(BaseHandler):
@defer.inlineCallbacks
def load_filtered_recents(self, room_id, sync_config, now_token,
- since_token=None):
+ since_token=None, recents=None, newly_joined_room=False):
"""
:returns a Deferred TimelineBatch
"""
- limited = True
- recents = []
filtering_factor = 2
timeline_limit = sync_config.filter_collection.timeline_limit()
- load_limit = max(timeline_limit * filtering_factor, 100)
- max_repeat = 3 # Only try a few times per room, otherwise
+ load_limit = max(timeline_limit * filtering_factor, 10)
+ max_repeat = 5 # Only try a few times per room, otherwise
room_key = now_token.room_key
end_key = room_key
+ limited = recents is None or newly_joined_room or timeline_limit < len(recents)
+
+ if recents is not None:
+ recents = sync_config.filter_collection.filter_room_timeline(recents)
+ recents = yield self._filter_events_for_client(
+ sync_config.user.to_string(),
+ recents,
+ is_peeking=sync_config.is_guest,
+ )
+ else:
+ recents = []
+
+ since_key = None
+ if since_token and not newly_joined_room:
+ since_key = since_token.room_key
+
while limited and len(recents) < timeline_limit and max_repeat:
- events, keys = yield self.store.get_recent_events_for_room(
+ events, end_key = yield self.store.get_room_events_stream_for_room(
room_id,
limit=load_limit + 1,
- from_token=since_token.room_key if since_token else None,
- end_token=end_key,
+ from_key=since_key,
+ to_key=end_key,
)
- room_key, _ = keys
- end_key = "s" + room_key.split('-')[-1]
loaded_recents = sync_config.filter_collection.filter_room_timeline(events)
loaded_recents = yield self._filter_events_for_client(
sync_config.user.to_string(),
@@ -710,8 +644,10 @@ class SyncHandler(BaseHandler):
)
loaded_recents.extend(recents)
recents = loaded_recents
+
if len(events) <= load_limit:
limited = False
+ break
max_repeat -= 1
if len(recents) > timeline_limit:
@@ -724,7 +660,9 @@ class SyncHandler(BaseHandler):
)
defer.returnValue(TimelineBatch(
- events=recents, prev_batch=prev_batch_token, limited=limited
+ events=recents,
+ prev_batch=prev_batch_token,
+ limited=limited or newly_joined_room
))
@defer.inlineCallbacks
@@ -732,25 +670,12 @@ class SyncHandler(BaseHandler):
since_token, now_token,
ephemeral_by_room, tags_by_room,
account_data_by_room,
- all_ephemeral_by_room):
- """ Get the incremental delta needed to bring the client up to date for
- the room. Gives the client the most recent events and the changes to
- state.
- Returns:
- A Deferred JoinedSyncResult
- """
- logger.debug("Doing incremental sync for room %s between %s and %s",
- room_id, since_token, now_token)
-
- # TODO(mjark): Check for redactions we might have missed.
-
- batch = yield self.load_filtered_recents(
- room_id, sync_config, now_token, since_token,
- )
-
- logger.debug("Recents %r", batch)
+ all_ephemeral_by_room,
+ batch, full_state=False):
+ if full_state:
+ state = yield self.get_state_at(room_id, now_token)
- if batch.limited:
+ elif batch.limited:
current_state = yield self.get_state_at(room_id, now_token)
state_at_previous_sync = yield self.get_state_at(
@@ -814,43 +739,48 @@ class SyncHandler(BaseHandler):
defer.returnValue(room_sync)
@defer.inlineCallbacks
- def incremental_sync_for_archived_room(self, sync_config, leave_event,
+ def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
since_token, tags_by_room,
- account_data_by_room):
+ account_data_by_room, full_state,
+ leave_token=None):
""" Get the incremental delta needed to bring the client up to date for
the archived room.
Returns:
A Deferred ArchivedSyncResult
"""
- stream_token = yield self.store.get_stream_token_for_event(
- leave_event.event_id
- )
+ if not leave_token:
+ stream_token = yield self.store.get_stream_token_for_event(
+ leave_event_id
+ )
- leave_token = since_token.copy_and_replace("room_key", stream_token)
+ leave_token = since_token.copy_and_replace("room_key", stream_token)
- if since_token.is_after(leave_token):
+ if since_token and since_token.is_after(leave_token):
defer.returnValue(None)
batch = yield self.load_filtered_recents(
- leave_event.room_id, sync_config, leave_token, since_token,
+ room_id, sync_config, leave_token, since_token,
)
logger.debug("Recents %r", batch)
state_events_at_leave = yield self.store.get_state_for_event(
- leave_event.event_id
+ leave_event_id
)
- state_at_previous_sync = yield self.get_state_at(
- leave_event.room_id, stream_position=since_token
- )
+ if not full_state:
+ state_at_previous_sync = yield self.get_state_at(
+ room_id, stream_position=since_token
+ )
- state_events_delta = yield self.compute_state_delta(
- since_token=since_token,
- previous_state=state_at_previous_sync,
- current_state=state_events_at_leave,
- )
+ state_events_delta = yield self.compute_state_delta(
+ since_token=since_token,
+ previous_state=state_at_previous_sync,
+ current_state=state_events_at_leave,
+ )
+ else:
+ state_events_delta = state_events_at_leave
state_events_delta = {
(e.type, e.state_key): e
@@ -860,7 +790,7 @@ class SyncHandler(BaseHandler):
}
account_data = self.account_data_for_room(
- leave_event.room_id, tags_by_room, account_data_by_room
+ room_id, tags_by_room, account_data_by_room
)
account_data = sync_config.filter_collection.filter_room_account_data(
@@ -868,7 +798,7 @@ class SyncHandler(BaseHandler):
)
room_sync = ArchivedSyncResult(
- room_id=leave_event.room_id,
+ room_id=room_id,
timeline=batch,
state=state_events_delta,
account_data=account_data,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 298cb9bada..80187722ea 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -128,6 +128,9 @@ class EventsStore(SQLBaseStore):
is_new_state=is_new_state,
current_state=current_state,
)
+ self._events_stream_cache.room_has_changed(
+ None, event.room_id, stream_ordering
+ )
except _RollbackButIsFineException:
pass
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index c0593e23ee..7118368d97 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -15,11 +15,10 @@
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList, cached
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from twisted.internet import defer
-from blist import sorteddict
import logging
import ujson as json
@@ -31,8 +30,8 @@ class ReceiptsStore(SQLBaseStore):
def __init__(self, hs):
super(ReceiptsStore, self).__init__(hs)
- self._receipts_stream_cache = _RoomStreamChangeCache(
- self._receipts_id_gen.get_max_token(None)
+ self._receipts_stream_cache = RoomStreamChangeCache(
+ "ReceiptsRoomChangeCache", self._receipts_id_gen.get_max_token(None)
)
@cached(num_args=2)
@@ -370,63 +369,3 @@ class ReceiptsStore(SQLBaseStore):
"data": json.dumps(data),
}
)
-
-
-class _RoomStreamChangeCache(object):
- """Keeps track of the stream_id of the latest change in rooms.
-
- Given a list of rooms and stream key, it will give a subset of rooms that
- may have changed since that key. If the key is too old then the cache
- will simply return all rooms.
- """
- def __init__(self, current_key, size_of_cache=10000):
- self._size_of_cache = size_of_cache
- self._room_to_key = {}
- self._cache = sorteddict()
- self._earliest_key = current_key
- self.name = "ReceiptsRoomChangeCache"
- caches_by_name[self.name] = self._cache
-
- @defer.inlineCallbacks
- def get_rooms_changed(self, store, room_ids, key):
- """Returns subset of room ids that have had new receipts since the
- given key. If the key is too old it will just return the given list.
- """
- if key > (yield self._get_earliest_key(store)):
- keys = self._cache.keys()
- i = keys.bisect_right(key)
-
- result = set(
- self._cache[k] for k in keys[i:]
- ).intersection(room_ids)
-
- cache_counter.inc_hits(self.name)
- else:
- result = room_ids
- cache_counter.inc_misses(self.name)
-
- defer.returnValue(result)
-
- @defer.inlineCallbacks
- def room_has_changed(self, store, room_id, key):
- """Informs the cache that the room has been changed at the given key.
- """
- if key > (yield self._get_earliest_key(store)):
- old_key = self._room_to_key.get(room_id, None)
- if old_key:
- key = max(key, old_key)
- self._cache.pop(old_key, None)
- self._cache[key] = room_id
-
- while len(self._cache) > self._size_of_cache:
- k, r = self._cache.popitem()
- self._earliest_key = max(k, self._earliest_key)
- self._room_to_key.pop(r, None)
-
- @defer.inlineCallbacks
- def _get_earliest_key(self, store):
- if self._earliest_key is None:
- self._earliest_key = yield store.get_max_receipt_stream_id()
- self._earliest_key = int(self._earliest_key)
-
- defer.returnValue(self._earliest_key)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index e31bad258a..5096b46864 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -37,6 +37,7 @@ from twisted.internet import defer
from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.room_change_cache import RoomStreamChangeCache
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logutils import log_function
@@ -77,6 +78,12 @@ def upper_bound(token):
class StreamStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(StreamStore, self).__init__(hs)
+
+ self._events_stream_cache = RoomStreamChangeCache(
+ "EventsRoomStreamChangeCache", self._stream_id_gen.get_max_token(None)
+ )
@defer.inlineCallbacks
def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
@@ -157,6 +164,134 @@ class StreamStore(SQLBaseStore):
results = yield self.runInteraction("get_appservice_room_stream", f)
defer.returnValue(results)
+ @defer.inlineCallbacks
+ def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0):
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+
+ room_ids = yield self._events_stream_cache.get_rooms_changed(
+ self, room_ids, from_id
+ )
+
+ if not room_ids:
+ defer.returnValue({})
+
+ results = {}
+ room_ids = list(room_ids)
+ for rm_ids in (room_ids[i:i+20] for i in xrange(0, len(room_ids), 20)):
+ res = yield defer.gatherResults([
+ self.get_room_events_stream_for_room(
+ room_id, from_key, to_key, limit
+ ).addCallback(lambda r, rm: (rm, r), room_id)
+ for room_id in room_ids
+ ])
+ results.update(dict(res))
+
+ defer.returnValue(results)
+
+ @defer.inlineCallbacks
+ def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0):
+ if from_key is not None:
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ else:
+ from_id = None
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+ if from_key == to_key:
+ defer.returnValue(([], from_key))
+
+ has_changed = yield self._events_stream_cache.get_room_has_changed(
+ room_id, from_id
+ )
+
+ if not has_changed:
+ defer.returnValue(([], from_key))
+
+ def f(txn):
+ if from_id is not None:
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering > ? AND stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC LIMIT ?"
+ )
+ txn.execute(sql, (room_id, from_id, to_id, limit))
+ else:
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering <= ?"
+ " ORDER BY stream_ordering DESC LIMIT ?"
+ )
+ txn.execute(sql, (room_id, to_id, limit))
+
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(ret, rows, topo_order=False)
+
+ ret.reverse()
+
+ if rows:
+ key = "s%d" % min(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = from_key
+
+ return ret, key
+ res = yield self.runInteraction("get_room_events_stream_for_room", f)
+ defer.returnValue(res)
+
+ def get_room_changes_for_user(self, user_id, from_key, to_key):
+ if from_key is not None:
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ else:
+ from_id = None
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
+
+ if from_key == to_key:
+ return defer.succeed([])
+
+ def f(txn):
+ if from_id is not None:
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e,"
+ " room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, from_id, to_id,))
+ else:
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e,"
+ " room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND stream_ordering <= ?"
+ " ORDER BY stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, to_id,))
+ rows = self.cursor_to_dict(txn)
+
+ ret = self._get_events_txn(
+ txn,
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ return ret
+
+ return self.runInteraction("get_room_changes_for_user", f)
+
@log_function
def get_room_events_stream(
self,
@@ -174,7 +309,8 @@ class StreamStore(SQLBaseStore):
"SELECT c.room_id FROM history_visibility AS h"
" INNER JOIN current_state_events AS c"
" ON h.event_id = c.event_id"
- " WHERE c.room_id IN (%s) AND h.history_visibility = 'world_readable'" % (
+ " WHERE c.room_id IN (%s)"
+ " AND h.history_visibility = 'world_readable'" % (
",".join(map(lambda _: "?", room_ids))
)
)
@@ -434,6 +570,18 @@ class StreamStore(SQLBaseStore):
row["topological_ordering"], row["stream_ordering"],)
)
+ def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
+ sql = (
+ "SELECT max(topological_ordering) FROM events"
+ " WHERE room_id = ? AND stream_ordering < ?"
+ )
+ return self._execute(
+ "get_max_topological_token_for_stream_and_room", None,
+ sql, room_id, stream_key,
+ ).addCallback(
+ lambda r: r[0][0] if r else 0
+ )
+
def _get_max_topological_txn(self, txn):
txn.execute(
"SELECT MAX(topological_ordering) FROM events"
@@ -445,10 +593,13 @@ class StreamStore(SQLBaseStore):
return rows[0][0] if rows else 0
@staticmethod
- def _set_before_and_after(events, rows):
+ def _set_before_and_after(events, rows, topo_order=True):
for event, row in zip(events, rows):
stream = row["stream_ordering"]
- topo = event.depth
+ if topo_order:
+ topo = event.depth
+ else:
+ topo = None
internal = event.internal_metadata
internal.before = str(RoomStreamToken(topo, stream - 1))
internal.after = str(RoomStreamToken(topo, stream))
diff --git a/synapse/util/caches/room_change_cache.py b/synapse/util/caches/room_change_cache.py
new file mode 100644
index 0000000000..3a873c9c30
--- /dev/null
+++ b/synapse/util/caches/room_change_cache.py
@@ -0,0 +1,86 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.caches import cache_counter, caches_by_name
+
+
+from blist import sorteddict
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class RoomStreamChangeCache(object):
+ """Keeps track of the stream_id of the latest change in rooms.
+
+ Given a list of rooms and stream key, it will give a subset of rooms that
+ may have changed since that key. If the key is too old then the cache
+ will simply return all rooms.
+ """
+ def __init__(self, name, current_key, size_of_cache=10000):
+ self._size_of_cache = size_of_cache
+ self._room_to_key = {}
+ self._cache = sorteddict()
+ self._earliest_known_key = current_key
+ self.name = name
+ caches_by_name[self.name] = self._cache
+
+ def get_room_has_changed(self, room_id, key):
+ if key <= self._earliest_known_key:
+ return True
+
+ room_key = self._room_to_key.get(room_id, None)
+ if room_key is None:
+ return True
+
+ if key < room_key:
+ return True
+
+ return False
+
+ def get_rooms_changed(self, store, room_ids, key):
+ """Returns subset of room ids that have had new things since the
+ given key. If the key is too old it will just return the given list.
+ """
+ if key > self._earliest_known_key:
+ keys = self._cache.keys()
+ i = keys.bisect_right(key)
+
+ result = set(
+ self._cache[k] for k in keys[i:]
+ ).intersection(room_ids)
+
+ cache_counter.inc_hits(self.name)
+ else:
+ result = room_ids
+ cache_counter.inc_misses(self.name)
+
+ return result
+
+ def room_has_changed(self, store, room_id, key):
+ """Informs the cache that the room has been changed at the given key.
+ """
+ if key > self._earliest_known_key:
+ old_key = self._room_to_key.get(room_id, None)
+ if old_key:
+ key = max(key, old_key)
+ self._cache.pop(old_key, None)
+ self._cache[key] = room_id
+
+ while len(self._cache) > self._size_of_cache:
+ k, r = self._cache.popitem()
+ self._earliest_key = max(k, self._earliest_key)
+ self._room_to_key.pop(r, None)
|