diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 52bdce5be2..057f30db33 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -35,13 +35,17 @@ what sort order was used:
from twisted.internet import defer
-from ._base import SQLBaseStore
+from synapse.storage._base import SQLBaseStore
+from synapse.storage.events import EventsWorkerStore
+
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+import abc
import logging
@@ -143,81 +147,28 @@ def filter_to_clause(event_filter):
return " AND ".join(clauses), args
-class StreamStore(SQLBaseStore):
- @defer.inlineCallbacks
- def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
- # NB this lives here instead of appservice.py so we can reuse the
- # 'private' StreamToken class in this file.
- if limit:
- limit = max(limit, MAX_STREAM_SIZE)
- else:
- limit = MAX_STREAM_SIZE
+class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
+ __metaclass__ = abc.ABCMeta
- # From and to keys should be integers from ordering.
- from_id = RoomStreamToken.parse_stream_token(from_key)
- to_id = RoomStreamToken.parse_stream_token(to_key)
-
- if from_key == to_key:
- defer.returnValue(([], to_key))
- return
+ def __init__(self, db_conn, hs):
+ super(StreamWorkerStore, self).__init__(db_conn, hs)
- # select all the events between from/to with a sensible limit
- sql = (
- "SELECT e.event_id, e.room_id, e.type, s.state_key, "
- "e.stream_ordering FROM events AS e "
- "LEFT JOIN state_events as s ON "
- "e.event_id = s.event_id "
- "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
- "ORDER BY stream_ordering ASC LIMIT %(limit)d "
- ) % {
- "limit": limit
- }
-
- def f(txn):
- # pull out all the events between the tokens
- txn.execute(sql, (from_id.stream, to_id.stream,))
- rows = self.cursor_to_dict(txn)
-
- # Logic:
- # - We want ALL events which match the AS room_id regex
- # - We want ALL events which match the rooms represented by the AS
- # room_alias regex
- # - We want ALL events for rooms that AS users have joined.
- # This is currently supported via get_app_service_rooms (which is
- # used for the Notifier listener rooms). We can't reasonably make a
- # SQL query for these room IDs, so we'll pull all the events between
- # from/to and filter in python.
- rooms_for_as = self._get_app_service_rooms_txn(txn, service)
- room_ids_for_as = [r.room_id for r in rooms_for_as]
-
- def app_service_interested(row):
- if row["room_id"] in room_ids_for_as:
- return True
-
- if row["type"] == EventTypes.Member:
- if service.is_interested_in_user(row.get("state_key")):
- return True
- return False
-
- return [r for r in rows if app_service_interested(r)]
-
- rows = yield self.runInteraction("get_appservice_room_stream", f)
-
- ret = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
+ events_max = self.get_room_max_stream_ordering()
+ event_cache_prefill, min_event_val = self._get_cache_dict(
+ db_conn, "events",
+ entity_column="room_id",
+ stream_column="stream_ordering",
+ max_value=events_max,
+ )
+ self._events_stream_cache = StreamChangeCache(
+ "EventsRoomStreamChangeCache", min_event_val,
+ prefilled_cache=event_cache_prefill,
+ )
+ self._membership_stream_cache = StreamChangeCache(
+ "MembershipStreamChangeCache", events_max,
)
- self._set_before_and_after(ret, rows, topo_order=from_id is None)
-
- if rows:
- key = "s%d" % max(r["stream_ordering"] for r in rows)
- else:
- # Assume we didn't get anything because there was nothing to
- # get.
- key = to_key
-
- defer.returnValue((ret, key))
+ self._stream_order_on_start = self.get_room_max_stream_ordering()
@defer.inlineCallbacks
def get_room_events_stream_for_rooms(self, room_ids, from_key, to_key, limit=0,
@@ -381,88 +332,6 @@ class StreamStore(SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def paginate_room_events(self, room_id, from_key, to_key=None,
- direction='b', limit=-1, event_filter=None):
- # Tokens really represent positions between elements, but we use
- # the convention of pointing to the event before the gap. Hence
- # we have a bit of asymmetry when it comes to equalities.
- args = [False, room_id]
- if direction == 'b':
- order = "DESC"
- bounds = upper_bound(
- RoomStreamToken.parse(from_key), self.database_engine
- )
- if to_key:
- bounds = "%s AND %s" % (bounds, lower_bound(
- RoomStreamToken.parse(to_key), self.database_engine
- ))
- else:
- order = "ASC"
- bounds = lower_bound(
- RoomStreamToken.parse(from_key), self.database_engine
- )
- if to_key:
- bounds = "%s AND %s" % (bounds, upper_bound(
- RoomStreamToken.parse(to_key), self.database_engine
- ))
-
- filter_clause, filter_args = filter_to_clause(event_filter)
-
- if filter_clause:
- bounds += " AND " + filter_clause
- args.extend(filter_args)
-
- if int(limit) > 0:
- args.append(int(limit))
- limit_str = " LIMIT ?"
- else:
- limit_str = ""
-
- sql = (
- "SELECT * FROM events"
- " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
- " ORDER BY topological_ordering %(order)s,"
- " stream_ordering %(order)s %(limit)s"
- ) % {
- "bounds": bounds,
- "order": order,
- "limit": limit_str
- }
-
- def f(txn):
- txn.execute(sql, args)
-
- rows = self.cursor_to_dict(txn)
-
- if rows:
- topo = rows[-1]["topological_ordering"]
- toke = rows[-1]["stream_ordering"]
- if direction == 'b':
- # Tokens are positions between events.
- # This token points *after* the last event in the chunk.
- # We need it to point to the event before it in the chunk
- # when we are going backwards so we subtract one from the
- # stream part.
- toke -= 1
- next_token = str(RoomStreamToken(topo, toke))
- else:
- # TODO (erikj): We should work out what to do here instead.
- next_token = to_key if to_key else from_key
-
- return rows, next_token,
-
- rows, token = yield self.runInteraction("paginate_room_events", f)
-
- events = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
-
- self._set_before_and_after(events, rows)
-
- defer.returnValue((events, token))
-
- @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
rows, token = yield self.get_recent_event_ids_for_room(
room_id, limit, end_token, from_token
@@ -542,7 +411,7 @@ class StreamStore(SQLBaseStore):
`room_id` causes it to return the current room specific topological
token.
"""
- token = yield self._stream_id_gen.get_current_token()
+ token = yield self.get_room_max_stream_ordering()
if room_id is None:
defer.returnValue("s%d" % (token,))
else:
@@ -552,11 +421,13 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue("t%d-%d" % (topo, token))
+ @abc.abstractmethod
def get_room_max_stream_ordering(self):
- return self._stream_id_gen.get_current_token()
+ raise NotImplementedError()
+ @abc.abstractmethod
def get_room_min_stream_ordering(self):
- return self._backfill_id_gen.get_current_token()
+ raise NotImplementedError()
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
@@ -832,3 +703,168 @@ class StreamStore(SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+
+
+class StreamStore(StreamWorkerStore):
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
+
+ @defer.inlineCallbacks
+ def get_appservice_room_stream(self, service, from_key, to_key, limit=0):
+ # NB this lives here instead of appservice.py so we can reuse the
+ # 'private' StreamToken class in this file.
+ if limit:
+ limit = max(limit, MAX_STREAM_SIZE)
+ else:
+ limit = MAX_STREAM_SIZE
+
+ # From and to keys should be integers from ordering.
+ from_id = RoomStreamToken.parse_stream_token(from_key)
+ to_id = RoomStreamToken.parse_stream_token(to_key)
+
+ if from_key == to_key:
+ defer.returnValue(([], to_key))
+ return
+
+ # select all the events between from/to with a sensible limit
+ sql = (
+ "SELECT e.event_id, e.room_id, e.type, s.state_key, "
+ "e.stream_ordering FROM events AS e "
+ "LEFT JOIN state_events as s ON "
+ "e.event_id = s.event_id "
+ "WHERE e.stream_ordering > ? AND e.stream_ordering <= ? "
+ "ORDER BY stream_ordering ASC LIMIT %(limit)d "
+ ) % {
+ "limit": limit
+ }
+
+ def f(txn):
+ # pull out all the events between the tokens
+ txn.execute(sql, (from_id.stream, to_id.stream,))
+ rows = self.cursor_to_dict(txn)
+
+ # Logic:
+ # - We want ALL events which match the AS room_id regex
+ # - We want ALL events which match the rooms represented by the AS
+ # room_alias regex
+ # - We want ALL events for rooms that AS users have joined.
+ # This is currently supported via get_app_service_rooms (which is
+ # used for the Notifier listener rooms). We can't reasonably make a
+ # SQL query for these room IDs, so we'll pull all the events between
+ # from/to and filter in python.
+ rooms_for_as = self._get_app_service_rooms_txn(txn, service)
+ room_ids_for_as = [r.room_id for r in rooms_for_as]
+
+ def app_service_interested(row):
+ if row["room_id"] in room_ids_for_as:
+ return True
+
+ if row["type"] == EventTypes.Member:
+ if service.is_interested_in_user(row.get("state_key")):
+ return True
+ return False
+
+ return [r for r in rows if app_service_interested(r)]
+
+ rows = yield self.runInteraction("get_appservice_room_stream", f)
+
+ ret = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
+
+ if rows:
+ key = "s%d" % max(r["stream_ordering"] for r in rows)
+ else:
+ # Assume we didn't get anything because there was nothing to
+ # get.
+ key = to_key
+
+ defer.returnValue((ret, key))
+
+ @defer.inlineCallbacks
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1, event_filter=None):
+ # Tokens really represent positions between elements, but we use
+ # the convention of pointing to the event before the gap. Hence
+ # we have a bit of asymmetry when it comes to equalities.
+ args = [False, room_id]
+ if direction == 'b':
+ order = "DESC"
+ bounds = upper_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, lower_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+ else:
+ order = "ASC"
+ bounds = lower_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
+ if to_key:
+ bounds = "%s AND %s" % (bounds, upper_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
+
+ filter_clause, filter_args = filter_to_clause(event_filter)
+
+ if filter_clause:
+ bounds += " AND " + filter_clause
+ args.extend(filter_args)
+
+ if int(limit) > 0:
+ args.append(int(limit))
+ limit_str = " LIMIT ?"
+ else:
+ limit_str = ""
+
+ sql = (
+ "SELECT * FROM events"
+ " WHERE outlier = ? AND room_id = ? AND %(bounds)s"
+ " ORDER BY topological_ordering %(order)s,"
+ " stream_ordering %(order)s %(limit)s"
+ ) % {
+ "bounds": bounds,
+ "order": order,
+ "limit": limit_str
+ }
+
+ def f(txn):
+ txn.execute(sql, args)
+
+ rows = self.cursor_to_dict(txn)
+
+ if rows:
+ topo = rows[-1]["topological_ordering"]
+ toke = rows[-1]["stream_ordering"]
+ if direction == 'b':
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
+ toke -= 1
+ next_token = str(RoomStreamToken(topo, toke))
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_key if to_key else from_key
+
+ return rows, next_token,
+
+ rows, token = yield self.runInteraction("paginate_room_events", f)
+
+ events = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+
+ self._set_before_and_after(events, rows)
+
+ defer.returnValue((events, token))
|